Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: implement batch observer #429

Merged
merged 3 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion opentelemetry-prometheus/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use opentelemetry::sdk::Resource;
use opentelemetry::{
metrics::{MeterProvider, ObserverResult},
metrics::{BatchObserverResult, MeterProvider, ObserverResult},
KeyValue,
};
use opentelemetry_prometheus::PrometheusExporter;
Expand Down Expand Up @@ -33,6 +33,34 @@ fn free_unused_instruments() {
compare_export(&exporter, expected);
}

#[test]
fn batch() {
let exporter = opentelemetry_prometheus::exporter()
.with_resource(Resource::new(vec![KeyValue::new("R", "V")]))
.init();
let meter = exporter.provider().unwrap().meter("test", None);
let mut expected = Vec::new();

meter.batch_observer(|batch| {
let uint_observer = batch.u64_value_observer("uint_observer").init();
let float_observer = batch.f64_value_observer("float_observer").init();

move |result: BatchObserverResult| {
result.observe(
&[KeyValue::new("A", "B")],
&[
uint_observer.observation(2),
float_observer.observation(3.1),
],
);
}
});

expected.push(r#"uint_observer{A="B",R="V"} 2"#);
expected.push(r#"float_observer{A="B",R="V"} 3.1"#);
compare_export(&exporter, expected);
}

#[test]
fn test_add() {
let exporter = opentelemetry_prometheus::exporter()
Expand Down
32 changes: 18 additions & 14 deletions opentelemetry/src/metrics/async_instrument.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Async metrics
use crate::metrics::{sdk_api, Number};
use crate::KeyValue;
use crate::{
global,
metrics::{sdk_api, MetricsError, Number},
KeyValue,
};
use std::fmt;
use std::marker;
use std::sync::Arc;
Expand Down Expand Up @@ -33,17 +36,17 @@ impl Observation {
}

/// A type of callback that `f64` observers run.
type F64ObserverCallback = Box<dyn Fn(ObserverResult<f64>) + Send + Sync + 'static>;
type F64ObserverCallback = Box<dyn Fn(ObserverResult<f64>) + Send + Sync>;

/// A type of callback that `u64` observers run.
type U64ObserverCallback = Box<dyn Fn(ObserverResult<u64>) + Send + Sync + 'static>;
type U64ObserverCallback = Box<dyn Fn(ObserverResult<u64>) + Send + Sync>;

/// A type of callback that `u64` observers run.
type I64ObserverCallback = Box<dyn Fn(ObserverResult<i64>) + Send + Sync + 'static>;
type I64ObserverCallback = Box<dyn Fn(ObserverResult<i64>) + Send + Sync>;

/// A callback argument for use with any Observer instrument that will be
/// reported as a batch of observations.
pub type BatchObserverCallback = Box<dyn Fn(BatchObserverResult) + Send + Sync>;
type BatchObserverCallback = Box<dyn Fn(BatchObserverResult) + Send + Sync>;

/// Data passed to an observer callback to capture observations for one
/// asynchronous metric instrument.
Expand Down Expand Up @@ -137,16 +140,17 @@ impl AsyncRunner {
/// implementation can be used for batch runners.)
pub fn run(
&self,
instrument: Arc<dyn sdk_api::AsyncInstrumentCore>,
instrument: &Option<Arc<dyn sdk_api::AsyncInstrumentCore>>,
f: fn(&[KeyValue], &[Observation]),
) {
match self {
AsyncRunner::F64(run) => run(ObserverResult::new(instrument, f)),
AsyncRunner::I64(run) => run(ObserverResult::new(instrument, f)),
AsyncRunner::U64(run) => run(ObserverResult::new(instrument, f)),
// TODO: this should not require an instrument to call. consider
// moving to separate struct
AsyncRunner::Batch(run) => run(BatchObserverResult::new(f)),
match (instrument, self) {
(Some(i), AsyncRunner::F64(run)) => run(ObserverResult::new(i.clone(), f)),
(Some(i), AsyncRunner::I64(run)) => run(ObserverResult::new(i.clone(), f)),
(Some(i), AsyncRunner::U64(run)) => run(ObserverResult::new(i.clone(), f)),
(None, AsyncRunner::Batch(run)) => run(BatchObserverResult::new(f)),
_ => global::handle_error(MetricsError::Other(
"Invalid async runner / instrument pair".into(),
)),
}
}
}
Expand Down
90 changes: 75 additions & 15 deletions opentelemetry/src/metrics/meter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::sdk::InstrumentationLibrary;
use crate::{
metrics::{
sdk_api, AsyncRunner, BatchObserver, BatchObserverCallback, CounterBuilder, Descriptor,
sdk_api, AsyncRunner, BatchObserver, BatchObserverResult, CounterBuilder, Descriptor,
Measurement, NumberKind, ObserverResult, Result, SumObserverBuilder, UpDownCounterBuilder,
UpDownSumObserverBuilder, ValueObserverBuilder, ValueRecorderBuilder,
},
Expand Down Expand Up @@ -93,7 +93,7 @@ impl Meter {
SumObserverBuilder::new(
self,
name.into(),
AsyncRunner::F64(Box::new(callback)),
Some(AsyncRunner::F64(Box::new(callback))),
NumberKind::F64,
)
}
Expand All @@ -111,7 +111,7 @@ impl Meter {
UpDownSumObserverBuilder::new(
self,
name.into(),
AsyncRunner::F64(Box::new(callback)),
Some(AsyncRunner::F64(Box::new(callback))),
NumberKind::F64,
)
}
Expand All @@ -125,7 +125,7 @@ impl Meter {
ValueObserverBuilder::new(
self,
name.into(),
AsyncRunner::F64(Box::new(callback)),
Some(AsyncRunner::F64(Box::new(callback))),
NumberKind::F64,
)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ impl Meter {
SumObserverBuilder::new(
self,
name.into(),
AsyncRunner::I64(Box::new(callback)),
Some(AsyncRunner::I64(Box::new(callback))),
NumberKind::I64,
)
}
Expand All @@ -181,7 +181,7 @@ impl Meter {
UpDownSumObserverBuilder::new(
self,
name.into(),
AsyncRunner::I64(Box::new(callback)),
Some(AsyncRunner::I64(Box::new(callback))),
NumberKind::I64,
)
}
Expand All @@ -195,7 +195,7 @@ impl Meter {
ValueObserverBuilder::new(
self,
name.into(),
AsyncRunner::I64(Box::new(callback)),
Some(AsyncRunner::I64(Box::new(callback))),
NumberKind::I64,
)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ impl Meter {
SumObserverBuilder::new(
self,
name.into(),
AsyncRunner::U64(Box::new(callback)),
Some(AsyncRunner::U64(Box::new(callback))),
NumberKind::U64,
)
}
Expand All @@ -251,7 +251,7 @@ impl Meter {
UpDownSumObserverBuilder::new(
self,
name.into(),
AsyncRunner::U64(Box::new(callback)),
Some(AsyncRunner::U64(Box::new(callback))),
NumberKind::U64,
)
}
Expand All @@ -265,15 +265,75 @@ impl Meter {
ValueObserverBuilder::new(
self,
name.into(),
AsyncRunner::U64(Box::new(callback)),
Some(AsyncRunner::U64(Box::new(callback))),
NumberKind::U64,
)
}

/// Creates a new `BatchObserver` that supports making batches of observations for
/// multiple instruments.
pub fn batch_observer(&self, callback: BatchObserverCallback) -> BatchObserver<'_> {
BatchObserver::new(self, AsyncRunner::Batch(callback))
/// Creates a new `BatchObserver` that supports making batches of observations
/// for multiple instruments or returns an error if instrument initialization
/// fails.
///
/// # Examples
///
/// ```
/// use opentelemetry::{global, metrics::BatchObserverResult, KeyValue};
///
/// # fn init_observer() -> opentelemetry::metrics::Result<()> {
/// let meter = global::meter("test");
///
/// meter.build_batch_observer(|batch| {
/// let instrument = batch.u64_value_observer("test_instrument").try_init()?;
///
/// Ok(move |result: BatchObserverResult| {
/// result.observe(&[KeyValue::new("my-key", "my-value")], &[instrument.observation(1)]);
/// })
/// })?;
/// # Ok(())
/// # }
/// ```
pub fn build_batch_observer<B, F>(&self, builder: B) -> Result<()>
where
B: Fn(BatchObserver<'_>) -> Result<F>,
F: Fn(BatchObserverResult) + Send + Sync + 'static,
{
let observer = builder(BatchObserver::new(self))?;
self.core
.new_batch_observer(AsyncRunner::Batch(Box::new(observer)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably just inline the call to builder() here, so you don't have to name the intermediate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting this specifically?

self.core
    .new_batch_observer(AsyncRunner::Batch(Box::new(builder(BatchObserver::new(
        self,
    ))?)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Maybe the way rustfmt treats it's not actually an improvement in this case...

}

/// Creates a new `BatchObserver` that supports making batches of observations
/// for multiple instruments.
///
/// # Panics
///
/// Panics if instrument initialization or observer registration returns an
/// error.
///
/// # Examples
///
/// ```
/// use opentelemetry::{global, metrics::BatchObserverResult, KeyValue};
///
/// let meter = global::meter("test");
///
/// meter.batch_observer(|batch| {
/// let instrument = batch.u64_value_observer("test_instrument").init();
///
/// move |result: BatchObserverResult| {
/// result.observe(&[KeyValue::new("my-key", "my-value")], &[instrument.observation(1)]);
/// }
/// });
/// ```
pub fn batch_observer<B, F>(&self, builder: B)
where
B: Fn(BatchObserver<'_>) -> F,
F: Fn(BatchObserverResult) + Send + Sync + 'static,
{
let observer = builder(BatchObserver::new(self));
self.core
.new_batch_observer(AsyncRunner::Batch(Box::new(observer)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

.unwrap()
}

/// Atomically record a batch of measurements.
Expand Down Expand Up @@ -306,7 +366,7 @@ impl Meter {
pub(crate) fn new_async_instrument(
&self,
descriptor: Descriptor,
runner: AsyncRunner,
runner: Option<AsyncRunner>,
) -> Result<Arc<dyn sdk_api::AsyncInstrumentCore>> {
self.core.new_async_instrument(descriptor, runner)
}
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod up_down_counter;
mod value_recorder;

use crate::sdk::export::ExportError;
pub use async_instrument::{AsyncRunner, BatchObserverCallback, Observation, ObserverResult};
pub use async_instrument::{AsyncRunner, BatchObserverResult, Observation, ObserverResult};
pub use config::InstrumentConfig;
pub use counter::{BoundCounter, Counter, CounterBuilder};
pub use descriptor::Descriptor;
Expand Down
6 changes: 5 additions & 1 deletion opentelemetry/src/metrics/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl MeterCore for NoopMeterCore {
fn new_async_instrument(
&self,
_descriptor: Descriptor,
_runner: AsyncRunner,
_runner: Option<AsyncRunner>,
) -> Result<Arc<dyn AsyncInstrumentCore>> {
Ok(Arc::new(NoopAsyncInstrument::new()))
}
Expand All @@ -74,6 +74,10 @@ impl MeterCore for NoopMeterCore {
) {
// Ignored
}

fn new_batch_observer(&self, _runner: AsyncRunner) -> Result<()> {
Ok(())
}
}

/// A no-op sync instrument
Expand Down
Loading