From 722893ae028cb520292bd7a61b81d779e3a97c39 Mon Sep 17 00:00:00 2001 From: Mike Del Tito Date: Tue, 23 Apr 2024 16:38:09 -0400 Subject: [PATCH] refactor: consolidate handling of otel type conversion from events Handles extracting common values from LogEvent Value types: - `TraceId` - `SpanId` - `TraceState` - `TraceFlags` - Attribute fields (`Vec`) Ref: LOG-19371 --- src/sinks/opentelemetry/logs/model.rs | 58 ++------ src/sinks/opentelemetry/models.rs | 134 +++++++++++++++-- src/sinks/opentelemetry/traces/model.rs | 188 ++++++------------------ 3 files changed, 185 insertions(+), 195 deletions(-) diff --git a/src/sinks/opentelemetry/logs/model.rs b/src/sinks/opentelemetry/logs/model.rs index 08b4062c5..4bdf3a108 100644 --- a/src/sinks/opentelemetry/logs/model.rs +++ b/src/sinks/opentelemetry/logs/model.rs @@ -7,7 +7,7 @@ use vector_lib::{ use opentelemetry::{ logs::{AnyValue as OtlpAnyValue, LogRecord, Severity}, - trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState}, + trace::{SpanContext, TraceState}, }; use opentelemetry_sdk::export::logs::LogData; use std::borrow::Cow; @@ -15,7 +15,8 @@ use std::borrow::Cow; use crate::sinks::opentelemetry::{ models::{ value_to_otlp_any_value, value_to_system_time, OpentelemetryModelMatch, - OpentelemetryModelType, OpentelemetryResource, OpentelemetryScope, + OpentelemetryModelType, OpentelemetryResource, OpentelemetryScope, OpentelemetrySpanId, + OpentelemetryTraceFlags, OpentelemetryTraceId, }, sink::OpentelemetrySinkError, }; @@ -106,49 +107,16 @@ impl TryFrom> for OpentelemetryLogsModel { let raw_span_id = metadata.get("span_id"); if raw_trace_id.or(raw_span_id).is_some() { - let trace_id = if let Some(id) = raw_trace_id { - match &id { - Value::Bytes(bytes) => { - let mut trace_id = [0; 16]; - match faster_hex::hex_decode(bytes, &mut trace_id) { - Ok(_) => TraceId::from_bytes(trace_id), - Err(_) => TraceId::INVALID, - } - } - _ => TraceId::INVALID, - } - } else { - TraceId::INVALID - }; - - let span_id = if let Some(id) = raw_span_id { - match &id { - Value::Bytes(bytes) => { - let mut span_id = [0; 8]; - match faster_hex::hex_decode(bytes, &mut span_id) { - Ok(_) => SpanId::from_bytes(span_id), - Err(_) => SpanId::INVALID, - } - } - _ => SpanId::INVALID, - } - } else { - SpanId::INVALID - }; - - let trace_flags = if let Some(flags) = metadata.get("flags") { - match flags { - Value::Integer(flag) => TraceFlags::new( - u8::try_from(*flag).unwrap_or(TraceFlags::NOT_SAMPLED.to_u8()), - ), - _ => TraceFlags::NOT_SAMPLED, - } - } else { - TraceFlags::NOT_SAMPLED - }; - - let context = - SpanContext::new(trace_id, span_id, trace_flags, false, TraceState::NONE); + let trace_id: OpentelemetryTraceId = raw_trace_id.into(); + let span_id: OpentelemetrySpanId = raw_span_id.into(); + let trace_flags: OpentelemetryTraceFlags = metadata.get("flags").into(); + let context = SpanContext::new( + trace_id.into(), + span_id.into(), + trace_flags.into(), + false, + TraceState::NONE, + ); record_builder = record_builder.with_span_context(&context); } diff --git a/src/sinks/opentelemetry/models.rs b/src/sinks/opentelemetry/models.rs index b55d486ad..f7a2cdc2e 100644 --- a/src/sinks/opentelemetry/models.rs +++ b/src/sinks/opentelemetry/models.rs @@ -1,5 +1,5 @@ use chrono::SecondsFormat; -use std::ops::SubAssign; +use std::{ops::SubAssign, str::FromStr}; use std::collections::HashMap; use vector_lib::{ @@ -13,9 +13,11 @@ use super::{ traces::model::OpentelemetryTracesModel, }; use opentelemetry::{ - logs::AnyValue as OtlpAnyValue, Array as OtlpArray, InstrumentationLibrary, Key, KeyValue, - StringValue, Value as OtlpValue, + logs::AnyValue as OtlpAnyValue, + trace::{SpanId, TraceFlags, TraceId, TraceState}, + Array as OtlpArray, InstrumentationLibrary, Key, KeyValue, StringValue, Value as OtlpValue, }; + use opentelemetry_sdk::Resource; use std::{ borrow::Cow, @@ -119,10 +121,119 @@ pub fn value_to_system_time(value: &Value) -> SystemTime { #[derive(Debug)] pub struct OpentelemetryResource { - pub attributes: Vec, + pub attributes: OpentelemetryAttributes, pub schema_url: Cow<'static, str>, } +pub struct OpentelemetryTraceId(TraceId); + +impl From> for OpentelemetryTraceId { + fn from(bytes: Option<&Value>) -> Self { + match bytes { + Some(Value::Bytes(bytes)) => { + let mut trace_id = [0; 16]; + match faster_hex::hex_decode(bytes, &mut trace_id) { + Ok(_) => Self(TraceId::from_bytes(trace_id)), + Err(_) => Self(TraceId::INVALID), + } + } + _ => Self(TraceId::INVALID), + } + } +} + +impl From for TraceId { + fn from(trace_id: OpentelemetryTraceId) -> TraceId { + trace_id.0 + } +} + +pub struct OpentelemetrySpanId(SpanId); + +impl From> for OpentelemetrySpanId { + fn from(bytes: Option<&Value>) -> Self { + match bytes { + Some(Value::Bytes(bytes)) => { + let mut span_id = [0; 8]; + match faster_hex::hex_decode(bytes, &mut span_id) { + Ok(_) => Self(SpanId::from_bytes(span_id)), + Err(_) => Self(SpanId::INVALID), + } + } + _ => Self(SpanId::INVALID), + } + } +} + +impl From for SpanId { + fn from(span_id: OpentelemetrySpanId) -> Self { + span_id.0 + } +} + +pub struct OpentelemetryTraceState(TraceState); + +impl From> for OpentelemetryTraceState { + fn from(bytes: Option<&Value>) -> Self { + match bytes { + Some(Value::Bytes(bytes)) => { + let str = String::from_utf8_lossy(bytes); + Self(TraceState::from_str(&str).unwrap_or_default()) + } + _ => Self(TraceState::NONE), + } + } +} + +impl From for TraceState { + fn from(state: OpentelemetryTraceState) -> Self { + state.0 + } +} + +pub struct OpentelemetryTraceFlags(TraceFlags); + +impl From> for OpentelemetryTraceFlags { + fn from(bytes: Option<&Value>) -> Self { + match bytes { + Some(Value::Integer(flag)) => Self(TraceFlags::new( + u8::try_from(*flag).unwrap_or(TraceFlags::NOT_SAMPLED.to_u8()), + )), + _ => Self(TraceFlags::NOT_SAMPLED), + } + } +} + +impl From for TraceFlags { + fn from(flags: OpentelemetryTraceFlags) -> Self { + flags.0 + } +} + +#[derive(Default, Debug)] +pub struct OpentelemetryAttributes(Vec); + +impl From> for OpentelemetryAttributes { + fn from(value: Option<&Value>) -> Self { + match value { + Some(Value::Object(obj)) => Self( + obj.iter() + .map(|(key, value)| { + KeyValue::new(key.to_string(), value_to_otlp_value(value.clone())) + }) + .collect(), + ), + _ => Self(vec![]), + } + } +} + +impl From for Vec { + fn from(attrs: OpentelemetryAttributes) -> Self { + attrs.0 + } +} + impl From<&LogEvent> for OpentelemetryResource { fn from(log: &LogEvent) -> Self { let mut attributes = vec![]; @@ -143,7 +254,7 @@ impl From<&LogEvent> for OpentelemetryResource { } OpentelemetryResource { - attributes, + attributes: OpentelemetryAttributes(attributes), schema_url, } } @@ -151,7 +262,7 @@ impl From<&LogEvent> for OpentelemetryResource { impl From for Resource { fn from(val: OpentelemetryResource) -> Self { - Resource::from_schema_url(val.attributes, val.schema_url) + Resource::from_schema_url(Into::>::into(val.attributes), val.schema_url) } } @@ -160,7 +271,7 @@ pub struct OpentelemetryScope { pub name: Cow<'static, str>, pub version: Option>, pub schema_url: Option>, - pub attributes: Vec, + pub attributes: OpentelemetryAttributes, } impl From<&LogEvent> for OpentelemetryScope { @@ -205,14 +316,19 @@ impl From<&LogEvent> for OpentelemetryScope { name, version, schema_url, - attributes, + attributes: OpentelemetryAttributes(attributes), } } } impl From for InstrumentationLibrary { fn from(val: OpentelemetryScope) -> Self { - InstrumentationLibrary::new(val.name, val.version, val.schema_url, Some(val.attributes)) + InstrumentationLibrary::new( + val.name, + val.version, + val.schema_url, + Some(val.attributes.into()), + ) } } diff --git a/src/sinks/opentelemetry/traces/model.rs b/src/sinks/opentelemetry/traces/model.rs index 8d822a203..8c2c9dda7 100644 --- a/src/sinks/opentelemetry/traces/model.rs +++ b/src/sinks/opentelemetry/traces/model.rs @@ -1,18 +1,13 @@ use crate::sinks::opentelemetry::{ models::{ - value_to_otlp_value, value_to_system_time, OpentelemetryModelMatch, OpentelemetryModelType, - OpentelemetryResource, OpentelemetryScope, + value_to_system_time, OpentelemetryAttributes, OpentelemetryModelMatch, + OpentelemetryModelType, OpentelemetryResource, OpentelemetryScope, OpentelemetrySpanId, + OpentelemetryTraceFlags, OpentelemetryTraceId, OpentelemetryTraceState, }, sink::OpentelemetrySinkError, }; -use opentelemetry::{ - trace::{ - Event as TraceEvent, Link, SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, - TraceState, - }, - KeyValue, -}; +use opentelemetry::trace::{Event as TraceEvent, Link, SpanContext, SpanKind, Status}; use opentelemetry_proto::tonic::trace::v1::status::StatusCode; use opentelemetry_sdk::{ @@ -20,7 +15,6 @@ use opentelemetry_sdk::{ trace::{SpanEvents, SpanLinks}, }; use std::borrow::Cow; -use std::str::FromStr; use vector_lib::{ config::log_schema, event::{Event, Value}, @@ -82,18 +76,12 @@ impl TryFrom> for OpentelemetryTracesModel { let instrumentation_lib = OpentelemetryScope::from(trace); // Extract span attributes from metadata - let mut span_attributes: Vec = vec![]; - if let Some(metadata) = trace.get((PathPrefix::Event, log_schema().user_metadata_key())) - { - if let Some(Value::Object(obj)) = metadata.get("attributes") { - for (key, value) in obj.iter() { - span_attributes.push(KeyValue::new( - key.to_string(), - value_to_otlp_value(value.clone()), - )); - } - } - } + let span_attributes: OpentelemetryAttributes = trace + .get(( + PathPrefix::Event, + format!("{}.attributes", log_schema().user_metadata_key()).as_str(), + )) + .into(); // Extract from message if let Some(message) = trace.get_message() { @@ -104,62 +92,25 @@ impl TryFrom> for OpentelemetryTracesModel { _ => Cow::from(""), }; - let trace_id = match message.get("trace_id") { - Some(Value::Bytes(bytes)) => { - let mut trace_id = [0; 16]; - match faster_hex::hex_decode(bytes, &mut trace_id) { - Ok(_) => TraceId::from_bytes(trace_id), - Err(_) => TraceId::INVALID, - } - } - _ => TraceId::INVALID, - }; - - let span_id = match message.get("span_id") { - Some(Value::Bytes(bytes)) => { - let mut span_id = [0; 8]; - match faster_hex::hex_decode(bytes, &mut span_id) { - Ok(_) => SpanId::from_bytes(span_id), - Err(_) => SpanId::INVALID, - } - } - _ => SpanId::INVALID, - }; - - let parent_span_id = match message.get("parent_span_id") { - Some(Value::Bytes(bytes)) => { - let mut parent_span_id = [0; 8]; - match faster_hex::hex_decode(bytes, &mut parent_span_id) { - Ok(_) => SpanId::from_bytes(parent_span_id), - Err(_) => SpanId::INVALID, - } - } - _ => SpanId::INVALID, - }; - // LOG-19724: trace_flags are not currently captured/defined, this field was added // in a more recent version of the protocol, but our source does not include it in the // protocol impl it uses. // https://github.com/open-telemetry/opentelemetry-rust/commit/27b19b60261f342cec0559f26634ca8f02ed02ac#diff-cfa0a91439f7fb81c51a342043e87175f75c5394b0ff5c9aa7e55c3589a7bb11R91-R106 - let trace_flags = match message.get("flags") { - Some(Value::Integer(flag)) => TraceFlags::new( - u8::try_from(*flag).unwrap_or(TraceFlags::NOT_SAMPLED.to_u8()), - ), - _ => TraceFlags::NOT_SAMPLED, - }; - - let trace_state = match message.get("trace_state") { - Some(Value::Bytes(state_bytes)) => { - let str = String::from_utf8_lossy(state_bytes); - TraceState::from_str(&str).unwrap_or_default() - } - _ => TraceState::NONE, - }; + let trace_flags: OpentelemetryTraceFlags = message.get("flags").into(); + let trace_id: OpentelemetryTraceId = message.get("trace_id").into(); + let trace_state: OpentelemetryTraceState = message.get("trace_state").into(); + let span_id: OpentelemetrySpanId = message.get("span_id").into(); + let parent_span_id: OpentelemetrySpanId = message.get("parent_span_id").into(); // TODO: determine correct value for `is_remote`. This marker is not included // in the incoming request/event. - let span_context = - SpanContext::new(trace_id, span_id, trace_flags, false, trace_state); + let span_context = SpanContext::new( + trace_id.into(), + span_id.into(), + trace_flags.into(), + false, + trace_state.into(), + ); let start_time = value_to_system_time(message.get("start_timestamp").unwrap_or(&Value::Null)); @@ -245,12 +196,12 @@ impl TryFrom> for OpentelemetryTracesModel { traces_array.push(SpanData { span_context, - parent_span_id, + parent_span_id: parent_span_id.into(), span_kind, name, start_time, end_time, - attributes: span_attributes, + attributes: span_attributes.into(), dropped_attributes_count, events, links, @@ -274,66 +225,26 @@ impl TryFrom> for OpentelemetryTracesModel { fn value_to_link(value: &Value) -> Option { match value { Value::Object(link) => { - let trace_id = match link.get("trace_id") { - Some(Value::Bytes(bytes)) => { - let mut trace_id = [0; 16]; - match faster_hex::hex_decode(bytes, &mut trace_id) { - Ok(_) => TraceId::from_bytes(trace_id), - Err(_) => TraceId::INVALID, - } - } - _ => TraceId::INVALID, - }; - - let span_id = match link.get("span_id") { - Some(Value::Bytes(bytes)) => { - let mut span_id = [0; 8]; - match faster_hex::hex_decode(bytes, &mut span_id) { - Ok(_) => SpanId::from_bytes(span_id), - Err(_) => SpanId::INVALID, - } - } - _ => SpanId::INVALID, - }; - - let mut span_attributes: Vec = vec![]; - if let Some(Value::Object(obj)) = link.get("attributes") { - for (key, value) in obj.iter() { - span_attributes.push(KeyValue::new( - key.to_string(), - value_to_otlp_value(value.clone()), - )); - } - }; - - let trace_state = match link.get("trace_state") { - Some(Value::Bytes(state_bytes)) => { - let str = String::from_utf8_lossy(state_bytes); - TraceState::from_str(&str).unwrap_or_default() - } - _ => TraceState::NONE, - }; - + let trace_flags: OpentelemetryTraceFlags = link.get("flags").into(); + let trace_state: OpentelemetryTraceState = link.get("trace_state").into(); + let trace_id: OpentelemetryTraceId = link.get("trace_id").into(); + let span_id: OpentelemetrySpanId = link.get("span_id").into(); + let span_attributes: OpentelemetryAttributes = link.get("attributes").into(); let dropped_attributes_count = match link.get("dropped_attributes_count") { Some(Value::Integer(count)) => *count as u32, _ => 0, }; - // LOG-19724: trace_flags are not currently captured/defined, this field was added - // in a more recent version of the protocol, and we have diverged from upstream with - // our own `opentelemetry-rs` implementation. - // https://github.com/open-telemetry/opentelemetry-rust/commit/27b19b60261f342cec0559f26634ca8f02ed02ac#diff-cfa0a91439f7fb81c51a342043e87175f75c5394b0ff5c9aa7e55c3589a7bb11R91-R106 - let trace_flags = match link.get("flags") { - Some(Value::Integer(flag)) => { - TraceFlags::new(u8::try_from(*flag).unwrap_or(TraceFlags::NOT_SAMPLED.to_u8())) - } - _ => TraceFlags::NOT_SAMPLED, - }; - // TODO: determine correct value for `is_remote` - let span_context = SpanContext::new(trace_id, span_id, trace_flags, false, trace_state); + let span_context = SpanContext::new( + trace_id.into(), + span_id.into(), + trace_flags.into(), + false, + trace_state.into(), + ); - let mut link = Link::new(span_context, span_attributes); + let mut link = Link::new(span_context, span_attributes.into()); link.dropped_attributes_count = dropped_attributes_count; Some(link) } @@ -352,17 +263,7 @@ fn value_to_event(value: &Value) -> Option { }; let timestamp = value_to_system_time(event.get("timestamp").unwrap_or(&Value::Null)); - - let mut attributes: Vec = vec![]; - if let Some(Value::Object(obj)) = event.get("attributes") { - for (key, value) in obj.iter() { - attributes.push(KeyValue::new( - key.to_string(), - value_to_otlp_value(value.clone()), - )); - } - }; - + let attributes: OpentelemetryAttributes = event.get("attributes").into(); let dropped_attributes_count = match event.get("dropped_attributes_count") { Some(Value::Integer(count)) => *count as u32, _ => 0, @@ -371,7 +272,7 @@ fn value_to_event(value: &Value) -> Option { Some(TraceEvent::new( name, timestamp, - attributes, + attributes.into(), dropped_attributes_count, )) } @@ -381,17 +282,22 @@ fn value_to_event(value: &Value) -> Option { #[cfg(test)] mod test { - use std::time::SystemTime; + use std::{str::FromStr, time::SystemTime}; use super::*; use crate::event::Value; use chrono::{NaiveDateTime, TimeZone, Utc}; + use opentelemetry::{ + trace::{ + Event as TraceEvent, Link, SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, + TraceState, + }, + KeyValue, + }; use opentelemetry_sdk::{Resource, Scope}; use vector_lib::event::{Event, EventMetadata, LogEvent}; - use opentelemetry::trace::{SpanId, TraceFlags, TraceId}; - #[test] fn test_otlp_sink_trace_model_matcher_matches() { let event = Event::Log(LogEvent::from_map(