diff --git a/Cargo.lock b/Cargo.lock index e30afe577..1f10156cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2250,7 +2250,7 @@ dependencies = [ "csv-core", "derivative", "dyn-clone", - "faster-hex", + "faster-hex 0.8.1", "futures 0.3.29", "indoc", "memchr", @@ -3506,6 +3506,15 @@ dependencies = [ "serde", ] +[[package]] +name = "faster-hex" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2a2b11eda1d40935b26cf18f6833c526845ae8c41e58d09af6adeb6f0269183" +dependencies = [ + "serde", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -4032,9 +4041,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" dependencies = [ "bytes 1.5.0", "fnv", @@ -4042,7 +4051,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 1.9.3", + "indexmap 2.1.0", "slab", "tokio", "tokio-util", @@ -6347,6 +6356,21 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + [[package]] name = "opentelemetry-proto" version = "0.1.0" @@ -6364,6 +6388,21 @@ dependencies = [ "vrl", ] +[[package]] +name = "opentelemetry-proto" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" +dependencies = [ + "hex", + "opentelemetry", + "opentelemetry_sdk", + "prost 0.12.1", + "schemars", + "serde", + "tonic 0.11.0", +] + [[package]] name = "opentelemetry-rs" version = "1.2.1" @@ -6378,6 +6417,27 @@ dependencies = [ "walkdir", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "ordered-float 4.1.1", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -8340,6 +8400,30 @@ dependencies = [ "parking_lot", ] +[[package]] +name = "schemars" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45a28f4c49489add4ce10783f7911893516f15afe45d015608d41faca6bc4d29" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c767fd6fa65d9ccf9cf026122c1b555f2ef9a4f0cea69da4d7dbc3e258d30967" +dependencies = [ + "proc-macro2 1.0.69", + "quote 1.0.33", + "serde_derive_internals 0.26.0", + "syn 1.0.109", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -8498,6 +8582,17 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "serde_derive_internals" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85bf8229e7920a9f636479437026331ce11aa132b4dde37d121944a44d6e5f3c" +dependencies = [ + "proc-macro2 1.0.69", + "quote 1.0.33", + "syn 1.0.109", +] + [[package]] name = "serde_derive_internals" version = "0.29.0" @@ -9760,6 +9855,33 @@ dependencies = [ "tracing 0.1.40", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.5", + "bytes 1.5.0", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost 0.12.1", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing 0.1.40", +] + [[package]] name = "tonic-build" version = "0.9.2" @@ -10482,6 +10604,7 @@ dependencies = [ "enum_dispatch", "exitcode", "fakedata", + "faster-hex 0.9.0", "flate2", "futures 0.3.29", "futures-util", @@ -10538,6 +10661,9 @@ dependencies = [ "openssl", "openssl-probe", "openssl-src", + "opentelemetry", + "opentelemetry-proto 0.5.0", + "opentelemetry_sdk", "ordered-float 4.1.1", "paste", "percent-encoding", @@ -10766,7 +10892,7 @@ dependencies = [ "proc-macro2 1.0.69", "quote 1.0.33", "serde", - "serde_derive_internals", + "serde_derive_internals 0.29.0", "syn 2.0.39", "vector-config", "vector-config-common", @@ -10866,7 +10992,7 @@ dependencies = [ "codecs", "enrichment", "file-source", - "opentelemetry-proto", + "opentelemetry-proto 0.1.0", "prometheus-parser", "vector-api-client", "vector-buffers", diff --git a/Cargo.toml b/Cargo.toml index 918e8af62..922ae6424 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -343,6 +343,12 @@ mlua = { version = "0.9.1", default-features = false, features = ["lua54", "send # MEZMO: added dependency for s3-sink file consolidation gethostname = "0.4.3" +# Opentelemetry Dependencies +opentelemetry = { version = "0.22", features = ["trace", "logs", "metrics", "logs_level_enabled"] } +opentelemetry_sdk = { version = "0.22", features = ["trace", "logs", "logs_level_enabled", "metrics"] } +opentelemetry-proto = { version = "0.5", features = ["gen-tonic", "gen-tonic-messages", "hex", "logs", "metrics", "prost", "schemars", "serde", "tonic", "trace", "with-schemars", "with-serde", "zpages"] } +faster-hex = "0.9.0" + [target.'cfg(windows)'.dependencies] windows-service = "0.6.0" @@ -763,6 +769,7 @@ sinks-logs-mezmo = [ "sinks-sumo_logic", "sinks-vector", "sinks-postgresql", + "sinks-opentelemetry", ] sinks-metrics = [ "sinks-appsignal", @@ -841,6 +848,7 @@ sinks-vector = ["sinks-utils-udp", "dep:tonic", "protobuf-build"] sinks-websocket = ["dep:tokio-tungstenite"] sinks-sumo_logic = [] sinks-webhdfs = ["dep:opendal"] +sinks-opentelemetry = [] # Datadog integration enterprise = [ @@ -898,6 +906,7 @@ all-integration-tests = [ "sumo-logic-integration-tests", "dnstap-integration-tests", "webhdfs-integration-tests", + "opentelemetry-sink-integration-tests", ] amqp-integration-tests = ["sources-amqp", "sinks-amqp"] @@ -962,6 +971,7 @@ pulsar-integration-tests = ["sinks-pulsar"] redis-integration-tests = ["sinks-redis", "sources-redis"] splunk-integration-tests = ["sinks-splunk_hec"] sumo-logic-integration-tests = ["sinks-sumo_logic"] +opentelemetry-sink-integration-tests = ["sinks-opentelemetry"] dnstap-integration-tests = ["sources-dnstap", "dep:bollard"] webhdfs-integration-tests = ["sinks-webhdfs"] disable-resolv-conf = [] diff --git a/lib/codecs/src/decoding/format/mezmo/open_telemetry/log_parser.rs b/lib/codecs/src/decoding/format/mezmo/open_telemetry/log_parser.rs index 75c6c4c26..f05a3371d 100644 --- a/lib/codecs/src/decoding/format/mezmo/open_telemetry/log_parser.rs +++ b/lib/codecs/src/decoding/format/mezmo/open_telemetry/log_parser.rs @@ -1,6 +1,5 @@ use std::borrow::Cow; use std::collections::BTreeMap; -use std::time::SystemTime; use vector_core::event::metric::mezmo::IntoValue; use smallvec::SmallVec; @@ -12,8 +11,10 @@ use vector_core::{ event::{Event, EventMetadata, LogEvent, Value}, }; +use vector_common::btreemap; + use crate::decoding::format::mezmo::open_telemetry::{ - DeserializerError, OpenTelemetryAnyValue, OpenTelemetryKeyValue, + nano_to_timestamp, DeserializerError, OpenTelemetryAnyValue, OpenTelemetryKeyValue, }; pub fn parse_logs_request(bytes: &[u8]) -> vector_common::Result> { @@ -28,7 +29,6 @@ pub fn parse_logs_request(bytes: &[u8]) -> vector_common::Result SmallVec<[Event; 1]> { @@ -41,7 +41,6 @@ pub fn to_events(log_request: ExportLogsServiceRequest) -> SmallVec<[Event; 1]> log_request.resource_logs.into_iter().fold( SmallVec::with_capacity(log_count), |mut acc, resource_logs| { - // process resource let resource = resource_logs.resource; let mut resource_attrs = Value::from(BTreeMap::new()); @@ -52,104 +51,69 @@ pub fn to_events(log_request: ExportLogsServiceRequest) -> SmallVec<[Event; 1]> .to_value(); } - // done changing resource_attrs let resource_attrs = resource_attrs; for scope_logs in resource_logs.scope_logs.into_iter() { - // Scope attributes - let mut scope_attrs = Value::from(BTreeMap::new()); - if let Some(scope) = &scope_logs.scope { - scope_attrs = (OpenTelemetryKeyValue { - attributes: scope.attributes.clone(), - }) - .to_value(); + let mut scope: BTreeMap = BTreeMap::new(); + if let Some(s) = &scope_logs.scope { + let attributes = OpenTelemetryKeyValue { + attributes: s.attributes.clone(), + }; + + scope = btreemap! { + "name" => Value::from(s.name.clone()), + "version" => Value::from(s.version.clone()), + "attributes" => attributes.to_value(), + } } + scope.insert("schema_url".into(), Value::from(scope_logs.schema_url)); - // done changing scope_attrs - let scope_attrs = scope_attrs; + let scope = Value::from(scope); for log_record in scope_logs.log_records.into_iter() { - // Assemble metadata - let mut metadata = BTreeMap::new(); + let attributes = OpenTelemetryKeyValue { + attributes: log_record.attributes, + }; - metadata.insert("resource".to_string(), resource_attrs.clone()); - metadata.insert("scope".to_string(), scope_attrs.clone()); + let time = nano_to_timestamp(log_record.time_unix_nano); + + let mut metadata = btreemap! { + "resource" => resource_attrs.clone(), + "scope" => scope.clone(), + "time" => time.clone(), + "observed_timestamp" => nano_to_timestamp(log_record.observed_time_unix_nano), + "severity_number" => log_record.severity_number as i32, + "trace_id" => faster_hex::hex_string(&log_record.trace_id), + "span_id" => faster_hex::hex_string(&log_record.span_id), + "flags" => log_record.flags, + "attributes" => attributes.to_value(), + }; - // "time":"2023-10-31T13:32:42.240772879-04:00", - let time_unix_millis = Value::from(if log_record.time_unix_nano == 0 { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map(|t| t.as_millis()) - .unwrap_or(0) - .try_into() - .unwrap_or(u64::MAX) - } else { - log_record.time_unix_nano / NANOS_IN_MILLIS - }); - metadata.insert("time".to_string(), Value::from(time_unix_millis.clone())); - // "observed_timestamp": "2023-10-31T13:32:42.240772879-04:00", - if log_record.observed_time_unix_nano != 0 { - metadata.insert( - "observed_timestamp".to_string(), - Value::from(log_record.observed_time_unix_nano / NANOS_IN_MILLIS), - ); - } - // "severity_text": "ERROR", let sev = log_record.severity_text; if !sev.is_empty() { - metadata.insert("severity_text".to_string(), Value::from(sev.clone())); + metadata.insert("severity_text".into(), Value::from(sev.clone())); metadata.insert( - "level".to_string(), + "level".into(), Value::from(Cow::from( &sev[..std::cmp::min(sev.len(), MAX_LOG_LEVEL_LEN)], )), ); } - // "severity_number": 17, - metadata.insert( - "severity_number".to_string(), - Value::from(log_record.severity_number as i32), - ); - // "trace_id": "0x5b8aa5a2d2c872e8321cf37308d69df2", - metadata.insert( - "trace_id".to_string(), - Value::from(faster_hex::hex_string(&log_record.trace_id)), - ); - // "span_id": "0x051581bf3cb55c13", - metadata.insert( - "span_id".to_string(), - Value::from(faster_hex::hex_string(&log_record.span_id)), - ); - // "trace_flags": "00", - metadata.insert("flags".to_string(), Value::from(log_record.flags)); - - // LogRecord attributes - let attributes = OpenTelemetryKeyValue { - attributes: log_record.attributes, - }; - metadata.insert("attributes".to_string(), attributes.to_value()); let line = match log_record.body { Some(av) => OpenTelemetryAnyValue { value: av }.to_value(), None => Value::Null, }; - let log_line = BTreeMap::from_iter([ - // Add the user metadata - ( - log_schema().user_metadata_key().to_string(), - Value::from(metadata), - ), - // Add the actual line - (log_schema().message_key().unwrap().to_string(), line), - ]); + let message_key = log_schema().message_key().unwrap().to_string(); - // Wrap line in mezmo format - let mut log_event = LogEvent::from_map(log_line, EventMetadata::default()); + let mut log_event = LogEvent::from_map(btreemap! { + log_schema().user_metadata_key() => Value::Object(metadata), + message_key.as_str() => line, + }, EventMetadata::default()); if let Some(timestamp_key) = log_schema().timestamp_key() { - log_event - .insert((lookup::PathPrefix::Event, timestamp_key), time_unix_millis); + log_event.insert((lookup::PathPrefix::Event, timestamp_key), time); } acc.insert(i, Event::Log(log_event)); @@ -165,6 +129,7 @@ pub fn to_events(log_request: ExportLogsServiceRequest) -> SmallVec<[Event; 1]> mod tests { use super::*; + use chrono::{NaiveDateTime, TimeZone, Utc}; use std::borrow::Cow; use std::collections::BTreeMap; use std::ops::Deref; @@ -241,11 +206,21 @@ mod tests { ("flags".into(), Value::from(1)), ( "observed_timestamp".into(), - Value::from(1_579_134_612_000_000_011_i64 / 1_000_000) + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(1_579_134_612_i64, 11_u32) + .expect("timestamp should be a valid timestamp"), + ) + ) ), ( "time".into(), - Value::from(1_579_134_612_000_000_011_i64 / 1_000_000) + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(1_579_134_612_i64, 11_u32) + .expect("timestamp should be a valid timestamp"), + ) + ) ), ("severity_number".into(), 1.into()), ("severity_text".into(), "ERROR".into()), @@ -262,8 +237,16 @@ mod tests { ( "scope".into(), Value::Object(BTreeMap::from([ - ("foo".into(), "bar".into()), - ("empty".into(), Value::Null), + ("name".into(), "test_name".into()), + ("schema_url".into(), "https://some_url.com".into()), + ("version".into(), "1.2.3".into()), + ( + "attributes".into(), + Value::Object(BTreeMap::from([ + ("foo".into(), "bar".into()), + ("empty".into(), Value::Null), + ])) + ) ])) ), ])) diff --git a/lib/codecs/src/decoding/format/mezmo/open_telemetry/metric_parser.rs b/lib/codecs/src/decoding/format/mezmo/open_telemetry/metric_parser.rs index 5e31f3669..f3cd9c9b2 100644 --- a/lib/codecs/src/decoding/format/mezmo/open_telemetry/metric_parser.rs +++ b/lib/codecs/src/decoding/format/mezmo/open_telemetry/metric_parser.rs @@ -1,4 +1,3 @@ -use chrono::{NaiveDateTime, TimeZone, Utc}; use smallvec::SmallVec; use std::borrow::Cow; @@ -24,9 +23,12 @@ use vector_core::{ }, }; -use crate::decoding::format::mezmo::open_telemetry::{DeserializerError, OpenTelemetryKeyValue}; +use vector_common::btreemap; + +use crate::decoding::format::mezmo::open_telemetry::{ + nano_to_timestamp, DeserializerError, OpenTelemetryKeyValue, +}; -const NANO_RATIO: i64 = 1_000_000_000; const METRIC_TIMESTAMP_KEY: &str = "message.value.value.time_unix_nano"; #[derive(Debug, Default, PartialEq)] @@ -772,36 +774,32 @@ impl IntoValue for ScopeMetricValue<'_> { attributes: self.attributes.as_ref().unwrap().clone(), }; - let name = self.name.as_ref().unwrap().to_string(); - let version = self.version.as_ref().unwrap().to_string(); + let name = if let Some(name) = self.name.as_ref() { + if !name.to_string().is_empty() { + Value::from(name.to_string()) + } else { + Value::Null + } + } else { + Value::Null + }; - Value::Object( - [ - ( - "name".to_owned(), - if !name.is_empty() { - name.into() - } else { - Value::Null - }, - ), - ( - "version".to_owned(), - if !version.is_empty() { - version.into() - } else { - Value::Null - }, - ), - ("attributes".to_owned(), attributes.to_value()), - ( - "dropped_attributes_count".to_owned(), - self.dropped_attributes_count.into(), - ), - ] - .into_iter() - .collect(), - ) + let version = if let Some(version) = self.version.as_ref() { + if !version.to_string().is_empty() { + Value::from(version.to_string()) + } else { + Value::Null + } + } else { + Value::Null + }; + + Value::Object(btreemap! { + "name" => name, + "version" => version, + "attributes" => attributes.to_value(), + "dropped_attributes_count" => self.dropped_attributes_count, + }) } } @@ -832,17 +830,10 @@ impl<'a> ResourceMetricValue<'a> { impl IntoValue for ResourceMetricValue<'_> { fn to_value(&self) -> Value { - Value::Object( - [ - ("attributes".into(), self.attributes.to_value()), - ( - "dropped_attributes_count".into(), - self.dropped_attributes_count.into(), - ), - ] - .into_iter() - .collect(), - ) + Value::Object(btreemap! { + "attributes" => self.attributes.to_value(), + "dropped_attributes_count" => self.dropped_attributes_count, + }) } } @@ -857,14 +848,10 @@ impl IntoValue for QuantileValuesMetricValue { self.quantile_values .iter() .map(|quantile_value| { - Value::Object( - [ - ("quantile".into(), from_f64_or_zero(quantile_value.quantile)), - ("value".into(), from_f64_or_zero(quantile_value.value)), - ] - .into_iter() - .collect(), - ) + Value::Object(btreemap! { + "quantile" => from_f64_or_zero(quantile_value.quantile), + "value" => from_f64_or_zero(quantile_value.value), + }) }) .collect(), ) @@ -892,23 +879,13 @@ impl IntoValue for ExemplarsMetricValue<'_> { attributes: exemplar.filtered_attributes.clone(), }; - Value::Object( - [ - ("filtered_attributes".into(), filtered_attributes.to_value()), - ("value".into(), exemplar_value), - ("time_unix_nano".into(), exemplar.time_unix_nano.into()), - ( - "span_id".into(), - Value::from(faster_hex::hex_string(&exemplar.span_id)), - ), - ( - "trace_id".into(), - Value::from(faster_hex::hex_string(&exemplar.trace_id)), - ), - ] - .into_iter() - .collect(), - ) + Value::Object(btreemap! { + "filtered_attributes" => filtered_attributes.to_value(), + "value" => exemplar_value, + "time_unix_nano" => exemplar.time_unix_nano, + "span_id" => faster_hex::hex_string(&exemplar.span_id), + "trace_id" => faster_hex::hex_string(&exemplar.trace_id), + }) }) .collect(), ) @@ -938,22 +915,15 @@ impl DataPointBucketsMetricValue { impl IntoValue for DataPointBucketsMetricValue { fn to_value(&self) -> Value { - Value::Object( - [ - ("offset".to_owned(), self.offset.into()), - ( - "bucket_counts".to_owned(), - Value::Array( - self.bucket_counts - .iter() - .map(|count| Value::from(*count)) - .collect(), - ), - ), - ] - .into_iter() - .collect(), - ) + Value::Object(btreemap! { + "offset" => self.offset, + "bucket_counts" => Value::Array( + self.bucket_counts + .iter() + .map(|count| Value::from(*count)) + .collect(), + ), + }) } } @@ -1201,17 +1171,11 @@ pub fn to_events(metric_request: ExportMetricsServiceRequest) -> SmallVec<[Event fn make_event(mut log_event: LogEvent) -> Event { if let Some(timestamp_key) = log_schema().timestamp_key() { let metric_timestamp_target = (lookup::PathPrefix::Event, METRIC_TIMESTAMP_KEY); - let timestamp = match log_event.get(metric_timestamp_target) { - Some(ts) => { - let ts = ts.as_integer().unwrap(); - let ms: i64 = ts / NANO_RATIO; - let nanos: u32 = (ts % NANO_RATIO) as u32; - Utc.from_utc_datetime( - &NaiveDateTime::from_timestamp_opt(ms, nanos) - .expect("timestamp should be a valid timestamp"), - ) - } - None => Utc::now(), + + let timestamp = if let Some(Value::Integer(time)) = log_event.get(metric_timestamp_target) { + nano_to_timestamp(time.to_owned().try_into().unwrap_or(0)) + } else { + nano_to_timestamp(0) }; log_event.insert((lookup::PathPrefix::Event, timestamp_key), timestamp); diff --git a/lib/codecs/src/decoding/format/mezmo/open_telemetry/mod.rs b/lib/codecs/src/decoding/format/mezmo/open_telemetry/mod.rs index 935aca00b..69a398edd 100644 --- a/lib/codecs/src/decoding/format/mezmo/open_telemetry/mod.rs +++ b/lib/codecs/src/decoding/format/mezmo/open_telemetry/mod.rs @@ -3,6 +3,7 @@ mod metric_parser; mod trace_parser; use bytes::Bytes; +use chrono::{NaiveDateTime, TimeZone, Utc}; use opentelemetry_rs::opentelemetry::common::{AnyValue, AnyValueOneOfvalue, KeyValue}; use std::borrow::Cow; @@ -28,6 +29,20 @@ use vrl::value::Kind; use opentelemetry_rs::Error as OpenTelemetryError; const MAX_METADATA_SIZE: usize = 32 * 1024; +const NANO_RATIO: u64 = 1_000_000_000; + +pub fn nano_to_timestamp(time_unix_nano: u64) -> Value { + Value::Timestamp(if time_unix_nano > 0 { + let ms: i64 = (time_unix_nano / NANO_RATIO).try_into().unwrap(); + let nanos: u32 = (time_unix_nano % NANO_RATIO) as u32; + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(ms, nanos) + .expect("timestamp should be a valid timestamp"), + ) + } else { + Utc::now() + }) +} /// OpenTelemetry protobuf deserializer error list #[derive(Debug, snafu::Snafu)] diff --git a/lib/codecs/src/decoding/format/mezmo/open_telemetry/trace_parser.rs b/lib/codecs/src/decoding/format/mezmo/open_telemetry/trace_parser.rs index 93e7cf948..99c04b1b6 100644 --- a/lib/codecs/src/decoding/format/mezmo/open_telemetry/trace_parser.rs +++ b/lib/codecs/src/decoding/format/mezmo/open_telemetry/trace_parser.rs @@ -1,6 +1,5 @@ use std::borrow::Cow; use std::collections::BTreeMap; -use std::time::SystemTime; use vector_core::event::metric::mezmo::IntoValue; use smallvec::SmallVec; @@ -15,9 +14,11 @@ use vector_core::{ event::{Event, EventMetadata, LogEvent, Value}, }; -use crate::decoding::format::mezmo::open_telemetry::{DeserializerError, OpenTelemetryKeyValue}; +use vector_common::btreemap; -const NANOS_IN_MILLIS: u64 = 1_000_000; +use crate::decoding::format::mezmo::open_telemetry::{ + nano_to_timestamp, DeserializerError, OpenTelemetryKeyValue, +}; pub fn parse_traces_request(bytes: &[u8]) -> vector_common::Result> { let parsed_traces = ExportTraceServiceRequest::try_from(bytes) @@ -60,19 +61,6 @@ fn string_to_value(value: String) -> Value { } } -fn nano_to_timestamp(time_unix_nano: u64) -> Value { - Value::from(if time_unix_nano == 0 { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map(|t| t.as_millis()) - .unwrap_or(0) - .try_into() - .unwrap_or(u64::MAX) - } else { - time_unix_nano / NANOS_IN_MILLIS - }) -} - #[allow(clippy::too_many_lines)] pub fn to_events(trace_request: ExportTraceServiceRequest) -> SmallVec<[Event; 1]> { let trace_count = trace_request.resource_spans.iter().fold(0, |acc, rlgs| { @@ -84,207 +72,131 @@ pub fn to_events(trace_request: ExportTraceServiceRequest) -> SmallVec<[Event; 1 trace_request.resource_spans.into_iter().fold( SmallVec::with_capacity(trace_count), |mut acc, resource_spans| { - let mut resource_host_name = None; + let mut resource_host_name = Value::Null; let resource = if let Some(resource) = resource_spans.resource.clone() { - resource_host_name = extract(resource.attributes.clone(), "host.name"); - - Value::Object( - [ - ( - "attributes".into(), - OpenTelemetryKeyValue { - attributes: resource.attributes, - } - .to_value(), - ), - ( - "dropped_attributes_count".into(), - resource.dropped_attributes_count.into(), - ), - ] - .into_iter() - .collect(), - ) + resource_host_name = string_to_value( + extract(resource.attributes.clone(), "host.name") + .unwrap_or(Cow::from("")) + .to_string() + ); + + let attributes = OpenTelemetryKeyValue { + attributes: resource.attributes, + } + .to_value(); + + Value::Object(btreemap! { + "attributes" => attributes, + "dropped_attributes_count" => resource.dropped_attributes_count, + }) } else { Value::Null }; for scope_spans in resource_spans.scope_spans.into_iter() { - let scope = if let Some(scope) = scope_spans.scope.clone() { - Value::Object(BTreeMap::from([ - ( - "attributes".into(), - OpenTelemetryKeyValue { - attributes: scope.attributes, - } - .to_value(), - ), - ( - "dropped_attributes_count".into(), - scope.dropped_attributes_count.into(), - ), - ("name".into(), string_to_value(scope.name.into())), - ("version".into(), string_to_value(scope.version.into())), - ])) - } else { - Value::Null - }; - - acc.extend(scope_spans.spans.into_iter().map(|span| { - // Assemble trace message - let mut message = std::collections::BTreeMap::new(); - - message.insert("name".to_string(), string_to_value(span.name.into())); - - if let Some(host_name) = &resource_host_name { - message.insert( - "hostname".to_string(), - string_to_value(host_name.to_string()), - ); + let mut scope: BTreeMap = BTreeMap::new(); + if let Some(s) = &scope_spans.scope { + let attributes = OpenTelemetryKeyValue { + attributes: s.attributes.clone(), + }; + + scope = btreemap! { + "name" => Value::from(s.name.clone()), + "version" => Value::from(s.version.clone()), + "attributes" => attributes.to_value(), } + } + scope.insert("schema_url".into(), Value::from(scope_spans.schema_url)); - message.insert( - "trace.id".to_string(), - Value::from(faster_hex::hex_string(&span.trace_id)), - ); - message.insert("trace.state".to_string(), Value::from(span.trace_state)); - message.insert( - "span.id".to_string(), - Value::from(faster_hex::hex_string(&span.span_id)), - ); - message.insert( - "span.parent_id".to_string(), - Value::from(faster_hex::hex_string(&span.parent_span_id)), - ); - - let start_time_unix_nano = nano_to_timestamp(span.start_time_unix_nano); - - message.insert("start_timestamp".to_string(), start_time_unix_nano.clone()); - - message.insert( - "end_timestamp".to_string(), - nano_to_timestamp(span.end_time_unix_nano), - ); + let scope = Value::from(scope); - message.insert("kind".to_string(), Value::from(span.kind as i32)); - - message.insert( - "dropped_attributes_count".into(), - span.dropped_attributes_count.into(), - ); - - message.insert( - "events".to_string(), - Value::Array( - span.events - .iter() - .map(|event| { - Value::Object(BTreeMap::from([ - ("name".into(), string_to_value(event.name.clone().into())), - ( - "timestamp".into(), - nano_to_timestamp(event.time_unix_nano), - ), - ( - "attributes".into(), - OpenTelemetryKeyValue { - attributes: event.attributes.clone(), - } - .to_value(), - ), - ( - "dropped_attributes_count".into(), - event.dropped_attributes_count.into(), - ), - ])) + acc.extend(scope_spans.spans.into_iter().map(|span| { + let links = Value::Array( + span.links + .iter() + .map(|link| { + let attributes = OpenTelemetryKeyValue { + attributes: link.attributes.clone(), + } + .to_value(); + + Value::Object(btreemap! { + "trace_id" => faster_hex::hex_string(&link.trace_id), + "span_id" => faster_hex::hex_string(&link.span_id), + "trace_state" => link.trace_state.clone(), + "attributes" => attributes, + "dropped_attributes_count" => link.dropped_attributes_count, }) - .collect(), - ), - ); - - message.insert( - "dropped_events_count".into(), - span.dropped_events_count.into(), + }) + .collect(), ); - message.insert( - "links".to_string(), - Value::Array( - span.links - .iter() - .map(|link| { - Value::Object(BTreeMap::from([ - ( - "trace_id".into(), - Value::from(faster_hex::hex_string(&link.trace_id)), - ), - ( - "span_id".into(), - Value::from(faster_hex::hex_string(&link.span_id)), - ), - ("trace_state".into(), link.trace_state.clone().into()), - ( - "attributes".into(), - OpenTelemetryKeyValue { - attributes: link.attributes.clone(), - } - .to_value(), - ), - ( - "dropped_attributes_count".into(), - link.dropped_attributes_count.into(), - ), - ])) + let events = Value::Array( + span.events + .iter() + .map(|event| { + let attributes = OpenTelemetryKeyValue { + attributes: event.attributes.clone(), + } + .to_value(); + + Value::Object(btreemap! { + "name" => string_to_value(event.name.clone().into()), + "timestamp" => nano_to_timestamp(event.time_unix_nano), + "attributes" => attributes, + "dropped_attributes_count" => event.dropped_attributes_count, }) - .collect(), - ), + }) + .collect(), ); - message.insert( - "dropped_links_count".into(), - span.dropped_links_count.into(), - ); + let start_time_unix_nano = nano_to_timestamp(span.start_time_unix_nano); + + let mut message = btreemap! { + "name" => string_to_value(span.name.into()), + "hostname" => resource_host_name.clone(), + "trace.id" => Value::from(faster_hex::hex_string(&span.trace_id)), + "trace.state" => Value::from(span.trace_state), + "span.id" => Value::from(faster_hex::hex_string(&span.span_id)), + "span.parent_id" => Value::from(faster_hex::hex_string(&span.parent_span_id)), + "start_timestamp" => start_time_unix_nano.clone(), + "end_timestamp" => nano_to_timestamp(span.end_time_unix_nano), + "kind" => Value::from(span.kind as i32), + "dropped_attributes_count" => span.dropped_attributes_count, + "events" => events, + "dropped_events_count" => span.dropped_events_count, + "links" => links, + "dropped_links_count" => span.dropped_links_count, + }; if let Some(status) = span.status { message.insert( - "status".to_string(), - Value::Object(BTreeMap::from([ - ( - "message".into(), - string_to_value(status.message.to_string()), - ), - ("code".into(), Value::from(status.code as i32)), - ])), + "status".into(), + Value::Object(btreemap! { + "message" => string_to_value(status.message.to_string()), + "code" => Value::from(status.code as i32), + }), ); } // Assemble metadata - let mut user_metadata = std::collections::BTreeMap::from_iter([( - "level".to_string(), - "trace".into(), - )]); - - user_metadata.insert("resource".to_string(), resource.clone()); - user_metadata.insert("scope".to_string(), scope.clone()); - let filtered_attributes = OpenTelemetryKeyValue { attributes: span.attributes, - } - .to_value(); - user_metadata.insert("attributes".to_string(), filtered_attributes); + }; - let log_line = BTreeMap::from_iter([ - ( - log_schema().message_key().unwrap().to_string(), - message.into(), - ), - ( - log_schema().user_metadata_key().to_string(), - user_metadata.into(), - ), - ]); + let user_metadata = btreemap! { + "level" => Cow::from("trace"), + "resource" => resource.clone(), + "scope" => scope.clone(), + "attributes" => filtered_attributes.to_value(), + }; + + let message_key = log_schema().message_key().unwrap().to_string(); - let mut log_event = LogEvent::from_map(log_line, EventMetadata::default()); + let mut log_event = LogEvent::from_map(btreemap! { + message_key.as_str() => Value::Object(message), + log_schema().user_metadata_key() => Value::Object(user_metadata), + }, EventMetadata::default()); if let Some(timestamp_key) = log_schema().timestamp_key() { log_event.insert( @@ -304,6 +216,7 @@ pub fn to_events(trace_request: ExportTraceServiceRequest) -> SmallVec<[Event; 1 #[cfg(test)] mod tests { use super::*; + use chrono::{NaiveDateTime, TimeZone, Utc}; use std::ops::Deref; use opentelemetry_rs::opentelemetry::metrics::{AnyValue, AnyValueOneOfvalue, KeyValue}; @@ -344,12 +257,12 @@ mod tests { trace_state: Cow::from("test_state"), name: Cow::from("test_span_name"), kind: SpanKind::SPAN_KIND_UNSPECIFIED, - start_time_unix_nano: 1681339577345243523, - end_time_unix_nano: 1681339577345243523, + start_time_unix_nano: 1_579_134_612_000_000_011, + end_time_unix_nano: 1_579_134_612_000_000_012, attributes: vec![key_value.clone()], dropped_attributes_count: 10, events: vec![SpanEvent { - time_unix_nano: 1681339577345243523, + time_unix_nano: 1_579_134_612_000_000_013, name: Cow::from("test_name"), attributes: vec![key_value.clone()], dropped_attributes_count: 10, @@ -393,11 +306,27 @@ mod tests { "span.parent_id".into(), Value::from("706172656e745f7370616e5f6964") ), - ("start_timestamp".into(), Value::Integer(1_681_339_577_345)), + ( + "start_timestamp".into(), + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(1_579_134_612_i64, 11_u32) + .expect("timestamp should be a valid timestamp"), + ) + ) + ), ("dropped_attributes_count".into(), Value::Integer(10)), ("dropped_events_count".into(), Value::Integer(10)), ("dropped_links_count".into(), Value::Integer(10)), - ("end_timestamp".into(), Value::Integer(1_681_339_577_345)), + ( + "end_timestamp".into(), + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(1_579_134_612_i64, 12_u32) + .expect("timestamp should be a valid timestamp"), + ) + ) + ), ( "events".into(), Value::Array(Vec::from([Value::Object(BTreeMap::from([ @@ -407,9 +336,18 @@ mod tests { ), ("dropped_attributes_count".into(), Value::Integer(10)), ("name".into(), "test_name".into()), - ("timestamp".into(), Value::Integer(1_681_339_577_345)), + ( + "timestamp".into(), + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(1_579_134_612_i64, 13_u32) + .expect("timestamp should be a valid timestamp"), + ) + ) + ), ]))])) ), + ("hostname".into(), Value::Null), ("kind".into(), Value::Integer(0)), ( "links".into(), @@ -456,13 +394,13 @@ mod tests { ( "scope".into(), Value::Object(BTreeMap::from([ + ("name".into(), "test_name".into()), + ("schema_url".into(), "https://some_url.com".into()), + ("version".into(), "1.2.3".into()), ( "attributes".into(), Value::Object(BTreeMap::from([("test".into(), "test".into()),])) ), - ("dropped_attributes_count".into(), Value::Integer(10)), - ("name".into(), "test_name".into()), - ("version".into(), "1.2.3".into()), ])) ), ( diff --git a/scripts/integration/opentelemetry-sink/compose.yaml b/scripts/integration/opentelemetry-sink/compose.yaml new file mode 100644 index 000000000..8c2a05733 --- /dev/null +++ b/scripts/integration/opentelemetry-sink/compose.yaml @@ -0,0 +1,7 @@ +version: '3' + +services: + opentelemetry-collector: + image: docker.io/otel/opentelemetry-collector-contrib:${CONFIG_VERSION} + volumes: + - ../../../tests/data/opentelemetry/config.yaml:/etc/otelcol-contrib/config.yaml diff --git a/scripts/integration/opentelemetry-sink/test.yaml b/scripts/integration/opentelemetry-sink/test.yaml new file mode 100644 index 000000000..d8a36fa5d --- /dev/null +++ b/scripts/integration/opentelemetry-sink/test.yaml @@ -0,0 +1,18 @@ +features: +- opentelemetry-sink-integration-tests + +test_filter: '::opentelemetry::' + +runner: + env: + TEST_OPENTELEMETRY_ENDPOINT: http://opentelemetry-collector:9876 + RUST_BACKTRACE: true + +matrix: + version: [0.56.0] + +# changes to these files/paths will invoke the integration test in CI +# expressions are evaluated using https://github.com/micromatch/picomatch +paths: +- "src/sinks/opentelemetry/**" +- "scripts/integration/opentelemetry-sink/**" diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 6c1c5236d..9fcca8a7c 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -75,6 +75,8 @@ pub mod nats; pub mod new_relic; #[cfg(feature = "sinks-webhdfs")] pub mod opendal_common; +#[cfg(feature = "sinks-opentelemetry")] +pub mod opentelemetry; #[cfg(feature = "sinks-papertrail")] pub mod papertrail; #[cfg(feature = "sinks-postgresql")] diff --git a/src/sinks/opentelemetry/config.rs b/src/sinks/opentelemetry/config.rs new file mode 100644 index 000000000..cf575df35 --- /dev/null +++ b/src/sinks/opentelemetry/config.rs @@ -0,0 +1,247 @@ +use crate::mezmo::user_trace::MezmoLoggingService; +use crate::sinks::opentelemetry::healthcheck::healthcheck; +use crate::sinks::util::retries::RetryLogic; +use crate::sinks::util::{ServiceBuilderExt, TowerRequestConfig}; +use crate::{ + config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, + http::HttpClient, + sinks::{ + opentelemetry::{Auth, OpentelemetrySinkAuth}, + util::{BatchConfig, Compression, SinkBatchSettings}, + Healthcheck, VectorSink, + }, +}; + +use async_trait::async_trait; +use futures_util::FutureExt; +use http::{uri::InvalidUri, Uri}; +use tower::ServiceBuilder; +use vector_lib::configurable::configurable_component; +use vector_lib::tls::{TlsConfig, TlsSettings}; + +use super::models::OpentelemetryModelType; +use super::service::OpentelemetryApiResponse; +use super::sink::OpentelemetrySinkError; +use super::{ + encoding::OpentelemetryEncoder, service::OpentelemetryService, sink::OpentelemetrySink, +}; + +const OPENTELEMETRY_HEALTHCHECK_PORT: &str = "13133"; + +const DEFAULT_MAX_EVENTS: usize = 100; +const DEFAULT_MAX_BYTES: usize = 1_000_000; + +#[derive(Clone, Debug, Default)] +pub struct OpentelemetryRetry; + +impl RetryLogic for OpentelemetryRetry { + type Error = OpentelemetrySinkError; + type Response = OpentelemetryApiResponse; + + fn is_retriable_error(&self, _error: &Self::Error) -> bool { + false + } +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct OpentelemetryDefaultBatchSettings; + +// URL reference for Opentelemetry settings batch settings +impl SinkBatchSettings for OpentelemetryDefaultBatchSettings { + const MAX_EVENTS: Option = Some(DEFAULT_MAX_EVENTS); + const MAX_BYTES: Option = Some(DEFAULT_MAX_BYTES); + const TIMEOUT_SECS: f64 = 1.0; +} + +#[derive(Clone, Debug, Default)] +pub struct OpentelemetryEndpoint { + healthcheck_uri: Uri, + logs_uri: Uri, + metrics_uri: Uri, + traces_uri: Uri, +} + +impl OpentelemetryEndpoint { + pub fn new(endpoint: Uri) -> Self { + let scheme = endpoint.scheme_str().unwrap(); + let authority = endpoint.authority().map(|a| a.as_str()).unwrap(); + let host = endpoint.host().unwrap().to_owned(); + + let healthcheck_uri = Uri::builder() + .scheme(scheme) + .authority(host + ":" + OPENTELEMETRY_HEALTHCHECK_PORT) + .path_and_query("/") + .build() + .unwrap(); + + let logs_uri = Uri::builder() + .scheme(scheme) + .authority(authority) + .path_and_query("/v1/logs") + .build() + .unwrap(); + + let metrics_uri = Uri::builder() + .scheme(scheme) + .authority(authority) + .path_and_query("/v1/metrics") + .build() + .unwrap(); + + let traces_uri = Uri::builder() + .scheme(scheme) + .authority(authority) + .path_and_query("/v1/traces") + .build() + .unwrap(); + + Self { + healthcheck_uri, + logs_uri, + metrics_uri, + traces_uri, + } + } + + pub fn healthcheck(&self) -> Uri { + self.healthcheck_uri.clone() + } + + pub fn endpoint(&self, model_type: OpentelemetryModelType) -> Uri { + match model_type { + OpentelemetryModelType::Logs => self.logs_uri.clone(), + OpentelemetryModelType::Metrics => self.metrics_uri.clone(), + OpentelemetryModelType::Traces => self.traces_uri.clone(), + } + } +} + +impl TryFrom for OpentelemetryEndpoint { + type Error = InvalidUri; + + fn try_from(endpoint: String) -> Result { + let uri = endpoint.parse::()?; + Ok(Self::new(uri)) + } +} + +/// Configuration for the `opentelemetry_logs` sink. +#[configurable_component(sink("opentelemetry"))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct OpentelemetrySinkConfig { + /// The endpoint to send data to. + /// + /// The endpoint should include the scheme and the port to send to. + #[configurable(metadata(docs::examples = "https://localhost:8087"))] + pub endpoint: String, + + #[configurable(derived)] + pub auth: Option, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub compression: Compression, + + #[configurable(derived)] + #[serde(default)] + pub request: TowerRequestConfig, + + #[configurable(derived)] + pub tls: Option, + + /// Acknowlegements option + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +impl OpentelemetrySinkConfig { + pub(super) fn build_client(&self, cx: SinkContext) -> crate::Result { + let tls = TlsSettings::from_options(&self.tls)?; + let client = HttpClient::new(tls, cx.proxy())?; + Ok(client) + } +} + +impl GenerateConfig for OpentelemetrySinkConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#"endpoint = "http://localhost:3100" + compression = "none""#, + ) + .unwrap() + } +} + +#[async_trait] +#[typetag::serde(name = "opentelemetry")] +impl SinkConfig for OpentelemetrySinkConfig { + async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let endpoint = OpentelemetryEndpoint::try_from(self.endpoint.clone())?; + + let auth = match &self.auth { + Some(OpentelemetrySinkAuth::Basic { user, password }) => { + Some(Auth::Basic(crate::http::Auth::Basic { + user: user.clone(), + password: password.clone().into(), + })) + } + Some(OpentelemetrySinkAuth::Bearer { token }) => { + Some(Auth::Basic(crate::http::Auth::Bearer { + token: token.clone(), + })) + } + None => None, + }; + + let batcher_settings = self + .batch + .validate()? + .limit_max_events(self.batch.max_events.unwrap_or(DEFAULT_MAX_EVENTS))? + .into_batcher_settings()?; + + let request_limits = self.request.unwrap_with(&Default::default()); + + let client = self.build_client(ctx.clone())?; + + let healthcheck = + healthcheck(endpoint.clone(), client.clone(), auth.clone(), ctx.clone()).boxed(); + + let service = ServiceBuilder::new() + .settings(request_limits, OpentelemetryRetry) + .service(MezmoLoggingService::new( + OpentelemetryService { + endpoint: endpoint.clone(), + client, + auth, + }, + ctx.mezmo_ctx, + )); + + let compression = self.compression; + let sink = OpentelemetrySink { + service, + encoder: OpentelemetryEncoder, + compression, + batcher_settings, + }; + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::new(DataType::Metric | DataType::Log) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} diff --git a/src/sinks/opentelemetry/encoding.rs b/src/sinks/opentelemetry/encoding.rs new file mode 100644 index 000000000..8d5858bcc --- /dev/null +++ b/src/sinks/opentelemetry/encoding.rs @@ -0,0 +1,37 @@ +use std::io; + +use crate::sinks::prelude::*; +use crate::sinks::util::encoding::{as_tracked_write, Encoder}; + +use super::{ + logs::encoding::encode as encode_log, metrics::encoding::encode as encode_metrics, + models::OpentelemetryModel, sink::OpentelemetrySinkError, + traces::encoding::encode as encode_traces, +}; + +#[derive(Clone, Debug)] +pub struct OpentelemetryEncoder; + +impl Encoder> for OpentelemetryEncoder { + fn encode_input( + &self, + input: Result, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let output = match input? { + OpentelemetryModel::Logs(log_model) => encode_log(&log_model)?, + OpentelemetryModel::Metrics(metric_model) => encode_metrics(&metric_model)?, + OpentelemetryModel::Traces(trace_model) => encode_traces(&trace_model)?, + }; + let size = as_tracked_write::<_, _, io::Error>(writer, &output, |writer, output| { + writer.write_all(output)?; + Ok(()) + })?; + io::Result::Ok(( + size, + GroupedCountByteSize::Untagged { + size: CountByteSize(size, JsonSize::new(size)), + }, + )) + } +} diff --git a/src/sinks/opentelemetry/healthcheck.rs b/src/sinks/opentelemetry/healthcheck.rs new file mode 100644 index 000000000..23e84cbdf --- /dev/null +++ b/src/sinks/opentelemetry/healthcheck.rs @@ -0,0 +1,43 @@ +use http::Request; + +use crate::{ + config::SinkContext, + http::HttpClient, + mezmo::user_trace::MezmoUserLog, + sinks::{ + opentelemetry::{config::OpentelemetryEndpoint, Auth}, + HealthcheckError, + }, + user_log_error, +}; + +use vrl::value::Value; + +pub(crate) async fn healthcheck( + endpoint: OpentelemetryEndpoint, + client: HttpClient, + auth: Option, + cx: SinkContext, +) -> crate::Result<()> { + let mut request = Request::post(endpoint.healthcheck()).body(hyper::Body::empty())?; + + if let Some(auth) = auth { + match auth { + Auth::Basic(http_auth) => http_auth.apply(&mut request), + } + } + + let response = client.send(request).await?; + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + let msg = Value::from(format!( + "Error returned from destination with status code: {}", + status + )); + user_log_error!(cx.mezmo_ctx, msg); + } + match status { + status if status.is_success() => Ok(()), + other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()), + } +} diff --git a/src/sinks/opentelemetry/integration_tests.rs b/src/sinks/opentelemetry/integration_tests.rs new file mode 100644 index 000000000..14db2e36c --- /dev/null +++ b/src/sinks/opentelemetry/integration_tests.rs @@ -0,0 +1,154 @@ +use chrono::{NaiveDateTime, TimeZone, Utc}; +use indoc::indoc; +use std::collections::BTreeMap; + +use vector_lib::{ + event::{Event, LogEvent}, + finalization::{BatchNotifier, BatchStatus}, +}; + +use crate::{ + config::SinkConfig, + event::Value, + sinks::{opentelemetry::config::OpentelemetrySinkConfig, util::test::load_sink}, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + generate_events_with_stream, + }, +}; + +fn line_generator(index: usize) -> String { + format!("opentelemetry test log index {}", index) +} + +fn event_generator(index: usize) -> Event { + Event::Log(LogEvent::from(line_generator(index))) +} + +#[cfg(feature = "opentelemetry-sink-integration-tests")] +#[tokio::test] +async fn test_opentelemetry_sink_endpoint() { + let config = indoc! {r#" + endpoint = "opentelemetry-endpoint" + compression = "gzip" + "#}; + + let endpoint = std::env::var("TEST_OPENTELEMETRY_ENDPOINT") + .expect("test endpoint environment variable not set"); + + assert!( + !endpoint.is_empty(), + "$TEST_OPENTELEMETRY_ENDPOINT required" + ); + + let config = config.replace("opentelemetry-endpoint", &endpoint); + let (config, cx) = load_sink::(config.as_str()).unwrap(); + + let (sink, _) = config.build(cx).await.unwrap(); + + let trace_id = [ + 95, 70, 127, 231, 191, 66, 103, 108, 5, 226, 11, 164, 169, 14, 68, 142, + ]; + let span_id = [76, 114, 27, 243, 62, 60, 175, 143]; + let expected_resource_attribute = Value::Object(BTreeMap::from([ + ("str".into(), "bar".into()), + ("int".into(), Value::from(100)), + ("flt".into(), Value::from(100.123_f64)), + ("bool".into(), Value::from(false)), + ("empty".into(), Value::Null), + ( + "list".into(), + Value::Array(vec![ + "bar".into(), + Value::from(100), + Value::from(100.123_f64), + Value::from(false), + Value::Null, + ]), + ), + ])); + let expected_scope_attributes = expected_resource_attribute.clone(); + let expected_log_attributes = Value::Object(BTreeMap::from([ + ("str".into(), "bar".into()), + ("int".into(), Value::from(100)), + ("flt".into(), Value::from(100.123_f64)), + ("bool".into(), Value::from(false)), + ("empty".into(), Value::Null), + ( + "attributes".into(), + Value::Object(BTreeMap::from([ + ("str".into(), "bar".into()), + ("int".into(), Value::from(100)), + ("flt".into(), Value::from(100.123_f64)), + ("bool".into(), Value::from(false)), + ("empty".into(), Value::Null), + ])), + ), + ])); + + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let generator = |idx| { + let mut event = event_generator(idx); + let log = event.as_mut_log(); + + log.insert( + "metadata", + Value::Object(BTreeMap::from([ + ("attributes".into(), expected_log_attributes.clone()), + ("flags".into(), Value::from(1)), + ( + "observed_timestamp".into(), + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(1_579_134_612_i64, 0o11_u32) + .expect("timestamp should be a valid timestamp"), + ), + ), + ), + ( + "time".into(), + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(1_579_134_612_i64, 0o11_u32) + .expect("timestamp should be a valid timestamp"), + ), + ), + ), + ("severity_number".into(), 17.into()), + ("severity_text".into(), "ERROR".into()), + ("level".into(), "ERROR".into()), + ( + "trace_id".into(), + Value::from(faster_hex::hex_string(&trace_id)), + ), + ( + "span_id".into(), + Value::from(faster_hex::hex_string(&span_id)), + ), + ("resource".into(), expected_resource_attribute.clone()), + ( + "scope".into(), + Value::Object(BTreeMap::from([ + ("attributes".into(), expected_scope_attributes.clone()), + ("name".into(), "sone_scope_name".into()), + ("version".into(), "1.0.0".into()), + ])), + ), + ])), + ); + + event + }; + let (messages, events) = generate_events_with_stream(generator, 5, Some(batch)); + + for (index, message) in messages.iter().enumerate() { + assert_eq!( + Value::from(format!("opentelemetry test log index {}", index)), + message.clone().into_log().get_message().unwrap().to_owned() + ); + } + + run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; + + assert_eq!(receiver.await, BatchStatus::Delivered); +} diff --git a/src/sinks/opentelemetry/logs/encoding.rs b/src/sinks/opentelemetry/logs/encoding.rs new file mode 100644 index 000000000..b23782453 --- /dev/null +++ b/src/sinks/opentelemetry/logs/encoding.rs @@ -0,0 +1,18 @@ +use crate::sinks::opentelemetry::{ + logs::model::OpentelemetryLogsModel, sink::OpentelemetrySinkError, +}; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use prost::Message; + +pub fn encode(model: &OpentelemetryLogsModel) -> Result, OpentelemetrySinkError> { + let logs = model.0[0].get("logs").unwrap().clone(); + + let req = ExportLogsServiceRequest { + resource_logs: logs.into_iter().map(Into::into).collect(), + }; + + let mut buf = vec![]; + req.encode(&mut buf).map_err(OpentelemetrySinkError::from)?; + + Ok(buf) +} diff --git a/src/sinks/opentelemetry/logs/mod.rs b/src/sinks/opentelemetry/logs/mod.rs new file mode 100644 index 000000000..f0a1d8405 --- /dev/null +++ b/src/sinks/opentelemetry/logs/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod encoding; +pub(crate) mod model; diff --git a/src/sinks/opentelemetry/logs/model.rs b/src/sinks/opentelemetry/logs/model.rs new file mode 100644 index 000000000..08b4062c5 --- /dev/null +++ b/src/sinks/opentelemetry/logs/model.rs @@ -0,0 +1,391 @@ +use std::collections::HashMap; +use vector_lib::{ + config::log_schema, + event::{Event, Value}, + lookup::PathPrefix, +}; + +use opentelemetry::{ + logs::{AnyValue as OtlpAnyValue, LogRecord, Severity}, + trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState}, +}; +use opentelemetry_sdk::export::logs::LogData; +use std::borrow::Cow; + +use crate::sinks::opentelemetry::{ + models::{ + value_to_otlp_any_value, value_to_system_time, OpentelemetryModelMatch, + OpentelemetryModelType, OpentelemetryResource, OpentelemetryScope, + }, + sink::OpentelemetrySinkError, +}; + +type DataStore = HashMap>; + +#[derive(Debug)] +pub struct OpentelemetryLogsModel(pub Vec); + +impl OpentelemetryModelMatch for OpentelemetryLogsModel { + fn maybe_match(event: &Event) -> Option { + let log = event.clone().into_log(); + let message = log.get_message(); + let metadata = log.get((PathPrefix::Event, log_schema().user_metadata_key())); + + if metadata.and(message).is_some() { + let scope = metadata.unwrap().get("scope"); + + if scope.is_some() { + let trace_id = metadata.unwrap().get("trace_id"); + let span_id = metadata.unwrap().get("span_id"); + let severity_number = metadata.unwrap().get("severity_number"); + let flags = metadata.unwrap().get("flags"); + + if scope + .and(trace_id) + .and(span_id) + .and(severity_number) + .and(flags) + .is_some() + { + return Some(OpentelemetryModelType::Logs); + } + } + } + + None + } +} + +impl OpentelemetryLogsModel { + pub fn new(log_data_array: Vec) -> Self { + let mut logs_store = DataStore::new(); + logs_store.insert("logs".to_owned(), log_data_array); + Self(vec![logs_store]) + } +} + +impl TryFrom> for OpentelemetryLogsModel { + type Error = OpentelemetrySinkError; + + fn try_from(buf_events: Vec) -> Result { + let mut logs_array = vec![]; + for buf_event in buf_events { + let log = &buf_event.into_log(); + + let mut record_builder = LogRecord::builder(); + + if let Some(message) = log.get_message() { + record_builder = record_builder.with_body(OtlpAnyValue::from(message.to_string())); + } + + let mut severity_number = None; + + if let Some(metadata) = log.get((PathPrefix::Event, log_schema().user_metadata_key())) { + if let Some(value) = metadata.get("attributes") { + if let OtlpAnyValue::Map(attrs) = value_to_otlp_any_value(value.clone()) { + let attributes = attrs + .clone() + .into_iter() + .map(|(key, val)| (key, val)) + .collect::>(); + + record_builder = record_builder.with_attributes(attributes); + } + } + + if let Some(timestamp) = metadata.get("time") { + record_builder = record_builder.with_timestamp(value_to_system_time(timestamp)); + } + + if let Some(timestamp) = metadata.get("observed_timestamp") { + record_builder = + record_builder.with_observed_timestamp(value_to_system_time(timestamp)); + } + + let raw_trace_id = metadata.get("trace_id"); + let raw_span_id = metadata.get("span_id"); + + if raw_trace_id.or(raw_span_id).is_some() { + let trace_id = if let Some(id) = raw_trace_id { + match &id { + Value::Bytes(bytes) => { + let mut trace_id = [0; 16]; + match faster_hex::hex_decode(bytes, &mut trace_id) { + Ok(_) => TraceId::from_bytes(trace_id), + Err(_) => TraceId::INVALID, + } + } + _ => TraceId::INVALID, + } + } else { + TraceId::INVALID + }; + + let span_id = if let Some(id) = raw_span_id { + match &id { + Value::Bytes(bytes) => { + let mut span_id = [0; 8]; + match faster_hex::hex_decode(bytes, &mut span_id) { + Ok(_) => SpanId::from_bytes(span_id), + Err(_) => SpanId::INVALID, + } + } + _ => SpanId::INVALID, + } + } else { + SpanId::INVALID + }; + + let trace_flags = if let Some(flags) = metadata.get("flags") { + match flags { + Value::Integer(flag) => TraceFlags::new( + u8::try_from(*flag).unwrap_or(TraceFlags::NOT_SAMPLED.to_u8()), + ), + _ => TraceFlags::NOT_SAMPLED, + } + } else { + TraceFlags::NOT_SAMPLED + }; + + let context = + SpanContext::new(trace_id, span_id, trace_flags, false, TraceState::NONE); + + record_builder = record_builder.with_span_context(&context); + } + + if let Some(value) = metadata.get("severity_text") { + let severity_text = if let Value::Bytes(bytes) = value { + Cow::from(String::from_utf8_lossy(bytes).into_owned()) + } else { + Cow::from("") + }; + + record_builder = record_builder.with_severity_text(severity_text); + }; + + if let Some(Value::Integer(number)) = metadata.get("severity_number") { + severity_number = match *number { + 1 => Some(Severity::Trace), + 2 => Some(Severity::Trace2), + 3 => Some(Severity::Trace3), + 4 => Some(Severity::Trace4), + 5 => Some(Severity::Debug), + 6 => Some(Severity::Debug2), + 7 => Some(Severity::Debug3), + 8 => Some(Severity::Debug4), + 9 => Some(Severity::Info), + 10 => Some(Severity::Info2), + 11 => Some(Severity::Info3), + 12 => Some(Severity::Info4), + 13 => Some(Severity::Warn), + 14 => Some(Severity::Warn2), + 15 => Some(Severity::Warn3), + 16 => Some(Severity::Warn4), + 17 => Some(Severity::Error), + 18 => Some(Severity::Error2), + 19 => Some(Severity::Error3), + 20 => Some(Severity::Error4), + 21 => Some(Severity::Fatal), + 22 => Some(Severity::Fatal2), + 23 => Some(Severity::Fatal3), + 24 => Some(Severity::Fatal4), + _ => None, + }; + }; + } + + let resource = OpentelemetryResource::from(log); + let scope = OpentelemetryScope::from(log); + + let mut log_record = record_builder.build(); + + log_record.severity_number = severity_number; + + logs_array.push(LogData { + record: log_record, + resource: Cow::Owned(resource.into()), + instrumentation: scope.into(), + }); + } + + Ok(Self::new(logs_array)) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::event::Value; + use chrono::{NaiveDateTime, TimeZone, Utc}; + use std::collections::BTreeMap; + use std::time::SystemTime; + use vector_lib::event::{Event, LogEvent}; + + use opentelemetry::{ + logs::{AnyValue as OtlpAnyValue, LogRecord, Severity, TraceContext}, + trace::{SpanId, TraceFlags, TraceId}, + InstrumentationLibrary, + }; + use opentelemetry_sdk::export::logs::LogData; + use opentelemetry_sdk::Resource; + + fn line_generator(index: usize) -> String { + format!("opentelemetry test log index {}", index) + } + + fn event_generator(index: usize) -> Event { + Event::Log(LogEvent::from(line_generator(index))) + } + + pub fn generate_events Event>(generator: Gen, count: usize) -> Vec { + (0..count).map(generator).collect::>() + } + + #[test] + fn test_otlp_sink_event_to_log_model() { + let trace_id = [ + 95, 70, 127, 231, 191, 66, 103, 108, 5, 226, 11, 164, 169, 14, 68, 142, + ]; + let span_id = [76, 114, 27, 243, 62, 60, 175, 143]; + + let expected_resource_attribute = Value::Object(BTreeMap::from([ + ("str".into(), "bar".into()), + ("int".into(), Value::from(100)), + ("flt".into(), Value::from(100.123_f64)), + ("bool".into(), Value::from(false)), + ("empty".into(), Value::Null), + ( + "list".into(), + Value::Array(vec![ + "bar".into(), + Value::from(100), + Value::from(100.123_f64), + Value::from(false), + Value::Null, + ]), + ), + ])); + let expected_scope_attributes = expected_resource_attribute.clone(); + let expected_log_attributes = Value::Object(BTreeMap::from([ + ("str".into(), "bar".into()), + ("int".into(), Value::from(100)), + ("flt".into(), Value::from(100.123_f64)), + ("bool".into(), Value::from(false)), + ("empty".into(), Value::Null), + ( + "attributes".into(), + Value::Object(BTreeMap::from([ + ("str".into(), "bar".into()), + ("int".into(), Value::from(100)), + ("flt".into(), Value::from(100.123_f64)), + ("bool".into(), Value::from(false)), + ("empty".into(), Value::Null), + ])), + ), + ])); + + let generator = |idx| { + let mut event = event_generator(idx); + let log = event.as_mut_log(); + + log.insert( + "metadata", + Value::Object(BTreeMap::from([ + ("attributes".into(), expected_log_attributes.clone()), + ("flags".into(), Value::from(1)), + ( + "observed_timestamp".into(), + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(1_579_134_612_i64, 0o11_u32) + .expect("timestamp should be a valid timestamp"), + ), + ), + ), + ( + "time".into(), + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(1_579_134_612_i64, 0o11_u32) + .expect("timestamp should be a valid timestamp"), + ), + ), + ), + ("severity_number".into(), 17.into()), + ("severity_text".into(), "ERROR".into()), + ("level".into(), "ERROR".into()), + ( + "trace_id".into(), + Value::from(faster_hex::hex_string(&trace_id)), + ), + ( + "span_id".into(), + Value::from(faster_hex::hex_string(&span_id)), + ), + ("resource".into(), expected_resource_attribute.clone()), + ( + "scope".into(), + Value::Object(BTreeMap::from([ + ("attributes".into(), expected_scope_attributes.clone()), + ("name".into(), "sone_scope_name".into()), + ("version".into(), "1.0.0".into()), + ])), + ), + ])), + ); + event + }; + + let events = generate_events(generator, 1); + let model = OpentelemetryLogsModel::try_from(events).unwrap(); + + let logs = model.0[0].get("logs").expect("Logs data store not present"); + + let log_data: LogData = logs[0].clone(); + let record: LogRecord = log_data.record; + let _resource: Resource = log_data.resource.into_owned(); + let _instrumentation: InstrumentationLibrary = log_data.instrumentation; + + assert!(record.event_name.is_none()); + assert!(record.timestamp.is_some()); + assert!(record.severity_text.is_some()); + assert!(record.severity_number.is_some()); + assert!(record.body.is_some()); + assert!(record.trace_context.is_some()); + assert!(record.attributes.is_some()); + + let timestamp_duration = record + .timestamp + .unwrap() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + + assert_eq!(timestamp_duration.as_millis(), 1_579_134_612_000_u128); + + let observed_timestamp_duration = record + .observed_timestamp + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + + assert_eq!( + observed_timestamp_duration.as_millis(), + 1_579_134_612_000_u128 + ); + + assert_eq!(record.severity_text.unwrap().into_owned(), "ERROR"); + + assert_eq!(record.severity_number.unwrap(), Severity::Error); + + assert_eq!( + record.body.unwrap(), + OtlpAnyValue::from("\"opentelemetry test log index 0\"".to_string()) + ); + + let trace_context: TraceContext = record.trace_context.unwrap(); + assert_eq!(trace_context.trace_id, TraceId::from_bytes(trace_id)); + assert_eq!(trace_context.span_id, SpanId::from_bytes(span_id)); + assert!(trace_context.trace_flags.is_some()); + assert_eq!(trace_context.trace_flags.unwrap(), TraceFlags::SAMPLED); + } +} diff --git a/src/sinks/opentelemetry/metrics/encoding.rs b/src/sinks/opentelemetry/metrics/encoding.rs new file mode 100644 index 000000000..596a20018 --- /dev/null +++ b/src/sinks/opentelemetry/metrics/encoding.rs @@ -0,0 +1,10 @@ +use super::model::OpentelemetryMetricsModel; +use crate::sinks::opentelemetry::sink::OpentelemetrySinkError; + +pub fn encode(_model: &OpentelemetryMetricsModel) -> Result, OpentelemetrySinkError> { + // Metric model to Protobuf encoding + // https://github.com/open-telemetry/opentelemetry-rust/blob/936c46639aa1521bf49dbffba49bbd9795f8ea58/opentelemetry-otlp/src/exporter/http/metrics.rs#L52-L61 + Err(OpentelemetrySinkError::new( + "Opentelemetry metrics encoding is not implemented yet", + )) +} diff --git a/src/sinks/opentelemetry/metrics/mod.rs b/src/sinks/opentelemetry/metrics/mod.rs new file mode 100644 index 000000000..f0a1d8405 --- /dev/null +++ b/src/sinks/opentelemetry/metrics/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod encoding; +pub(crate) mod model; diff --git a/src/sinks/opentelemetry/metrics/model.rs b/src/sinks/opentelemetry/metrics/model.rs new file mode 100644 index 000000000..d4f47919d --- /dev/null +++ b/src/sinks/opentelemetry/metrics/model.rs @@ -0,0 +1,89 @@ +use crate::sinks::opentelemetry::{ + models::{OpentelemetryModelMatch, OpentelemetryModelType}, + sink::OpentelemetrySinkError, +}; +use opentelemetry_sdk::metrics::data::ResourceMetrics; +use std::collections::HashMap; +use std::str::FromStr; +use vector_lib::{ + config::log_schema, + event::{Event, Value}, + lookup::PathPrefix, +}; + +type DataStore = HashMap>; + +enum OpentelemetryMetricsType { + Gauge, + Sum, + Histogram, + ExponentialHistogram, + Summary, +} + +impl FromStr for OpentelemetryMetricsType { + type Err = (); + fn from_str(input: &str) -> Result { + match input { + "gauge" => Ok(OpentelemetryMetricsType::Gauge), + "sum" => Ok(OpentelemetryMetricsType::Sum), + "histogram" => Ok(OpentelemetryMetricsType::Histogram), + "exponential_histogram" => Ok(OpentelemetryMetricsType::ExponentialHistogram), + "summary" => Ok(OpentelemetryMetricsType::Summary), + _ => Err(()), + } + } +} + +#[derive(Debug)] +pub struct OpentelemetryMetricsModel(pub Vec); + +impl OpentelemetryModelMatch for OpentelemetryMetricsModel { + fn maybe_match(event: &Event) -> Option { + let log = event.clone().into_log(); + let message = log.get_message(); + let metadata = log.get((PathPrefix::Event, log_schema().user_metadata_key())); + + if metadata.and(message).is_some() { + let scope = metadata.unwrap().get("scope"); + + if scope.is_some() { + if let Some(value) = message.unwrap().get("value") { + if let Some(Value::Bytes(ref metric_type)) = value.get("type") { + if OpentelemetryMetricsType::from_str( + String::from_utf8_lossy(metric_type).into_owned().as_str(), + ) + .is_ok() + { + return Some(OpentelemetryModelType::Metrics); + } + } + } + } + } + + None + } +} + +impl OpentelemetryMetricsModel { + pub fn new(metrics_data_array: Vec) -> Self { + let mut metrics_store = DataStore::new(); + metrics_store.insert("metrics".to_owned(), metrics_data_array); + Self(vec![metrics_store]) + } +} + +impl TryFrom> for OpentelemetryMetricsModel { + type Error = OpentelemetrySinkError; + + fn try_from(_buf_events: Vec) -> Result { + // https://github.com/open-telemetry/opentelemetry-rust/blob/936c46639aa1521bf49dbffba49bbd9795f8ea58/opentelemetry-sdk/src/metrics/data/mod.rs#L15 + let metrics_array = vec![]; + let _ = Self::new(metrics_array); + + Err(OpentelemetrySinkError::new( + "Opentelemetry metric model is not implemented yet", + )) + } +} diff --git a/src/sinks/opentelemetry/mod.rs b/src/sinks/opentelemetry/mod.rs new file mode 100644 index 000000000..f2cc0c196 --- /dev/null +++ b/src/sinks/opentelemetry/mod.rs @@ -0,0 +1,46 @@ +pub(crate) mod config; +pub(crate) mod encoding; +pub(crate) mod healthcheck; +pub(crate) mod models; +pub(crate) mod service; +pub(crate) mod sink; + +pub(crate) mod logs; +pub(crate) mod metrics; +pub(crate) mod traces; + +#[cfg(feature = "opentelemetry-sink-integration-tests")] +#[cfg(test)] +pub(crate) mod integration_tests; + +use vector_lib::configurable::configurable_component; +use vector_lib::sensitive_string::SensitiveString; + +/// Authentication strategies. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")] +#[configurable(metadata(docs::enum_tag_description = "The authentication strategy to use."))] +pub enum OpentelemetrySinkAuth { + /// HTTP Basic Authentication. + Basic { + /// Basic authentication username. + user: String, + + /// Basic authentication password. + password: String, + }, + + /// Bearer authentication. + /// + /// A bearer token (OAuth2, JWT, etc) is passed as-is. + Bearer { + /// The bearer token to send. + token: SensitiveString, + }, +} + +#[derive(Debug, Clone)] +pub enum Auth { + Basic(crate::http::Auth), +} diff --git a/src/sinks/opentelemetry/models.rs b/src/sinks/opentelemetry/models.rs new file mode 100644 index 000000000..7a2d06857 --- /dev/null +++ b/src/sinks/opentelemetry/models.rs @@ -0,0 +1,232 @@ +use chrono::SecondsFormat; +use std::ops::SubAssign; + +use std::collections::HashMap; +use vector_lib::{ + config::log_schema, + event::{Event, LogEvent, Value}, + lookup::PathPrefix, +}; + +use super::{ + logs::model::OpentelemetryLogsModel, metrics::model::OpentelemetryMetricsModel, + traces::model::OpentelemetryTracesModel, +}; +use opentelemetry::{ + logs::AnyValue as OtlpAnyValue, Array as OtlpArray, InstrumentationLibrary, Key, KeyValue, + StringValue, Value as OtlpValue, +}; +use opentelemetry_sdk::Resource; +use std::{ + borrow::Cow, + time::{Duration, SystemTime}, +}; + +pub fn value_to_otlp_any_value(value: Value) -> OtlpAnyValue { + match &value { + Value::Bytes(bytes) => OtlpAnyValue::from(String::from_utf8_lossy(bytes).into_owned()), + Value::Integer(int) => OtlpAnyValue::from(*int), + Value::Float(float) => OtlpAnyValue::from(*float.as_ref()), + Value::Boolean(bool) => OtlpAnyValue::from(*bool), + Value::Timestamp(timestamp) => { + OtlpAnyValue::from(timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true)) + } + Value::Array(val_list) => val_list + .iter() + .map(|val| value_to_otlp_any_value(val.clone())) + .collect::(), + Value::Object(object) => OtlpAnyValue::Map( + object + .iter() + .map(|(key, value)| { + ( + key.to_string().into(), + value_to_otlp_any_value(value.clone()), + ) + }) + .collect::>(), + ), + Value::Null => OtlpAnyValue::from(""), + _ => OtlpAnyValue::from(""), + } +} + +pub fn value_to_otlp_value(value: Value) -> OtlpValue { + match &value { + Value::Bytes(bytes) => OtlpValue::from(String::from_utf8_lossy(bytes).into_owned()), + Value::Integer(int) => OtlpValue::I64(*int), + Value::Float(float) => OtlpValue::F64(*float.as_ref()), + Value::Boolean(bool) => OtlpValue::Bool(*bool), + Value::Array(val_list) => OtlpValue::Array(value_to_otlp_array(val_list.to_vec())), + Value::Timestamp(timestamp) => { + OtlpValue::from(timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true)) + } + Value::Null => OtlpValue::from(""), + _ => OtlpValue::from(""), + // Other value types: Regex, Object are not supported by the OtlpValue enum. + } +} + +pub fn value_to_otlp_array(values: Vec) -> OtlpArray { + let mut string_values: Vec = vec![]; + + for val in values.iter() { + string_values.push(match &val { + Value::Bytes(bytes) => String::from_utf8_lossy(bytes).into_owned().into(), + Value::Integer(int) => int.to_string().into(), + Value::Float(float) => float.to_string().into(), + Value::Boolean(bool) => bool.to_string().into(), + Value::Timestamp(timestamp) => timestamp + .to_rfc3339_opts(SecondsFormat::AutoSi, true) + .into(), + Value::Null => "".to_string().into(), + _ => "".to_string().into(), + // Other value types: Array Regex, Object are not supported by the OtlpArray enum. + }); + } + + string_values.into() +} + +pub fn value_to_system_time(value: &Value) -> SystemTime { + match value { + Value::Timestamp(time) => { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + + let diff = now + - Duration::from_nanos( + time.to_owned() + .timestamp_nanos_opt() + .unwrap() + .try_into() + .unwrap(), + ); + + let mut ts = SystemTime::now(); + ts.sub_assign(diff); + ts + } + Value::Integer(time) => { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + let diff = now - Duration::from_millis((*time).try_into().unwrap()); + let mut ts = SystemTime::now(); + ts.sub_assign(diff); + ts + } + _ => SystemTime::now(), + } +} + +#[derive(Debug)] +pub struct OpentelemetryResource { + pub attributes: Vec, +} + +impl From<&LogEvent> for OpentelemetryResource { + fn from(log: &LogEvent) -> Self { + let mut attributes = vec![]; + + if let Some(metadata) = log.get((PathPrefix::Event, log_schema().user_metadata_key())) { + if let Some(Value::Object(obj)) = metadata.get("resource") { + for (key, value) in obj.iter() { + attributes.push(KeyValue::new( + key.to_string(), + value_to_otlp_value(value.clone()), + )); + } + } + } + + OpentelemetryResource { attributes } + } +} + +impl From for Resource { + fn from(val: OpentelemetryResource) -> Self { + Resource::new(val.attributes) + } +} + +#[derive(Debug)] +pub struct OpentelemetryScope { + pub name: Cow<'static, str>, + pub version: Option>, + pub schema_url: Option>, + pub attributes: Vec, +} + +impl From<&LogEvent> for OpentelemetryScope { + fn from(log: &LogEvent) -> Self { + let mut name = Cow::from(""); + let mut version = None; + let mut schema_url = None; + let mut attributes = vec![]; + + if let Some(metadata) = log.get((PathPrefix::Event, log_schema().user_metadata_key())) { + if let Some(scope) = metadata.get("scope") { + name = if let Some(Value::Bytes(val)) = scope.get("name") { + Cow::from(String::from_utf8_lossy(val).into_owned()) + } else { + Cow::from("") + }; + + version = if let Some(Value::Bytes(val)) = scope.get("version") { + Some(Cow::from(String::from_utf8_lossy(val).into_owned())) + } else { + None + }; + + schema_url = if let Some(Value::Bytes(val)) = scope.get("schema_url") { + Some(Cow::from(String::from_utf8_lossy(val).into_owned())) + } else { + None + }; + + if let Some(Value::Object(obj)) = scope.get("attributes") { + for (key, value) in obj.iter() { + attributes.push(KeyValue::new( + key.to_string(), + value_to_otlp_value(value.clone()), + )); + } + } + } + } + + Self { + name, + version, + schema_url, + attributes, + } + } +} + +impl From for InstrumentationLibrary { + fn from(val: OpentelemetryScope) -> Self { + InstrumentationLibrary::new(val.name, val.version, val.schema_url, Some(val.attributes)) + } +} + +pub enum OpentelemetryModel { + Logs(OpentelemetryLogsModel), + Metrics(OpentelemetryMetricsModel), + Traces(OpentelemetryTracesModel), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum OpentelemetryModelType { + Logs, + Metrics, + Traces, +} + +pub trait OpentelemetryModelMatch { + fn maybe_match(event: &Event) -> Option + where + Self: Sized; +} diff --git a/src/sinks/opentelemetry/service.rs b/src/sinks/opentelemetry/service.rs new file mode 100644 index 000000000..1387e2c61 --- /dev/null +++ b/src/sinks/opentelemetry/service.rs @@ -0,0 +1,139 @@ +use std::task::{Context, Poll}; + +use crate::{ + http::HttpClient, + mezmo::user_trace::UserLoggingResponse, + sinks::{ + opentelemetry::{config::OpentelemetryEndpoint, models::OpentelemetryModelType, Auth}, + util::Compression, + }, +}; +use bytes::Bytes; +use futures::future::BoxFuture; +use http::{ + header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE}, + Request, +}; +use hyper::Body; +use tower::Service; +use vector_lib::{ + finalization::{EventFinalizers, EventStatus, Finalizable}, + request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, + stream::DriverResponse, +}; + +use super::sink::OpentelemetrySinkError; + +#[derive(Clone, Debug)] +pub struct OpentelemetryApiRequest { + pub payload: Bytes, + pub compression: Compression, + pub metadata: RequestMetadata, + pub finalizers: EventFinalizers, + pub model_type: OpentelemetryModelType, +} + +impl OpentelemetryApiRequest { + const fn get_model_type(&self) -> OpentelemetryModelType { + self.model_type + } +} + +impl Finalizable for OpentelemetryApiRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl MetaDescriptive for OpentelemetryApiRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +#[derive(Debug)] +pub struct OpentelemetryApiResponse { + event_status: EventStatus, + metadata: RequestMetadata, + events_byte_size: GroupedCountByteSize, +} + +impl DriverResponse for OpentelemetryApiResponse { + fn event_status(&self) -> EventStatus { + self.event_status + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.metadata.request_encoded_size()) + } +} + +impl UserLoggingResponse for OpentelemetryApiResponse {} + +#[derive(Clone, Debug)] +pub struct OpentelemetryService { + pub endpoint: OpentelemetryEndpoint, + pub client: HttpClient, + pub auth: Option, +} + +impl Service for OpentelemetryService { + type Response = OpentelemetryApiResponse; + type Error = OpentelemetrySinkError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, mut request: OpentelemetryApiRequest) -> Self::Future { + let mut client = self.client.clone(); + let uri = self.endpoint.endpoint(request.get_model_type()); + + let metadata = std::mem::take(request.metadata_mut()); + let events_byte_size = metadata + .clone() + .into_events_estimated_json_encoded_byte_size(); + let http_request = Request::post(&uri); + + let http_request = http_request.header(CONTENT_TYPE, "application/x-protobuf"); + + let http_request = if let Some(ca) = request.compression.content_encoding() { + http_request.header(CONTENT_ENCODING, ca) + } else { + http_request + }; + + let mut http_request = http_request + .header(CONTENT_LENGTH, request.payload.len()) + .body(Body::from(request.payload)) + .expect("building HTTP request failed unexpectedly"); + + if let Some(auth) = &self.auth { + match auth { + Auth::Basic(http_auth) => http_auth.apply(&mut http_request), + } + } + + Box::pin(async move { + match client.call(http_request).await { + Ok(_) => Ok(OpentelemetryApiResponse { + event_status: EventStatus::Delivered, + metadata: metadata.clone(), + events_byte_size, + }), + Err(error) => Err(OpentelemetrySinkError::new(&format!( + "HTTP request error: {}", + error + ))), + } + }) + } +} diff --git a/src/sinks/opentelemetry/sink.rs b/src/sinks/opentelemetry/sink.rs new file mode 100644 index 000000000..2f4d54a13 --- /dev/null +++ b/src/sinks/opentelemetry/sink.rs @@ -0,0 +1,232 @@ +use std::{fmt::Debug, num::NonZeroUsize}; + +use super::{ + encoding::OpentelemetryEncoder, + logs::model::OpentelemetryLogsModel, + metrics::model::OpentelemetryMetricsModel, + models::{OpentelemetryModel, OpentelemetryModelMatch, OpentelemetryModelType}, + service::OpentelemetryApiRequest, + traces::model::OpentelemetryTracesModel, +}; + +use crate::{ + event::Event, + http::get_http_scheme_from_uri, + internal_events::SinkRequestBuildError, + mezmo::user_trace::UserLoggingError, + sinks::util::{ + metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, + RequestBuilder, SinkBuilderExt, + }, +}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::{BoxStream, StreamExt}; +use tower::Service; +use vector_lib::{ + event::Value, + finalization::{EventFinalizers, Finalizable}, + partition::Partitioner, + request_metadata::RequestMetadata, + sink::StreamSink, + stream::{BatcherSettings, DriverResponse}, +}; + +#[derive(Debug)] +pub struct OpentelemetrySinkError { + message: String, +} + +impl OpentelemetrySinkError { + pub fn new(msg: &str) -> Self { + OpentelemetrySinkError { + message: String::from(msg), + } + } + + pub fn boxed(msg: &str) -> Box { + Box::new(OpentelemetrySinkError { + message: String::from(msg), + }) + } +} + +impl From for OpentelemetrySinkError { + fn from(error: std::io::Error) -> Self { + Self::new(&error.to_string()) + } +} + +impl From for OpentelemetrySinkError { + fn from(error: prost::EncodeError) -> Self { + Self::new(&error.to_string()) + } +} + +impl std::fmt::Display for OpentelemetrySinkError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +impl std::error::Error for OpentelemetrySinkError { + fn description(&self) -> &str { + &self.message + } +} + +impl From for std::io::Error { + fn from(error: OpentelemetrySinkError) -> Self { + Self::new(std::io::ErrorKind::Other, error) + } +} + +impl UserLoggingError for OpentelemetrySinkError { + fn log_msg(&self) -> Option { + Some(self.to_string().into()) + } +} + +#[derive(Clone)] +struct OpentelemetryRequestBuilder { + encoder: OpentelemetryEncoder, + compression: Compression, +} + +impl RequestBuilder<(OpentelemetryModelType, Vec)> for OpentelemetryRequestBuilder { + type Metadata = (OpentelemetryModelType, EventFinalizers); + type Events = Result; + type Encoder = OpentelemetryEncoder; + type Payload = Bytes; + type Request = OpentelemetryApiRequest; + type Error = OpentelemetrySinkError; + + fn compression(&self) -> Compression { + self.compression + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + input: (OpentelemetryModelType, Vec), + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let (model_type, mut events) = input; + let builder = RequestMetadataBuilder::from_events(&events); + + let finalizers = events.take_finalizers(); + let model: Result = match model_type { + OpentelemetryModelType::Logs => Ok(OpentelemetryModel::Logs( + OpentelemetryLogsModel::try_from(events).expect("error with log events input"), + )), + OpentelemetryModelType::Metrics => Ok(OpentelemetryModel::Metrics( + OpentelemetryMetricsModel::try_from(events) + .expect("error with metric events input"), + )), + OpentelemetryModelType::Traces => Ok(OpentelemetryModel::Traces( + OpentelemetryTracesModel::try_from(events).expect("error with trace events input"), + )), + }; + + ((model_type, finalizers), builder, model) + } + + fn build_request( + &self, + batch_metadata: Self::Metadata, + metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + let (model_type, finalizers) = batch_metadata; + + OpentelemetryApiRequest { + compression: self.compression, + payload: payload.into_payload(), + metadata, + finalizers, + model_type, + } + } +} + +struct OpentelemetryTypePartitioner; + +impl Partitioner for OpentelemetryTypePartitioner { + type Item = Event; + type Key = OpentelemetryModelType; + + fn partition(&self, item: &Self::Item) -> Self::Key { + let model_type = OpentelemetryLogsModel::maybe_match(item) + .or(OpentelemetryMetricsModel::maybe_match(item)) + .or(OpentelemetryTracesModel::maybe_match(item)); + + if let Some(model_type) = model_type { + return model_type; + } + + panic!("Unexpected event type") + } +} + +pub struct OpentelemetrySink { + pub service: S, + pub encoder: OpentelemetryEncoder, + pub compression: Compression, + pub batcher_settings: BatcherSettings, +} + +impl OpentelemetrySink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let builder_limit = NonZeroUsize::new(64).unwrap(); + let request_builder = OpentelemetryRequestBuilder { + encoder: self.encoder, + compression: self.compression, + }; + + use http::Uri; + // let protocol = get_http_scheme_from_uri(&self.endpoint); + let protocol = get_http_scheme_from_uri(&Uri::from_static("http://localhost")); + + input + .batched_partitioned(OpentelemetryTypePartitioner, || { + self.batcher_settings.as_byte_size_config() + }) + .request_builder(builder_limit, request_builder) + .filter_map( + |request: Result| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }, + ) + .into_driver(self.service) + .protocol(protocol) + .run() + .await + } +} + +#[async_trait] +impl StreamSink for OpentelemetrySink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/opentelemetry/traces/encoding.rs b/src/sinks/opentelemetry/traces/encoding.rs new file mode 100644 index 000000000..2bbd17ba4 --- /dev/null +++ b/src/sinks/opentelemetry/traces/encoding.rs @@ -0,0 +1,10 @@ +use super::model::OpentelemetryTracesModel; +use crate::sinks::opentelemetry::sink::OpentelemetrySinkError; + +pub fn encode(_model: &OpentelemetryTracesModel) -> Result, OpentelemetrySinkError> { + // Trace model to Protobuf encoding + // https://github.com/open-telemetry/opentelemetry-rust/blob/936c46639aa1521bf49dbffba49bbd9795f8ea58/opentelemetry-otlp/src/exporter/http/trace.rs#L72-L83 + Err(OpentelemetrySinkError::new( + "Opentelemetry traces encoding is not implemented yet", + )) +} diff --git a/src/sinks/opentelemetry/traces/mod.rs b/src/sinks/opentelemetry/traces/mod.rs new file mode 100644 index 000000000..f0a1d8405 --- /dev/null +++ b/src/sinks/opentelemetry/traces/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod encoding; +pub(crate) mod model; diff --git a/src/sinks/opentelemetry/traces/model.rs b/src/sinks/opentelemetry/traces/model.rs new file mode 100644 index 000000000..48d971520 --- /dev/null +++ b/src/sinks/opentelemetry/traces/model.rs @@ -0,0 +1,65 @@ +use crate::sinks::opentelemetry::{ + models::{OpentelemetryModelMatch, OpentelemetryModelType}, + sink::OpentelemetrySinkError, +}; +use opentelemetry_sdk::export::trace::SpanData; +use std::collections::HashMap; +use vector_lib::{ + config::log_schema, + event::{Event, Value}, + lookup::PathPrefix, +}; + +type DataStore = HashMap>; + +#[derive(Debug)] +pub struct OpentelemetryTracesModel(pub Vec); + +impl OpentelemetryModelMatch for OpentelemetryTracesModel { + fn maybe_match(event: &Event) -> Option { + let log = event.clone().into_log(); + let message = log.get_message(); + let metadata = log.get((PathPrefix::Event, log_schema().user_metadata_key())); + + if metadata.and(message).is_some() { + let scope = metadata.unwrap().get("scope"); + + if scope.is_some() { + if let Some(Value::Bytes(ref level)) = metadata.unwrap().get("level") { + let trace_id = message.unwrap().get("trace.id"); + let span_id = message.unwrap().get("span.id"); + + if String::from_utf8_lossy(level).into_owned() == "trace" + && trace_id.and(span_id).is_some() + { + return Some(OpentelemetryModelType::Traces); + } + } + } + } + + None + } +} + +impl OpentelemetryTracesModel { + pub fn new(traces_data_array: Vec) -> Self { + let mut traces_store = DataStore::new(); + traces_store.insert("traces".to_owned(), traces_data_array); + Self(vec![traces_store]) + } +} + +impl TryFrom> for OpentelemetryTracesModel { + type Error = OpentelemetrySinkError; + + fn try_from(_buf_events: Vec) -> Result { + // https://github.com/open-telemetry/opentelemetry-rust/blob/936c46639aa1521bf49dbffba49bbd9795f8ea58/opentelemetry-sdk/src/export/trace.rs#L71 + let traces_array = vec![]; + let _ = Self::new(traces_array); + + Err(OpentelemetrySinkError::new( + "Opentelemetry trace model is not implemented yet", + )) + } +} diff --git a/src/transforms/protobuf_to_log.rs b/src/transforms/protobuf_to_log.rs index 9bb3d43ff..78c9b32dd 100644 --- a/src/transforms/protobuf_to_log.rs +++ b/src/transforms/protobuf_to_log.rs @@ -200,7 +200,7 @@ impl FunctionTransform for ProtobufToLog { #[cfg(test)] mod tests { use super::*; - use chrono::{DateTime, Utc}; + use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use std::collections::BTreeMap; use std::time::Duration; use tokio::sync::mpsc; @@ -255,24 +255,35 @@ mod tests { async fn metric_protobuf_test() { let logs: &[u8] = b"\n\x85\x06\x12\x82\x06\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 0\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 1\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 2\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 3\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 4\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 5\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 6\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 7\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 8\x12$\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x12\n\x10test log line: 9\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 10\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 11\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 12\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 13\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 14\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 15\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 16\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 17\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 18\x12%\t@B\x0f\x00\x00\x00\x00\x00\x1a\x05ERROR*\x13\n\x11test log line: 19"; - let expect_metadata = Value::Object(BTreeMap::from([ + let mut expect_metadata = BTreeMap::from([ ( "headers".to_owned(), Value::Object(BTreeMap::from([("key".into(), "value".into())])), ), ("attributes".to_owned(), Value::Object(BTreeMap::from([]))), ("resource".to_owned(), Value::Object(BTreeMap::from([]))), - ("scope".to_owned(), Value::Object(BTreeMap::from([]))), + ( + "scope".to_owned(), + Value::Object(BTreeMap::from([("schema_url".to_owned(), "".into())])), + ), ("flags".to_owned(), Value::Integer(0)), ("severity_number".to_owned(), Value::Integer(0)), ("severity_text".to_owned(), "ERROR".into()), ("level".to_owned(), "ERROR".into()), ("span_id".to_owned(), "".into()), ("trace_id".to_owned(), "".into()), - ("time".to_owned(), Value::Integer(1)), - ])); + ( + "time".to_owned(), + Value::from( + Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(0_i64, 1000000_u32) + .expect("timestamp should be a valid timestamp"), + ), + ), + ), + ]); - let event = log_event_from_bytes(logs, &expect_metadata); + let event = log_event_from_bytes(logs, &Value::Object(expect_metadata.clone())); let result = do_transform( event.into(), @@ -289,7 +300,11 @@ mod tests { let log = &event.into_log(); let event_metadata = log.get("metadata").expect("Metadata is empty"); - assert_eq!(*event_metadata, expect_metadata); + if let Some(Value::Timestamp(ts)) = event_metadata.get("observed_timestamp") { + expect_metadata.insert("observed_timestamp".into(), Value::from(*ts)); + } + + assert_eq!(*event_metadata, Value::Object(expect_metadata.clone())); } } @@ -412,8 +427,8 @@ mod tests { "scope".to_owned(), Value::Object(BTreeMap::from([ ("attributes".into(), Value::Object(BTreeMap::new())), - ("dropped_attributes_count".into(), Value::Integer(0)), ("name".into(), Value::from("opentelemetry_phoenix")), + ("schema_url".into(), Value::from("")), ("version".into(), Value::from("1.0.0")), ])), ), @@ -473,8 +488,8 @@ mod tests { "scope".to_owned(), Value::Object(BTreeMap::from([ ("attributes".into(), Value::Object(BTreeMap::new())), - ("dropped_attributes_count".into(), Value::Integer(0)), ("name".into(), Value::from("opentelemetry_ecto")), + ("schema_url".into(), Value::from("")), ("version".into(), Value::from("1.0.0")), ])), ),