Skip to content

Commit

Permalink
metrics: implement batch observer (#429)
Browse files Browse the repository at this point in the history
This patch adds an implementation of the metrics batch observer. The API
is not identical to the example in [the
spec](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/api.md#batch-observer)
as it is inconvenient to register metrics after they are moved into the
observer callback in rust. Instead the `Meter::batch_observer` method
accepts a closure in which instruments can be registered before being
moved into the callback.

```rust
meter.batch_observer(|batch| {
    let inst = batch.u64_sum_observer("example").init();

    move |result| {
        result.observe(&[KeyValue::new("a", "1")], inst.observation(42)]);
    }
});
```
  • Loading branch information
jtescher authored Jan 18, 2021
1 parent 6728799 commit c3bdce4
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 53 deletions.
30 changes: 29 additions & 1 deletion opentelemetry-prometheus/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use opentelemetry::sdk::Resource;
use opentelemetry::{
metrics::{MeterProvider, ObserverResult},
metrics::{BatchObserverResult, MeterProvider, ObserverResult},
KeyValue,
};
use opentelemetry_prometheus::PrometheusExporter;
Expand Down Expand Up @@ -33,6 +33,34 @@ fn free_unused_instruments() {
compare_export(&exporter, expected);
}

#[test]
fn batch() {
let exporter = opentelemetry_prometheus::exporter()
.with_resource(Resource::new(vec![KeyValue::new("R", "V")]))
.init();
let meter = exporter.provider().unwrap().meter("test", None);
let mut expected = Vec::new();

meter.batch_observer(|batch| {
let uint_observer = batch.u64_value_observer("uint_observer").init();
let float_observer = batch.f64_value_observer("float_observer").init();

move |result: BatchObserverResult| {
result.observe(
&[KeyValue::new("A", "B")],
&[
uint_observer.observation(2),
float_observer.observation(3.1),
],
);
}
});

expected.push(r#"uint_observer{A="B",R="V"} 2"#);
expected.push(r#"float_observer{A="B",R="V"} 3.1"#);
compare_export(&exporter, expected);
}

#[test]
fn test_add() {
let exporter = opentelemetry_prometheus::exporter()
Expand Down
32 changes: 18 additions & 14 deletions opentelemetry/src/metrics/async_instrument.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Async metrics
use crate::metrics::{sdk_api, Number};
use crate::KeyValue;
use crate::{
global,
metrics::{sdk_api, MetricsError, Number},
KeyValue,
};
use std::fmt;
use std::marker;
use std::sync::Arc;
Expand Down Expand Up @@ -33,17 +36,17 @@ impl Observation {
}

/// A type of callback that `f64` observers run.
type F64ObserverCallback = Box<dyn Fn(ObserverResult<f64>) + Send + Sync + 'static>;
type F64ObserverCallback = Box<dyn Fn(ObserverResult<f64>) + Send + Sync>;

/// A type of callback that `u64` observers run.
type U64ObserverCallback = Box<dyn Fn(ObserverResult<u64>) + Send + Sync + 'static>;
type U64ObserverCallback = Box<dyn Fn(ObserverResult<u64>) + Send + Sync>;

/// A type of callback that `u64` observers run.
type I64ObserverCallback = Box<dyn Fn(ObserverResult<i64>) + Send + Sync + 'static>;
type I64ObserverCallback = Box<dyn Fn(ObserverResult<i64>) + Send + Sync>;

/// A callback argument for use with any Observer instrument that will be
/// reported as a batch of observations.
pub type BatchObserverCallback = Box<dyn Fn(BatchObserverResult) + Send + Sync>;
type BatchObserverCallback = Box<dyn Fn(BatchObserverResult) + Send + Sync>;

/// Data passed to an observer callback to capture observations for one
/// asynchronous metric instrument.
Expand Down Expand Up @@ -137,16 +140,17 @@ impl AsyncRunner {
/// implementation can be used for batch runners.)
pub fn run(
&self,
instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
instrument: &Option<Arc<dyn sdk_api::AsyncInstrumentCore>>,
f: fn(&[KeyValue], &[Observation]),
) {
match self {
AsyncRunner::F64(run) => run(ObserverResult::new(instrument, f)),
AsyncRunner::I64(run) => run(ObserverResult::new(instrument, f)),
AsyncRunner::U64(run) => run(ObserverResult::new(instrument, f)),
// TODO: this should not require an instrument to call. consider
// moving to separate struct
AsyncRunner::Batch(run) => run(BatchObserverResult::new(f)),
match (instrument, self) {
(Some(i), AsyncRunner::F64(run)) => run(ObserverResult::new(i.clone(), f)),
(Some(i), AsyncRunner::I64(run)) => run(ObserverResult::new(i.clone(), f)),
(Some(i), AsyncRunner::U64(run)) => run(ObserverResult::new(i.clone(), f)),
(None, AsyncRunner::Batch(run)) => run(BatchObserverResult::new(f)),
_ => global::handle_error(MetricsError::Other(
"Invalid async runner / instrument pair".into(),
)),
}
}
}
Expand Down
90 changes: 75 additions & 15 deletions opentelemetry/src/metrics/meter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::sdk::InstrumentationLibrary;
use crate::{
metrics::{
sdk_api, AsyncRunner, BatchObserver, BatchObserverCallback, CounterBuilder, Descriptor,
sdk_api, AsyncRunner, BatchObserver, BatchObserverResult, CounterBuilder, Descriptor,
Measurement, NumberKind, ObserverResult, Result, SumObserverBuilder, UpDownCounterBuilder,
UpDownSumObserverBuilder, ValueObserverBuilder, ValueRecorderBuilder,
},
Expand Down Expand Up @@ -93,7 +93,7 @@ impl Meter {
SumObserverBuilder::new(
self,
name.into(),
AsyncRunner::F64(Box::new(callback)),
Some(AsyncRunner::F64(Box::new(callback))),
NumberKind::F64,
)
}
Expand All @@ -111,7 +111,7 @@ impl Meter {
UpDownSumObserverBuilder::new(
self,
name.into(),
AsyncRunner::F64(Box::new(callback)),
Some(AsyncRunner::F64(Box::new(callback))),
NumberKind::F64,
)
}
Expand All @@ -125,7 +125,7 @@ impl Meter {
ValueObserverBuilder::new(
self,
name.into(),
AsyncRunner::F64(Box::new(callback)),
Some(AsyncRunner::F64(Box::new(callback))),
NumberKind::F64,
)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ impl Meter {
SumObserverBuilder::new(
self,
name.into(),
AsyncRunner::I64(Box::new(callback)),
Some(AsyncRunner::I64(Box::new(callback))),
NumberKind::I64,
)
}
Expand All @@ -181,7 +181,7 @@ impl Meter {
UpDownSumObserverBuilder::new(
self,
name.into(),
AsyncRunner::I64(Box::new(callback)),
Some(AsyncRunner::I64(Box::new(callback))),
NumberKind::I64,
)
}
Expand All @@ -195,7 +195,7 @@ impl Meter {
ValueObserverBuilder::new(
self,
name.into(),
AsyncRunner::I64(Box::new(callback)),
Some(AsyncRunner::I64(Box::new(callback))),
NumberKind::I64,
)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ impl Meter {
SumObserverBuilder::new(
self,
name.into(),
AsyncRunner::U64(Box::new(callback)),
Some(AsyncRunner::U64(Box::new(callback))),
NumberKind::U64,
)
}
Expand All @@ -251,7 +251,7 @@ impl Meter {
UpDownSumObserverBuilder::new(
self,
name.into(),
AsyncRunner::U64(Box::new(callback)),
Some(AsyncRunner::U64(Box::new(callback))),
NumberKind::U64,
)
}
Expand All @@ -265,15 +265,75 @@ impl Meter {
ValueObserverBuilder::new(
self,
name.into(),
AsyncRunner::U64(Box::new(callback)),
Some(AsyncRunner::U64(Box::new(callback))),
NumberKind::U64,
)
}

/// Creates a new `BatchObserver` that supports making batches of observations for
/// multiple instruments.
pub fn batch_observer(&self, callback: BatchObserverCallback) -> BatchObserver<'_> {
BatchObserver::new(self, AsyncRunner::Batch(callback))
/// Creates a new `BatchObserver` that supports making batches of observations
/// for multiple instruments or returns an error if instrument initialization
/// fails.
///
/// # Examples
///
/// ```
/// use opentelemetry::{global, metrics::BatchObserverResult, KeyValue};
///
/// # fn init_observer() -> opentelemetry::metrics::Result<()> {
/// let meter = global::meter("test");
///
/// meter.build_batch_observer(|batch| {
/// let instrument = batch.u64_value_observer("test_instrument").try_init()?;
///
/// Ok(move |result: BatchObserverResult| {
/// result.observe(&[KeyValue::new("my-key", "my-value")], &[instrument.observation(1)]);
/// })
/// })?;
/// # Ok(())
/// # }
/// ```
pub fn build_batch_observer<B, F>(&self, builder: B) -> Result<()>
where
B: Fn(BatchObserver<'_>) -> Result<F>,
F: Fn(BatchObserverResult) + Send + Sync + 'static,
{
let observer = builder(BatchObserver::new(self))?;
self.core
.new_batch_observer(AsyncRunner::Batch(Box::new(observer)))
}

/// Creates a new `BatchObserver` that supports making batches of observations
/// for multiple instruments.
///
/// # Panics
///
/// Panics if instrument initialization or observer registration returns an
/// error.
///
/// # Examples
///
/// ```
/// use opentelemetry::{global, metrics::BatchObserverResult, KeyValue};
///
/// let meter = global::meter("test");
///
/// meter.batch_observer(|batch| {
/// let instrument = batch.u64_value_observer("test_instrument").init();
///
/// move |result: BatchObserverResult| {
/// result.observe(&[KeyValue::new("my-key", "my-value")], &[instrument.observation(1)]);
/// }
/// });
/// ```
pub fn batch_observer<B, F>(&self, builder: B)
where
B: Fn(BatchObserver<'_>) -> F,
F: Fn(BatchObserverResult) + Send + Sync + 'static,
{
let observer = builder(BatchObserver::new(self));
self.core
.new_batch_observer(AsyncRunner::Batch(Box::new(observer)))
.unwrap()
}

/// Atomically record a batch of measurements.
Expand Down Expand Up @@ -306,7 +366,7 @@ impl Meter {
pub(crate) fn new_async_instrument(
&self,
descriptor: Descriptor,
runner: AsyncRunner,
runner: Option<AsyncRunner>,
) -> Result<Arc<dyn sdk_api::AsyncInstrumentCore>> {
self.core.new_async_instrument(descriptor, runner)
}
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod up_down_counter;
mod value_recorder;

use crate::sdk::export::ExportError;
pub use async_instrument::{AsyncRunner, BatchObserverCallback, Observation, ObserverResult};
pub use async_instrument::{AsyncRunner, BatchObserverResult, Observation, ObserverResult};
pub use config::InstrumentConfig;
pub use counter::{BoundCounter, Counter, CounterBuilder};
pub use descriptor::Descriptor;
Expand Down
6 changes: 5 additions & 1 deletion opentelemetry/src/metrics/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl MeterCore for NoopMeterCore {
fn new_async_instrument(
&self,
_descriptor: Descriptor,
_runner: AsyncRunner,
_runner: Option<AsyncRunner>,
) -> Result<Arc<dyn AsyncInstrumentCore>> {
Ok(Arc::new(NoopAsyncInstrument::new()))
}
Expand All @@ -74,6 +74,10 @@ impl MeterCore for NoopMeterCore {
) {
// Ignored
}

fn new_batch_observer(&self, _runner: AsyncRunner) -> Result<()> {
Ok(())
}
}

/// A no-op sync instrument
Expand Down
Loading

0 comments on commit c3bdce4

Please sign in to comment.