Skip to content

Commit

Permalink
refactor: consolidate handling of otel type conversion from events
Browse files Browse the repository at this point in the history
Handles extracting common values from LogEvent Value types:
- `TraceId`
- `SpanId`
- `TraceState`
- `TraceFlags`
- Attribute fields (`Vec<KeyVal>`)

Ref: LOG-19371
  • Loading branch information
mdeltito committed Apr 23, 2024
1 parent 86b98dc commit 722893a
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 195 deletions.
58 changes: 13 additions & 45 deletions src/sinks/opentelemetry/logs/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ use vector_lib::{

use opentelemetry::{
logs::{AnyValue as OtlpAnyValue, LogRecord, Severity},
trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
trace::{SpanContext, TraceState},
};
use opentelemetry_sdk::export::logs::LogData;
use std::borrow::Cow;

use crate::sinks::opentelemetry::{
models::{
value_to_otlp_any_value, value_to_system_time, OpentelemetryModelMatch,
OpentelemetryModelType, OpentelemetryResource, OpentelemetryScope,
OpentelemetryModelType, OpentelemetryResource, OpentelemetryScope, OpentelemetrySpanId,
OpentelemetryTraceFlags, OpentelemetryTraceId,
},
sink::OpentelemetrySinkError,
};
Expand Down Expand Up @@ -106,49 +107,16 @@ impl TryFrom<Vec<Event>> for OpentelemetryLogsModel {
let raw_span_id = metadata.get("span_id");

if raw_trace_id.or(raw_span_id).is_some() {
let trace_id = if let Some(id) = raw_trace_id {
match &id {
Value::Bytes(bytes) => {
let mut trace_id = [0; 16];
match faster_hex::hex_decode(bytes, &mut trace_id) {
Ok(_) => TraceId::from_bytes(trace_id),
Err(_) => TraceId::INVALID,
}
}
_ => TraceId::INVALID,
}
} else {
TraceId::INVALID
};

let span_id = if let Some(id) = raw_span_id {
match &id {
Value::Bytes(bytes) => {
let mut span_id = [0; 8];
match faster_hex::hex_decode(bytes, &mut span_id) {
Ok(_) => SpanId::from_bytes(span_id),
Err(_) => SpanId::INVALID,
}
}
_ => SpanId::INVALID,
}
} else {
SpanId::INVALID
};

let trace_flags = if let Some(flags) = metadata.get("flags") {
match flags {
Value::Integer(flag) => TraceFlags::new(
u8::try_from(*flag).unwrap_or(TraceFlags::NOT_SAMPLED.to_u8()),
),
_ => TraceFlags::NOT_SAMPLED,
}
} else {
TraceFlags::NOT_SAMPLED
};

let context =
SpanContext::new(trace_id, span_id, trace_flags, false, TraceState::NONE);
let trace_id: OpentelemetryTraceId = raw_trace_id.into();
let span_id: OpentelemetrySpanId = raw_span_id.into();
let trace_flags: OpentelemetryTraceFlags = metadata.get("flags").into();
let context = SpanContext::new(
trace_id.into(),
span_id.into(),
trace_flags.into(),
false,
TraceState::NONE,
);

record_builder = record_builder.with_span_context(&context);
}
Expand Down
134 changes: 125 additions & 9 deletions src/sinks/opentelemetry/models.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::SecondsFormat;
use std::ops::SubAssign;
use std::{ops::SubAssign, str::FromStr};

use std::collections::HashMap;
use vector_lib::{
Expand All @@ -13,9 +13,11 @@ use super::{
traces::model::OpentelemetryTracesModel,
};
use opentelemetry::{
logs::AnyValue as OtlpAnyValue, Array as OtlpArray, InstrumentationLibrary, Key, KeyValue,
StringValue, Value as OtlpValue,
logs::AnyValue as OtlpAnyValue,
trace::{SpanId, TraceFlags, TraceId, TraceState},
Array as OtlpArray, InstrumentationLibrary, Key, KeyValue, StringValue, Value as OtlpValue,
};

use opentelemetry_sdk::Resource;
use std::{
borrow::Cow,
Expand Down Expand Up @@ -119,10 +121,119 @@ pub fn value_to_system_time(value: &Value) -> SystemTime {

#[derive(Debug)]
pub struct OpentelemetryResource {
pub attributes: Vec<KeyValue>,
pub attributes: OpentelemetryAttributes,
pub schema_url: Cow<'static, str>,
}

pub struct OpentelemetryTraceId(TraceId);

impl From<Option<&Value>> for OpentelemetryTraceId {
fn from(bytes: Option<&Value>) -> Self {
match bytes {
Some(Value::Bytes(bytes)) => {
let mut trace_id = [0; 16];
match faster_hex::hex_decode(bytes, &mut trace_id) {
Ok(_) => Self(TraceId::from_bytes(trace_id)),
Err(_) => Self(TraceId::INVALID),
}
}
_ => Self(TraceId::INVALID),
}
}
}

impl From<OpentelemetryTraceId> for TraceId {
fn from(trace_id: OpentelemetryTraceId) -> TraceId {
trace_id.0
}
}

pub struct OpentelemetrySpanId(SpanId);

impl From<Option<&Value>> for OpentelemetrySpanId {
fn from(bytes: Option<&Value>) -> Self {
match bytes {
Some(Value::Bytes(bytes)) => {
let mut span_id = [0; 8];
match faster_hex::hex_decode(bytes, &mut span_id) {
Ok(_) => Self(SpanId::from_bytes(span_id)),
Err(_) => Self(SpanId::INVALID),
}
}
_ => Self(SpanId::INVALID),
}
}
}

impl From<OpentelemetrySpanId> for SpanId {
fn from(span_id: OpentelemetrySpanId) -> Self {
span_id.0
}
}

pub struct OpentelemetryTraceState(TraceState);

impl From<Option<&Value>> for OpentelemetryTraceState {
fn from(bytes: Option<&Value>) -> Self {
match bytes {
Some(Value::Bytes(bytes)) => {
let str = String::from_utf8_lossy(bytes);
Self(TraceState::from_str(&str).unwrap_or_default())
}
_ => Self(TraceState::NONE),
}
}
}

impl From<OpentelemetryTraceState> for TraceState {
fn from(state: OpentelemetryTraceState) -> Self {
state.0
}
}

pub struct OpentelemetryTraceFlags(TraceFlags);

impl From<Option<&Value>> for OpentelemetryTraceFlags {
fn from(bytes: Option<&Value>) -> Self {
match bytes {
Some(Value::Integer(flag)) => Self(TraceFlags::new(
u8::try_from(*flag).unwrap_or(TraceFlags::NOT_SAMPLED.to_u8()),
)),
_ => Self(TraceFlags::NOT_SAMPLED),
}
}
}

impl From<OpentelemetryTraceFlags> for TraceFlags {
fn from(flags: OpentelemetryTraceFlags) -> Self {
flags.0
}
}

#[derive(Default, Debug)]
pub struct OpentelemetryAttributes(Vec<KeyValue>);

impl From<Option<&Value>> for OpentelemetryAttributes {
fn from(value: Option<&Value>) -> Self {
match value {
Some(Value::Object(obj)) => Self(
obj.iter()
.map(|(key, value)| {
KeyValue::new(key.to_string(), value_to_otlp_value(value.clone()))
})
.collect(),
),
_ => Self(vec![]),
}
}
}

impl From<OpentelemetryAttributes> for Vec<KeyValue> {
fn from(attrs: OpentelemetryAttributes) -> Self {
attrs.0
}
}

impl From<&LogEvent> for OpentelemetryResource {
fn from(log: &LogEvent) -> Self {
let mut attributes = vec![];
Expand All @@ -143,15 +254,15 @@ impl From<&LogEvent> for OpentelemetryResource {
}

OpentelemetryResource {
attributes,
attributes: OpentelemetryAttributes(attributes),
schema_url,
}
}
}

impl From<OpentelemetryResource> for Resource {
fn from(val: OpentelemetryResource) -> Self {
Resource::from_schema_url(val.attributes, val.schema_url)
Resource::from_schema_url(Into::<Vec<KeyValue>>::into(val.attributes), val.schema_url)
}
}

Expand All @@ -160,7 +271,7 @@ pub struct OpentelemetryScope {
pub name: Cow<'static, str>,
pub version: Option<Cow<'static, str>>,
pub schema_url: Option<Cow<'static, str>>,
pub attributes: Vec<KeyValue>,
pub attributes: OpentelemetryAttributes,
}

impl From<&LogEvent> for OpentelemetryScope {
Expand Down Expand Up @@ -205,14 +316,19 @@ impl From<&LogEvent> for OpentelemetryScope {
name,
version,
schema_url,
attributes,
attributes: OpentelemetryAttributes(attributes),
}
}
}

impl From<OpentelemetryScope> for InstrumentationLibrary {
fn from(val: OpentelemetryScope) -> Self {
InstrumentationLibrary::new(val.name, val.version, val.schema_url, Some(val.attributes))
InstrumentationLibrary::new(
val.name,
val.version,
val.schema_url,
Some(val.attributes.into()),
)
}
}

Expand Down
Loading

0 comments on commit 722893a

Please sign in to comment.