diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py index 83d210e063b..c353a0e0032 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -25,7 +25,8 @@ from abc import ABC, abstractmethod from logging import getLogger from os import environ -from typing import Optional, cast +from threading import Lock +from typing import List, Optional, cast from opentelemetry.environment_variables import OTEL_PYTHON_METER_PROVIDER from opentelemetry.metrics.instrument import ( @@ -41,7 +42,15 @@ ObservableGauge, ObservableUpDownCounter, UpDownCounter, + _ProxyCounter, + _ProxyHistogram, + _ProxyInstrument, + _ProxyObservableCounter, + _ProxyObservableGauge, + _ProxyObservableUpDownCounter, + _ProxyUpDownCounter, ) +from opentelemetry.util._once import Once from opentelemetry.util._providers import _load_provider _logger = getLogger(__name__) @@ -70,18 +79,33 @@ def get_meter( return _DefaultMeter(name, version=version, schema_url=schema_url) -class ProxyMeterProvider(MeterProvider): +class _ProxyMeterProvider(MeterProvider): + def __init__(self) -> None: + self._lock = Lock() + self._meters: List[_ProxyMeter] = [] + self._real_meter_provider: Optional[MeterProvider] = None + def get_meter( self, name, version=None, schema_url=None, ) -> "Meter": - if _METER_PROVIDER: - return _METER_PROVIDER.get_meter( - name, version=version, schema_url=schema_url - ) - return ProxyMeter(name, version=version, schema_url=schema_url) + with self._lock: + if self._real_meter_provider is not None: + return self._real_meter_provider.get_meter( + name, version, schema_url + ) + + meter = _ProxyMeter(name, version=version, schema_url=schema_url) + self._meters.append(meter) + return meter + + def on_set_meter_provider(self, meter_provider: MeterProvider) -> None: + with self._lock: + self._real_meter_provider = meter_provider + for meter in self._meters: + meter.on_set_meter_provider(meter_provider) class Meter(ABC): @@ -225,7 +249,7 @@ def create_observable_up_down_counter( self._secure_instrument_name(name) -class ProxyMeter(Meter): +class _ProxyMeter(Meter): def __init__( self, name, @@ -233,43 +257,101 @@ def __init__( schema_url=None, ): super().__init__(name, version=version, schema_url=schema_url) + self._lock = Lock() + self._instruments: List[_ProxyInstrument] = [] self._real_meter: Optional[Meter] = None - self._noop_meter = _DefaultMeter( - name, version=version, schema_url=schema_url + + def on_set_meter_provider(self, meter_provider: MeterProvider) -> None: + """Called when a real meter provider is set on the creating _ProxyMeterProvider + + Creates a real backing meter for this instance and notifies all created + instruments so they can create real backing instruments. + """ + real_meter = meter_provider.get_meter( + self._name, self._version, self._schema_url ) - @property - def _meter(self) -> Meter: - if self._real_meter is not None: - return self._real_meter - - if _METER_PROVIDER: - self._real_meter = _METER_PROVIDER.get_meter( - self._name, - self._version, - ) - return self._real_meter - return self._noop_meter + with self._lock: + self._real_meter = real_meter + # notify all proxy instruments of the new meter so they can create + # real instruments to back themselves + for instrument in self._instruments: + instrument.on_meter_set(real_meter) - def create_counter(self, *args, **kwargs) -> Counter: - return self._meter.create_counter(*args, **kwargs) + def create_counter(self, name, unit="", description="") -> Counter: + with self._lock: + if self._real_meter: + return self._real_meter.create_counter(name, unit, description) + proxy = _ProxyCounter(name, unit, description) + self._instruments.append(proxy) + return proxy - def create_up_down_counter(self, *args, **kwargs) -> UpDownCounter: - return self._meter.create_up_down_counter(*args, **kwargs) + def create_up_down_counter( + self, name, unit="", description="" + ) -> UpDownCounter: + with self._lock: + if self._real_meter: + return self._real_meter.create_up_down_counter( + name, unit, description + ) + proxy = _ProxyUpDownCounter(name, unit, description) + self._instruments.append(proxy) + return proxy - def create_observable_counter(self, *args, **kwargs) -> ObservableCounter: - return self._meter.create_observable_counter(*args, **kwargs) + def create_observable_counter( + self, name, callback, unit="", description="" + ) -> ObservableCounter: + with self._lock: + if self._real_meter: + return self._real_meter.create_observable_counter( + name, callback, unit, description + ) + proxy = _ProxyObservableCounter( + name, callback, unit=unit, description=description + ) + self._instruments.append(proxy) + return proxy - def create_histogram(self, *args, **kwargs) -> Histogram: - return self._meter.create_histogram(*args, **kwargs) + def create_histogram(self, name, unit="", description="") -> Histogram: + with self._lock: + if self._real_meter: + return self._real_meter.create_histogram( + name, unit, description + ) + proxy = _ProxyHistogram(name, unit, description) + self._instruments.append(proxy) + return proxy - def create_observable_gauge(self, *args, **kwargs) -> ObservableGauge: - return self._meter.create_observable_gauge(*args, **kwargs) + def create_observable_gauge( + self, name, callback, unit="", description="" + ) -> ObservableGauge: + with self._lock: + if self._real_meter: + return self._real_meter.create_observable_gauge( + name, callback, unit, description + ) + proxy = _ProxyObservableGauge( + name, callback, unit=unit, description=description + ) + self._instruments.append(proxy) + return proxy def create_observable_up_down_counter( - self, *args, **kwargs + self, name, callback, unit="", description="" ) -> ObservableUpDownCounter: - return self._meter.create_observable_up_down_counter(*args, **kwargs) + with self._lock: + if self._real_meter: + return self._real_meter.create_observable_up_down_counter( + name, + callback, + unit, + description, + ) + proxy = _ProxyObservableUpDownCounter( + name, callback, unit=unit, description=description + ) + self._instruments.append(proxy) + return proxy class _DefaultMeter(Meter): @@ -329,8 +411,9 @@ def create_observable_up_down_counter( ) -_METER_PROVIDER = None -_PROXY_METER_PROVIDER = None +_METER_PROVIDER_SET_ONCE = Once() +_METER_PROVIDER: Optional[MeterProvider] = None +_PROXY_METER_PROVIDER = _ProxyMeterProvider() def get_meter( @@ -350,35 +433,40 @@ def get_meter( return meter_provider.get_meter(name, version) +def _set_meter_provider(meter_provider: MeterProvider, log: bool) -> None: + def set_mp() -> None: + global _METER_PROVIDER # pylint: disable=global-statement + _METER_PROVIDER = meter_provider + + # gives all proxies real instruments off the newly set meter provider + _PROXY_METER_PROVIDER.on_set_meter_provider(meter_provider) + + did_set = _METER_PROVIDER_SET_ONCE.do_once(set_mp) + + if log and not did_set: + _logger.warning("Overriding of current MeterProvider is not allowed") + + def set_meter_provider(meter_provider: MeterProvider) -> None: """Sets the current global :class:`~.MeterProvider` object. This can only be done once, a warning will be logged if any furter attempt is made. """ - global _METER_PROVIDER # pylint: disable=global-statement - - if _METER_PROVIDER is not None: - _logger.warning("Overriding of current MeterProvider is not allowed") - return - - _METER_PROVIDER = meter_provider + _set_meter_provider(meter_provider, log=True) def get_meter_provider() -> MeterProvider: """Gets the current global :class:`~.MeterProvider` object.""" - # pylint: disable=global-statement - global _METER_PROVIDER - global _PROXY_METER_PROVIDER if _METER_PROVIDER is None: if OTEL_PYTHON_METER_PROVIDER not in environ.keys(): - if _PROXY_METER_PROVIDER is None: - _PROXY_METER_PROVIDER = ProxyMeterProvider() return _PROXY_METER_PROVIDER - _METER_PROVIDER = cast( - "MeterProvider", - _load_provider(OTEL_PYTHON_METER_PROVIDER, "meter_provider"), + meter_provider: MeterProvider = _load_provider( + OTEL_PYTHON_METER_PROVIDER, "meter_provider" ) - return _METER_PROVIDER + _set_meter_provider(meter_provider, log=False) + + # _METER_PROVIDER will have been set by one thread + return cast("MeterProvider", _METER_PROVIDER) diff --git a/opentelemetry-api/src/opentelemetry/metrics/instrument.py b/opentelemetry-api/src/opentelemetry/metrics/instrument.py index 5d382056408..7ff4de4b6eb 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/instrument.py +++ b/opentelemetry-api/src/opentelemetry/metrics/instrument.py @@ -27,6 +27,7 @@ _TInstrumentCallback = Callable[[], Iterable[Measurement]] _TInstrumentCallbackGenerator = Generator[Iterable[Measurement], None, None] TCallback = Union[_TInstrumentCallback, _TInstrumentCallbackGenerator] +InstrumentT = TypeVar("InstrumentT", bound="Instrument") _logger = getLogger(__name__) @@ -73,6 +74,32 @@ def __init__(self, name, unit="", description=""): self._description = description +class _ProxyInstrument(ABC, Generic[InstrumentT]): + def __init__(self, name, unit, description) -> None: + self._name = name + self._unit = unit + self._description = description + self._real_instrument: Optional[InstrumentT] = None + + def on_meter_set(self, meter: "metrics.Meter") -> None: + """Called when a real meter is set on the creating _ProxyMeter""" + + # We don't need any locking on proxy instruments because it's OK if some + # measurements get dropped while a real backing instrument is being + # created. + self._real_instrument = self._create_real_instrument(meter) + + @abstractmethod + def _create_real_instrument(self, meter: "metrics.Meter") -> InstrumentT: + """Create an instance of the real instrument. Implement this.""" + + +class _ProxyAsynchronousInstrument(_ProxyInstrument[InstrumentT]): + def __init__(self, name, callback, unit, description) -> None: + super().__init__(name, unit, description) + self._callback = callback + + class Synchronous(Instrument): pass @@ -172,6 +199,15 @@ def add(self, amount, attributes=None): return super().add(amount, attributes=attributes) +class _ProxyCounter(_ProxyInstrument[Counter], Counter): + def add(self, amount, attributes=None): + if self._real_instrument: + self._real_instrument.add(amount, attributes) + + def _create_real_instrument(self, meter: "metrics.Meter") -> Counter: + return meter.create_counter(self._name, self._unit, self._description) + + class UpDownCounter(_NonMonotonic, Synchronous): @abstractmethod def add(self, amount, attributes=None): @@ -186,6 +222,17 @@ def add(self, amount, attributes=None): return super().add(amount, attributes=attributes) +class _ProxyUpDownCounter(_ProxyInstrument[UpDownCounter], UpDownCounter): + def add(self, amount, attributes=None): + if self._real_instrument: + self._real_instrument.add(amount, attributes) + + def _create_real_instrument(self, meter: "metrics.Meter") -> UpDownCounter: + return meter.create_up_down_counter( + self._name, self._unit, self._description + ) + + class ObservableCounter(_Monotonic, Asynchronous): def callback(self): measurements = super().callback() @@ -201,8 +248,18 @@ def __init__(self, name, callback, unit="", description=""): super().__init__(name, callback, unit=unit, description=description) -class ObservableUpDownCounter(_NonMonotonic, Asynchronous): +class _ProxyObservableCounter( + _ProxyAsynchronousInstrument[ObservableCounter], ObservableCounter +): + def _create_real_instrument( + self, meter: "metrics.Meter" + ) -> ObservableCounter: + return meter.create_observable_counter( + self._name, self._callback, self._unit, self._description + ) + +class ObservableUpDownCounter(_NonMonotonic, Asynchronous): pass @@ -211,6 +268,18 @@ def __init__(self, name, callback, unit="", description=""): super().__init__(name, callback, unit=unit, description=description) +class _ProxyObservableUpDownCounter( + _ProxyAsynchronousInstrument[ObservableUpDownCounter], + ObservableUpDownCounter, +): + def _create_real_instrument( + self, meter: "metrics.Meter" + ) -> ObservableUpDownCounter: + return meter.create_observable_up_down_counter( + self._name, self._callback, self._unit, self._description + ) + + class Histogram(_Grouping, Synchronous): @abstractmethod def record(self, amount, attributes=None): @@ -225,6 +294,17 @@ def record(self, amount, attributes=None): return super().record(amount, attributes=attributes) +class _ProxyHistogram(_ProxyInstrument[Histogram], Histogram): + def record(self, amount, attributes=None): + if self._real_instrument: + self._real_instrument.record(amount, attributes) + + def _create_real_instrument(self, meter: "metrics.Meter") -> Histogram: + return meter.create_histogram( + self._name, self._unit, self._description + ) + + class ObservableGauge(_Grouping, Asynchronous): pass @@ -232,3 +312,15 @@ class ObservableGauge(_Grouping, Asynchronous): class DefaultObservableGauge(ObservableGauge): def __init__(self, name, callback, unit="", description=""): super().__init__(name, callback, unit=unit, description=description) + + +class _ProxyObservableGauge( + _ProxyAsynchronousInstrument[ObservableGauge], + ObservableGauge, +): + def _create_real_instrument( + self, meter: "metrics.Meter" + ) -> ObservableGauge: + return meter.create_observable_gauge( + self._name, self._callback, self._unit, self._description + )