Skip to content

Commit

Permalink
impl(pubsub): add thread id attribute to publish span (#13151)
Browse files Browse the repository at this point in the history
  • Loading branch information
alevenberg committed Nov 21, 2023
1 parent 9d81890 commit 38647aa
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 84 deletions.
9 changes: 9 additions & 0 deletions google/cloud/internal/opentelemetry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
#include <opentelemetry/trace/provider.h>
#include <opentelemetry/trace/span_startoptions.h>
#include <sstream>
#include <string>
#include <thread>
#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

namespace google {
Expand Down Expand Up @@ -136,6 +139,12 @@ std::string ToString(opentelemetry::trace::SpanId const& span_id) {
return std::string(span_id_array, kSize);
}

std::string CurrentThreadId() {
std::ostringstream os;
os << std::this_thread::get_id();
return std::move(os).str();
}

#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/internal/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <opentelemetry/nostd/string_view.h>
#include <opentelemetry/trace/span.h>
#include <opentelemetry/trace/tracer.h>
#include <string>
#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
#include <chrono>
#include <functional>
Expand Down Expand Up @@ -216,6 +217,9 @@ std::string ToString(opentelemetry::trace::TraceId const& trace_id);

std::string ToString(opentelemetry::trace::SpanId const& span_id);

/// Gets the current thread id.
std::string CurrentThreadId();

#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

bool TracingEnabled(Options const& options);
Expand Down
3 changes: 2 additions & 1 deletion google/cloud/pubsub/internal/tracing_message_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ auto MakeParent(Links const& links, Spans const& message_spans) {
static_cast<std::int64_t>(message_spans.size())},
{sc::kCodeFunction, "BatchSink::AsyncPublish"},
{/*sc::kMessagingOperation=*/
"messaging.operation", "publish"}},
"messaging.operation", "publish"},
{sc::kThreadId, internal::CurrentThreadId()}},
/*links*/ std::move(links), options);

// Add metadata to the message spans about the batch sink span.
Expand Down
89 changes: 48 additions & 41 deletions google/cloud/pubsub/internal/tracing_message_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,43 @@ TEST(TracingMessageBatch, Flush) {
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 1),
OTelAttribute<std::string>(sc::kCodeFunction,
"BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation, "publish")),
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 1)),
SpanHasLinks(AllOf(LinkHasSpanContext(message_span->GetContext()),
SpanLinkAttributesAre(OTelAttribute<int64_t>(
"messaging.gcp_pubsub.message.link", 0)))))));
}

TEST(TracingMessageBatch, PublishSpanHasAttributes) {
namespace sc = ::opentelemetry::trace::SemanticConventions;
auto span_catcher = InstallSpanCatcher();
auto message_span = MakeSpan("test span");
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, SaveMessage(_));
EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; });
auto message_batch =
MakeTracingMessageBatch(std::move(mock), MakeTestOptions());
auto initial_spans = {message_span};
SaveMessages(initial_spans, message_batch);

auto end_spans = message_batch->Flush();
end_spans(make_ready_future());

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kThreadId, _)))));
EXPECT_THAT(spans, Contains(AllOf(

SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kCodeFunction, "BatchSink::AsyncPublish")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kMessagingOperation, "publish")))));
}

TEST(TracingMessageBatch, FlushOnlyIncludeSampledLink) {
namespace sc = ::opentelemetry::trace::SemanticConventions;
// Create span before the span catcher so it is not sampled.
Expand Down Expand Up @@ -178,10 +206,7 @@ TEST(TracingMessageBatch, FlushOnlyIncludeSampledLink) {
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 2),
OTelAttribute<std::string>(sc::kCodeFunction,
"BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation, "publish")),
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 2)),
SpanLinksAre(AllOf(LinkHasSpanContext(message_span->GetContext()),
SpanLinkAttributesAre(OTelAttribute<int64_t>(
"messaging.gcp_pubsub.message.link", 0)))))));
Expand Down Expand Up @@ -213,10 +238,7 @@ TEST(TracingMessageBatch, FlushSmallBatch) {
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 2),
OTelAttribute<std::string>(sc::kCodeFunction,
"BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation, "publish")),
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 2)),
SpanHasLinks(AllOf(LinkHasSpanContext(message_span1->GetContext()),
SpanLinkAttributesAre(OTelAttribute<int64_t>(
"messaging.gcp_pubsub.message.link", 0))),
Expand Down Expand Up @@ -244,16 +266,11 @@ TEST(TracingMessageBatch, FlushBatchWithOtelLimit) {
auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Contains(AllOf(
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount,
kDefaultMaxLinks),
OTelAttribute<std::string>(sc::kCodeFunction,
"BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation, "publish")),
SpanLinksSizeIs(128))));
Contains(AllOf(SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, kDefaultMaxLinks)),
SpanLinksSizeIs(128))));
}

TEST(TracingMessageBatch, FlushLargeBatch) {
Expand All @@ -275,16 +292,10 @@ TEST(TracingMessageBatch, FlushLargeBatch) {
end_spans(make_ready_future());

auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Contains(AllOf(
SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, batch_size),
OTelAttribute<std::string>(
sc::kCodeFunction, "BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation,
"publish")))));
EXPECT_THAT(spans, Contains(AllOf(
SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, batch_size)))));
EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #0"), SpanKindIsClient(),
SpanLinksSizeIs(kDefaultMaxLinks))));
EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #1"), SpanKindIsClient(),
Expand All @@ -311,15 +322,11 @@ TEST(TracingMessageBatch, FlushBatchWithCustomLimit) {
end_spans(make_ready_future());

auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Contains(AllOf(SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, kBatchSize),
OTelAttribute<std::string>(
sc::kCodeFunction, "BatchSink::AsyncPublish")))));
EXPECT_THAT(spans, Contains(AllOf(
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, kBatchSize)))));
EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #0"), SpanKindIsClient(),
SpanLinksSizeIs(kMaxLinks))));
EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #1"), SpanKindIsClient(),
Expand Down
19 changes: 6 additions & 13 deletions google/cloud/storage/internal/async/reader_connection_tracing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
#include "google/cloud/internal/opentelemetry.h"
#include <opentelemetry/trace/semantic_conventions.h>
#include <cstdint>
#include <sstream>
#include <string>
#include <thread>
#include <utility>

namespace google {
Expand All @@ -31,12 +29,6 @@ namespace {

namespace sc = ::opentelemetry::trace::SemanticConventions;

std::string CurrentThreadId() {
std::ostringstream os;
os << std::this_thread::get_id();
return std::move(os).str();
}

class AsyncReaderConnectionTracing
: public storage_experimental::AsyncReaderConnection {
public:
Expand All @@ -47,9 +39,10 @@ class AsyncReaderConnectionTracing

void Cancel() override {
auto scope = opentelemetry::trace::Scope(span_);
span_->AddEvent("gl-cpp.cancel", {
{sc::kThreadId, CurrentThreadId()},
});
span_->AddEvent("gl-cpp.cancel",
{
{sc::kThreadId, internal::CurrentThreadId()},
});
return impl_->Cancel();
}

Expand All @@ -63,7 +56,7 @@ class AsyncReaderConnectionTracing
{
{sc::kMessageType, "RECEIVED"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{sc::kThreadId, internal::CurrentThreadId()},
});
return internal::EndSpan(*span, absl::get<Status>(std::move(r)));
}
Expand All @@ -72,7 +65,7 @@ class AsyncReaderConnectionTracing
{
{sc::kMessageType, "RECEIVED"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{sc::kThreadId, internal::CurrentThreadId()},
{"message.starting_offset", payload.offset()},
});
return r;
Expand Down
54 changes: 25 additions & 29 deletions google/cloud/storage/internal/async/writer_connection_tracing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
#include "google/cloud/internal/opentelemetry.h"
#include <opentelemetry/trace/semantic_conventions.h>
#include <cstdint>
#include <sstream>
#include <string>
#include <thread>
#include <utility>

namespace google {
Expand All @@ -30,12 +28,6 @@ namespace {

namespace sc = ::opentelemetry::trace::SemanticConventions;

std::string CurrentThreadId() {
std::ostringstream os;
os << std::this_thread::get_id();
return std::move(os).str();
}

class AsyncWriterConnectionTracing
: public storage_experimental::AsyncWriterConnection {
public:
Expand All @@ -46,9 +38,10 @@ class AsyncWriterConnectionTracing

void Cancel() override {
auto scope = opentelemetry::trace::Scope(span_);
span_->AddEvent("gl-cpp.cancel", {
{sc::kThreadId, CurrentThreadId()},
});
span_->AddEvent("gl-cpp.cancel",
{
{sc::kThreadId, internal::CurrentThreadId()},
});
return impl_->Cancel();
}

Expand All @@ -68,12 +61,13 @@ class AsyncWriterConnectionTracing
auto size = static_cast<std::uint64_t>(p.size());
return impl_->Write(std::move(p))
.then([count = ++sent_count_, span = span_, size](auto f) {
span->AddEvent("gl-cpp.write", {
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{"gl-cpp.size", size},
});
span->AddEvent("gl-cpp.write",
{
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, internal::CurrentThreadId()},
{"gl-cpp.size", size},
});
auto status = f.get();
if (!status.ok()) return internal::EndSpan(*span, std::move(status));
return status;
Expand All @@ -90,7 +84,7 @@ class AsyncWriterConnectionTracing
{
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{sc::kThreadId, internal::CurrentThreadId()},
{"gl-cpp.size", size},
});
return internal::EndSpan(*span, f.get());
Expand All @@ -102,12 +96,13 @@ class AsyncWriterConnectionTracing
auto size = static_cast<std::uint64_t>(p.size());
return impl_->Flush(std::move(p))
.then([count = ++sent_count_, span = span_, size](auto f) {
span->AddEvent("gl-cpp.flush", {
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{"gl-cpp.size", size},
});
span->AddEvent("gl-cpp.flush",
{
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, internal::CurrentThreadId()},
{"gl-cpp.size", size},
});
auto status = f.get();
if (!status.ok()) return internal::EndSpan(*span, std::move(status));
return status;
Expand All @@ -117,11 +112,12 @@ class AsyncWriterConnectionTracing
future<StatusOr<std::int64_t>> Query() override {
internal::OTelScope scope(span_);
return impl_->Query().then([count = ++recv_count_, span = span_](auto f) {
span->AddEvent("gl-cpp.query", {
{sc::kMessageType, "RECEIVE"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
});
span->AddEvent("gl-cpp.query",
{
{sc::kMessageType, "RECEIVE"},
{sc::kMessageId, count},
{sc::kThreadId, internal::CurrentThreadId()},
});
auto response = f.get();
if (!response) return internal::EndSpan(*span, std::move(response));
return response;
Expand Down

0 comments on commit 38647aa

Please sign in to comment.