diff --git a/docs-requirements.txt b/docs-requirements.txt index 26b74cccc29..55b647402f6 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -3,6 +3,7 @@ sphinx-rtd-theme~=0.4 sphinx-autodoc-typehints~=1.10.2 # Required by ext packages +ddtrace>=0.34.0 aiohttp ~= 3.0 Deprecated>=1.2.6 django>=2.2 diff --git a/docs/examples/datadog_exporter/README.rst b/docs/examples/datadog_exporter/README.rst new file mode 100644 index 00000000000..961ac9ca05c --- /dev/null +++ b/docs/examples/datadog_exporter/README.rst @@ -0,0 +1,81 @@ +Datadog Exporter Example +======================== + +These examples show how to use OpenTelemetry to send tracing data to Datadog. + + +Basic Example +------------- + +* Installation + +.. code-block:: sh + + pip install opentelemetry-api + pip install opentelemetry-sdk + pip install opentelemetry-ext-datadog + +* Start Datadog Agent + +.. code-block:: sh + + docker run --rm \ + -v /var/run/docker.sock:/var/run/docker.sock:ro \ + -v /proc/:/host/proc/:ro \ + -v /sys/fs/cgroup/:/host/sys/fs/cgroup:ro \ + -p 127.0.0.1:8126:8126/tcp \ + -e DD_API_KEY="" \ + -e DD_APM_ENABLED=true \ + datadog/agent:latest + +* Run example + +.. code-block:: sh + + python datadog_exporter.py + +Auto-Instrumention Example +-------------------------- + +* Installation + +.. code-block:: sh + + pip install opentelemetry-api + pip install opentelemetry-sdk + pip install opentelemetry-ext-datadog + pip install opentelemetry-auto-instrumentation + pip install opentelemetry-ext-flask + pip install flask + pip install requests + +* Start Datadog Agent + +.. code-block:: sh + + docker run --rm \ + -v /var/run/docker.sock:/var/run/docker.sock:ro \ + -v /proc/:/host/proc/:ro \ + -v /sys/fs/cgroup/:/host/sys/fs/cgroup:ro \ + -p 127.0.0.1:8126:8126/tcp \ + -e DD_API_KEY="" \ + -e DD_APM_ENABLED=true \ + datadog/agent:latest + +* Start server + +.. code-block:: sh + + opentelemetry-auto-instrumentation python server.py + +* Run client + +.. code-block:: sh + + opentelemetry-auto-instrumentation python client.py testing + +* Run client with parameter to raise error + +.. code-block:: sh + + opentelemetry-auto-instrumentation python client.py error diff --git a/docs/examples/datadog_exporter/client.py b/docs/examples/datadog_exporter/client.py new file mode 100644 index 00000000000..3969ef04d9a --- /dev/null +++ b/docs/examples/datadog_exporter/client.py @@ -0,0 +1,52 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sys import argv + +from requests import get + +from opentelemetry import propagators, trace +from opentelemetry.ext.datadog import ( + DatadogExportSpanProcessor, + DatadogSpanExporter, +) +from opentelemetry.sdk.trace import TracerProvider + +trace.set_tracer_provider(TracerProvider()) + +trace.get_tracer_provider().add_span_processor( + DatadogExportSpanProcessor( + DatadogSpanExporter( + agent_url="http://localhost:8126", service="example-client" + ) + ) +) + +tracer = trace.get_tracer(__name__) + +assert len(argv) == 2 + +with tracer.start_as_current_span("client"): + + with tracer.start_as_current_span("client-server"): + headers = {} + propagators.inject(dict.__setitem__, headers) + requested = get( + "http://localhost:8082/server_request", + params={"param": argv[1]}, + headers=headers, + ) + + assert requested.status_code == 200 + print(requested.text) diff --git a/docs/examples/datadog_exporter/datadog_exporter.py b/docs/examples/datadog_exporter/datadog_exporter.py new file mode 100644 index 00000000000..0b3af99223e --- /dev/null +++ b/docs/examples/datadog_exporter/datadog_exporter.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from opentelemetry import trace +from opentelemetry.ext.datadog import ( + DatadogExportSpanProcessor, + DatadogSpanExporter, +) +from opentelemetry.sdk.trace import TracerProvider + +trace.set_tracer_provider(TracerProvider()) +tracer = trace.get_tracer(__name__) + +exporter = DatadogSpanExporter( + agent_url="http://localhost:8126", service="example" +) + +span_processor = DatadogExportSpanProcessor(exporter) +trace.get_tracer_provider().add_span_processor(span_processor) + +with tracer.start_as_current_span("foo"): + with tracer.start_as_current_span("bar"): + with tracer.start_as_current_span("baz"): + print("Hello world from OpenTelemetry Python!") diff --git a/docs/examples/datadog_exporter/server.py b/docs/examples/datadog_exporter/server.py new file mode 100644 index 00000000000..0d545e2b7b6 --- /dev/null +++ b/docs/examples/datadog_exporter/server.py @@ -0,0 +1,49 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from flask import Flask, request + +from opentelemetry import trace +from opentelemetry.ext.datadog import ( + DatadogExportSpanProcessor, + DatadogSpanExporter, +) +from opentelemetry.sdk.trace import TracerProvider + +app = Flask(__name__) + +trace.set_tracer_provider(TracerProvider()) + +trace.get_tracer_provider().add_span_processor( + DatadogExportSpanProcessor( + DatadogSpanExporter( + agent_url="http://localhost:8126", service="example-server" + ) + ) +) + +tracer = trace.get_tracer(__name__) + + +@app.route("/server_request") +def server_request(): + param = request.args.get("param") + with tracer.start_as_current_span("server-inner"): + if param == "error": + raise ValueError("forced server error") + return "served: {}".format(param) + + +if __name__ == "__main__": + app.run(port=8082) diff --git a/docs/ext/datadog/datadog.rst b/docs/ext/datadog/datadog.rst new file mode 100644 index 00000000000..5ae7e042890 --- /dev/null +++ b/docs/ext/datadog/datadog.rst @@ -0,0 +1,7 @@ +OpenTelemetry Datadog Exporter +============================== + +.. automodule:: opentelemetry.ext.datadog + :members: + :undoc-members: + :show-inheritance: diff --git a/ext/opentelemetry-ext-datadog/CHANGELOG.md b/ext/opentelemetry-ext-datadog/CHANGELOG.md new file mode 100644 index 00000000000..333b2b3f8b2 --- /dev/null +++ b/ext/opentelemetry-ext-datadog/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +## Unreleased + +- Add exporter to Datadog + ([#572](https://github.com/open-telemetry/opentelemetry-python/pull/572)) + diff --git a/ext/opentelemetry-ext-datadog/README.rst b/ext/opentelemetry-ext-datadog/README.rst new file mode 100644 index 00000000000..9f9a2aeb889 --- /dev/null +++ b/ext/opentelemetry-ext-datadog/README.rst @@ -0,0 +1,29 @@ +OpenTelemetry Datadog Exporter +============================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-ext-datadog.svg + :target: https://pypi.org/project/opentelemetry-ext-datadog/ + +This library allows to export tracing data to `Datadog +`_. OpenTelemetry span event and links are not +supported. + +Installation +------------ + +:: + + pip install opentelemetry-ext-datadog + + +.. _Datadog: https://www.datadoghq.com/ +.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ + + +References +---------- + +* `Datadog `_ +* `OpenTelemetry Project `_ diff --git a/ext/opentelemetry-ext-datadog/setup.cfg b/ext/opentelemetry-ext-datadog/setup.cfg new file mode 100644 index 00000000000..f17192709b0 --- /dev/null +++ b/ext/opentelemetry-ext-datadog/setup.cfg @@ -0,0 +1,47 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-ext-datadog +description = Datadog Span Exporter for OpenTelemetry +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python/ext/opentelemetry-ext-datadog +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.5 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.5 +package_dir= + =src +packages=find_namespace: +install_requires = + ddtrace>=0.34.0 + opentelemetry-api==0.7.dev0 + opentelemetry-sdk==0.7.dev0 + +[options.packages.find] +where = src diff --git a/ext/opentelemetry-ext-datadog/setup.py b/ext/opentelemetry-ext-datadog/setup.py new file mode 100644 index 00000000000..f6573911046 --- /dev/null +++ b/ext/opentelemetry-ext-datadog/setup.py @@ -0,0 +1,27 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "ext", "datadog", "version.py" +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup(version=PACKAGE_INFO["__version__"]) diff --git a/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/__init__.py b/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/__init__.py new file mode 100644 index 00000000000..0c01cf7fba4 --- /dev/null +++ b/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/__init__.py @@ -0,0 +1,51 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The **OpenTelemetry Datadog Exporter** provides a span exporter from +`OpenTelemetry`_ traces to `Datadog`_ by using the Datadog Agent. + +Usage +----- + +.. code:: python + + from opentelemetry import trace + from opentelemetry.ext.datadog import DatadogExportSpanProcessor, DatadogSpanExporter + from opentelemetry.sdk.trace import TracerProvider + + trace.set_tracer_provider(TracerProvider()) + tracer = trace.get_tracer(__name__) + + exporter = DatadogSpanExporter( + agent_url="http://agent:8126", service="my-helloworld-service" + ) + + span_processor = DatadogExportSpanProcessor(exporter) + trace.get_tracer_provider().add_span_processor(span_processor) + + with tracer.start_as_current_span("foo"): + print("Hello world!") + +API +--- +.. _Datadog: https://www.datadoghq.com/ +.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ +""" +# pylint: disable=import-error + +from .exporter import DatadogSpanExporter +from .spanprocessor import DatadogExportSpanProcessor + +__all__ = ["DatadogExportSpanProcessor", "DatadogSpanExporter"] diff --git a/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/exporter.py b/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/exporter.py new file mode 100644 index 00000000000..4843200a2c0 --- /dev/null +++ b/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/exporter.py @@ -0,0 +1,204 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +from urllib.parse import urlparse + +from ddtrace.ext import SpanTypes as DatadogSpanTypes +from ddtrace.internal.writer import AgentWriter +from ddtrace.span import Span as DatadogSpan + +import opentelemetry.trace as trace_api +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.trace.status import StatusCanonicalCode + +logger = logging.getLogger(__name__) + + +DEFAULT_AGENT_URL = "http://localhost:8126" +_INSTRUMENTATION_SPAN_TYPES = { + "opentelemetry.ext.aiohttp-client": DatadogSpanTypes.HTTP, + "opentelemetry.ext.dbapi": DatadogSpanTypes.SQL, + "opentelemetry.ext.django": DatadogSpanTypes.WEB, + "opentelemetry.ext.flask": DatadogSpanTypes.WEB, + "opentelemetry.ext.grpc": DatadogSpanTypes.GRPC, + "opentelemetry.ext.jinja2": DatadogSpanTypes.TEMPLATE, + "opentelemetry.ext.mysql": DatadogSpanTypes.SQL, + "opentelemetry.ext.psycopg2": DatadogSpanTypes.SQL, + "opentelemetry.ext.pymongo": DatadogSpanTypes.MONGODB, + "opentelemetry.ext.pymysql": DatadogSpanTypes.SQL, + "opentelemetry.ext.redis": DatadogSpanTypes.REDIS, + "opentelemetry.ext.requests": DatadogSpanTypes.HTTP, + "opentelemetry.ext.sqlalchemy": DatadogSpanTypes.SQL, + "opentelemetry.ext.wsgi": DatadogSpanTypes.WEB, +} + + +class DatadogSpanExporter(SpanExporter): + """Datadog span exporter for OpenTelemetry. + + Args: + agent_url: The url of the Datadog Agent or use `DD_TRACE_AGENT_URL` environment variable + service: The service to be used for the application or use `DD_SERVICE` environment variable + """ + + def __init__(self, agent_url=None, service=None): + self.agent_url = ( + agent_url + if agent_url + else os.environ.get("DD_TRACE_AGENT_URL", DEFAULT_AGENT_URL) + ) + self.service = service if service else os.environ.get("DD_SERVICE") + self._agent_writer = None + + @property + def agent_writer(self): + if self._agent_writer is None: + url_parsed = urlparse(self.agent_url) + if url_parsed.scheme in ("http", "https"): + self._agent_writer = AgentWriter( + hostname=url_parsed.hostname, + port=url_parsed.port, + https=url_parsed.scheme == "https", + ) + elif url_parsed.scheme == "unix": + self._agent_writer = AgentWriter(uds_path=url_parsed.path) + else: + raise ValueError( + "Unknown scheme `%s` for agent URL" % url_parsed.scheme + ) + return self._agent_writer + + def export(self, spans): + datadog_spans = self._translate_to_datadog(spans) + + self.agent_writer.write(spans=datadog_spans) + + return SpanExportResult.SUCCESS + + def shutdown(self): + if self.agent_writer.started: + self.agent_writer.stop() + self.agent_writer.join(self.agent_writer.exit_timeout) + + def _translate_to_datadog(self, spans): + datadog_spans = [] + + for span in spans: + trace_id, parent_id, span_id = _get_trace_ids(span) + + # datadog Span is initialized with a reference to the tracer which is + # used to record the span when it is finished. We can skip ignore this + # because we are not calling the finish method and explictly set the + # duration. + tracer = None + + datadog_span = DatadogSpan( + tracer, + _get_span_name(span), + service=self.service, + resource=_get_resource(span), + span_type=_get_span_type(span), + trace_id=trace_id, + span_id=span_id, + parent_id=parent_id, + ) + datadog_span.start_ns = span.start_time + datadog_span.duration_ns = span.end_time - span.start_time + + if span.status.canonical_code is not StatusCanonicalCode.OK: + datadog_span.error = 1 + if span.status.description: + exc_type, exc_val = _get_exc_info(span) + # no mapping for error.stack since traceback not recorded + datadog_span.set_tag("error.msg", exc_val) + datadog_span.set_tag("error.type", exc_type) + + datadog_span.set_tags(span.attributes) + + # span events and span links are not supported + + datadog_spans.append(datadog_span) + + return datadog_spans + + +def _get_trace_ids(span): + """Extract tracer ids from span""" + ctx = span.get_context() + trace_id = ctx.trace_id + span_id = ctx.span_id + + if isinstance(span.parent, trace_api.Span): + parent_id = span.parent.get_context().span_id + elif isinstance(span.parent, trace_api.SpanContext): + parent_id = span.parent.span_id + else: + parent_id = 0 + + trace_id = _convert_trace_id_uint64(trace_id) + + return trace_id, parent_id, span_id + + +def _convert_trace_id_uint64(otel_id): + """Convert 128-bit int used for trace_id to 64-bit unsigned int""" + return otel_id & 0xFFFFFFFFFFFFFFFF + + +def _get_span_name(span): + """Get span name by using instrumentation and kind while backing off to + span.name + """ + instrumentation_name = ( + span.instrumentation_info.name if span.instrumentation_info else None + ) + span_kind_name = span.kind.name if span.kind else None + name = ( + "{}.{}".format(instrumentation_name, span_kind_name) + if instrumentation_name and span_kind_name + else span.name + ) + return name + + +def _get_resource(span): + """Get resource name for span""" + if "http.method" in span.attributes: + route = span.attributes.get( + "http.route", span.attributes.get("http.path") + ) + return ( + span.attributes["http.method"] + " " + route + if route + else span.attributes["http.method"] + ) + + return span.name + + +def _get_span_type(span): + """Get Datadog span type""" + instrumentation_name = ( + span.instrumentation_info.name if span.instrumentation_info else None + ) + span_type = _INSTRUMENTATION_SPAN_TYPES.get(instrumentation_name) + return span_type + + +def _get_exc_info(span): + """Parse span status description for exception type and value""" + exc_type, exc_val = span.status.description.split(":", 1) + return exc_type, exc_val.strip() diff --git a/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/spanprocessor.py b/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/spanprocessor.py new file mode 100644 index 00000000000..600778c88c2 --- /dev/null +++ b/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/spanprocessor.py @@ -0,0 +1,222 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging +import threading +import typing + +from opentelemetry.context import attach, detach, set_value +from opentelemetry.sdk.trace import Span, SpanProcessor +from opentelemetry.sdk.trace.export import SpanExporter +from opentelemetry.trace import INVALID_TRACE_ID +from opentelemetry.util import time_ns + +logger = logging.getLogger(__name__) + + +class DatadogExportSpanProcessor(SpanProcessor): + """Datadog exporter span processor + + DatadogExportSpanProcessor is an implementation of `SpanProcessor` that + batches all opened spans into a list per trace. When all spans for a trace + are ended, the trace is queues up for export. This is required for exporting + to the Datadog Agent which expects to received list of spans for each trace. + """ + + _FLUSH_TOKEN = INVALID_TRACE_ID + + def __init__( + self, + span_exporter: SpanExporter, + schedule_delay_millis: float = 5000, + max_trace_size: int = 4096, + ): + if max_trace_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + self.span_exporter = span_exporter + + # queue trace_ids for traces with recently ended spans for worker thread to check + # for exporting + self.check_traces_queue = ( + collections.deque() + ) # type: typing.Deque[int] + + self.traces_lock = threading.Lock() + # dictionary of trace_ids to a list of spans where the first span is the + # first opened span for the trace + self.traces = collections.defaultdict(list) + # counter to keep track of the number of spans and ended spans for a + # trace_id + self.traces_spans_count = collections.Counter() + self.traces_spans_ended_count = collections.Counter() + + self.worker_thread = threading.Thread(target=self.worker, daemon=True) + + # threading conditions used for flushing and shutdown + self.condition = threading.Condition(threading.Lock()) + self.flush_condition = threading.Condition(threading.Lock()) + + # flag to indicate that there is a flush operation on progress + self._flushing = False + + self.max_trace_size = max_trace_size + self._spans_dropped = False + self.schedule_delay_millis = schedule_delay_millis + self.done = False + self.worker_thread.start() + + def on_start(self, span: Span) -> None: + ctx = span.get_context() + trace_id = ctx.trace_id + + with self.traces_lock: + # check upper bound on number of spans for trace before adding new + # span + if self.traces_spans_count[trace_id] == self.max_trace_size: + logger.warning("Max spans for trace, spans will be dropped.") + self._spans_dropped = True + return + + # add span to end of list for a trace and update the counter + self.traces[trace_id].append(span) + self.traces_spans_count[trace_id] += 1 + + def on_end(self, span: Span) -> None: + if self.done: + logger.warning("Already shutdown, dropping span.") + return + + ctx = span.get_context() + trace_id = ctx.trace_id + + with self.traces_lock: + self.traces_spans_ended_count[trace_id] += 1 + if self.is_trace_exportable(trace_id): + self.check_traces_queue.appendleft(trace_id) + + def worker(self): + timeout = self.schedule_delay_millis / 1e3 + while not self.done: + if not self._flushing: + with self.condition: + self.condition.wait(timeout) + if not self.check_traces_queue: + # spurious notification, let's wait again + continue + if self.done: + # missing spans will be sent when calling flush + break + + # substract the duration of this export call to the next timeout + start = time_ns() + self.export() + end = time_ns() + duration = (end - start) / 1e9 + timeout = self.schedule_delay_millis / 1e3 - duration + + # be sure that all spans are sent + self._drain_queue() + + def is_trace_exportable(self, trace_id): + return ( + self.traces_spans_count[trace_id] + - self.traces_spans_ended_count[trace_id] + <= 0 + ) + + def export(self) -> None: + """Exports traces with finished spans.""" + notify_flush = False + export_trace_ids = [] + + while self.check_traces_queue: + trace_id = self.check_traces_queue.pop() + if trace_id is self._FLUSH_TOKEN: + notify_flush = True + else: + with self.traces_lock: + # check whether trace is exportable again in case that new + # spans were started since we last concluded trace was + # exportable + if self.is_trace_exportable(trace_id): + export_trace_ids.append(trace_id) + del self.traces_spans_count[trace_id] + del self.traces_spans_ended_count[trace_id] + + if len(export_trace_ids) > 0: + token = attach(set_value("suppress_instrumentation", True)) + + for trace_id in export_trace_ids: + with self.traces_lock: + try: + # Ignore type b/c the Optional[None]+slicing is too "clever" + # for mypy + self.span_exporter.export(self.traces[trace_id]) # type: ignore + # pylint: disable=broad-except + except Exception: + logger.exception( + "Exception while exporting Span batch." + ) + finally: + del self.traces[trace_id] + + detach(token) + + if notify_flush: + with self.flush_condition: + self.flush_condition.notify() + + def _drain_queue(self): + """"Export all elements until queue is empty. + + Can only be called from the worker thread context because it invokes + `export` that is not thread safe. + """ + while self.check_traces_queue: + self.export() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + if self.done: + logger.warning("Already shutdown, ignoring call to force_flush().") + return True + + self._flushing = True + self.check_traces_queue.appendleft(self._FLUSH_TOKEN) + + # wake up worker thread + with self.condition: + self.condition.notify_all() + + # wait for token to be processed + with self.flush_condition: + ret = self.flush_condition.wait(timeout_millis / 1e3) + + self._flushing = False + + if not ret: + logger.warning("Timeout was exceeded in force_flush().") + return ret + + def shutdown(self) -> None: + # signal the worker thread to finish and then wait for it + self.done = True + with self.condition: + self.condition.notify_all() + self.worker_thread.join() + self.span_exporter.shutdown() diff --git a/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/version.py b/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/version.py new file mode 100644 index 00000000000..86c61362ab5 --- /dev/null +++ b/ext/opentelemetry-ext-datadog/src/opentelemetry/ext/datadog/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.7.dev0" diff --git a/ext/opentelemetry-ext-datadog/tests/__init__.py b/ext/opentelemetry-ext-datadog/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ext/opentelemetry-ext-datadog/tests/test_datadog_exporter.py b/ext/opentelemetry-ext-datadog/tests/test_datadog_exporter.py new file mode 100644 index 00000000000..97ca3fa9b9c --- /dev/null +++ b/ext/opentelemetry-ext-datadog/tests/test_datadog_exporter.py @@ -0,0 +1,405 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import logging +import time +import unittest +from unittest import mock + +from ddtrace.internal.writer import AgentWriter + +from opentelemetry import trace as trace_api +from opentelemetry.ext import datadog +from opentelemetry.sdk import trace +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo + + +class MockDatadogSpanExporter(datadog.DatadogSpanExporter): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + agent_writer_mock = mock.Mock(spec=AgentWriter) + agent_writer_mock.started = True + agent_writer_mock.exit_timeout = 1 + self._agent_writer = agent_writer_mock + + +def get_spans(tracer, exporter, shutdown=True): + if shutdown: + tracer.source.shutdown() + + spans = [ + call_args[-1]["spans"] + for call_args in exporter.agent_writer.write.call_args_list + ] + + return [span.to_dict() for span in itertools.chain.from_iterable(spans)] + + +class TestDatadogSpanExporter(unittest.TestCase): + def setUp(self): + self.exporter = MockDatadogSpanExporter() + self.span_processor = datadog.DatadogExportSpanProcessor(self.exporter) + tracer_provider = trace.TracerProvider() + tracer_provider.add_span_processor(self.span_processor) + self.tracer_provider = tracer_provider + self.tracer = tracer_provider.get_tracer(__name__) + + def tearDown(self): + self.tracer_provider.shutdown() + + def test_constructor_default(self): + """Test the default values assigned by constructor.""" + exporter = datadog.DatadogSpanExporter() + + self.assertEqual(exporter.agent_url, "http://localhost:8126") + self.assertIsNone(exporter.service) + self.assertIsNotNone(exporter.agent_writer) + + def test_constructor_explicit(self): + """Test the constructor passing all the options.""" + agent_url = "http://localhost:8126" + exporter = datadog.DatadogSpanExporter( + agent_url=agent_url, service="explicit" + ) + + self.assertEqual(exporter.agent_url, agent_url) + self.assertEqual(exporter.service, "explicit") + self.assertIsNotNone(exporter.agent_writer) + + @mock.patch.dict( + "os.environ", + {"DD_TRACE_AGENT_URL": "http://agent:8126", "DD_SERVICE": "environ"}, + ) + def test_constructor_environ(self): + exporter = datadog.DatadogSpanExporter() + + self.assertEqual(exporter.agent_url, "http://agent:8126") + self.assertEqual(exporter.service, "environ") + self.assertIsNotNone(exporter.agent_writer) + + # pylint: disable=too-many-locals + @mock.patch.dict("os.environ", {"DD_SERVICE": "test-service"}) + def test_translate_to_datadog(self): + # pylint: disable=invalid-name + self.maxDiff = None + + span_names = ("test1", "test2", "test3") + trace_id = 0x6E0C63257DE34C926F9EFCD03927272E + trace_id_low = 0x6F9EFCD03927272E + span_id = 0x34BF92DEEFC58C92 + parent_id = 0x1111111111111111 + other_id = 0x2222222222222222 + + base_time = 683647322 * 10 ** 9 # in ns + start_times = ( + base_time, + base_time + 150 * 10 ** 6, + base_time + 300 * 10 ** 6, + ) + durations = (50 * 10 ** 6, 100 * 10 ** 6, 200 * 10 ** 6) + end_times = ( + start_times[0] + durations[0], + start_times[1] + durations[1], + start_times[2] + durations[2], + ) + + span_context = trace_api.SpanContext( + trace_id, span_id, is_remote=False + ) + parent_context = trace_api.SpanContext( + trace_id, parent_id, is_remote=False + ) + other_context = trace_api.SpanContext( + trace_id, other_id, is_remote=False + ) + + instrumentation_info = InstrumentationInfo(__name__, "0") + + otel_spans = [ + trace.Span( + name=span_names[0], + context=span_context, + parent=parent_context, + kind=trace_api.SpanKind.CLIENT, + instrumentation_info=instrumentation_info, + ), + trace.Span( + name=span_names[1], + context=parent_context, + parent=None, + instrumentation_info=instrumentation_info, + ), + trace.Span( + name=span_names[2], context=other_context, parent=None, + ), + ] + + otel_spans[0].start(start_time=start_times[0]) + otel_spans[0].end(end_time=end_times[0]) + + otel_spans[1].start(start_time=start_times[1]) + otel_spans[1].end(end_time=end_times[1]) + + otel_spans[2].start(start_time=start_times[2]) + otel_spans[2].end(end_time=end_times[2]) + + # pylint: disable=protected-access + exporter = datadog.DatadogSpanExporter() + datadog_spans = [ + span.to_dict() + for span in exporter._translate_to_datadog(otel_spans) + ] + + expected_spans = [ + dict( + trace_id=trace_id_low, + parent_id=parent_id, + span_id=span_id, + name="tests.test_datadog_exporter.CLIENT", + resource=span_names[0], + start=start_times[0], + duration=durations[0], + error=0, + service="test-service", + ), + dict( + trace_id=trace_id_low, + parent_id=0, + span_id=parent_id, + name="tests.test_datadog_exporter.INTERNAL", + resource=span_names[1], + start=start_times[1], + duration=durations[1], + error=0, + service="test-service", + ), + dict( + trace_id=trace_id_low, + parent_id=0, + span_id=other_id, + name=span_names[2], + resource=span_names[2], + start=start_times[2], + duration=durations[2], + error=0, + service="test-service", + ), + ] + + self.assertEqual(datadog_spans, expected_spans) + + @mock.patch.dict("os.environ", {"DD_SERVICE": "test-service"}) + def test_export(self): + """Test that agent and/or collector are invoked""" + # create and save span to be used in tests + context = trace_api.SpanContext( + trace_id=0x000000000000000000000000DEADBEEF, + span_id=0x00000000DEADBEF0, + is_remote=False, + ) + + test_span = trace.Span("test_span", context=context) + test_span.start() + test_span.end() + + self.exporter.export((test_span,)) + + self.assertEqual(self.exporter.agent_writer.write.call_count, 1) + + def test_resources(self): + test_attributes = [ + {}, + {"http.method": "GET", "http.route": "/foo"}, + {"http.method": "GET", "http.path": "/foo"}, + ] + + for index, test in enumerate(test_attributes): + with self.tracer.start_span(str(index), attributes=test): + pass + + datadog_spans = get_spans(self.tracer, self.exporter) + + self.assertEqual(len(datadog_spans), 3) + + actual = [span["resource"] for span in datadog_spans] + expected = ["0", "GET /foo", "GET /foo"] + + self.assertEqual(actual, expected) + + def test_span_types(self): + test_instrumentations = [ + "opentelemetry.ext.aiohttp-client", + "opentelemetry.ext.dbapi", + "opentelemetry.ext.django", + "opentelemetry.ext.flask", + "opentelemetry.ext.grpc", + "opentelemetry.ext.jinja2", + "opentelemetry.ext.mysql", + "opentelemetry.ext.psycopg2", + "opentelemetry.ext.pymongo", + "opentelemetry.ext.pymysql", + "opentelemetry.ext.redis", + "opentelemetry.ext.requests", + "opentelemetry.ext.sqlalchemy", + "opentelemetry.ext.wsgi", + ] + + for index, instrumentation in enumerate(test_instrumentations): + # change tracer's instrumentation info before starting span + self.tracer.instrumentation_info = InstrumentationInfo( + instrumentation, "0" + ) + with self.tracer.start_span(str(index)): + pass + + datadog_spans = get_spans(self.tracer, self.exporter) + + self.assertEqual(len(datadog_spans), 14) + + actual = [span.get("type") for span in datadog_spans] + expected = [ + "http", + "sql", + "web", + "web", + "grpc", + "template", + "sql", + "sql", + "mongodb", + "sql", + "redis", + "http", + "sql", + "web", + ] + self.assertEqual(actual, expected) + + def test_errors(self): + with self.assertRaises(ValueError): + with self.tracer.start_span("foo"): + raise ValueError("bar") + + datadog_spans = get_spans(self.tracer, self.exporter) + + self.assertEqual(len(datadog_spans), 1) + + span = datadog_spans[0] + self.assertEqual(span["error"], 1) + self.assertEqual(span["meta"]["error.msg"], "bar") + self.assertEqual(span["meta"]["error.type"], "ValueError") + + def test_shutdown(self): + span_names = ["xxx", "bar", "foo"] + + for name in span_names: + with self.tracer.start_span(name): + pass + + self.span_processor.shutdown() + + # check that spans are exported without an explicitly call to + # force_flush() + datadog_spans = get_spans(self.tracer, self.exporter) + actual = [span.get("resource") for span in datadog_spans] + self.assertListEqual(span_names, actual) + + def test_flush(self): + span_names0 = ["xxx", "bar", "foo"] + span_names1 = ["yyy", "baz", "fox"] + + for name in span_names0: + with self.tracer.start_span(name): + pass + + self.assertTrue(self.span_processor.force_flush()) + datadog_spans = get_spans(self.tracer, self.exporter, shutdown=False) + actual0 = [span.get("resource") for span in datadog_spans] + self.assertListEqual(span_names0, actual0) + + # create some more spans to check that span processor still works + for name in span_names1: + with self.tracer.start_span(name): + pass + + self.assertTrue(self.span_processor.force_flush()) + datadog_spans = get_spans(self.tracer, self.exporter) + actual1 = [span.get("resource") for span in datadog_spans] + self.assertListEqual(span_names0 + span_names1, actual1) + + def test_span_processor_lossless(self): + """Test that no spans are lost when sending max_trace_size spans""" + span_processor = datadog.DatadogExportSpanProcessor( + self.exporter, max_trace_size=128 + ) + tracer_provider = trace.TracerProvider() + tracer_provider.add_span_processor(span_processor) + tracer = tracer_provider.get_tracer(__name__) + + with tracer.start_as_current_span("root"): + for _ in range(127): + with tracer.start_span("foo"): + pass + + self.assertTrue(span_processor.force_flush()) + datadog_spans = get_spans(tracer, self.exporter) + self.assertEqual(len(datadog_spans), 128) + tracer_provider.shutdown() + + def test_span_processor_dropped_spans(self): + """Test that spans are lost when exceeding max_trace_size spans""" + span_processor = datadog.DatadogExportSpanProcessor( + self.exporter, max_trace_size=128 + ) + tracer_provider = trace.TracerProvider() + tracer_provider.add_span_processor(span_processor) + tracer = tracer_provider.get_tracer(__name__) + + with tracer.start_as_current_span("root"): + for _ in range(127): + with tracer.start_span("foo"): + pass + with self.assertLogs(level=logging.WARNING): + with tracer.start_span("one-too-many"): + pass + + self.assertTrue(span_processor.force_flush()) + datadog_spans = get_spans(tracer, self.exporter) + self.assertEqual(len(datadog_spans), 128) + tracer_provider.shutdown() + + def test_span_processor_scheduled_delay(self): + """Test that spans are exported each schedule_delay_millis""" + delay = 300 + span_processor = datadog.DatadogExportSpanProcessor( + self.exporter, schedule_delay_millis=delay + ) + tracer_provider = trace.TracerProvider() + tracer_provider.add_span_processor(span_processor) + tracer = tracer_provider.get_tracer(__name__) + + with tracer.start_span("foo"): + pass + + time.sleep(delay / (1e3 * 2)) + datadog_spans = get_spans(tracer, self.exporter, shutdown=False) + self.assertEqual(len(datadog_spans), 0) + + time.sleep(delay / (1e3 * 2) + 0.01) + datadog_spans = get_spans(tracer, self.exporter, shutdown=False) + self.assertEqual(len(datadog_spans), 1) + + tracer_provider.shutdown() diff --git a/scripts/coverage.sh b/scripts/coverage.sh index 1ff42d9e539..248e5faea8b 100755 --- a/scripts/coverage.sh +++ b/scripts/coverage.sh @@ -19,6 +19,7 @@ coverage erase cov opentelemetry-api cov opentelemetry-sdk +cov ext/opentelemetry-ext-datadog cov ext/opentelemetry-ext-flask cov ext/opentelemetry-ext-requests cov ext/opentelemetry-ext-jaeger diff --git a/tox.ini b/tox.ini index 8d4971d3208..40a2c02d709 100644 --- a/tox.ini +++ b/tox.ini @@ -56,6 +56,9 @@ envlist = py3{4,5,6,7,8}-test-ext-jaeger pypy3-test-ext-jaeger + ; opentelemetry-ext-datadog + py3{5,6,7,8}-test-ext-datadog + ; opentelemetry-ext-mysql py3{4,5,6,7,8}-test-ext-mysql pypy3-test-ext-mysql @@ -140,6 +143,7 @@ changedir = test-ext-requests: ext/opentelemetry-ext-requests/tests test-ext-jinja2: ext/opentelemetry-ext-jinja2/tests test-ext-jaeger: ext/opentelemetry-ext-jaeger/tests + test-ext-datadog: ext/opentelemetry-ext-datadog/tests test-ext-dbapi: ext/opentelemetry-ext-dbapi/tests test-ext-django: ext/opentelemetry-ext-django/tests test-ext-mysql: ext/opentelemetry-ext-mysql/tests @@ -224,6 +228,9 @@ commands_pre = jaeger: pip install {toxinidir}/ext/opentelemetry-ext-jaeger + datadog: pip install {toxinidir}/opentelemetry-sdk + datadog: pip install {toxinidir}/ext/opentelemetry-ext-datadog + opentracing-shim: pip install {toxinidir}/ext/opentelemetry-ext-opentracing-shim zipkin: pip install {toxinidir}/ext/opentelemetry-ext-zipkin