Skip to content

Commit

Permalink
Fixing lifetimes for dogstatsd flusher.
Browse files Browse the repository at this point in the history
I think Rust is too dumb :-)

Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
  • Loading branch information
bwoebi committed Sep 18, 2024
1 parent 8fecb9a commit ef7adb3
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 31 additions & 5 deletions data-pipeline/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use hyper::http::uri::PathAndQuery;
use hyper::{Body, Client, Method, Uri};
use log::error;
use std::{borrow::Borrow, collections::HashMap, str::FromStr};
use std::iter::Chain;
use std::marker::PhantomData;
use std::sync::Mutex;
use tokio::runtime::Runtime;

/// TraceExporterInputFormat represents the format of the input traces.
Expand Down Expand Up @@ -209,7 +212,7 @@ impl TraceExporter {
Err(err) => {
self.emit_metric(HealthMetric::Count(STAT_SEND_ERRORS, 1), None);
anyhow::bail!("Failed to send traces: {err}")
},
}
}
})
.or_else(|err| {
Expand All @@ -220,18 +223,38 @@ impl TraceExporter {

/// Emit a health metric to dogstatsd
fn emit_metric(&self, metric: HealthMetric, custom_tags: Option<Vec<Tag>>) {
struct WrapIt<'a, 'b: 'a, V: 'b, I: IntoIterator<Item = &'a V>> {
it: Mutex<Option<I>>,
_p: PhantomData<&'b I>,
}
impl<'a, 'b, V, I: IntoIterator<Item = &'a V>> WrapIt<'a, 'b, V, I> {
pub fn wrap(iter: I) -> Self {
WrapIt {
it: Mutex::new(Some(iter)),
_p: PhantomData,
}
}
}
impl<'a, 'b, V, I: IntoIterator<Item = &'a V>> IntoIterator for &'b WrapIt<'a, 'b, V, I> {
type Item = I::Item;
type IntoIter = I::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.it.lock().unwrap().take().unwrap().into_iter()
}
}
if let Some(flusher) = &self.dogstatsd {
if custom_tags.is_some() {
match metric {
HealthMetric::Count(name, c) => flusher.send(vec![DogStatsDAction::Count(
HealthMetric::Count(name, c) => flusher.send(&vec![DogStatsDAction::Count(
name,
c,
&self.common_stats_tags.iter().chain(&custom_tags.unwrap()),
&WrapIt::wrap(self.common_stats_tags.iter().chain(&custom_tags.unwrap())),
)]),
}
} else {
match metric {
HealthMetric::Count(name, c) => flusher.send(vec![DogStatsDAction::Count(
HealthMetric::Count(name, c) => flusher.send(&vec![DogStatsDAction::Count(
name,
c,
&self.common_stats_tags,
Expand All @@ -258,7 +281,10 @@ impl TraceExporter {
return Ok(String::from("{}"));
}

self.emit_metric(HealthMetric::Count(STAT_DESER_TRACES, traces.len() as i64), None);
self.emit_metric(
HealthMetric::Count(STAT_DESER_TRACES, traces.len() as i64),
None,
);

let header_tags: TracerHeaderTags<'_> = (&self.tags).into();

Expand Down
41 changes: 19 additions & 22 deletions dogstatsd-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ const QUEUE_SIZE: usize = 32 * 1024;
/// The `DogStatsDActionRef` enum gathers the metric types that can be sent to the DogStatsD server.
#[derive(Debug, Serialize, Deserialize)]
pub enum DogStatsDAction<T: AsRef<str>, V: std::ops::Deref>
where
for<'a> &'a <V as std::ops::Deref>::Target: IntoIterator<Item = &'a Tag>,
{
// TODO: instead of AsRef<str> we can accept a marker Trait that users of this crate implement
Count(T, i64, V),
Expand Down Expand Up @@ -70,34 +68,34 @@ impl Flusher {
Ok(())
}

pub fn send<T: AsRef<str>, V: std::ops::Deref>(&self, actions: Vec<DogStatsDAction<T, V>>)
pub fn send<'a, T: AsRef<str> + 'a, V: std::ops::Deref + 'a>(&self, actions: &'a Vec<DogStatsDAction<T, V>>)
where
for<'a> &'a <V as std::ops::Deref>::Target: IntoIterator<Item = &'a Tag>,
&'a <V as std::ops::Deref>::Target: IntoIterator<Item = &'a Tag> + 'a,
{
if self.client.is_none() {
return;
}
let client = self.client.as_ref().unwrap();

for action in actions {
for action in actions.iter() {
if let Err(err) = match action {
DogStatsDAction::Count(metric, value, ref tags) => {
let metric_builder = client.count_with_tags(metric.as_ref(), value);
do_send(metric_builder, tags.deref())
DogStatsDAction::Count(metric, value, tags) => {
let metric_builder = client.count_with_tags(metric.as_ref(), *value);
do_send(metric_builder, &mut tags.deref().into_iter())
}
DogStatsDAction::Distribution(metric, value, ref tags) => do_send(
client.distribution_with_tags(metric.as_ref(), value),
tags.deref(),
DogStatsDAction::Distribution(metric, value, tags) => do_send(
client.distribution_with_tags(metric.as_ref(), *value),
&mut tags.deref().into_iter(),
),
DogStatsDAction::Gauge(metric, value, ref tags) => {
do_send(client.gauge_with_tags(metric.as_ref(), value), tags.deref())
DogStatsDAction::Gauge(metric, value, tags) => {
do_send(client.gauge_with_tags(metric.as_ref(), *value), &mut tags.deref().into_iter())
}
DogStatsDAction::Histogram(metric, value, ref tags) => do_send(
client.histogram_with_tags(metric.as_ref(), value),
tags.deref(),
DogStatsDAction::Histogram(metric, value, tags) => do_send(
client.histogram_with_tags(metric.as_ref(), *value),
&mut tags.deref().into_iter(),
),
DogStatsDAction::Set(metric, value, ref tags) => {
do_send(client.set_with_tags(metric.as_ref(), value), tags.deref())
DogStatsDAction::Set(metric, value, tags) => {
do_send(client.set_with_tags(metric.as_ref(), *value), &mut tags.deref().into_iter())
}
} {
error!("Error while sending metric: {}", err);
Expand All @@ -106,15 +104,14 @@ impl Flusher {
}
}

fn do_send<'m, 't, T, V: IntoIterator<Item = &'t Tag>>(
fn do_send<'m, 't, T, V: Iterator<Item = &'t Tag> + 't>(
mut builder: MetricBuilder<'m, '_, T>,
tags: V,
tags_iter: &mut V,
) -> anyhow::Result<()>
where
T: Metric + From<String>,
't: 'm,
{
let mut tags_iter = tags.into_iter();
let mut tag_opt = tags_iter.next();
while tag_opt.is_some() {
builder = builder.with_tag_value(tag_opt.unwrap().as_ref());
Expand Down Expand Up @@ -185,7 +182,7 @@ mod test {
_ = flusher.set_endpoint(Endpoint::from_slice(
socket.local_addr().unwrap().to_string().as_str(),
));
flusher.send(vec![
flusher.send(&vec![
Count("test_count", 3, vec![tag!("foo", "bar")]),
Count("test_neg_count", -2, vec![]),
Distribution("test_distribution", 4.2, vec![]),
Expand Down
2 changes: 1 addition & 1 deletion sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ impl SidecarInterface for SidecarServer {
tokio::spawn(async move {
self.get_session(&instance_id.session_id)
.get_dogstatsd()
.send(actions);
.send(&actions);
});

no_response()
Expand Down

0 comments on commit ef7adb3

Please sign in to comment.