Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mongo db - fix db statement capturing #1512

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-aws-lambda` Adds an option to configure `disable_aws_context_propagation` by
environment variable: `OTEL_LAMBDA_DISABLE_AWS_CONTEXT_PROPAGATION`
([#1507](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1507))
- mongo db - fix db statement capturing
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))


## Version 1.15.0/0.36b0 (2022-12-10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
failed_hook (Callable) -
a function with extra user-defined logic to be performed after the query returns with a failed response
this function signature is: def failed_hook(span: Span, event: CommandFailedEvent) -> None
capture_statement (bool) - an optional value to enable capturing the database statement that is being executed

for example:

Expand Down Expand Up @@ -81,6 +82,9 @@ def failed_hook(span, event):
from opentelemetry import context
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.pymongo.package import _instruments
from opentelemetry.instrumentation.pymongo.utils import (
COMMAND_TO_ATTRIBUTE_MAPPING,
)
from opentelemetry.instrumentation.pymongo.version import __version__
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
Expand All @@ -106,29 +110,28 @@ def __init__(
request_hook: RequestHookT = dummy_callback,
response_hook: ResponseHookT = dummy_callback,
failed_hook: FailedHookT = dummy_callback,
capture_statement: bool = False,
):
self._tracer = tracer
self._span_dict = {}
self.is_enabled = True
self.start_hook = request_hook
self.success_hook = response_hook
self.failed_hook = failed_hook
self.capture_statement = capture_statement

def started(self, event: monitoring.CommandStartedEvent):
"""Method to handle a pymongo CommandStartedEvent"""
if not self.is_enabled or context.get_value(
_SUPPRESS_INSTRUMENTATION_KEY
):
return
command = event.command.get(event.command_name, "")
name = event.database_name
name += "." + event.command_name
statement = event.command_name
if command:
statement += " " + str(command)
command_name = event.command_name
span_name = event.database_name + "." + command_name
avzis marked this conversation as resolved.
Show resolved Hide resolved
statement = self._get_statement_by_command_name(command_name, event)

try:
span = self._tracer.start_span(name, kind=SpanKind.CLIENT)
span = self._tracer.start_span(span_name, kind=SpanKind.CLIENT)
if span.is_recording():
span.set_attribute(
SpanAttributes.DB_SYSTEM, DbSystemValues.MONGODB.value
Expand Down Expand Up @@ -191,6 +194,14 @@ def failed(self, event: monitoring.CommandFailedEvent):
def _pop_span(self, event):
return self._span_dict.pop(_get_span_dict_key(event), None)

def _get_statement_by_command_name(self, command_name, event):
statement = command_name
command_attribute = COMMAND_TO_ATTRIBUTE_MAPPING.get(command_name)
command = event.command.get(command_attribute)
if command and self.capture_statement:
statement += " " + str(command)
return statement


def _get_span_dict_key(event):
if event.connection_id is not None:
Expand Down Expand Up @@ -223,6 +234,7 @@ def _instrument(self, **kwargs):
request_hook = kwargs.get("request_hook", dummy_callback)
response_hook = kwargs.get("response_hook", dummy_callback)
failed_hook = kwargs.get("failed_hook", dummy_callback)
capture_statement = kwargs.get("capture_statement")
# Create and register a CommandTracer only the first time
if self._commandtracer_instance is None:
tracer = get_tracer(__name__, __version__, tracer_provider)
Expand All @@ -232,6 +244,7 @@ def _instrument(self, **kwargs):
request_hook=request_hook,
response_hook=response_hook,
failed_hook=failed_hook,
capture_statement=capture_statement,
)
monitoring.register(self._commandtracer_instance)
# If already created, just enable it
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

COMMAND_TO_ATTRIBUTE_MAPPING = {
"insert": "documents",
"delete": "deletes",
"update": "updates",
"find": "filter",
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_started(self):
span.attributes[SpanAttributes.DB_NAME], "database_name"
)
self.assertEqual(
span.attributes[SpanAttributes.DB_STATEMENT], "command_name find"
span.attributes[SpanAttributes.DB_STATEMENT], "command_name"
)
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_NAME], "test.com"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def setUp(self):
self.instrumentor = PymongoInstrumentor()
self.instrumentor.instrument()
self.instrumentor._commandtracer_instance._tracer = self._tracer
self.instrumentor._commandtracer_instance.capture_statement = True
client = MongoClient(
MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000
)
Expand All @@ -44,7 +45,7 @@ def tearDown(self):
self.instrumentor.uninstrument()
super().tearDown()

def validate_spans(self):
def validate_spans(self, expected_db_statement):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
for span in spans:
Expand All @@ -68,34 +69,65 @@ def validate_spans(self):
self.assertEqual(
pymongo_span.attributes[SpanAttributes.NET_PEER_PORT], MONGODB_PORT
)
self.assertEqual(
pymongo_span.attributes[SpanAttributes.DB_STATEMENT],
expected_db_statement,
)

def test_insert(self):
"""Should create a child span for insert"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.insert_one(
insert_result = self._collection.insert_one(
{"name": "testName", "value": "testValue"}
)
self.validate_spans()
insert_result_id = insert_result.inserted_id

expected_db_statement = (
f"insert [{{'name': 'testName', 'value': 'testValue', '_id': "
f"ObjectId('{insert_result_id}')}}]"
)
self.validate_spans(expected_db_statement)

def test_update(self):
"""Should create a child span for update"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.update_one(
{"name": "testName"}, {"$set": {"value": "someOtherValue"}}
)
self.validate_spans()

expected_db_statement = (
"update [SON([('q', {'name': 'testName'}), ('u', "
"{'$set': {'value': 'someOtherValue'}}), ('multi', False), ('upsert', False)])]"
)
self.validate_spans(expected_db_statement)

def test_find(self):
"""Should create a child span for find"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.find_one()
self.validate_spans()
self._collection.find_one({"name": "testName"})

expected_db_statement = "find {'name': 'testName'}"
self.validate_spans(expected_db_statement)

def test_delete(self):
"""Should create a child span for delete"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.delete_one({"name": "testName"})
self.validate_spans()

expected_db_statement = (
"delete [SON([('q', {'name': 'testName'}), ('limit', 1)])]"
)
self.validate_spans(expected_db_statement)

def test_find_without_capture_statement(self):
"""Should create a child span for find"""
self.instrumentor._commandtracer_instance.capture_statement = False

with self._tracer.start_as_current_span("rootSpan"):
self._collection.find_one({"name": "testName"})

expected_db_statement = "find"
self.validate_spans(expected_db_statement)

def test_uninstrument(self):
# check that integration is working
Expand Down