diff --git a/google/cloud/internal/opentelemetry.cc b/google/cloud/internal/opentelemetry.cc index c966f4875efec..8505e8f01b99d 100644 --- a/google/cloud/internal/opentelemetry.cc +++ b/google/cloud/internal/opentelemetry.cc @@ -18,6 +18,9 @@ #ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY #include #include +#include +#include +#include #endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY namespace google { @@ -136,6 +139,12 @@ std::string ToString(opentelemetry::trace::SpanId const& span_id) { return std::string(span_id_array, kSize); } +std::string CurrentThreadId() { + std::ostringstream os; + os << std::this_thread::get_id(); + return std::move(os).str(); +} + #endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY #ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY diff --git a/google/cloud/internal/opentelemetry.h b/google/cloud/internal/opentelemetry.h index e96b057c1f9e3..7805b0c43a1ca 100644 --- a/google/cloud/internal/opentelemetry.h +++ b/google/cloud/internal/opentelemetry.h @@ -27,6 +27,7 @@ #include #include #include +#include #endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY #include #include @@ -216,6 +217,9 @@ std::string ToString(opentelemetry::trace::TraceId const& trace_id); std::string ToString(opentelemetry::trace::SpanId const& span_id); +/// Gets the current thread id. +std::string CurrentThreadId(); + #endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY bool TracingEnabled(Options const& options); diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 46abd9499cfe2..21e4560b82840 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -70,7 +70,8 @@ auto MakeParent(Links const& links, Spans const& message_spans) { static_cast(message_spans.size())}, {sc::kCodeFunction, "BatchSink::AsyncPublish"}, {/*sc::kMessagingOperation=*/ - "messaging.operation", "publish"}}, + "messaging.operation", "publish"}, + {sc::kThreadId, internal::CurrentThreadId()}}, /*links*/ std::move(links), options); // Add metadata to the message spans about the batch sink span. diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index 341e29e20a4ef..91c1cf7fa0037 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -140,15 +140,43 @@ TEST(TracingMessageBatch, Flush) { SpanHasInstrumentationScope(), SpanKindIsProducer(), SpanNamed("publish"), SpanHasAttributes( - OTelAttribute(sc::kMessagingBatchMessageCount, 1), - OTelAttribute(sc::kCodeFunction, - "BatchSink::AsyncPublish"), - OTelAttribute(sc::kMessagingOperation, "publish")), + OTelAttribute(sc::kMessagingBatchMessageCount, 1)), SpanHasLinks(AllOf(LinkHasSpanContext(message_span->GetContext()), SpanLinkAttributesAre(OTelAttribute( "messaging.gcp_pubsub.message.link", 0))))))); } +TEST(TracingMessageBatch, PublishSpanHasAttributes) { + namespace sc = ::opentelemetry::trace::SemanticConventions; + auto span_catcher = InstallSpanCatcher(); + auto message_span = MakeSpan("test span"); + auto mock = std::make_unique(); + EXPECT_CALL(*mock, SaveMessage(_)); + EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; }); + auto message_batch = + MakeTracingMessageBatch(std::move(mock), MakeTestOptions()); + auto initial_spans = {message_span}; + SaveMessages(initial_spans, message_batch); + + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); + + auto spans = span_catcher->GetSpans(); + EXPECT_THAT(spans, + Contains(AllOf(SpanNamed("publish"), + SpanHasAttributes(OTelAttribute( + sc::kThreadId, _))))); + EXPECT_THAT(spans, Contains(AllOf( + + SpanNamed("publish"), + SpanHasAttributes(OTelAttribute( + sc::kCodeFunction, "BatchSink::AsyncPublish"))))); + EXPECT_THAT(spans, + Contains(AllOf(SpanNamed("publish"), + SpanHasAttributes(OTelAttribute( + sc::kMessagingOperation, "publish"))))); +} + TEST(TracingMessageBatch, FlushOnlyIncludeSampledLink) { namespace sc = ::opentelemetry::trace::SemanticConventions; // Create span before the span catcher so it is not sampled. @@ -178,10 +206,7 @@ TEST(TracingMessageBatch, FlushOnlyIncludeSampledLink) { SpanHasInstrumentationScope(), SpanKindIsProducer(), SpanNamed("publish"), SpanHasAttributes( - OTelAttribute(sc::kMessagingBatchMessageCount, 2), - OTelAttribute(sc::kCodeFunction, - "BatchSink::AsyncPublish"), - OTelAttribute(sc::kMessagingOperation, "publish")), + OTelAttribute(sc::kMessagingBatchMessageCount, 2)), SpanLinksAre(AllOf(LinkHasSpanContext(message_span->GetContext()), SpanLinkAttributesAre(OTelAttribute( "messaging.gcp_pubsub.message.link", 0))))))); @@ -213,10 +238,7 @@ TEST(TracingMessageBatch, FlushSmallBatch) { SpanHasInstrumentationScope(), SpanKindIsProducer(), SpanNamed("publish"), SpanHasAttributes( - OTelAttribute(sc::kMessagingBatchMessageCount, 2), - OTelAttribute(sc::kCodeFunction, - "BatchSink::AsyncPublish"), - OTelAttribute(sc::kMessagingOperation, "publish")), + OTelAttribute(sc::kMessagingBatchMessageCount, 2)), SpanHasLinks(AllOf(LinkHasSpanContext(message_span1->GetContext()), SpanLinkAttributesAre(OTelAttribute( "messaging.gcp_pubsub.message.link", 0))), @@ -244,16 +266,11 @@ TEST(TracingMessageBatch, FlushBatchWithOtelLimit) { auto spans = span_catcher->GetSpans(); EXPECT_THAT( spans, - Contains(AllOf( - SpanHasInstrumentationScope(), SpanKindIsProducer(), - SpanNamed("publish"), - SpanHasAttributes( - OTelAttribute(sc::kMessagingBatchMessageCount, - kDefaultMaxLinks), - OTelAttribute(sc::kCodeFunction, - "BatchSink::AsyncPublish"), - OTelAttribute(sc::kMessagingOperation, "publish")), - SpanLinksSizeIs(128)))); + Contains(AllOf(SpanHasInstrumentationScope(), SpanKindIsProducer(), + SpanNamed("publish"), + SpanHasAttributes(OTelAttribute( + sc::kMessagingBatchMessageCount, kDefaultMaxLinks)), + SpanLinksSizeIs(128)))); } TEST(TracingMessageBatch, FlushLargeBatch) { @@ -275,16 +292,10 @@ TEST(TracingMessageBatch, FlushLargeBatch) { end_spans(make_ready_future()); auto spans = span_catcher->GetSpans(); - EXPECT_THAT( - spans, - Contains(AllOf( - SpanNamed("publish"), - SpanHasAttributes(OTelAttribute( - sc::kMessagingBatchMessageCount, batch_size), - OTelAttribute( - sc::kCodeFunction, "BatchSink::AsyncPublish"), - OTelAttribute(sc::kMessagingOperation, - "publish"))))); + EXPECT_THAT(spans, Contains(AllOf( + SpanNamed("publish"), + SpanHasAttributes(OTelAttribute( + sc::kMessagingBatchMessageCount, batch_size))))); EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #0"), SpanKindIsClient(), SpanLinksSizeIs(kDefaultMaxLinks)))); EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #1"), SpanKindIsClient(), @@ -311,15 +322,11 @@ TEST(TracingMessageBatch, FlushBatchWithCustomLimit) { end_spans(make_ready_future()); auto spans = span_catcher->GetSpans(); - EXPECT_THAT( - spans, - Contains(AllOf(SpanHasInstrumentationScope(), SpanKindIsProducer(), - SpanNamed("publish"), - SpanHasAttributes( - OTelAttribute( - sc::kMessagingBatchMessageCount, kBatchSize), - OTelAttribute( - sc::kCodeFunction, "BatchSink::AsyncPublish"))))); + EXPECT_THAT(spans, Contains(AllOf( + SpanHasInstrumentationScope(), SpanKindIsProducer(), + SpanNamed("publish"), + SpanHasAttributes(OTelAttribute( + sc::kMessagingBatchMessageCount, kBatchSize))))); EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #0"), SpanKindIsClient(), SpanLinksSizeIs(kMaxLinks)))); EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #1"), SpanKindIsClient(), diff --git a/google/cloud/storage/internal/async/reader_connection_tracing.cc b/google/cloud/storage/internal/async/reader_connection_tracing.cc index 1880161fb0fcc..6ae14c182984d 100644 --- a/google/cloud/storage/internal/async/reader_connection_tracing.cc +++ b/google/cloud/storage/internal/async/reader_connection_tracing.cc @@ -18,9 +18,7 @@ #include "google/cloud/internal/opentelemetry.h" #include #include -#include #include -#include #include namespace google { @@ -31,12 +29,6 @@ namespace { namespace sc = ::opentelemetry::trace::SemanticConventions; -std::string CurrentThreadId() { - std::ostringstream os; - os << std::this_thread::get_id(); - return std::move(os).str(); -} - class AsyncReaderConnectionTracing : public storage_experimental::AsyncReaderConnection { public: @@ -47,9 +39,10 @@ class AsyncReaderConnectionTracing void Cancel() override { auto scope = opentelemetry::trace::Scope(span_); - span_->AddEvent("gl-cpp.cancel", { - {sc::kThreadId, CurrentThreadId()}, - }); + span_->AddEvent("gl-cpp.cancel", + { + {sc::kThreadId, internal::CurrentThreadId()}, + }); return impl_->Cancel(); } @@ -63,7 +56,7 @@ class AsyncReaderConnectionTracing { {sc::kMessageType, "RECEIVED"}, {sc::kMessageId, count}, - {sc::kThreadId, CurrentThreadId()}, + {sc::kThreadId, internal::CurrentThreadId()}, }); return internal::EndSpan(*span, absl::get(std::move(r))); } @@ -72,7 +65,7 @@ class AsyncReaderConnectionTracing { {sc::kMessageType, "RECEIVED"}, {sc::kMessageId, count}, - {sc::kThreadId, CurrentThreadId()}, + {sc::kThreadId, internal::CurrentThreadId()}, {"message.starting_offset", payload.offset()}, }); return r; diff --git a/google/cloud/storage/internal/async/writer_connection_tracing.cc b/google/cloud/storage/internal/async/writer_connection_tracing.cc index 2c3cd2a52e90a..fc5590098f573 100644 --- a/google/cloud/storage/internal/async/writer_connection_tracing.cc +++ b/google/cloud/storage/internal/async/writer_connection_tracing.cc @@ -17,9 +17,7 @@ #include "google/cloud/internal/opentelemetry.h" #include #include -#include #include -#include #include namespace google { @@ -30,12 +28,6 @@ namespace { namespace sc = ::opentelemetry::trace::SemanticConventions; -std::string CurrentThreadId() { - std::ostringstream os; - os << std::this_thread::get_id(); - return std::move(os).str(); -} - class AsyncWriterConnectionTracing : public storage_experimental::AsyncWriterConnection { public: @@ -46,9 +38,10 @@ class AsyncWriterConnectionTracing void Cancel() override { auto scope = opentelemetry::trace::Scope(span_); - span_->AddEvent("gl-cpp.cancel", { - {sc::kThreadId, CurrentThreadId()}, - }); + span_->AddEvent("gl-cpp.cancel", + { + {sc::kThreadId, internal::CurrentThreadId()}, + }); return impl_->Cancel(); } @@ -68,12 +61,13 @@ class AsyncWriterConnectionTracing auto size = static_cast(p.size()); return impl_->Write(std::move(p)) .then([count = ++sent_count_, span = span_, size](auto f) { - span->AddEvent("gl-cpp.write", { - {sc::kMessageType, "SENT"}, - {sc::kMessageId, count}, - {sc::kThreadId, CurrentThreadId()}, - {"gl-cpp.size", size}, - }); + span->AddEvent("gl-cpp.write", + { + {sc::kMessageType, "SENT"}, + {sc::kMessageId, count}, + {sc::kThreadId, internal::CurrentThreadId()}, + {"gl-cpp.size", size}, + }); auto status = f.get(); if (!status.ok()) return internal::EndSpan(*span, std::move(status)); return status; @@ -90,7 +84,7 @@ class AsyncWriterConnectionTracing { {sc::kMessageType, "SENT"}, {sc::kMessageId, count}, - {sc::kThreadId, CurrentThreadId()}, + {sc::kThreadId, internal::CurrentThreadId()}, {"gl-cpp.size", size}, }); return internal::EndSpan(*span, f.get()); @@ -102,12 +96,13 @@ class AsyncWriterConnectionTracing auto size = static_cast(p.size()); return impl_->Flush(std::move(p)) .then([count = ++sent_count_, span = span_, size](auto f) { - span->AddEvent("gl-cpp.flush", { - {sc::kMessageType, "SENT"}, - {sc::kMessageId, count}, - {sc::kThreadId, CurrentThreadId()}, - {"gl-cpp.size", size}, - }); + span->AddEvent("gl-cpp.flush", + { + {sc::kMessageType, "SENT"}, + {sc::kMessageId, count}, + {sc::kThreadId, internal::CurrentThreadId()}, + {"gl-cpp.size", size}, + }); auto status = f.get(); if (!status.ok()) return internal::EndSpan(*span, std::move(status)); return status; @@ -117,11 +112,12 @@ class AsyncWriterConnectionTracing future> Query() override { internal::OTelScope scope(span_); return impl_->Query().then([count = ++recv_count_, span = span_](auto f) { - span->AddEvent("gl-cpp.query", { - {sc::kMessageType, "RECEIVE"}, - {sc::kMessageId, count}, - {sc::kThreadId, CurrentThreadId()}, - }); + span->AddEvent("gl-cpp.query", + { + {sc::kMessageType, "RECEIVE"}, + {sc::kMessageId, count}, + {sc::kThreadId, internal::CurrentThreadId()}, + }); auto response = f.get(); if (!response) return internal::EndSpan(*span, std::move(response)); return response;