Skip to content

Commit

Permalink
Add links to nack spans in the subscribe span in
Browse files Browse the repository at this point in the history
dispatcher.modify_ack_deadlin()
  • Loading branch information
mukund-ananthu committed Sep 24, 2024
1 parent a791570 commit 3206c19
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
8 changes: 8 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ def modify_ack_deadline(
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
subscribe_links: List[trace.Link] = []
subscribe_spans: List[trace.Span] = []
for ack_req in ack_reqs_dict.values():
if ack_req.opentelemetry_data and math.isclose(ack_req.seconds, 0):
subscribe_span: Optional[
Expand All @@ -468,6 +469,7 @@ def modify_ack_deadline(
subscribe_links.append(
trace.Link(subscribe_span.get_span_context())
)
subscribe_spans.append(subscribe_span)
nack_span: Optional[trace.Span] = None
if subscription_id and project_id and len(subscribe_links) > 0:
nack_span = start_nack_span(
Expand All @@ -476,6 +478,12 @@ def modify_ack_deadline(
project_id,
subscribe_links,
)
if (
nack_span and nack_span.get_span_context().trace_flags.sampled
): # pragma: NO COVER
nack_span_context: trace.SpanContext = nack_span.get_span_context()
for subscribe_span in subscribe_spans:
subscribe_span.add_link(nack_span_context)
requests_to_retry: List[requests.ModAckRequest]
requests_completed: Optional[List[requests.ModAckRequest]] = None
if default_deadline is None:
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,10 @@ def test_drop_ordered_messages():
manager.maybe_resume_consumer.assert_called_once()


@pytest.mark.skipif(
sys.version_info < (3, 8),
reason="Open Telemetry not supported below Python version 3.8",
)
def test_opentelemetry_nack(span_exporter):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
Expand Down Expand Up @@ -1016,6 +1020,14 @@ def test_opentelemetry_nack(span_exporter):
assert subscribe_span.events[0].name == "nack start"
assert subscribe_span.events[1].name == "nack end"

# This subscribe span is sampled, so we expect it to be linked to the nack
# span.
assert len(spans[1].links) == 1
assert spans[1].links[0].context == nack_span.context
# This subscribe span is not sampled, so we expect it to not be linked to
# the nack span
assert len(spans[2].links) == 0

assert nack_span.name == "subscriptionID nack"
assert nack_span.kind == trace.SpanKind.CLIENT
assert nack_span.parent is None
Expand Down

0 comments on commit 3206c19

Please sign in to comment.