From 21a5716844d57029cab2aa8114d0eb1219924db6 Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Tue, 24 Sep 2024 14:50:29 +0000 Subject: [PATCH] Add links to subscribe span of the sampled ack_spans in dispatcher.ack --- google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py | 7 +++++++ tests/unit/pubsub_v1/subscriber/test_dispatcher.py | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 3b2f7918a..f0a61f25f 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -255,6 +255,7 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: } subscribe_links: List[trace.Link] = [] + subscribe_spans: List[trace.Span] = [] for ack_req in ack_reqs_dict.values(): if ack_req.opentelemetry_data: subscribe_span: Optional[ @@ -267,6 +268,7 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: subscribe_links.append( trace.Link(subscribe_span.get_span_context()) ) + subscribe_spans.append(subscribe_span) ack_span: Optional[trace.Span] = None if subscription_id and project_id: ack_span = start_ack_span( @@ -275,6 +277,11 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: project_id, subscribe_links, ) + if ack_span and ack_span.get_span_context().trace_flags.sampled: + ack_span_context: trace.SpanContext = ack_span.get_span_context() + for subscribe_span in subscribe_spans: + subscribe_span.add_link(ack_span_context) + requests_completed, requests_to_retry = self._manager.send_unary_ack( ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)), ack_reqs_dict=ack_reqs_dict, diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index bf557ea2f..ee0eae402 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -464,6 +464,14 @@ def test_opentelemetry_ack(span_exporter): assert subscribe_span.events[0].name == "ack start" assert subscribe_span.events[1].name == "ack end" + # This subscribe span is sampled, so we expect it to be linked to the ack + # span. + assert len(spans[1].links) == 1 + assert spans[1].links[0].context == ack_span.context + # This subscribe span is not sampled, so we expect it to not be linked to + # the ack span + assert len(spans[2].links) == 0 + assert ack_span.name == "subscriptionID ack" assert ack_span.kind == trace.SpanKind.CLIENT assert ack_span.parent is None