From c3bdce4c25c669f16dbce46df3305c368b1e2ea2 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Mon, 18 Jan 2021 10:36:29 -0800 Subject: [PATCH] metrics: implement batch observer (#429) This patch adds an implementation of the metrics batch observer. The API is not identical to the example in [the spec](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/api.md#batch-observer) as it is inconvenient to register metrics after they are moved into the observer callback in rust. Instead the `Meter::batch_observer` method accepts a closure in which instruments can be registered before being moved into the callback. ```rust meter.batch_observer(|batch| { let inst = batch.u64_sum_observer("example").init(); move |result| { result.observe(&[KeyValue::new("a", "1")], inst.observation(42)]); } }); ``` --- .../tests/integration_test.rs | 30 ++++++- opentelemetry/src/metrics/async_instrument.rs | 32 ++++--- opentelemetry/src/metrics/meter.rs | 90 +++++++++++++++---- opentelemetry/src/metrics/mod.rs | 2 +- opentelemetry/src/metrics/noop.rs | 6 +- opentelemetry/src/metrics/observer.rs | 73 +++++++++++++-- opentelemetry/src/metrics/registry.rs | 6 +- opentelemetry/src/metrics/sdk_api.rs | 7 +- opentelemetry/src/sdk/metrics/mod.rs | 44 ++++++--- 9 files changed, 237 insertions(+), 53 deletions(-) diff --git a/opentelemetry-prometheus/tests/integration_test.rs b/opentelemetry-prometheus/tests/integration_test.rs index 4633875244..890e779220 100644 --- a/opentelemetry-prometheus/tests/integration_test.rs +++ b/opentelemetry-prometheus/tests/integration_test.rs @@ -1,6 +1,6 @@ use opentelemetry::sdk::Resource; use opentelemetry::{ - metrics::{MeterProvider, ObserverResult}, + metrics::{BatchObserverResult, MeterProvider, ObserverResult}, KeyValue, }; use opentelemetry_prometheus::PrometheusExporter; @@ -33,6 +33,34 @@ fn free_unused_instruments() { compare_export(&exporter, expected); } +#[test] +fn batch() { + let exporter = opentelemetry_prometheus::exporter() + .with_resource(Resource::new(vec![KeyValue::new("R", "V")])) + .init(); + let meter = exporter.provider().unwrap().meter("test", None); + let mut expected = Vec::new(); + + meter.batch_observer(|batch| { + let uint_observer = batch.u64_value_observer("uint_observer").init(); + let float_observer = batch.f64_value_observer("float_observer").init(); + + move |result: BatchObserverResult| { + result.observe( + &[KeyValue::new("A", "B")], + &[ + uint_observer.observation(2), + float_observer.observation(3.1), + ], + ); + } + }); + + expected.push(r#"uint_observer{A="B",R="V"} 2"#); + expected.push(r#"float_observer{A="B",R="V"} 3.1"#); + compare_export(&exporter, expected); +} + #[test] fn test_add() { let exporter = opentelemetry_prometheus::exporter() diff --git a/opentelemetry/src/metrics/async_instrument.rs b/opentelemetry/src/metrics/async_instrument.rs index 18d2fa60ec..671803817a 100644 --- a/opentelemetry/src/metrics/async_instrument.rs +++ b/opentelemetry/src/metrics/async_instrument.rs @@ -1,6 +1,9 @@ //! Async metrics -use crate::metrics::{sdk_api, Number}; -use crate::KeyValue; +use crate::{ + global, + metrics::{sdk_api, MetricsError, Number}, + KeyValue, +}; use std::fmt; use std::marker; use std::sync::Arc; @@ -33,17 +36,17 @@ impl Observation { } /// A type of callback that `f64` observers run. -type F64ObserverCallback = Box) + Send + Sync + 'static>; +type F64ObserverCallback = Box) + Send + Sync>; /// A type of callback that `u64` observers run. -type U64ObserverCallback = Box) + Send + Sync + 'static>; +type U64ObserverCallback = Box) + Send + Sync>; /// A type of callback that `u64` observers run. -type I64ObserverCallback = Box) + Send + Sync + 'static>; +type I64ObserverCallback = Box) + Send + Sync>; /// A callback argument for use with any Observer instrument that will be /// reported as a batch of observations. -pub type BatchObserverCallback = Box; +type BatchObserverCallback = Box; /// Data passed to an observer callback to capture observations for one /// asynchronous metric instrument. @@ -137,16 +140,17 @@ impl AsyncRunner { /// implementation can be used for batch runners.) pub fn run( &self, - instrument: Arc, + instrument: &Option>, f: fn(&[KeyValue], &[Observation]), ) { - match self { - AsyncRunner::F64(run) => run(ObserverResult::new(instrument, f)), - AsyncRunner::I64(run) => run(ObserverResult::new(instrument, f)), - AsyncRunner::U64(run) => run(ObserverResult::new(instrument, f)), - // TODO: this should not require an instrument to call. consider - // moving to separate struct - AsyncRunner::Batch(run) => run(BatchObserverResult::new(f)), + match (instrument, self) { + (Some(i), AsyncRunner::F64(run)) => run(ObserverResult::new(i.clone(), f)), + (Some(i), AsyncRunner::I64(run)) => run(ObserverResult::new(i.clone(), f)), + (Some(i), AsyncRunner::U64(run)) => run(ObserverResult::new(i.clone(), f)), + (None, AsyncRunner::Batch(run)) => run(BatchObserverResult::new(f)), + _ => global::handle_error(MetricsError::Other( + "Invalid async runner / instrument pair".into(), + )), } } } diff --git a/opentelemetry/src/metrics/meter.rs b/opentelemetry/src/metrics/meter.rs index d223574ca2..22aa2cd8b1 100644 --- a/opentelemetry/src/metrics/meter.rs +++ b/opentelemetry/src/metrics/meter.rs @@ -1,7 +1,7 @@ use crate::sdk::InstrumentationLibrary; use crate::{ metrics::{ - sdk_api, AsyncRunner, BatchObserver, BatchObserverCallback, CounterBuilder, Descriptor, + sdk_api, AsyncRunner, BatchObserver, BatchObserverResult, CounterBuilder, Descriptor, Measurement, NumberKind, ObserverResult, Result, SumObserverBuilder, UpDownCounterBuilder, UpDownSumObserverBuilder, ValueObserverBuilder, ValueRecorderBuilder, }, @@ -93,7 +93,7 @@ impl Meter { SumObserverBuilder::new( self, name.into(), - AsyncRunner::F64(Box::new(callback)), + Some(AsyncRunner::F64(Box::new(callback))), NumberKind::F64, ) } @@ -111,7 +111,7 @@ impl Meter { UpDownSumObserverBuilder::new( self, name.into(), - AsyncRunner::F64(Box::new(callback)), + Some(AsyncRunner::F64(Box::new(callback))), NumberKind::F64, ) } @@ -125,7 +125,7 @@ impl Meter { ValueObserverBuilder::new( self, name.into(), - AsyncRunner::F64(Box::new(callback)), + Some(AsyncRunner::F64(Box::new(callback))), NumberKind::F64, ) } @@ -163,7 +163,7 @@ impl Meter { SumObserverBuilder::new( self, name.into(), - AsyncRunner::I64(Box::new(callback)), + Some(AsyncRunner::I64(Box::new(callback))), NumberKind::I64, ) } @@ -181,7 +181,7 @@ impl Meter { UpDownSumObserverBuilder::new( self, name.into(), - AsyncRunner::I64(Box::new(callback)), + Some(AsyncRunner::I64(Box::new(callback))), NumberKind::I64, ) } @@ -195,7 +195,7 @@ impl Meter { ValueObserverBuilder::new( self, name.into(), - AsyncRunner::I64(Box::new(callback)), + Some(AsyncRunner::I64(Box::new(callback))), NumberKind::I64, ) } @@ -233,7 +233,7 @@ impl Meter { SumObserverBuilder::new( self, name.into(), - AsyncRunner::U64(Box::new(callback)), + Some(AsyncRunner::U64(Box::new(callback))), NumberKind::U64, ) } @@ -251,7 +251,7 @@ impl Meter { UpDownSumObserverBuilder::new( self, name.into(), - AsyncRunner::U64(Box::new(callback)), + Some(AsyncRunner::U64(Box::new(callback))), NumberKind::U64, ) } @@ -265,15 +265,75 @@ impl Meter { ValueObserverBuilder::new( self, name.into(), - AsyncRunner::U64(Box::new(callback)), + Some(AsyncRunner::U64(Box::new(callback))), NumberKind::U64, ) } - /// Creates a new `BatchObserver` that supports making batches of observations for - /// multiple instruments. - pub fn batch_observer(&self, callback: BatchObserverCallback) -> BatchObserver<'_> { - BatchObserver::new(self, AsyncRunner::Batch(callback)) + /// Creates a new `BatchObserver` that supports making batches of observations + /// for multiple instruments or returns an error if instrument initialization + /// fails. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::{global, metrics::BatchObserverResult, KeyValue}; + /// + /// # fn init_observer() -> opentelemetry::metrics::Result<()> { + /// let meter = global::meter("test"); + /// + /// meter.build_batch_observer(|batch| { + /// let instrument = batch.u64_value_observer("test_instrument").try_init()?; + /// + /// Ok(move |result: BatchObserverResult| { + /// result.observe(&[KeyValue::new("my-key", "my-value")], &[instrument.observation(1)]); + /// }) + /// })?; + /// # Ok(()) + /// # } + /// ``` + pub fn build_batch_observer(&self, builder: B) -> Result<()> + where + B: Fn(BatchObserver<'_>) -> Result, + F: Fn(BatchObserverResult) + Send + Sync + 'static, + { + let observer = builder(BatchObserver::new(self))?; + self.core + .new_batch_observer(AsyncRunner::Batch(Box::new(observer))) + } + + /// Creates a new `BatchObserver` that supports making batches of observations + /// for multiple instruments. + /// + /// # Panics + /// + /// Panics if instrument initialization or observer registration returns an + /// error. + /// + /// # Examples + /// + /// ``` + /// use opentelemetry::{global, metrics::BatchObserverResult, KeyValue}; + /// + /// let meter = global::meter("test"); + /// + /// meter.batch_observer(|batch| { + /// let instrument = batch.u64_value_observer("test_instrument").init(); + /// + /// move |result: BatchObserverResult| { + /// result.observe(&[KeyValue::new("my-key", "my-value")], &[instrument.observation(1)]); + /// } + /// }); + /// ``` + pub fn batch_observer(&self, builder: B) + where + B: Fn(BatchObserver<'_>) -> F, + F: Fn(BatchObserverResult) + Send + Sync + 'static, + { + let observer = builder(BatchObserver::new(self)); + self.core + .new_batch_observer(AsyncRunner::Batch(Box::new(observer))) + .unwrap() } /// Atomically record a batch of measurements. @@ -306,7 +366,7 @@ impl Meter { pub(crate) fn new_async_instrument( &self, descriptor: Descriptor, - runner: AsyncRunner, + runner: Option, ) -> Result> { self.core.new_async_instrument(descriptor, runner) } diff --git a/opentelemetry/src/metrics/mod.rs b/opentelemetry/src/metrics/mod.rs index f7d5da4bd7..d7ffc1c2c6 100644 --- a/opentelemetry/src/metrics/mod.rs +++ b/opentelemetry/src/metrics/mod.rs @@ -20,7 +20,7 @@ mod up_down_counter; mod value_recorder; use crate::sdk::export::ExportError; -pub use async_instrument::{AsyncRunner, BatchObserverCallback, Observation, ObserverResult}; +pub use async_instrument::{AsyncRunner, BatchObserverResult, Observation, ObserverResult}; pub use config::InstrumentConfig; pub use counter::{BoundCounter, Counter, CounterBuilder}; pub use descriptor::Descriptor; diff --git a/opentelemetry/src/metrics/noop.rs b/opentelemetry/src/metrics/noop.rs index 2efbaf75a0..3ed88c87f1 100644 --- a/opentelemetry/src/metrics/noop.rs +++ b/opentelemetry/src/metrics/noop.rs @@ -61,7 +61,7 @@ impl MeterCore for NoopMeterCore { fn new_async_instrument( &self, _descriptor: Descriptor, - _runner: AsyncRunner, + _runner: Option, ) -> Result> { Ok(Arc::new(NoopAsyncInstrument::new())) } @@ -74,6 +74,10 @@ impl MeterCore for NoopMeterCore { ) { // Ignored } + + fn new_batch_observer(&self, _runner: AsyncRunner) -> Result<()> { + Ok(()) + } } /// A no-op sync instrument diff --git a/opentelemetry/src/metrics/observer.rs b/opentelemetry/src/metrics/observer.rs index c82007ccd1..49a8749721 100644 --- a/opentelemetry/src/metrics/observer.rs +++ b/opentelemetry/src/metrics/observer.rs @@ -8,12 +8,67 @@ use std::sync::Arc; #[derive(Debug)] pub struct BatchObserver<'a> { meter: &'a Meter, - runner: AsyncRunner, } impl<'a> BatchObserver<'a> { - pub(crate) fn new(meter: &'a Meter, runner: AsyncRunner) -> Self { - BatchObserver { meter, runner } + pub(crate) fn new(meter: &'a Meter) -> Self { + BatchObserver { meter } + } + + /// Creates a new integer `SumObserverBuilder` for `u64` values with the given name. + pub fn u64_sum_observer(&self, name: T) -> SumObserverBuilder<'_, u64> + where + T: Into, + { + SumObserverBuilder::new(self.meter, name.into(), None, NumberKind::U64) + } + + /// Creates a new floating point `SumObserverBuilder` for `f64` values with the given name. + pub fn f64_sum_observer(&self, name: T) -> SumObserverBuilder<'_, f64> + where + T: Into, + { + SumObserverBuilder::new(self.meter, name.into(), None, NumberKind::F64) + } + + /// Creates a new integer `UpDownSumObserverBuilder` for `i64` values with the given name. + pub fn i64_up_down_sum_observer(&self, name: T) -> UpDownSumObserverBuilder<'_, i64> + where + T: Into, + { + UpDownSumObserverBuilder::new(self.meter, name.into(), None, NumberKind::I64) + } + + /// Creates a new floating point `UpDownSumObserverBuilder` for `f64` values with the given name. + pub fn f64_up_down_sum_observer(&self, name: T) -> UpDownSumObserverBuilder<'_, f64> + where + T: Into, + { + UpDownSumObserverBuilder::new(self.meter, name.into(), None, NumberKind::F64) + } + + /// Creates a new integer `ValueObserverBuilder` for `u64` values with the given name. + pub fn u64_value_observer(&self, name: T) -> ValueObserverBuilder<'_, u64> + where + T: Into, + { + ValueObserverBuilder::new(self.meter, name.into(), None, NumberKind::U64) + } + + /// Creates a new integer `ValueObserverBuilder` for `i64` values with the given name. + pub fn i64_value_observer(&self, name: T) -> ValueObserverBuilder<'_, i64> + where + T: Into, + { + ValueObserverBuilder::new(self.meter, name.into(), None, NumberKind::I64) + } + + /// Creates a new floating point `ValueObserverBuilder` for `f64` values with the given name. + pub fn f64_value_observer(&self, name: T) -> ValueObserverBuilder<'_, f64> + where + T: Into, + { + ValueObserverBuilder::new(self.meter, name.into(), None, NumberKind::F64) } } @@ -41,7 +96,7 @@ where pub struct SumObserverBuilder<'a, T> { meter: &'a Meter, descriptor: Descriptor, - runner: AsyncRunner, + runner: Option, _marker: std::marker::PhantomData, } @@ -49,7 +104,7 @@ impl<'a, T> SumObserverBuilder<'a, T> { pub(crate) fn new( meter: &'a Meter, name: String, - runner: AsyncRunner, + runner: Option, number_kind: NumberKind, ) -> Self { SumObserverBuilder { @@ -128,7 +183,7 @@ where pub struct UpDownSumObserverBuilder<'a, T> { meter: &'a Meter, descriptor: Descriptor, - runner: AsyncRunner, + runner: Option, _marker: std::marker::PhantomData, } @@ -136,7 +191,7 @@ impl<'a, T> UpDownSumObserverBuilder<'a, T> { pub(crate) fn new( meter: &'a Meter, name: String, - runner: AsyncRunner, + runner: Option, number_kind: NumberKind, ) -> Self { UpDownSumObserverBuilder { @@ -214,7 +269,7 @@ where pub struct ValueObserverBuilder<'a, T> { meter: &'a Meter, descriptor: Descriptor, - runner: AsyncRunner, + runner: Option, _marker: std::marker::PhantomData, } @@ -222,7 +277,7 @@ impl<'a, T> ValueObserverBuilder<'a, T> { pub(crate) fn new( meter: &'a Meter, name: String, - runner: AsyncRunner, + runner: Option, number_kind: NumberKind, ) -> Self { ValueObserverBuilder { diff --git a/opentelemetry/src/metrics/registry.rs b/opentelemetry/src/metrics/registry.rs index 288dc56f05..c66b8c3aae 100644 --- a/opentelemetry/src/metrics/registry.rs +++ b/opentelemetry/src/metrics/registry.rs @@ -76,7 +76,7 @@ impl MeterCore for UniqueInstrumentMeterCore { fn new_async_instrument( &self, descriptor: Descriptor, - runner: AsyncRunner, + runner: Option, ) -> super::Result { self.async_state .lock() @@ -96,6 +96,10 @@ impl MeterCore for UniqueInstrumentMeterCore { }) }) } + + fn new_batch_observer(&self, runner: AsyncRunner) -> Result<()> { + self.inner.new_batch_observer(runner) + } } fn check_sync_uniqueness( diff --git a/opentelemetry/src/metrics/sdk_api.rs b/opentelemetry/src/metrics/sdk_api.rs index 76b3ca3725..bca8e43892 100644 --- a/opentelemetry/src/metrics/sdk_api.rs +++ b/opentelemetry/src/metrics/sdk_api.rs @@ -19,11 +19,16 @@ pub trait MeterCore: fmt::Debug { fn new_sync_instrument(&self, descriptor: Descriptor) -> Result>; /// Create a new asynchronous instrument implementation. + /// + /// Runner is `None` if used in batch as the batch runner is registered separately. fn new_async_instrument( &self, descriptor: Descriptor, - runner: AsyncRunner, + runner: Option, ) -> Result>; + + /// Register a batch observer + fn new_batch_observer(&self, runner: AsyncRunner) -> Result<()>; } /// A common interface for synchronous and asynchronous instruments. diff --git a/opentelemetry/src/sdk/metrics/mod.rs b/opentelemetry/src/sdk/metrics/mod.rs index f997c85d4f..f76468811e 100644 --- a/opentelemetry/src/sdk/metrics/mod.rs +++ b/opentelemetry/src/sdk/metrics/mod.rs @@ -77,11 +77,20 @@ struct MapKey { instrument_hash: u64, } +type AsyncRunnerPair = (AsyncRunner, Option>); + #[derive(Default, Debug)] struct AsyncInstrumentState { - /// runners maintains the set of runners in the order they were - /// registered. - runners: Vec<(AsyncRunner, Arc)>, + /// The set of runners in the order they were registered that will run each + /// collection interval. + /// + /// Non-batch observers are entered with an instrument, batch observers are + /// entered without an instrument, each is called once allowing both batch and + /// individual observations to be collected. + runners: Vec, + + /// The set of instruments in the order they were registered. + instruments: Vec>, } fn collect_async(labels: &[KeyValue], observations: &[Observation]) { @@ -99,10 +108,10 @@ fn collect_async(labels: &[KeyValue], observations: &[Observation]) { } impl AsyncInstrumentState { + /// Executes the complete set of observer callbacks. fn run(&self) { for (runner, instrument) in self.runners.iter() { - // TODO see if batch needs other logic - runner.run(instrument.clone(), collect_async) + runner.run(instrument, collect_async) } } } @@ -136,16 +145,28 @@ impl AccumulatorCore { fn register( &self, instrument: Arc, - runner: AsyncRunner, + runner: Option, ) -> Result<()> { self.async_instruments .lock() .map_err(Into::into) .map(|mut async_instruments| { - async_instruments.runners.push((runner, instrument)); + if let Some(runner) = runner { + async_instruments + .runners + .push((runner, Some(instrument.clone()))); + }; + async_instruments.instruments.push(instrument); }) } + fn register_runner(&self, runner: AsyncRunner) -> Result<()> { + self.async_instruments + .lock() + .map_err(Into::into) + .map(|mut async_instruments| async_instruments.runners.push((runner, None))) + } + fn collect(&self, locked_processor: &mut dyn LockedProcessor) -> usize { let mut checkpointed = self.observe_async_instruments(locked_processor); checkpointed += self.collect_sync_instruments(locked_processor); @@ -162,7 +183,7 @@ impl AccumulatorCore { async_instruments.run(); - for (_runner, instrument) in &async_instruments.runners { + for instrument in &async_instruments.instruments { if let Some(a) = instrument.as_any().downcast_ref::() { async_collected += self.checkpoint_async(a, locked_processor); } @@ -523,7 +544,6 @@ impl sdk_api::MeterCore for Accumulator { labels: &[KeyValue], measurements: Vec, ) { - // var labelsPtr *label.Set for measure in measurements.into_iter() { if let Some(instrument) = measure .instrument() @@ -540,7 +560,7 @@ impl sdk_api::MeterCore for Accumulator { fn new_async_instrument( &self, descriptor: Descriptor, - runner: AsyncRunner, + runner: Option, ) -> Result> { let instrument = Arc::new(AsyncInstrument { instrument: Arc::new(Instrument { @@ -554,4 +574,8 @@ impl sdk_api::MeterCore for Accumulator { Ok(instrument) } + + fn new_batch_observer(&self, runner: AsyncRunner) -> Result<()> { + self.0.register_runner(runner) + } }