Skip to content

Commit

Permalink
Add Correlations API (#101)
Browse files Browse the repository at this point in the history
This adds an implementation of the [Correlations
API](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/correlationcontext/api.md)
which is used to annotate telemetry, adding context and information to
metrics, traces, and logs. It is an abstract data type represented by a
set of name/value pairs describing user-defined properties.

Example:

```rust
let propagator = CorrelationContextPropagator::new();
// can extract from any type that impls `Carrier`, usually an HTTP header map
let cx = propagator.extract(&headers);

// Iterate over extracted name / value pairs
for (name, value) in cx.correlation_context() {
  // ...
}

// Add new correlations
let cx_with_additions = cx.with_correlations(vec![Key::new("server_id").u64(42)]);

// Inject correlations into http request
propagator.inject_context(&cx_with_additions, &mut headers);
```

Resolves #62
  • Loading branch information
jtescher authored May 7, 2020
1 parent c579248 commit e5f5860
Show file tree
Hide file tree
Showing 6 changed files with 463 additions and 14 deletions.
15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ edition = "2018"

[dependencies]
base64 = { version = "0.12", optional = true }
futures = { version = "0.3.4", optional = true }
lazy_static = "1.4.0"
pin-project = { version = "0.4.6", optional = true }
prometheus = { version = "0.7.0", optional = true }
rand = { version = "0.7.2", optional = true }
serde = { version = "1.0.104", features = ["derive", "rc"], optional = true }
bincode = { version = "1.2.1", optional = true }
futures = { version = "0.3", optional = true }
lazy_static = "1.4"
percent-encoding = "2.0"
pin-project = { version = "0.4", optional = true }
prometheus = { version = "0.7", optional = true }
rand = { version = "0.7", optional = true }
serde = { version = "1.0", features = ["derive", "rc"], optional = true }
bincode = { version = "1.2", optional = true }

[dev-dependencies]
criterion = "0.3.1"
Expand Down
9 changes: 8 additions & 1 deletion examples/basic/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use opentelemetry::api::{
Gauge, GaugeHandle, Key, Measure, MeasureHandle, Meter, MetricOptions, TraceContextExt, Tracer,
Context, CorrelationContextExt, Gauge, GaugeHandle, Key, Measure, MeasureHandle, Meter,
MetricOptions, TraceContextExt, Tracer,
};
use opentelemetry::{global, sdk};

Expand Down Expand Up @@ -33,6 +34,8 @@ fn main() -> thrift::Result<()> {
init_tracer()?;
let meter = sdk::Meter::new("ex_com_basic");

let foo_key = Key::new("ex.com/foo");
let bar_key = Key::new("ex.com/bar");
let lemons_key = Key::new("ex_com_lemons");
let another_key = Key::new("ex_com_another");

Expand All @@ -54,6 +57,10 @@ fn main() -> thrift::Result<()> {

let measure = measure_two.acquire_handle(&common_labels);

let _correlations =
Context::current_with_correlations(vec![foo_key.string("foo1"), bar_key.string("bar1")])
.attach();

global::tracer("component-main").in_span("operation", move |cx| {
let span = cx.span();
span.add_event(
Expand Down
36 changes: 30 additions & 6 deletions src/api/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,17 @@ impl From<&'static str> for Key {
}
}

impl Into<String> for Key {
impl From<String> for Key {
/// Convert a `String` to a `Key`.
fn from(string: String) -> Self {
Key(Cow::from(string))
}
}

impl From<Key> for String {
/// Converts `Key` instances into `String`.
fn into(self) -> String {
self.0.to_string()
fn from(key: Key) -> Self {
key.0.into_owned()
}
}

Expand Down Expand Up @@ -132,11 +139,11 @@ impl From<&str> for Value {
}
}

impl Into<String> for Value {
impl From<Value> for String {
/// Convert `Value` types to `String` for use by exporters that only use
/// `String` values.
fn into(self) -> String {
match self {
fn from(value: Value) -> Self {
match value {
Value::Bool(value) => value.to_string(),
Value::I64(value) => value.to_string(),
Value::U64(value) => value.to_string(),
Expand All @@ -147,6 +154,23 @@ impl Into<String> for Value {
}
}

impl From<&Value> for String {
/// Convert `&Value` types to `String` for use by exporters that only use
/// `String` values.
fn from(value: &Value) -> Self {
match value {
Value::Bool(value) => value.to_string(),
Value::I64(value) => value.to_string(),
Value::U64(value) => value.to_string(),
Value::F64(value) => value.to_string(),
Value::String(value) => value.clone(),
Value::Bytes(value) => {
String::from_utf8(value.clone()).unwrap_or_else(|_| String::new())
}
}
}
}

/// `KeyValue` pairs are used by `LabelSet`s and `Span` attributes.
#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
#[derive(Clone, Debug, PartialEq)]
Expand Down
161 changes: 161 additions & 0 deletions src/api/correlation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
//! # OpenTelemetry Correlation Context API
//!
//! A Correlation Context is used to annotate telemetry, adding context and
//! information to metrics, traces, and logs. It is an abstract data type
//! represented by a set of name/value pairs describing user-defined properties.
//! Each name in a [`CorrelationContext`] is associated with exactly one value.
//! `CorrelationContext`s are serialized according to the editor's draft of
//! the [W3C Correlation Context] specification.
//!
//! [`CorrelationContext`]: struct.CorrelationContext.html
//! [W3C Correlation Context]: https://w3c.github.io/correlation-context/
//!
//! # Examples
//!
//! ```
//! use opentelemetry::api::{
//! CorrelationContextExt, CorrelationContextPropagator, HttpTextFormat, Key
//! };
//! use std::collections::HashMap;
//!
//! // Example correlation value passed in externally via http headers
//! let mut headers = HashMap::new();
//! headers.insert("Correlation-Context", "user_id=1".to_string());
//!
//! let propagator = CorrelationContextPropagator::new();
//! // can extract from any type that impls `Carrier`, usually an HTTP header map
//! let cx = propagator.extract(&headers);
//!
//! // Iterate over extracted name / value pairs
//! for (name, value) in cx.correlation_context() {
//! // ...
//! }
//!
//! // Add new correlations
//! let cx_with_additions = cx.with_correlations(vec![Key::new("server_id").u64(42)]);
//!
//! // Inject correlations into http request
//! propagator.inject_context(&cx_with_additions, &mut headers);
//!
//! let header_value = headers.get("Correlation-Context").expect("header is injected");
//! assert!(header_value.contains("user_id=1"), "still contains previous name / value");
//! assert!(header_value.contains("server_id=42"), "contains new name / value pair");
//! ```
use crate::api;
use std::collections::{hash_map, HashMap};
use std::iter::FromIterator;

mod propagation;

pub use propagation::{CorrelationContextExt, CorrelationContextPropagator};

/// A set of name/value pairs describing user-defined properties across systems.
#[derive(Debug, Default)]
pub struct CorrelationContext {
inner: HashMap<api::Key, api::Value>,
}

impl CorrelationContext {
/// Creates an empty `CorrelationContext`.
pub fn new() -> Self {
CorrelationContext {
inner: HashMap::default(),
}
}

/// Returns a reference to the value associated with a given name
///
/// # Examples
///
/// ```
/// use opentelemetry::api::{CorrelationContext, Value};
///
/// let mut cc = CorrelationContext::new();
/// let _ = cc.insert("my-name", "my-value");
///
/// assert_eq!(cc.get("my-name"), Some(&Value::String("my-value".to_string())))
/// ```
pub fn get<T: Into<api::Key>>(&self, key: T) -> Option<&api::Value> {
self.inner.get(&key.into())
}

/// Inserts a name-value pair into the correlation context.
///
/// If the name was not present, [`None`] is returned. If the name was present,
/// the value is updated, and the old value is returned.
///
/// # Examples
///
/// ```
/// use opentelemetry::api::{CorrelationContext, Value};
///
/// let mut cc = CorrelationContext::new();
/// let _ = cc.insert("my-name", "my-value");
///
/// assert_eq!(cc.get("my-name"), Some(&Value::String("my-value".to_string())))
/// ```
pub fn insert<K, V>(&mut self, key: K, value: V) -> Option<api::Value>
where
K: Into<api::Key>,
V: Into<api::Value>,
{
self.inner.insert(key.into(), value.into())
}

/// Removes a name from the correlation context, returning the value
/// corresponding to the name if the pair was previously in the map.
pub fn remove<K: Into<api::Key>>(&mut self, key: K) -> Option<api::Value> {
self.inner.remove(&key.into())
}

/// Returns the number of attributes for this correlation context
pub fn len(&self) -> usize {
self.inner.len()
}

/// Returns `true` if the correlation context contains no items.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

/// Gets an iterator over the correlation context items, sorted by name.
pub fn iter(&self) -> Iter {
self.into_iter()
}
}

/// An iterator over the entries of a `CorrelationContext`.
#[derive(Debug)]
pub struct Iter<'a>(hash_map::Iter<'a, api::Key, api::Value>);
impl<'a> Iterator for Iter<'a> {
type Item = (&'a api::Key, &'a api::Value);

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}

impl<'a> IntoIterator for &'a CorrelationContext {
type Item = (&'a api::Key, &'a api::Value);
type IntoIter = Iter<'a>;

fn into_iter(self) -> Self::IntoIter {
Iter(self.inner.iter())
}
}

impl FromIterator<(api::Key, api::Value)> for CorrelationContext {
fn from_iter<I: IntoIterator<Item = (api::Key, api::Value)>>(iter: I) -> Self {
CorrelationContext {
inner: iter.into_iter().collect(),
}
}
}

impl FromIterator<api::KeyValue> for CorrelationContext {
fn from_iter<I: IntoIterator<Item = api::KeyValue>>(iter: I) -> Self {
CorrelationContext {
inner: iter.into_iter().map(|kv| (kv.key, kv.value)).collect(),
}
}
}
Loading

0 comments on commit e5f5860

Please sign in to comment.