From 3206c192bf567be0d087c011c20c2f15308b703d Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Tue, 24 Sep 2024 15:25:59 +0000 Subject: [PATCH] Add links to nack spans in the subscribe span in dispatcher.modify_ack_deadlin() --- .../pubsub_v1/subscriber/_protocol/dispatcher.py | 8 ++++++++ tests/unit/pubsub_v1/subscriber/test_dispatcher.py | 12 ++++++++++++ 2 files changed, 20 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 3824aab3d..a120d9223 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -456,6 +456,7 @@ def modify_ack_deadline( for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) } subscribe_links: List[trace.Link] = [] + subscribe_spans: List[trace.Span] = [] for ack_req in ack_reqs_dict.values(): if ack_req.opentelemetry_data and math.isclose(ack_req.seconds, 0): subscribe_span: Optional[ @@ -468,6 +469,7 @@ def modify_ack_deadline( subscribe_links.append( trace.Link(subscribe_span.get_span_context()) ) + subscribe_spans.append(subscribe_span) nack_span: Optional[trace.Span] = None if subscription_id and project_id and len(subscribe_links) > 0: nack_span = start_nack_span( @@ -476,6 +478,12 @@ def modify_ack_deadline( project_id, subscribe_links, ) + if ( + nack_span and nack_span.get_span_context().trace_flags.sampled + ): # pragma: NO COVER + nack_span_context: trace.SpanContext = nack_span.get_span_context() + for subscribe_span in subscribe_spans: + subscribe_span.add_link(nack_span_context) requests_to_retry: List[requests.ModAckRequest] requests_completed: Optional[List[requests.ModAckRequest]] = None if default_deadline is None: diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index b752ef426..babf7ecb3 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -945,6 +945,10 @@ def test_drop_ordered_messages(): manager.maybe_resume_consumer.assert_called_once() +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) def test_opentelemetry_nack(span_exporter): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True @@ -1016,6 +1020,14 @@ def test_opentelemetry_nack(span_exporter): assert subscribe_span.events[0].name == "nack start" assert subscribe_span.events[1].name == "nack end" + # This subscribe span is sampled, so we expect it to be linked to the nack + # span. + assert len(spans[1].links) == 1 + assert spans[1].links[0].context == nack_span.context + # This subscribe span is not sampled, so we expect it to not be linked to + # the nack span + assert len(spans[2].links) == 0 + assert nack_span.name == "subscriptionID nack" assert nack_span.kind == trace.SpanKind.CLIENT assert nack_span.parent is None