From 09d1ef506d59c65cbe58f0a2e153d493716efaa3 Mon Sep 17 00:00:00 2001 From: vianney Date: Fri, 20 Sep 2024 11:09:58 +0200 Subject: [PATCH] [APMSP-1013] Add stats exporter (#584) * Move concentrator * Fix clippy warnings * Use helper function to ignore span * Make struct fields private * Change visibility to super * Move tests to a separate module * Add stats exporter and compute stats from traces * Send client side stats header to the agent * Drop chunks once stats have been computed * Add test for drop_chunks * Add client-side stats to headers * Fix clippy lint * Update LICENSE-3rdparty * Fix header tags test * Ignore shutdown test on miri * Add support for client computed top level * Add docs * Fix typos and remove TODOs * Apply suggestions Co-authored-by: Andrew Glaude * Update shutdown signature * Add support for timeout in shutdown * Add example to send traces and stats to an agent * Apply suggestions * Add doc --------- Co-authored-by: Andrew Glaude APMSP-1013 --- Cargo.lock | 8 +- LICENSE-3rdparty.yml | 4 +- data-pipeline-ffi/src/trace_exporter.rs | 27 +- data-pipeline/Cargo.toml | 8 +- .../examples/send-traces-with-stats.rs | 63 +++ data-pipeline/src/lib.rs | 2 + data-pipeline/src/span_concentrator/mod.rs | 1 - data-pipeline/src/stats_exporter.rs | 357 ++++++++++++ data-pipeline/src/trace_exporter.rs | 515 +++++++++++++++++- ddcommon/src/lib.rs | 2 + examples/ffi/trace_exporter.c | 9 + trace-utils/src/tracer_header_tags.rs | 29 +- 12 files changed, 991 insertions(+), 34 deletions(-) create mode 100644 data-pipeline/examples/send-traces-with-stats.rs create mode 100644 data-pipeline/src/stats_exporter.rs diff --git a/Cargo.lock b/Cargo.lock index 6019d1ae8..ff32eb484 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1342,15 +1342,19 @@ dependencies = [ "criterion", "datadog-ddsketch", "datadog-trace-normalization", + "datadog-trace-obfuscation", "datadog-trace-protobuf", "datadog-trace-utils", "ddcommon", "futures", + "httpmock", "hyper 0.14.28", "log", "rand", "rmp-serde", "tokio", + "tokio-util 0.7.11", + "uuid", ] [[package]] @@ -5873,9 +5877,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.8.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" dependencies = [ "getrandom", "serde", diff --git a/LICENSE-3rdparty.yml b/LICENSE-3rdparty.yml index c6afa00e2..08be29e5f 100644 --- a/LICENSE-3rdparty.yml +++ b/LICENSE-3rdparty.yml @@ -1,4 +1,4 @@ -root_name: datadog-alloc, builder, build_common, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-ddsketch, datadog-trace-normalization, datadog-trace-protobuf, datadog-trace-utils, ddcommon, tinybytes, ddcommon-ffi, datadog-crashtracker-ffi, datadog-crashtracker, ddtelemetry, datadog-profiling, ddtelemetry-ffi, symbolizer-ffi, tools, datadog-profiling-replayer, dogstatsd, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-sidecar, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, datadog-trace-obfuscation, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent +root_name: datadog-alloc, builder, build_common, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-ddsketch, datadog-trace-normalization, datadog-trace-protobuf, datadog-trace-obfuscation, datadog-trace-utils, ddcommon, tinybytes, ddcommon-ffi, datadog-crashtracker-ffi, datadog-crashtracker, ddtelemetry, datadog-profiling, ddtelemetry-ffi, symbolizer-ffi, tools, datadog-profiling-replayer, dogstatsd, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, spawn_worker, cc_utils, datadog-sidecar, datadog-remote-config, datadog-dynamic-configuration, datadog-sidecar-macros, datadog-sidecar-ffi, sidecar_mockgen, test_spawn_from_lib, datadog-serverless-trace-mini-agent, datadog-trace-mini-agent third_party_libraries: - package_name: addr2line package_version: 0.21.0 @@ -30830,7 +30830,7 @@ third_party_libraries: IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - package_name: uuid - package_version: 1.8.0 + package_version: 1.10.0 repository: https://github.com/uuid-rs/uuid license: Apache-2.0 OR MIT licenses: diff --git a/data-pipeline-ffi/src/trace_exporter.rs b/data-pipeline-ffi/src/trace_exporter.rs index 3786451bd..b4dbbf81c 100644 --- a/data-pipeline-ffi/src/trace_exporter.rs +++ b/data-pipeline-ffi/src/trace_exporter.rs @@ -8,7 +8,7 @@ use ddcommon_ffi::{ slice::{AsBytes, ByteSlice}, CharSlice, MaybeError, }; -use std::{ffi::c_char, ptr::NonNull}; +use std::{ffi::c_char, ptr::NonNull, time::Duration}; /// Create a new TraceExporter instance. /// @@ -20,6 +20,10 @@ use std::{ffi::c_char, ptr::NonNull}; /// * `language` - The language of the client library. /// * `language_version` - The version of the language of the client library. /// * `language_interpreter` - The interpreter of the language of the client library. +/// * `hostname` - The hostname of the application, used for stats aggregation +/// * `env` - The environment of the application, used for stats aggregation +/// * `version` - The version of the application, used for stats aggregation +/// * `service` - The service name of the application, used for stats aggregation /// * `input_format` - The input format of the traces. Setting this to Proxy will send the trace /// data to the Datadog Agent as is. /// * `output_format` - The output format of the traces to send to the Datadog Agent. If using the @@ -35,25 +39,38 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( language: CharSlice, language_version: CharSlice, language_interpreter: CharSlice, + hostname: CharSlice, + env: CharSlice, + version: CharSlice, + service: CharSlice, input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, + compute_stats: bool, agent_response_callback: extern "C" fn(*const c_char), ) -> MaybeError { let callback_wrapper = ResponseCallbackWrapper { response_callback: agent_response_callback, }; // TODO - handle errors - https://datadoghq.atlassian.net/browse/APMSP-1095 - let exporter = TraceExporter::builder() + let mut builder = TraceExporter::builder() .set_url(url.to_utf8_lossy().as_ref()) .set_tracer_version(tracer_version.to_utf8_lossy().as_ref()) .set_language(language.to_utf8_lossy().as_ref()) .set_language_version(language_version.to_utf8_lossy().as_ref()) .set_language_interpreter(language_interpreter.to_utf8_lossy().as_ref()) + .set_hostname(hostname.to_utf8_lossy().as_ref()) + .set_env(env.to_utf8_lossy().as_ref()) + .set_version(version.to_utf8_lossy().as_ref()) + .set_service(service.to_utf8_lossy().as_ref()) .set_input_format(input_format) .set_output_format(output_format) - .set_response_callback(Box::new(callback_wrapper)) - .build() - .unwrap(); + .set_response_callback(Box::new(callback_wrapper)); + if compute_stats { + builder = builder.enable_stats(Duration::from_secs(10)) + // TODO: APMSP-1317 Enable peer tags aggregation and stats by span_kind based on agent + // configuration + } + let exporter = builder.build().unwrap(); out_handle.as_ptr().write(Box::new(exporter)); MaybeError::None } diff --git a/data-pipeline/Cargo.toml b/data-pipeline/Cargo.toml index 556f38618..df5825917 100644 --- a/data-pipeline/Cargo.toml +++ b/data-pipeline/Cargo.toml @@ -16,13 +16,16 @@ hyper = {version = "0.14", features = ["client"], default-features = false} log = "0.4" rmp-serde = "1.1.1" bytes = "1.4" -tokio = {version = "1.23", features = ["rt"], default-features = false} +tokio = { version = "1.23", features = ["rt", "test-util", "time"], default-features = false } ddcommon = { path = "../ddcommon" } datadog-trace-protobuf = { path = "../trace-protobuf" } datadog-trace-utils = { path = "../trace-utils" } datadog-trace-normalization = { path = "../trace-normalization" } -datadog-ddsketch = { path = "../ddsketch"} +datadog-trace-obfuscation = { path = "../trace-obfuscation" } +datadog-ddsketch = { path = "../ddsketch" } +uuid = { version = "1.10.0", features = ["v4"] } +tokio-util = "0.7.11" [lib] bench = false @@ -33,5 +36,6 @@ harness = false path = "benches/main.rs" [dev-dependencies] +httpmock = "0.7.0" criterion = "0.5.1" rand = "0.8.5" diff --git a/data-pipeline/examples/send-traces-with-stats.rs b/data-pipeline/examples/send-traces-with-stats.rs new file mode 100644 index 000000000..31962f167 --- /dev/null +++ b/data-pipeline/examples/send-traces-with-stats.rs @@ -0,0 +1,63 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use data_pipeline::trace_exporter::{ + TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat, +}; +use datadog_trace_protobuf::pb; +use std::{ + collections::HashMap, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +fn get_span(now: i64, trace_id: u64, span_id: u64) -> pb::Span { + pb::Span { + trace_id, + span_id, + parent_id: span_id - 1, + duration: trace_id as i64 % 3 * 10_000_000 + span_id as i64 * 1_000_000, + start: now + trace_id as i64 * 1_000_000_000 + span_id as i64 * 1_000_000, + service: "data-pipeline-test".to_string(), + name: format!("test-name-{}", span_id % 2), + resource: format!("test-resource-{}", (span_id + trace_id) % 3), + error: if trace_id % 10 == 0 { 1 } else { 0 }, + metrics: HashMap::from([ + ("_sampling_priority_v1".to_string(), 1.0), + ("_dd.measured".to_string(), 1.0), + ]), + ..Default::default() + } +} + +fn main() { + let exporter = TraceExporter::builder() + .set_url("http://localhost:8126") + .set_hostname("test") + .set_env("testing") + .set_version(env!("CARGO_PKG_VERSION")) + .set_service("data-pipeline-test") + .set_tracer_version(env!("CARGO_PKG_VERSION")) + .set_language("rust") + .set_language_version(env!("CARGO_PKG_RUST_VERSION")) + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V07) + .enable_stats(Duration::from_secs(10)) + .build() + .unwrap(); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + + let mut traces = Vec::new(); + for trace_id in 1..=100 { + let mut trace = Vec::new(); + for span_id in 1..=1000 { + trace.push(get_span(now, trace_id, span_id)); + } + traces.push(trace); + } + let data = rmp_serde::to_vec_named(&traces).unwrap(); + exporter.send(&data, 100).unwrap(); + exporter.shutdown(None).unwrap(); +} diff --git a/data-pipeline/src/lib.rs b/data-pipeline/src/lib.rs index f0a1fcb61..9886087e4 100644 --- a/data-pipeline/src/lib.rs +++ b/data-pipeline/src/lib.rs @@ -9,4 +9,6 @@ #[allow(missing_docs)] pub mod span_concentrator; #[allow(missing_docs)] +pub mod stats_exporter; +#[allow(missing_docs)] pub mod trace_exporter; diff --git a/data-pipeline/src/span_concentrator/mod.rs b/data-pipeline/src/span_concentrator/mod.rs index e42de140c..e17aa7322 100644 --- a/data-pipeline/src/span_concentrator/mod.rs +++ b/data-pipeline/src/span_concentrator/mod.rs @@ -1,7 +1,6 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 //! This module implements the SpanConcentrator used to aggregate spans into stats -#![allow(dead_code)] // TODO: Remove once the trace exporter uses the SpanConcentrator use std::collections::HashMap; use std::time::{self, Duration, SystemTime}; diff --git a/data-pipeline/src/stats_exporter.rs b/data-pipeline/src/stats_exporter.rs new file mode 100644 index 000000000..51fb56172 --- /dev/null +++ b/data-pipeline/src/stats_exporter.rs @@ -0,0 +1,357 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, + }, + time, +}; + +use datadog_trace_protobuf::pb; +use ddcommon::{connector, tag::Tag, Endpoint}; +use hyper; +use tokio::select; +use tokio_util::sync::CancellationToken; + +use crate::span_concentrator::SpanConcentrator; + +const STATS_ENDPOINT_PATH: &str = "/v0.6/stats"; + +/// Metadata required in a ClientStatsPayload +#[derive(Debug, Default, Clone)] +pub struct LibraryMetadata { + pub hostname: String, + pub env: String, + pub version: String, + pub lang: String, + pub tracer_version: String, + pub runtime_id: String, + pub service: String, + pub container_id: String, + pub git_commit_sha: String, + /// Should be left empty by client, except for some specific environment + pub tags: Vec, +} + +/// An exporter that concentrates and sends stats to the agent +#[derive(Debug)] +pub struct StatsExporter { + flush_interval: time::Duration, + concentrator: Arc>, + endpoint: Endpoint, + meta: LibraryMetadata, + sequence_id: AtomicU64, + client: ddcommon::HttpClient, + cancellation_token: CancellationToken, +} + +impl StatsExporter { + /// Return a new StatsExporter + /// + /// - `flush_interval` the interval on which the concentrator is flushed + /// - `concentrator` SpanConcentrator storing the stats to be sent to the agent + /// - `meta` the metadata used when sending the ClientStatsPayload to the agent + /// - `endpoint` the Endpoint used to send stats to the agent + /// - `cancellation_token` Token used to safely shutdown the exporter by force flushing the + /// concentrator + pub fn new( + flush_interval: time::Duration, + concentrator: Arc>, + meta: LibraryMetadata, + endpoint: Endpoint, + cancellation_token: CancellationToken, + ) -> Self { + Self { + flush_interval, + concentrator, + endpoint, + meta, + sequence_id: AtomicU64::new(0), + client: hyper::Client::builder().build(connector::Connector::default()), + cancellation_token, + } + } + + /// Flush the stats stored in the concentrator and send them + /// + /// If the stats flushed from the concentrator contain at least one time bucket the stats are + /// sent to `self.endpoint`. The stats are serialized as msgpack. + /// + /// # Errors + /// The function will return an error in the following case: + /// - The endpoint failed to build + /// - The stats payload cannot be serialized as a valid http body + /// - The http client failed while sending the request + /// - The http status of the response is not 2xx + /// + /// # Panic + /// Will panic if another thread panicked while holding the concentrator lock in which + /// case stats cannot be flushed since the concentrator might be corrupted. + pub async fn send(&self, force_flush: bool) -> anyhow::Result<()> { + let payload = self.flush(force_flush); + if payload.stats.is_empty() { + return Ok(()); + } + let body = rmp_serde::encode::to_vec_named(&payload)?; + let req = self + .endpoint + .into_request_builder(concat!("Libdatadog/", env!("CARGO_PKG_VERSION")))? + .header( + hyper::header::CONTENT_TYPE, + ddcommon::header::APPLICATION_MSGPACK, + ) + .method(hyper::Method::POST) + .body(hyper::Body::from(body))?; + + let resp = self.client.request(req).await?; + + if !resp.status().is_success() { + anyhow::bail!( + "received {} status code from the agent", + resp.status().as_u16() + ); + } + Ok(()) + } + + /// Flush stats from the concentrator into a payload + /// + /// # Arguments + /// - `force_flush` if true, triggers a force flush on the concentrator causing all buckets to + /// be flushed regardless of their age. + /// + /// # Panic + /// Will panic if another thread panicked while holding the concentrator lock in which + /// case stats cannot be flushed since the concentrator might be corrupted. + fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload { + let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed); + encode_stats_payload( + self.meta.clone(), + sequence, + self.concentrator + .lock() + .unwrap() + .flush(time::SystemTime::now(), force_flush), + ) + } + + /// Run loop of the stats exporter + /// + /// Once started, the stats exporter will flush and send stats on every `self.flush_interval`. + /// If the `self.cancellation_token` is cancelled, the exporter will force flush all stats and + /// return. + pub async fn run(&mut self) { + loop { + select! { + _ = self.cancellation_token.cancelled() => { + let _ = self.send(true).await; + break; + }, + _ = tokio::time::sleep(self.flush_interval) => { + let _ = self.send(false).await; + }, + }; + } + } +} + +fn encode_stats_payload( + meta: LibraryMetadata, + sequence: u64, + buckets: Vec, +) -> pb::ClientStatsPayload { + pb::ClientStatsPayload { + hostname: meta.hostname, + env: meta.env, + lang: meta.lang, + version: meta.version, + runtime_id: meta.runtime_id, + tracer_version: meta.tracer_version, + service: meta.service, + container_id: meta.container_id, + git_commit_sha: meta.git_commit_sha, + tags: meta.tags.into_iter().map(|t| t.to_string()).collect(), + sequence, + stats: buckets, + // Agent-only field + agent_aggregation: String::new(), + image_tag: String::new(), + } +} + +/// Return the stats endpoint url to send stats to the agent at `agent_url` +pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result { + let mut parts = agent_url.parse::()?.into_parts(); + parts.path_and_query = Some(hyper::http::uri::PathAndQuery::from_static( + STATS_ENDPOINT_PATH, + )); + Ok(hyper::Uri::from_parts(parts)?) +} + +#[cfg(test)] +mod tests { + use super::*; + use datadog_trace_utils::trace_utils; + use httpmock::prelude::*; + use httpmock::MockServer; + use time::Duration; + use time::SystemTime; + + fn is_send() {} + fn is_sync() {} + + const BUCKETS_DURATION: Duration = Duration::from_secs(10); + + /// Fails to compile if stats exporter is not Send and Sync + #[test] + fn test_stats_exporter_sync_send() { + let _ = is_send::; + let _ = is_sync::; + } + + fn get_test_metadata() -> LibraryMetadata { + LibraryMetadata { + hostname: "libdatadog-test".into(), + env: "test".into(), + version: "0.0.0".into(), + lang: "rust".into(), + tracer_version: "0.0.0".into(), + runtime_id: "e39d6d12-0752-489f-b488-cf80006c0378".into(), + service: "stats_exporter_test".into(), + ..Default::default() + } + } + + fn get_test_concentrator() -> SpanConcentrator { + let mut concentrator = SpanConcentrator::new( + BUCKETS_DURATION, + // Make sure the oldest bucket will be flushed on next send + SystemTime::now() - BUCKETS_DURATION * 3, + false, + false, + vec![], + ); + let mut trace = vec![]; + + for i in 1..100 { + trace.push(pb::Span { + service: "libdatadog-test".to_string(), + duration: i, + ..Default::default() + }) + } + + trace_utils::compute_top_level_span(trace.as_mut_slice()); + + for span in trace.iter() { + concentrator.add_span(span); + } + concentrator + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_send_stats() { + let server = MockServer::start_async().await; + + let mock = server + .mock_async(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .path("/v0.6/stats") + .body_contains("libdatadog-test"); + then.status(200).body(""); + }) + .await; + + let stats_exporter = StatsExporter::new( + BUCKETS_DURATION, + Arc::new(Mutex::new(get_test_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + CancellationToken::new(), + ); + + let send_status = stats_exporter.send(true).await; + send_status.unwrap(); + + mock.assert_async().await; + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_run() { + let server = MockServer::start_async().await; + + let mock = server + .mock_async(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .path("/v0.6/stats") + .body_contains("libdatadog-test"); + then.status(200).body(""); + }) + .await; + + let mut stats_exporter = StatsExporter::new( + BUCKETS_DURATION, + Arc::new(Mutex::new(get_test_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + CancellationToken::new(), + ); + + tokio::time::pause(); + tokio::spawn(async move { + stats_exporter.run().await; + }); + // Wait for the stats to be flushed + tokio::time::sleep(BUCKETS_DURATION + Duration::from_secs(1)).await; + // Resume time to sleep while the stats are being sent + tokio::time::resume(); + tokio::time::sleep(Duration::from_millis(10)).await; + + mock.assert_async().await; + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn test_cancellation_token() { + let server = MockServer::start_async().await; + + let mock = server + .mock_async(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .path("/v0.6/stats") + .body_contains("libdatadog-test"); + then.status(200).body(""); + }) + .await; + + let buckets_duration = Duration::from_secs(10); + let cancellation_token = CancellationToken::new(); + + let mut stats_exporter = StatsExporter::new( + buckets_duration, + Arc::new(Mutex::new(get_test_concentrator())), + get_test_metadata(), + Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()), + cancellation_token.clone(), + ); + + tokio::time::pause(); + tokio::spawn(async move { + stats_exporter.run().await; + }); + // Cancel token to trigger force flush + cancellation_token.cancel(); + // Resume time to sleep while the stats are being sent + tokio::time::resume(); + tokio::time::sleep(Duration::from_millis(10)).await; + + mock.assert_async().await; + } +} diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index 4b2f7eb1f..236bd8d25 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -1,17 +1,22 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 - +use crate::{span_concentrator::SpanConcentrator, stats_exporter}; use bytes::Bytes; use datadog_trace_protobuf::pb; -use datadog_trace_utils::trace_utils::{self, SendData, TracerHeaderTags}; +use datadog_trace_utils::trace_utils::{ + self, compute_top_level_span, has_top_level, SendData, TracerHeaderTags, +}; use datadog_trace_utils::tracer_payload; use datadog_trace_utils::tracer_payload::TraceCollection; use ddcommon::{connector, Endpoint}; use hyper::http::uri::PathAndQuery; use hyper::{Body, Client, Method, Uri}; use log::error; -use std::{borrow::Borrow, collections::HashMap, str::FromStr}; -use tokio::runtime::Runtime; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use std::{borrow::Borrow, collections::HashMap, str::FromStr, time}; +use tokio::{runtime::Runtime, task::JoinHandle}; +use tokio_util::sync::CancellationToken; /// TraceExporterInputFormat represents the format of the input traces. /// The input format can be either Proxy or V0.4, where V0.4 is the default. @@ -76,11 +81,51 @@ fn add_path(url: &Uri, path: &str) -> Uri { Uri::from_parts(parts).unwrap() } +/// Remove spans and chunks only keeping the ones that may be sampled by the agent +fn drop_chunks(traces: &mut Vec>) { + traces.retain_mut(|chunk| { + let mut sampled_indexes = Vec::new(); + for (index, span) in chunk.iter().enumerate() { + if span.error == 1 { + // We send chunks containing an error + return true; + } + let priority = span.metrics.get("_sampling_priority_v1"); + if priority.is_some_and(|p| *p > 0.0) { + if has_top_level(span) { + // We send chunks with positive priority + return true; + } + // We send single spans with positive priority + sampled_indexes.push(index); + } else if priority.is_none() && has_top_level(span) { + // We send chunks with no priority + return true; + } else if span.metrics.contains_key("_dd.sr.eausr") { + // We send analyzed spans + sampled_indexes.push(index); + } + } + if sampled_indexes.is_empty() { + // If no spans were sampled we can drop the whole chunk + return false; + } + let sampled_spans = sampled_indexes + .iter() + .map(|i| std::mem::take(&mut chunk[*i])) + .collect(); + *chunk = sampled_spans; + true + }) +} + struct TracerTags { tracer_version: String, language: String, language_version: String, language_interpreter: String, + client_computed_stats: bool, + client_computed_top_level: bool, } impl<'a> From<&'a TracerTags> for TracerHeaderTags<'a> { @@ -90,6 +135,8 @@ impl<'a> From<&'a TracerTags> for TracerHeaderTags<'a> { lang_version: &tags.language_version, tracer_version: &tags.tracer_version, lang_interpreter: &tags.language_interpreter, + client_computed_stats: tags.client_computed_stats, + client_computed_top_level: tags.client_computed_top_level, ..Default::default() } } @@ -97,17 +144,37 @@ impl<'a> From<&'a TracerTags> for TracerHeaderTags<'a> { impl<'a> From<&'a TracerTags> for HashMap<&'static str, String> { fn from(tags: &'a TracerTags) -> HashMap<&'static str, String> { - TracerHeaderTags::<'_> { - lang: &tags.language, - lang_version: &tags.language_version, - tracer_version: &tags.tracer_version, - lang_interpreter: &tags.language_interpreter, - ..Default::default() - } - .into() + TracerHeaderTags::from(tags).into() } } +enum StatsComputationStatus { + StatsDisabled, + StatsEnabled { + stats_concentrator: Arc>, + cancellation_token: CancellationToken, + exporter_handle: JoinHandle<()>, + }, +} + +/// The TraceExporter ingest traces from the tracers serialized as messagepack and forward them to +/// the agent while applying some transformation. +/// +/// # Proxy +/// If the input format is set as `Proxy`, the exporter will forward traces to the agent without +/// deserializing them. +/// +/// # Features +/// When the input format is set to `V04` the TraceExporter will deserialize the traces and perform +/// some operation before sending them to the agent. The available operations are described below. +/// +/// ## V07 Serialization +/// The Trace exporter can serialize the traces to V07 before sending them to the agent. +/// +/// ## Stats computation +/// The Trace Exporter can compute stats on traces. In this case the trace exporter will start +/// another task to send stats when a time bucket expire. When this feature is enabled the +/// TraceExporter drops all spans that may not be sampled by the agent. #[allow(missing_docs)] pub struct TraceExporter { endpoint: Endpoint, @@ -117,6 +184,8 @@ pub struct TraceExporter { // TODO - do something with the response callback - https://datadoghq.atlassian.net/browse/APMSP-1019 _response_callback: Option>, runtime: Runtime, + client_computed_top_level: bool, + stats: StatsComputationStatus, } impl TraceExporter { @@ -125,6 +194,7 @@ impl TraceExporter { TraceExporterBuilder::default() } + /// Send msgpack serialized traces to the agent #[allow(missing_docs)] pub fn send(&self, data: &[u8], trace_count: usize) -> Result { match self.input_format { @@ -133,6 +203,37 @@ impl TraceExporter { } } + /// Safely shutdown the TraceExporter and all related tasks + pub fn shutdown(self, timeout: Option) -> Result<(), String> { + match self.stats { + StatsComputationStatus::StatsEnabled { + stats_concentrator: _, + cancellation_token: cancelation_token, + exporter_handle, + } => { + if let Some(timeout) = timeout { + match self.runtime.block_on(async { + tokio::time::timeout(timeout, async { + cancelation_token.cancel(); + let _ = exporter_handle.await; + }) + .await + }) { + Ok(_) => Ok(()), + Err(_) => Err("Shutdown timed out".to_string()), + } + } else { + self.runtime.block_on(async { + cancelation_token.cancel(); + let _ = exporter_handle.await; + }); + Ok(()) + } + } + StatsComputationStatus::StatsDisabled => Ok(()), + } + } + fn send_proxy(&self, data: &[u8], trace_count: usize) -> Result { self.send_data_to_url( data, @@ -197,10 +298,27 @@ impl TraceExporter { }) } + /// Add all spans from the given iterator into the stats concentrator + /// # Panic + /// Will panic if another thread panicked will holding the lock on `stats_concentrator` + fn add_spans_to_stats<'a>(&self, spans: impl Iterator) { + if let StatsComputationStatus::StatsEnabled { + stats_concentrator, + cancellation_token: _, + exporter_handle: _, + } = &self.stats + { + let mut stats_concentrator = stats_concentrator.lock().unwrap(); + for span in spans { + stats_concentrator.add_span(span); + } + } + } + fn send_deser_ser(&self, data: &[u8]) -> Result { let size = data.len(); // TODO base on input format - let traces: Vec> = match rmp_serde::from_slice(data) { + let mut traces: Vec> = match rmp_serde::from_slice(data) { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); @@ -213,6 +331,19 @@ impl TraceExporter { return Ok(String::from("{}")); } + // Stats computation + if let StatsComputationStatus::StatsEnabled { .. } = &self.stats { + if !self.client_computed_top_level { + for chunk in traces.iter_mut() { + compute_top_level_span(chunk); + } + } + self.add_spans_to_stats(traces.iter().flat_map(|trace| trace.iter())); + // Once stats have been computed we can drop all chunks that are not going to be + // sampled by the agent + drop_chunks(&mut traces); + } + let header_tags: TracerHeaderTags<'_> = (&self.tags).into(); match self.output_format { @@ -262,17 +393,32 @@ impl TraceExporter { } } +const DEFAULT_AGENT_URL: &str = "http://127.0.0.1:8126"; + #[allow(missing_docs)] #[derive(Default)] pub struct TraceExporterBuilder { url: Option, tracer_version: String, + hostname: String, + env: String, + version: String, + service: String, language: String, language_version: String, language_interpreter: String, input_format: TraceExporterInputFormat, output_format: TraceExporterOutputFormat, response_callback: Option>, + client_computed_stats: bool, + client_computed_top_level: bool, + + // Stats specific fields + /// A Some value enables stats-computation, None if it is disabled + stats_bucket_size: Option, + peer_tags_aggregation: bool, + compute_stats_by_span_kind: bool, + peer_tags: Vec, } impl TraceExporterBuilder { @@ -282,6 +428,26 @@ impl TraceExporterBuilder { self } + pub fn set_hostname(mut self, hostname: &str) -> Self { + hostname.clone_into(&mut self.hostname); + self + } + + pub fn set_env(mut self, env: &str) -> Self { + env.clone_into(&mut self.env); + self + } + + pub fn set_version(mut self, version: &str) -> Self { + version.clone_into(&mut self.version); + self + } + + pub fn set_service(mut self, service: &str) -> Self { + service.clone_into(&mut self.service); + self + } + #[allow(missing_docs)] pub fn set_tracer_version(mut self, tracer_version: &str) -> Self { tracer_version.clone_into(&mut self.tracer_version); @@ -324,24 +490,111 @@ impl TraceExporterBuilder { self } + /// Set the header indicating the tracer has computed the top-level tag + pub fn set_client_computed_top_level(mut self) -> Self { + self.client_computed_top_level = true; + self + } + + /// Set the header indicating the tracer has already computed stats. + /// This should not be used when stats computation is enabled. + pub fn set_client_computed_stats(mut self) -> Self { + self.client_computed_stats = true; + self + } + + /// Enable stats computation on traces sent through this exporter + pub fn enable_stats(mut self, bucket_size: time::Duration) -> Self { + self.stats_bucket_size = Some(bucket_size); + self + } + + /// Enable peer tags aggregation for stats computation (requires stats computation to be + /// enabled) + pub fn enable_stats_peer_tags_aggregation(mut self, peer_tags: Vec) -> Self { + self.peer_tags_aggregation = true; + self.peer_tags = peer_tags; + self + } + + /// Enable stats eligibility by span kind (requires stats computation to be + /// enabled) + pub fn enable_compute_stats_by_span_kind(mut self) -> Self { + self.compute_stats_by_span_kind = true; + self + } + #[allow(missing_docs)] - pub fn build(mut self) -> anyhow::Result { + pub fn build(self) -> anyhow::Result { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; + let mut stats = StatsComputationStatus::StatsDisabled; + + // Proxy mode does not support stats + if self.input_format != TraceExporterInputFormat::Proxy { + if let Some(bucket_size) = self.stats_bucket_size { + let stats_concentrator = Arc::new(Mutex::new(SpanConcentrator::new( + bucket_size, + time::SystemTime::now(), + self.peer_tags_aggregation, + self.compute_stats_by_span_kind, + self.peer_tags, + ))); + + let cancellation_token = CancellationToken::new(); + + let mut stats_exporter = stats_exporter::StatsExporter::new( + self.stats_bucket_size.unwrap(), + stats_concentrator.clone(), + stats_exporter::LibraryMetadata { + hostname: self.hostname, + env: self.env, + version: self.version, + lang: self.language.clone(), + tracer_version: self.tracer_version.clone(), + runtime_id: uuid::Uuid::new_v4().to_string(), + service: self.service, + ..Default::default() + }, + Endpoint::from_url(stats_exporter::stats_url_from_agent_url( + self.url.as_deref().unwrap_or(DEFAULT_AGENT_URL), + )?), + cancellation_token.clone(), + ); + + let exporter_handle = runtime.spawn(async move { + stats_exporter.run().await; + }); + + stats = StatsComputationStatus::StatsEnabled { + stats_concentrator, + cancellation_token, + exporter_handle, + } + } + } + Ok(TraceExporter { - endpoint: Endpoint::from_slice(self.url.as_deref().unwrap_or("http://127.0.0.1:8126")), + endpoint: Endpoint::from_slice(self.url.as_deref().unwrap_or(DEFAULT_AGENT_URL)), tags: TracerTags { - tracer_version: std::mem::take(&mut self.tracer_version), - language_version: std::mem::take(&mut self.language_version), - language_interpreter: std::mem::take(&mut self.language_interpreter), - language: std::mem::take(&mut self.language), + tracer_version: self.tracer_version, + language_version: self.language_version, + language_interpreter: self.language_interpreter, + language: self.language, + client_computed_stats: self.client_computed_stats + || self.stats_bucket_size.is_some(), + client_computed_top_level: self.client_computed_top_level + || self.stats_bucket_size.is_some(), /* Client side stats enforce client + * computed top level */ }, input_format: self.input_format, output_format: self.output_format, _response_callback: self.response_callback, + client_computed_top_level: self.client_computed_top_level, runtime, + stats, }) } } @@ -355,7 +608,10 @@ pub trait ResponseCallback { #[cfg(test)] mod tests { use super::*; + use httpmock::prelude::*; + use httpmock::MockServer; use std::collections::HashMap; + use time::Duration; #[test] fn new() { @@ -383,6 +639,7 @@ mod tests { assert_eq!(exporter.tags.language, "nodejs"); assert_eq!(exporter.tags.language_version, "1.0"); assert_eq!(exporter.tags.language_interpreter, "v8"); + assert!(!exporter.tags.client_computed_stats); } #[test] @@ -393,6 +650,7 @@ mod tests { .set_language("nodejs") .set_language_version("1.0") .set_language_interpreter("v8") + .enable_stats(Duration::from_secs(10)) .build() .unwrap(); @@ -408,6 +666,7 @@ mod tests { assert_eq!(exporter.tags.language, "nodejs"); assert_eq!(exporter.tags.language_version, "1.0"); assert_eq!(exporter.tags.language_interpreter, "v8"); + assert!(exporter.tags.client_computed_stats); } #[test] @@ -417,6 +676,8 @@ mod tests { language: "rust".to_string(), language_version: "1.52.1".to_string(), language_interpreter: "rustc".to_string(), + client_computed_stats: true, + client_computed_top_level: true, }; let tracer_header_tags: TracerHeaderTags = (&tracer_tags).into(); @@ -425,6 +686,8 @@ mod tests { assert_eq!(tracer_header_tags.lang, "rust"); assert_eq!(tracer_header_tags.lang_version, "1.52.1"); assert_eq!(tracer_header_tags.lang_interpreter, "rustc"); + assert!(tracer_header_tags.client_computed_stats); + assert!(tracer_header_tags.client_computed_top_level); } #[test] @@ -434,6 +697,8 @@ mod tests { language: "rust".to_string(), language_version: "1.52.1".to_string(), language_interpreter: "rustc".to_string(), + client_computed_stats: true, + client_computed_top_level: true, }; let hashmap: HashMap<&'static str, String> = (&tracer_tags).into(); @@ -445,5 +710,217 @@ mod tests { hashmap.get("datadog-meta-lang-interpreter").unwrap(), "rustc" ); + assert!(hashmap.contains_key("datadog-client-computed-stats")); + assert!(hashmap.contains_key("datadog-client-computed-top-level")); + } + + #[test] + fn test_drop_chunks() { + let chunk_with_priority = vec![ + pb::Span { + span_id: 1, + metrics: HashMap::from([ + ("_sampling_priority_v1".to_string(), 1.0), + ("_dd.top_level".to_string(), 1.0), + ]), + ..Default::default() + }, + pb::Span { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_with_null_priority = vec![ + pb::Span { + span_id: 1, + metrics: HashMap::from([ + ("_sampling_priority_v1".to_string(), 0.0), + ("_dd.top_level".to_string(), 1.0), + ]), + ..Default::default() + }, + pb::Span { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_without_priority = vec![ + pb::Span { + span_id: 1, + metrics: HashMap::from([("_dd.top_level".to_string(), 1.0)]), + ..Default::default() + }, + pb::Span { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_with_error = vec![ + pb::Span { + span_id: 1, + error: 1, + metrics: HashMap::from([ + ("_sampling_priority_v1".to_string(), 0.0), + ("_dd.top_level".to_string(), 1.0), + ]), + ..Default::default() + }, + pb::Span { + span_id: 2, + parent_id: 1, + ..Default::default() + }, + ]; + let chunk_with_a_single_span = vec![ + pb::Span { + span_id: 1, + metrics: HashMap::from([ + ("_sampling_priority_v1".to_string(), 0.0), + ("_dd.top_level".to_string(), 1.0), + ]), + ..Default::default() + }, + pb::Span { + span_id: 2, + parent_id: 1, + metrics: HashMap::from([("_sampling_priority_v1".to_string(), 1.0)]), + ..Default::default() + }, + ]; + let chunk_with_analyzed_span = vec![ + pb::Span { + span_id: 1, + metrics: HashMap::from([ + ("_sampling_priority_v1".to_string(), 0.0), + ("_dd.top_level".to_string(), 1.0), + ]), + ..Default::default() + }, + pb::Span { + span_id: 2, + parent_id: 1, + metrics: HashMap::from([("_dd.sr.eausr".to_string(), 1.0)]), + ..Default::default() + }, + ]; + + let chunks_and_expected_sampled_spans = vec![ + (chunk_with_priority, 2), + (chunk_with_null_priority, 0), + (chunk_without_priority, 2), + (chunk_with_error, 2), + (chunk_with_a_single_span, 1), + (chunk_with_analyzed_span, 1), + ]; + + for (chunk, expected_count) in chunks_and_expected_sampled_spans.into_iter() { + let mut traces = vec![chunk]; + drop_chunks(&mut traces); + if expected_count == 0 { + assert!(traces.is_empty()); + } else { + println!("{:?}", traces[0]); + assert_eq!(traces[0].len(), expected_count); + println!("----") + } + } + } + + #[cfg_attr(miri, ignore)] + #[test] + fn test_shutdown() { + let server = MockServer::start(); + + let mock_traces = server.mock(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .path("/v0.7/traces"); + then.status(200).body(""); + }); + + let mock_stats = server.mock(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .path("/v0.6/stats"); + then.status(200).body(""); + }); + let builder = TraceExporterBuilder::default(); + let exporter = builder + .set_url(&server.url("/")) + .set_tracer_version("v0.1") + .set_language("nodejs") + .set_language_version("1.0") + .set_language_interpreter("v8") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V07) + .enable_stats(Duration::from_secs(10)) + .build() + .unwrap(); + + let mut trace_chunk = vec![pb::Span { + duration: 10, + ..Default::default() + }]; + + trace_utils::compute_top_level_span(&mut trace_chunk); + + let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap(); + + exporter.send(data.as_slice(), 1).unwrap(); + exporter.shutdown(None).unwrap(); + + mock_traces.assert(); + mock_stats.assert(); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn test_shutdown_with_timeout() { + let server = MockServer::start(); + + let mock_traces = server.mock(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .path("/v0.7/traces"); + then.status(200).body(""); + }); + + let _mock_stats = server.mock(|when, then| { + when.method(POST) + .header("Content-type", "application/msgpack") + .path("/v0.6/stats"); + then.delay(Duration::from_secs(10)).status(200).body(""); + }); + let builder = TraceExporterBuilder::default(); + let exporter = builder + .set_url(&server.url("/")) + .set_tracer_version("v0.1") + .set_language("nodejs") + .set_language_version("1.0") + .set_language_interpreter("v8") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_format(TraceExporterOutputFormat::V07) + .enable_stats(Duration::from_secs(10)) + .build() + .unwrap(); + + let mut trace_chunk = vec![pb::Span { + duration: 10, + ..Default::default() + }]; + + trace_utils::compute_top_level_span(&mut trace_chunk); + + let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap(); + + exporter.send(data.as_slice(), 1).unwrap(); + exporter + .shutdown(Some(Duration::from_millis(500))) + .unwrap_err(); // The shutdown should timeout + + mock_traces.assert(); } } diff --git a/ddcommon/src/lib.rs b/ddcommon/src/lib.rs index 03f5056ee..c0243ed02 100644 --- a/ddcommon/src/lib.rs +++ b/ddcommon/src/lib.rs @@ -25,8 +25,10 @@ pub mod header { pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id"); pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id"); pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env"); + pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count"); pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key"); pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json"); + pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static("application/msgpack"); pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName = HeaderName::from_static("x-datadog-test-session-token"); } diff --git a/examples/ffi/trace_exporter.c b/examples/ffi/trace_exporter.c index c8cfb9993..055d4748f 100644 --- a/examples/ffi/trace_exporter.c +++ b/examples/ffi/trace_exporter.c @@ -29,6 +29,10 @@ int main(int argc, char** argv) ddog_CharSlice language = DDOG_CHARSLICE_C("dotnet"); ddog_CharSlice language_version = DDOG_CHARSLICE_C("10.0"); ddog_CharSlice language_interpreter = DDOG_CHARSLICE_C("X"); + ddog_CharSlice hostname = DDOG_CHARSLICE_C("host1"); + ddog_CharSlice env = DDOG_CHARSLICE_C("staging"); + ddog_CharSlice version = DDOG_CHARSLICE_C("1.0"); + ddog_CharSlice service = DDOG_CHARSLICE_C("test_app"); TRY(ddog_trace_exporter_new( &trace_exporter, url, @@ -36,8 +40,13 @@ int main(int argc, char** argv) language, language_version, language_interpreter, + hostname, + env, + version, + service, DDOG_TRACE_EXPORTER_INPUT_FORMAT_PROXY, DDOG_TRACE_EXPORTER_OUTPUT_FORMAT_V04, + true, &agent_response_callback )); diff --git a/trace-utils/src/tracer_header_tags.rs b/trace-utils/src/tracer_header_tags.rs index 9c1228159..e1bca47cc 100644 --- a/trace-utils/src/tracer_header_tags.rs +++ b/trace-utils/src/tracer_header_tags.rs @@ -52,6 +52,22 @@ impl<'a> From> for HashMap<&'static str, String> { tags.tracer_version.to_string(), ), ("datadog-container-id", tags.container_id.to_string()), + ( + "datadog-client-computed-stats", + if tags.client_computed_stats { + "true".to_string() + } else { + String::new() + }, + ), + ( + "datadog-client-computed-top-level", + if tags.client_computed_top_level { + "true".to_string() + } else { + String::new() + }, + ), ]); headers.retain(|_, v| !v.is_empty()); headers @@ -96,13 +112,13 @@ mod tests { lang_vendor: "vendor", tracer_version: "1.0", container_id: "id", - client_computed_top_level: false, - client_computed_stats: false, + client_computed_top_level: true, + client_computed_stats: true, }; let map: HashMap<&'static str, String> = header_tags.into(); - assert_eq!(map.len(), 6); + assert_eq!(map.len(), 8); assert_eq!(map.get("datadog-meta-lang").unwrap(), "test-lang"); assert_eq!(map.get("datadog-meta-lang-version").unwrap(), "2.0"); assert_eq!( @@ -112,6 +128,11 @@ mod tests { assert_eq!(map.get("datadog-meta-lang-vendor").unwrap(), "vendor"); assert_eq!(map.get("datadog-meta-tracer-version").unwrap(), "1.0"); assert_eq!(map.get("datadog-container-id").unwrap(), "id"); + assert_eq!( + map.get("datadog-client-computed-top-level").unwrap(), + "true" + ); + assert_eq!(map.get("datadog-client-computed-stats").unwrap(), "true"); } #[test] fn tags_to_hashmap_empty_value() { @@ -138,6 +159,8 @@ mod tests { assert_eq!(map.get("datadog-meta-lang-vendor").unwrap(), "vendor"); assert_eq!(map.get("datadog-meta-tracer-version").unwrap(), "1.0"); assert_eq!(map.get("datadog-container-id"), None); + assert_eq!(map.get("datadog-client-computed-top-level"), None); + assert_eq!(map.get("datadog-client-computed-stats"), None); } #[test]