Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite gRPC server interceptor #1171

Merged
merged 17 commits into from
Oct 29, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import server_interceptor
from opentelemetry.instrumentation.grpc.grpcext import intercept_server
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
Expand Down Expand Up @@ -73,8 +72,9 @@ def SayHello(self, request, context):

def serve():

server = grpc.server(futures.ThreadPoolExecutor())
server = intercept_server(server, server_interceptor())
server = grpc.server(
futures.ThreadPoolExecutor(), interceptors=[server_interceptor()],
)

helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:50051")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import server_interceptor
from opentelemetry.instrumentation.grpc.grpcext import intercept_server
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
Expand Down Expand Up @@ -162,8 +161,10 @@ def RouteChat(self, request_iterator, context):


def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server = intercept_server(server, server_interceptor())
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[server_interceptor()],
)

route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
RouteGuideServicer(), server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ def run():
import grpc

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer, server_interceptor
from opentelemetry.instrumentation.grpc.grpcext import intercept_server
from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
Expand All @@ -94,10 +93,10 @@ def run():
trace.get_tracer_provider().add_span_processor(
SimpleExportSpanProcessor(ConsoleSpanExporter())
)

grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()


class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)
Expand All @@ -106,7 +105,6 @@ def SayHello(self, request, context):
def serve():

server = grpc.server(futures.ThreadPoolExecutor())
server = intercept_server(server, server_interceptor())

helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:50051")
Expand All @@ -117,18 +115,25 @@ def serve():
if __name__ == "__main__":
logging.basicConfig()
serve()

You can also add the instrumentor manually, rather than using
:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorServer`:

.. code-block:: python

from opentelemetry.instrumentation.grpc import server_interceptor

server = grpc.server(futures.ThreadPoolExecutor(),
interceptors = [server_interceptor()])

"""
from contextlib import contextmanager
from functools import partial

import grpc
from wrapt import wrap_function_wrapper as _wrap

from opentelemetry import trace
from opentelemetry.instrumentation.grpc.grpcext import (
intercept_channel,
intercept_server,
)
from opentelemetry.instrumentation.grpc.grpcext import intercept_channel
from opentelemetry.instrumentation.grpc.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
Expand All @@ -140,15 +145,33 @@ def serve():


class GrpcInstrumentorServer(BaseInstrumentor):
"""
Globally instrument the grpc server.

Usage::

grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()

"""

# pylint:disable=attribute-defined-outside-init

def _instrument(self, **kwargs):
_wrap("grpc", "server", self.wrapper_fn)
self._original_func = grpc.server
aabmass marked this conversation as resolved.
Show resolved Hide resolved

def _uninstrument(self, **kwargs):
unwrap(grpc, "server")
def server(*args, **kwargs):
if "interceptors" in kwargs:
# add our interceptor as the first
kwargs["interceptors"].insert(0, server_interceptor())
else:
kwargs["interceptors"] = [server_interceptor()]
return self._original_func(*args, **kwargs)

def wrapper_fn(self, original_func, instance, args, kwargs):
server = original_func(*args, **kwargs)
return intercept_server(server, server_interceptor())
grpc.server = server

def _uninstrument(self, **kwargs):
grpc.server = self._original_func


class GrpcInstrumentorClient(BaseInstrumentor):
Expand Down
Loading