Skip to content

Commit

Permalink
boto3sqs: Make propagation compatible with other instrumentations and…
Browse files Browse the repository at this point in the history
… add 'messaging.url' span attribute (#1234)

* boto3sqs: Fix various issues

* do not use 'otel' prefix for propagation keys to make propagation
  compatible with other SQS instrumentations like Node.Js
  Inject propergator.fields keys into the MessageAttributeNames argument
  for 'receive_message' calls to retreive the corresponding message attributes
* add 'messaging.url' span attribute to SQS spans
* add boto3sqs instrumentation to tox.ini to run tests in CI
* add some basic unit tests

* changelog

* fix linting issues

* unset instrumented flag on uninstrument
  • Loading branch information
mariojonke committed Aug 23, 2022
1 parent 7625b82 commit f48b313
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 143 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies.
([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234))

## [1.12.0-0.33b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0-0.33b0) - 2022-08-08

- Adding multiple db connections support for django-instrumentation's sqlcommenter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ package_dir=
packages=find_namespace:
install_requires =
opentelemetry-api ~= 1.12
opentelemetry-semantic-conventions == 0.33b0
opentelemetry-instrumentation == 0.33b0
wrapt >= 1.0.0, < 2.0.0

[options.extras_require]
test =
opentelemetry-test-utils == 0.33b0

[options.packages.find]
where = src
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
Boto3SQSInstrumentor().instrument()
"""
import logging
from typing import Any, Collection, Dict, Generator, List, Optional
from typing import Any, Collection, Dict, Generator, List, Mapping, Optional

import boto3
import botocore.client
Expand All @@ -53,33 +53,31 @@
from .version import __version__

_logger = logging.getLogger(__name__)
# We use this prefix so we can request all instrumentation MessageAttributeNames with a wildcard, without harming
# existing filters
_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER: str = "otel."
_OTEL_IDENTIFIER_LENGTH = len(_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER)

_IS_SQS_INSTRUMENTED_ATTRIBUTE = "_otel_boto3sqs_instrumented"


class Boto3SQSGetter(Getter[CarrierT]):
def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
value = carrier.get(f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", {})
if not value:
msg_attr = carrier.get(key)
if not isinstance(msg_attr, Mapping):
return None

value = msg_attr.get("StringValue")
if value is None:
return None
return [value.get("StringValue")]

return [value]

def keys(self, carrier: CarrierT) -> List[str]:
return [
key[_OTEL_IDENTIFIER_LENGTH:]
if key.startswith(_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER)
else key
for key in carrier.keys()
]
return list(carrier.keys())


class Boto3SQSSetter(Setter[CarrierT]):
def set(self, carrier: CarrierT, key: str, value: str) -> None:
# This is a limitation defined by AWS for SQS MessageAttributes size
if len(carrier.items()) < 10:
carrier[f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] = {
carrier[key] = {
"StringValue": value,
"DataType": "String",
}
Expand Down Expand Up @@ -145,6 +143,7 @@ def instrumentation_dependencies(self) -> Collection[str]:
def _enrich_span(
span: Span,
queue_name: str,
queue_url: str,
conversation_id: Optional[str] = None,
operation: Optional[MessagingOperationValues] = None,
message_id: Optional[str] = None,
Expand All @@ -157,12 +156,12 @@ def _enrich_span(
SpanAttributes.MESSAGING_DESTINATION_KIND,
MessagingDestinationKindValues.QUEUE.value,
)
span.set_attribute(SpanAttributes.MESSAGING_URL, queue_url)

if operation:
span.set_attribute(
SpanAttributes.MESSAGING_OPERATION, operation.value
)
else:
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
if conversation_id:
span.set_attribute(
SpanAttributes.MESSAGING_CONVERSATION_ID, conversation_id
Expand Down Expand Up @@ -190,15 +189,19 @@ def _extract_queue_name_from_url(queue_url: str) -> str:
return queue_url.split("/")[-1]

def _create_processing_span(
self, queue_name: str, receipt_handle: str, message: Dict[str, Any]
self,
queue_name: str,
queue_url: str,
receipt_handle: str,
message: Dict[str, Any],
) -> None:
message_attributes = message.get("MessageAttributes", {})
links = []
ctx = propagate.extract(message_attributes, getter=boto3sqs_getter)
if ctx:
for item in ctx.values():
if hasattr(item, "get_span_context"):
links.append(Link(context=item.get_span_context()))
parent_span_ctx = trace.get_current_span(ctx).get_span_context()
if parent_span_ctx.is_valid:
links.append(Link(context=parent_span_ctx))

span = self._tracer.start_span(
name=f"{queue_name} process", links=links, kind=SpanKind.CONSUMER
)
Expand All @@ -208,11 +211,12 @@ def _create_processing_span(
Boto3SQSInstrumentor._enrich_span(
span,
queue_name,
queue_url,
message_id=message_id,
operation=MessagingOperationValues.PROCESS,
)

def _wrap_send_message(self) -> None:
def _wrap_send_message(self, sqs_class: type) -> None:
def send_wrapper(wrapped, instance, args, kwargs):
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)
Expand All @@ -227,7 +231,7 @@ def send_wrapper(wrapped, instance, args, kwargs):
kind=SpanKind.PRODUCER,
end_on_exit=True,
) as span:
Boto3SQSInstrumentor._enrich_span(span, queue_name)
Boto3SQSInstrumentor._enrich_span(span, queue_name, queue_url)
attributes = kwargs.pop("MessageAttributes", {})
propagate.inject(attributes, setter=boto3sqs_setter)
retval = wrapped(*args, MessageAttributes=attributes, **kwargs)
Expand All @@ -239,9 +243,9 @@ def send_wrapper(wrapped, instance, args, kwargs):
)
return retval

wrap_function_wrapper(self._sqs_class, "send_message", send_wrapper)
wrap_function_wrapper(sqs_class, "send_message", send_wrapper)

def _wrap_send_message_batch(self) -> None:
def _wrap_send_message_batch(self, sqs_class: type) -> None:
def send_batch_wrapper(wrapped, instance, args, kwargs):
queue_url = kwargs.get("QueueUrl")
entries = kwargs.get("Entries")
Expand All @@ -260,12 +264,11 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
for entry in entries:
entry_id = entry["Id"]
span = self._tracer.start_span(
name=f"{queue_name} send",
kind=SpanKind.PRODUCER,
name=f"{queue_name} send", kind=SpanKind.PRODUCER
)
ids_to_spans[entry_id] = span
Boto3SQSInstrumentor._enrich_span(
span, queue_name, conversation_id=entry_id
span, queue_name, queue_url, conversation_id=entry_id
)
with trace.use_span(span):
if "MessageAttributes" not in entry:
Expand All @@ -288,15 +291,15 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
return retval

wrap_function_wrapper(
self._sqs_class, "send_message_batch", send_batch_wrapper
sqs_class, "send_message_batch", send_batch_wrapper
)

def _wrap_receive_message(self) -> None:
def _wrap_receive_message(self, sqs_class: type) -> None:
def receive_message_wrapper(wrapped, instance, args, kwargs):
queue_url = kwargs.get("QueueUrl")
message_attribute_names = kwargs.pop("MessageAttributeNames", [])
message_attribute_names.append(
f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}*"
message_attribute_names.extend(
propagate.get_global_textmap().fields
)
queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url(
queue_url
Expand All @@ -309,6 +312,7 @@ def receive_message_wrapper(wrapped, instance, args, kwargs):
Boto3SQSInstrumentor._enrich_span(
span,
queue_name,
queue_url,
operation=MessagingOperationValues.RECEIVE,
)
retval = wrapped(
Expand All @@ -327,29 +331,31 @@ def receive_message_wrapper(wrapped, instance, args, kwargs):
receipt_handle
)
self._create_processing_span(
queue_name, receipt_handle, message
queue_name, queue_url, receipt_handle, message
)
retval["Messages"] = Boto3SQSInstrumentor.ContextableList(
messages
)
return retval

wrap_function_wrapper(
self._sqs_class, "receive_message", receive_message_wrapper
sqs_class, "receive_message", receive_message_wrapper
)

def _wrap_delete_message(self) -> None:
@staticmethod
def _wrap_delete_message(sqs_class: type) -> None:
def delete_message_wrapper(wrapped, instance, args, kwargs):
receipt_handle = kwargs.get("ReceiptHandle")
if receipt_handle:
Boto3SQSInstrumentor._safe_end_processing_span(receipt_handle)
return wrapped(*args, **kwargs)

wrap_function_wrapper(
self._sqs_class, "delete_message", delete_message_wrapper
sqs_class, "delete_message", delete_message_wrapper
)

def _wrap_delete_message_batch(self) -> None:
@staticmethod
def _wrap_delete_message_batch(sqs_class: type) -> None:
def delete_message_wrapper_batch(wrapped, instance, args, kwargs):
entries = kwargs.get("Entries")
for entry in entries:
Expand All @@ -361,9 +367,7 @@ def delete_message_wrapper_batch(wrapped, instance, args, kwargs):
return wrapped(*args, **kwargs)

wrap_function_wrapper(
self._sqs_class,
"delete_message_batch",
delete_message_wrapper_batch,
sqs_class, "delete_message_batch", delete_message_wrapper_batch
)

def _wrap_client_creation(self) -> None:
Expand All @@ -375,52 +379,58 @@ def _wrap_client_creation(self) -> None:

def client_wrapper(wrapped, instance, args, kwargs):
retval = wrapped(*args, **kwargs)
if not self._did_decorate:
self._decorate_sqs()
self._decorate_sqs(type(retval))
return retval

wrap_function_wrapper(boto3, "client", client_wrapper)

def _decorate_sqs(self) -> None:
def _decorate_sqs(self, sqs_class: type) -> None:
"""
Since botocore creates classes on the fly using schemas, we try to find the class that inherits from the base
class and is SQS to wrap.
"""
# We define SQS client as the only client that implements send_message_batch
sqs_class = [
cls
for cls in botocore.client.BaseClient.__subclasses__()
if hasattr(cls, "send_message_batch")
]
if sqs_class:
self._sqs_class = sqs_class[0]
self._did_decorate = True
self._wrap_send_message()
self._wrap_send_message_batch()
self._wrap_receive_message()
self._wrap_delete_message()
self._wrap_delete_message_batch()

def _un_decorate_sqs(self) -> None:
if self._did_decorate:
unwrap(self._sqs_class, "send_message")
unwrap(self._sqs_class, "send_message_batch")
unwrap(self._sqs_class, "receive_message")
unwrap(self._sqs_class, "delete_message")
unwrap(self._sqs_class, "delete_message_batch")
self._did_decorate = False
if not hasattr(sqs_class, "send_message_batch"):
return

if getattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False):
return

setattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, True)

self._wrap_send_message(sqs_class)
self._wrap_send_message_batch(sqs_class)
self._wrap_receive_message(sqs_class)
self._wrap_delete_message(sqs_class)
self._wrap_delete_message_batch(sqs_class)

@staticmethod
def _un_decorate_sqs(sqs_class: type) -> None:
if not getattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False):
return

unwrap(sqs_class, "send_message")
unwrap(sqs_class, "send_message_batch")
unwrap(sqs_class, "receive_message")
unwrap(sqs_class, "delete_message")
unwrap(sqs_class, "delete_message_batch")

setattr(sqs_class, _IS_SQS_INSTRUMENTED_ATTRIBUTE, False)

def _instrument(self, **kwargs: Dict[str, Any]) -> None:
self._did_decorate: bool = False
self._tracer_provider: Optional[TracerProvider] = kwargs.get(
"tracer_provider"
)
self._tracer: Tracer = trace.get_tracer(
__name__, __version__, self._tracer_provider
)
self._wrap_client_creation()
self._decorate_sqs()

for client_cls in botocore.client.BaseClient.__subclasses__():
self._decorate_sqs(client_cls)

def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
unwrap(boto3, "client")
self._un_decorate_sqs()

for client_cls in botocore.client.BaseClient.__subclasses__():
self._un_decorate_sqs(client_cls)
Loading

0 comments on commit f48b313

Please sign in to comment.