Skip to content

Commit

Permalink
Bugfix/instrument basic publish in pika (#759)
Browse files Browse the repository at this point in the history
  • Loading branch information
oxeye-nikolay committed Oct 19, 2021
1 parent 433b856 commit c24c77d
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `opentelemetry-sdk-extension-aws` & `opentelemetry-propagator-aws` Release AWS Python SDK Extension as 2.0.1 and AWS Propagator as 1.0.1
([#753](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/753))
- `opentelemetry-instrumentation-pika` Add `_decorate_basic_consume` to ensure post instrumentation `basic_consume` calls are also instrumented.
([#759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/759))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def _uninstrument_channel_functions(channel: Channel) -> None:
function = getattr(channel, function_name)
if hasattr(function, "_original_function"):
channel.__setattr__(function_name, function._original_function)
unwrap(channel, "basic_consume")

@staticmethod
def instrument_channel(
Expand All @@ -90,6 +91,7 @@ def instrument_channel(
PikaInstrumentor._instrument_consumers(
channel._impl._consumers, tracer
)
PikaInstrumentor._decorate_basic_consume(channel, tracer)
PikaInstrumentor._instrument_channel_functions(channel, tracer)

@staticmethod
Expand Down Expand Up @@ -120,6 +122,33 @@ def wrapper(wrapped, instance, args, kwargs):

wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper)

@staticmethod
def _decorate_basic_consume(channel, tracer: Optional[Tracer]) -> None:
def wrapper(wrapped, instance, args, kwargs):
if not hasattr(channel, "_impl"):
_LOG.error(
"Could not find implementation for provided channel!"
)
return wrapped(*args, **kwargs)
current_keys = set(channel._impl._consumers.keys())
return_value = wrapped(*args, **kwargs)
new_key_list = list(
set(channel._impl._consumers.keys()) - current_keys
)
if not new_key_list:
_LOG.error("Could not find added callback")
return return_value
new_key = new_key_list[0]
callback = channel._impl._consumers[new_key]
decorated_callback = utils._decorate_callback(
callback, tracer, new_key
)
setattr(decorated_callback, "_original_callback", callback)
channel._impl._consumers[new_key] = decorated_callback
return return_value

wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper)

def _instrument(self, **kwargs: Dict[str, Any]) -> None:
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
self.__setattr__("__opentelemetry_tracer_provider", tracer_provider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def decorated_callback(
) -> Any:
if not properties:
properties = BasicProperties(headers={})
if properties.headers is None:
properties.headers = {}
ctx = propagate.extract(properties.headers, getter=_pika_getter)
if not ctx:
ctx = context.get_current()
Expand Down Expand Up @@ -74,6 +76,8 @@ def decorated_function(
) -> Any:
if not properties:
properties = BasicProperties(headers={})
if properties.headers is None:
properties.headers = {}
ctx = context.get_current()
span = _get_span(
tracer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,24 @@ def test_instrument_api(self) -> None:
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_basic_consume"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_consumers"
)
def test_instrument(
self,
instrument_consumers: mock.MagicMock,
instrument_basic_consume: mock.MagicMock,
instrument_channel_functions: mock.MagicMock,
):
PikaInstrumentor.instrument_channel(channel=self.channel)
assert hasattr(
self.channel, "_is_instrumented_by_opentelemetry"
), "channel is not marked as instrumented!"
instrument_consumers.assert_called_once()
instrument_basic_consume.assert_called_once()
instrument_channel_functions.assert_called_once()

@mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from wrapt import ObjectProxy

# pylint: disable=unused-import
# pylint: disable=E0611
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY # noqa: F401
from opentelemetry.trace import StatusCode

Expand Down

0 comments on commit c24c77d

Please sign in to comment.