Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenTelemetry support for Subscribe Side #1252

Merged
merged 56 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
04c3881
Add SubscriberOptions and OpenTelemetry Subscriber option
mukund-ananthu Sep 16, 2024
0a679a5
Set / Override OpenTelemetry Subscriber settings based on Python Version
mukund-ananthu Sep 16, 2024
40a2996
Add OpenTelemetryContextGetter used for trace propagation
mukund-ananthu Sep 16, 2024
b10f2f6
Add Subscribe Span and SubscribeOpenTelemetry class
mukund-ananthu Sep 18, 2024
95a1d32
Add receipt modack start and end events to subscribe span
mukund-ananthu Sep 18, 2024
c556f99
Modify SubscriberMessage to include opentelemetry data and include
mukund-ananthu Sep 18, 2024
f52fbac
Add ack, modack, nack start events to subscribe span
mukund-ananthu Sep 18, 2024
b5ab6bb
Add Open Telemetry data to Ack, Modack, Nack request objects
mukund-ananthu Sep 18, 2024
a695c99
Add ack result, ack end event to subscribe span and end it
mukund-ananthu Sep 18, 2024
694cfde
Add tests for ack and ack_with_response methods of Subscriber Message
mukund-ananthu Sep 18, 2024
27edc0f
Add tests for nack and nack_with_response methods of Subscriber Message
mukund-ananthu Sep 18, 2024
c1f0ad8
Add tests for modify_ack_deadline and modify_ack_deadline_with_response
mukund-ananthu Sep 18, 2024
e97b83c
Add tests for set_subscribe_span_result method of Subscribe
mukund-ananthu Sep 18, 2024
8189b12
Add test for dispatcher ack method
mukund-ananthu Sep 18, 2024
48ffe54
Add test for retry_acks method in dispatcher
mukund-ananthu Sep 18, 2024
d40b676
Pass the OpenTelemetry data to the dispatcher when message is acked,
mukund-ananthu Sep 18, 2024
383ac2c
Record nack and modack end events, set their results and end them in
mukund-ananthu Sep 18, 2024
5ce0cf8
Record modack, nack ends after successful retries
mukund-ananthu Sep 18, 2024
5b21c93
Fix mypy errors
mukund-ananthu Sep 19, 2024
1b2020d
Add modack events for lease management and receipt modack
mukund-ananthu Sep 19, 2024
26a86ec
Add drop event to subscribe span
mukund-ananthu Sep 19, 2024
dbe1e50
Fix mypy errors
mukund-ananthu Sep 20, 2024
ef0c391
Add subscriber concurrency control span
mukund-ananthu Sep 20, 2024
b71cf15
Add subscribe scheduler span
mukund-ananthu Sep 20, 2024
da797cf
End subscribe scheduler span
mukund-ananthu Sep 20, 2024
a005057
Add subscription_id property to SubscribeOpenTelemetry
mukund-ananthu Sep 20, 2024
47c9d9a
Start process span
mukund-ananthu Sep 20, 2024
8faa3cf
Add end process span to SubcribeOpenTelemetry
mukund-ananthu Sep 20, 2024
4189b22
End process span when ack and nack occur
mukund-ananthu Sep 20, 2024
06ce105
End process span and add event when messages are dropped
mukund-ananthu Sep 20, 2024
5fa4773
Fix the trace.Link from process span to publish span
mukund-ananthu Sep 21, 2024
f30c2c4
Add null check before checking if Subscriber message has opentelemetry
mukund-ananthu Sep 21, 2024
3ae2418
Add tests to verify process span links and subscribe span parent
mukund-ananthu Sep 21, 2024
d1c870d
Move adding dropped event and ending subscriber span from
mukund-ananthu Sep 22, 2024
cb1f974
Move ack called subscribe span event from message.ack() to
mukund-ananthu Sep 22, 2024
7d6ea08
Move nack start event from message.nack and message.nack_with_response
mukund-ananthu Sep 22, 2024
e07fa4d
Move modack start event from message.modify_ack_deadline/with response
mukund-ananthu Sep 22, 2024
c384466
Rename the "dropped" event with "expired"
mukund-ananthu Sep 22, 2024
9ba47ff
Add modack span
mukund-ananthu Sep 22, 2024
2410a4e
Add is_receipt_modack attribute to modack span
mukund-ananthu Sep 23, 2024
b66fd19
Add ack span
mukund-ananthu Sep 23, 2024
54a88ba
Add ack span for dispatcher._retry_ack
mukund-ananthu Sep 23, 2024
28a192a
Add nack span
mukund-ananthu Sep 23, 2024
c8c5eb9
Add nack span for dispatcher._retry_modacks
mukund-ananthu Sep 23, 2024
6ec7c35
Fix modacks
mukund-ananthu Sep 24, 2024
7936911
Fix mypy errors
mukund-ananthu Sep 24, 2024
5874a96
Add links to subscribe span of the sampled ack_spans in dispatcher.ack
mukund-ananthu Sep 24, 2024
a791570
Add links to ack_span in the subscribe_spans in dispatcher.retry_ack()
mukund-ananthu Sep 24, 2024
3206c19
Add links to nack spans in the subscribe span in
mukund-ananthu Sep 24, 2024
14250a1
Add links to nack_spans in subscribe_span in dispatcher.retry_nacks()
mukund-ananthu Sep 24, 2024
815f2a3
Add attributes to the links from subscribe_span to ack_span
mukund-ananthu Sep 24, 2024
94feeec
Add attributes to links in subscribe_spans to nack_spans
mukund-ananthu Sep 24, 2024
5dd8fb8
Add links to modack spans in subscribe span
mukund-ananthu Sep 24, 2024
b4fdd40
Create nack span even if there are no sampled subscribe span
mukund-ananthu Sep 24, 2024
ad13b59
Add a note stating that OpenTelemetry traces are subject to change
mukund-ananthu Sep 24, 2024
3977a6a
Split the subscription name at one place
mukund-ananthu Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion google/cloud/pubsub_v1/open_telemetry/context_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from opentelemetry.propagators.textmap import Setter
from typing import Optional, List

from opentelemetry.propagators.textmap import Setter, Getter

from google.pubsub_v1 import PubsubMessage

Expand All @@ -37,3 +39,17 @@ def set(self, carrier: PubsubMessage, key: str, value: str) -> None:
None
"""
carrier.attributes["googclient_" + key] = value


class OpenTelemetryContextGetter(Getter):
"""
Used by Open Telemetry for context propagation.
"""

def get(self, carrier: PubsubMessage, key: str) -> Optional[List[str]]:
if ("googclient_" + key) not in carrier.attributes:
return None
return [carrier.attributes["googclient_" + key]]

def keys(self, carrier: PubsubMessage) -> List[str]:
return list(map(str, carrier.attributes.keys()))
280 changes: 280 additions & 0 deletions google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
# Copyright 2024, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, List
from datetime import datetime

from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.trace.propagation import set_span_in_context

from google.cloud.pubsub_v1.open_telemetry.context_propagation import (
OpenTelemetryContextGetter,
)
from google.pubsub_v1.types import PubsubMessage

_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"


class SubscribeOpenTelemetry:
def __init__(self, message: PubsubMessage):
self._message: PubsubMessage = message

# subscribe span will be initialized by the `start_subscribe_span`
# method.
self._subscribe_span: Optional[trace.Span] = None

# subscriber concurrency control span will be initialized by the
# `start_subscribe_concurrency_control_span` method.
self._concurrency_control_span: Optional[trace.Span] = None

# scheduler span will be initialized by the
# `start_subscribe_scheduler_span` method.
self._scheduler_span: Optional[trace.Span] = None

# This will be set by `start_subscribe_span` method and will be used
# for other spans, such as process span.
self._subscription_id: Optional[str] = None

# This will be set by `start_process_span` method.
self._process_span: Optional[trace.Span] = None

# This will be set by `start_subscribe_span` method, if a publisher create span
# context was extracted from trace propagation. And will be used by spans like
# proces span to add links to the publisher create span.
self._publisher_create_span_context: Optional[context.Context] = None

# This will be set by `start_subscribe_span` method and will be used
# for other spans, such as modack span.
self._project_id: Optional[str] = None

@property
def subscription_id(self) -> Optional[str]:
return self._subscription_id

@property
def project_id(self) -> Optional[str]:
return self._project_id

@property
def subscribe_span(self) -> Optional[trace.Span]:
return self._subscribe_span

def start_subscribe_span(
self,
subscription: str,
exactly_once_enabled: bool,
ack_id: str,
delivery_attempt: int,
) -> None:
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
parent_span_context = TraceContextTextMapPropagator().extract(
carrier=self._message,
getter=OpenTelemetryContextGetter(),
)
self._publisher_create_span_context = parent_span_context
split_subscription: List[str] = subscription.split("/")
assert len(split_subscription) == 4
subscription_short_name = split_subscription[3]
self._project_id = split_subscription[1]
self._subscription_id = subscription_short_name
with tracer.start_as_current_span(
name=f"{subscription_short_name} subscribe",
context=parent_span_context if parent_span_context else None,
kind=trace.SpanKind.CONSUMER,
attributes={
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.destination.name": subscription_short_name,
"gcp.project_id": subscription.split("/")[1],
"messaging.message.id": self._message.message_id,
"messaging.message.body.size": len(self._message.data),
"messaging.gcp_pubsub.message.ack_id": ack_id,
"messaging.gcp_pubsub.message.ordering_key": self._message.ordering_key,
"messaging.gcp_pubsub.message.exactly_once_delivery": exactly_once_enabled,
"code.function": "_on_response",
"messaging.gcp_pubsub.message.delivery_attempt": delivery_attempt,
},
end_on_exit=False,
) as subscribe_span:
self._subscribe_span = subscribe_span

def add_subscribe_span_event(self, event: str) -> None:
assert self._subscribe_span is not None
self._subscribe_span.add_event(
name=event,
attributes={
"timestamp": str(datetime.now()),
},
)

def end_subscribe_span(self) -> None:
assert self._subscribe_span is not None
self._subscribe_span.end()

def set_subscribe_span_result(self, result: str) -> None:
assert self._subscribe_span is not None
self._subscribe_span.set_attribute(
key="messaging.gcp_pubsub.result",
value=result,
)

def start_subscribe_concurrency_control_span(self) -> None:
assert self._subscribe_span is not None
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name="subscriber concurrency control",
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(self._subscribe_span),
end_on_exit=False,
) as concurrency_control_span:
self._concurrency_control_span = concurrency_control_span

def end_subscribe_concurrency_control_span(self) -> None:
assert self._concurrency_control_span is not None
self._concurrency_control_span.end()

def start_subscribe_scheduler_span(self) -> None:
assert self._subscribe_span is not None
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name="subscriber scheduler",
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(self._subscribe_span),
end_on_exit=False,
) as scheduler_span:
self._scheduler_span = scheduler_span

def end_subscribe_scheduler_span(self) -> None:
assert self._scheduler_span is not None
self._scheduler_span.end()

def start_process_span(self) -> None:
assert self._subscribe_span is not None
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
publish_create_span_link: Optional[trace.Link] = None
if self._publisher_create_span_context:
publish_create_span: trace.Span = trace.get_current_span(
self._publisher_create_span_context
)
span_context: Optional[
trace.SpanContext
] = publish_create_span.get_span_context()
publish_create_span_link = (
trace.Link(span_context) if span_context else None
)

with tracer.start_as_current_span(
name=f"{self._subscription_id} process",
attributes={
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
},
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(self._subscribe_span),
links=[publish_create_span_link] if publish_create_span_link else None,
end_on_exit=False,
) as process_span:
self._process_span = process_span

def end_process_span(self) -> None:
assert self._process_span is not None
self._process_span.end()

def add_process_span_event(self, event: str) -> None:
assert self._process_span is not None
self._process_span.add_event(
name=event,
attributes={
"timestamp": str(datetime.now()),
},
)


def start_modack_span(
subscribe_span_links: List[trace.Link],
subscription_id: Optional[str],
message_count: int,
deadline: float,
project_id: Optional[str],
code_function: str,
receipt_modack: bool,
) -> trace.Span:
assert subscription_id is not None
assert project_id is not None
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name=f"{subscription_id} modack",
attributes={
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.batch.message_count": message_count,
"messaging.gcp_pubsub.message.ack_deadline": deadline,
"messaging.destination.name": subscription_id,
"gcp.project_id": project_id,
"messaging.operation.name": "modack",
"code.function": code_function,
"messaging.gcp_pubsub.is_receipt_modack": receipt_modack,
},
links=subscribe_span_links,
kind=trace.SpanKind.CLIENT,
end_on_exit=False,
) as modack_span:
return modack_span


def start_ack_span(
subscription_id: str,
message_count: int,
project_id: str,
links: List[trace.Link],
) -> trace.Span:
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name=f"{subscription_id} ack",
attributes={
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.batch.message_count": message_count,
"messaging.operation": "ack",
"gcp.project_id": project_id,
"messaging.destination.name": subscription_id,
"code.function": "ack",
},
kind=trace.SpanKind.CLIENT,
links=links,
end_on_exit=False,
) as ack_span:
return ack_span


def start_nack_span(
subscription_id: str,
message_count: int,
project_id: str,
links: List[trace.Link],
) -> trace.Span:
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name=f"{subscription_id} nack",
attributes={
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.batch.message_count": message_count,
"messaging.operation": "nack",
"gcp.project_id": project_id,
"messaging.destination.name": subscription_id,
"code.function": "modify_ack_deadline",
},
kind=trace.SpanKind.CLIENT,
links=links,
end_on_exit=False,
) as nack_span:
return nack_span
Loading