diff --git a/google/cloud/bigtable/internal/data_tracing_connection.cc b/google/cloud/bigtable/internal/data_tracing_connection.cc index 2b3683f93eb6..e7c4942135b6 100644 --- a/google/cloud/bigtable/internal/data_tracing_connection.cc +++ b/google/cloud/bigtable/internal/data_tracing_connection.cc @@ -65,7 +65,7 @@ class DataTracingConnection : public bigtable::DataConnection { future AsyncApply(std::string const& table_name, bigtable::SingleRowMutation mut) override { auto span = internal::MakeSpan("bigtable::Table::AsyncApply"); - auto scope = opentelemetry::trace::Scope(span); + internal::OTelScope scope(span); return internal::EndSpan(std::move(span), child_->AsyncApply(table_name, std::move(mut))); } @@ -82,12 +82,18 @@ class DataTracingConnection : public bigtable::DataConnection { future> AsyncBulkApply( std::string const& table_name, bigtable::BulkMutation mut) override { auto span = internal::MakeSpan("bigtable::Table::AsyncBulkApply"); - auto scope = opentelemetry::trace::Scope(span); + internal::OTelScope scope(span); auto total_mutations = mut.size(); - auto failures = child_->AsyncBulkApply(table_name, std::move(mut)); - return failures.then([s = std::move(span), total_mutations](auto f) { - return EndBulkApplySpan(*s, total_mutations, f.get()); - }); + return child_->AsyncBulkApply(table_name, std::move(mut)) + .then([s = std::move(span), total_mutations](auto f) { + return EndBulkApplySpan(*s, total_mutations, f.get()); + }) + .then([oc = opentelemetry::context::RuntimeContext::GetCurrent()]( + auto f) { + auto t = f.get(); + internal::DetachOTelContext(oc); + return t; + }); } bigtable::RowReader ReadRowsFull(bigtable::ReadRowsParams params) override { @@ -123,7 +129,7 @@ class DataTracingConnection : public bigtable::DataConnection { bigtable::Filter filter, std::vector true_mutations, std::vector false_mutations) override { auto span = internal::MakeSpan("bigtable::Table::AsyncCheckAndMutateRow"); - auto scope = opentelemetry::trace::Scope(span); + internal::OTelScope scope(span); return internal::EndSpan( std::move(span), child_->AsyncCheckAndMutateRow( @@ -141,7 +147,7 @@ class DataTracingConnection : public bigtable::DataConnection { future>> AsyncSampleRows( std::string const& table_name) override { auto span = internal::MakeSpan("bigtable::Table::AsyncSampleRows"); - auto scope = opentelemetry::trace::Scope(span); + internal::OTelScope scope(span); return internal::EndSpan(std::move(span), child_->AsyncSampleRows(table_name)); } @@ -157,7 +163,7 @@ class DataTracingConnection : public bigtable::DataConnection { future> AsyncReadModifyWriteRow( google::bigtable::v2::ReadModifyWriteRowRequest request) override { auto span = internal::MakeSpan("bigtable::Table::AsyncReadModifyWriteRow"); - auto scope = opentelemetry::trace::Scope(span); + internal::OTelScope scope(span); return internal::EndSpan( std::move(span), child_->AsyncReadModifyWriteRow(std::move(request))); } @@ -168,10 +174,14 @@ class DataTracingConnection : public bigtable::DataConnection { bigtable::RowSet row_set, std::int64_t rows_limit, bigtable::Filter filter) override { auto span = internal::MakeSpan("bigtable::Table::AsyncReadRows"); - auto scope = opentelemetry::trace::Scope(span); - auto traced_on_finish = [span, on_finish](Status const& status) { - return on_finish(internal::EndSpan(*span, status)); - }; + internal::OTelScope scope(span); + auto traced_on_finish = + [span, on_finish, + oc = opentelemetry::context::RuntimeContext::GetCurrent()]( + Status const& status) { + internal::DetachOTelContext(oc); + return on_finish(internal::EndSpan(*span, status)); + }; child_->AsyncReadRows(table_name, std::move(on_row), std::move(traced_on_finish), std::move(row_set), std::move(rows_limit), std::move(filter)); @@ -181,11 +191,17 @@ class DataTracingConnection : public bigtable::DataConnection { std::string const& table_name, std::string row_key, bigtable::Filter filter) override { auto span = internal::MakeSpan("bigtable::Table::AsyncReadRow"); - auto scope = opentelemetry::trace::Scope(span); + internal::OTelScope scope(span); return child_ ->AsyncReadRow(table_name, std::move(row_key), std::move(filter)) .then([s = std::move(span)](auto f) { return EndReadRowSpan(*s, f.get()); + }) + .then([oc = opentelemetry::context::RuntimeContext::GetCurrent()]( + auto f) { + auto t = f.get(); + internal::DetachOTelContext(oc); + return t; }); } diff --git a/google/cloud/bigtable/internal/data_tracing_connection_test.cc b/google/cloud/bigtable/internal/data_tracing_connection_test.cc index c0f7d4883267..bf414d71141d 100644 --- a/google/cloud/bigtable/internal/data_tracing_connection_test.cc +++ b/google/cloud/bigtable/internal/data_tracing_connection_test.cc @@ -31,6 +31,8 @@ using ::google::cloud::bigtable_mocks::MockDataConnection; using ::google::cloud::testing_util::InstallSpanCatcher; using ::google::cloud::testing_util::IsOkAndHolds; using ::google::cloud::testing_util::OTelAttribute; +using ::google::cloud::testing_util::OTelContextCaptured; +using ::google::cloud::testing_util::PromiseWithOTelContext; using ::google::cloud::testing_util::SpanHasAttributes; using ::google::cloud::testing_util::SpanHasInstrumentationScope; using ::google::cloud::testing_util::SpanKindIsClient; @@ -56,6 +58,21 @@ bigtable::SingleRowMutation Mutation() { "row", {bigtable::SetCell("fam", "col", ms(0), "val")}); } +auto expect_context = [](auto& p) { + return [&p] { + EXPECT_TRUE(ThereIsAnActiveSpan()); + EXPECT_TRUE(OTelContextCaptured()); + return p.get_future(); + }; +}; + +auto expect_no_context = [](auto f) { + auto t = f.get(); + EXPECT_FALSE(ThereIsAnActiveSpan()); + EXPECT_FALSE(OTelContextCaptured()); + return t; +}; + TEST(DataTracingConnection, Options) { struct TestOption { using Type = int; @@ -94,15 +111,15 @@ TEST(DataTracingConnection, Apply) { TEST(DataTracingConnection, AsyncApply) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext p; auto mock = std::make_shared(); - EXPECT_CALL(*mock, AsyncApply).WillOnce([] { - EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future(internal::AbortedError("fail")); - }); + EXPECT_CALL(*mock, AsyncApply).WillOnce(expect_context(p)); auto under_test = MakeDataTracingConnection(mock); - auto status = under_test->AsyncApply(kTableName, Mutation()); + auto status = + under_test->AsyncApply(kTableName, Mutation()).then(expect_no_context); + p.set_value(internal::AbortedError("fail")); EXPECT_THAT(status.get(), StatusIs(StatusCode::kAborted)); EXPECT_THAT( @@ -171,15 +188,15 @@ TEST(DataTracingConnection, BulkApplyFailure) { TEST(DataTracingConnection, AsyncBulkApplySuccess) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext> p; auto mock = std::make_shared(); - EXPECT_CALL(*mock, AsyncBulkApply).WillOnce([] { - EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future(std::vector{}); - }); + EXPECT_CALL(*mock, AsyncBulkApply).WillOnce(expect_context(p)); auto under_test = MakeDataTracingConnection(mock); - auto failures = under_test->AsyncBulkApply(kTableName, Mutation()); + auto failures = under_test->AsyncBulkApply(kTableName, Mutation()) + .then(expect_no_context); + p.set_value(std::vector{}); EXPECT_THAT(failures.get(), IsEmpty()); EXPECT_THAT( @@ -196,19 +213,19 @@ TEST(DataTracingConnection, AsyncBulkApplySuccess) { TEST(DataTracingConnection, AsyncBulkApplyFailure) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext> p; auto mock = std::make_shared(); - EXPECT_CALL(*mock, AsyncBulkApply).WillOnce([] { - EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future(std::vector{ - {internal::AbortedError("fail"), 1}, - {internal::AbortedError("fail"), 2}}); - }); + EXPECT_CALL(*mock, AsyncBulkApply).WillOnce(expect_context(p)); bigtable::BulkMutation mut; for (auto i = 0; i != 10; ++i) mut.push_back(Mutation()); auto under_test = MakeDataTracingConnection(mock); - auto failures = under_test->AsyncBulkApply(kTableName, std::move(mut)); + auto failures = under_test->AsyncBulkApply(kTableName, std::move(mut)) + .then(expect_no_context); + p.set_value(std::vector{ + {internal::AbortedError("fail"), 1}, + {internal::AbortedError("fail"), 2}}); EXPECT_THAT(failures.get(), SizeIs(2)); EXPECT_THAT( @@ -376,17 +393,18 @@ TEST(DataTracingConnection, CheckAndMutateRow) { TEST(DataTracingConnection, AsyncCheckAndMutateRow) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext> p; auto mock = std::make_shared(); - EXPECT_CALL(*mock, AsyncCheckAndMutateRow).WillOnce([] { - EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future>( - internal::AbortedError("fail")); - }); + EXPECT_CALL(*mock, AsyncCheckAndMutateRow).WillOnce(expect_context(p)); auto under_test = MakeDataTracingConnection(mock); - auto branch = under_test->AsyncCheckAndMutateRow( - kTableName, "row", bigtable::Filter::PassAllFilter(), {}, {}); + auto branch = + under_test + ->AsyncCheckAndMutateRow(kTableName, "row", + bigtable::Filter::PassAllFilter(), {}, {}) + .then(expect_no_context); + p.set_value(internal::AbortedError("fail")); EXPECT_THAT(branch.get(), StatusIs(StatusCode::kAborted)); EXPECT_THAT( @@ -401,6 +419,7 @@ TEST(DataTracingConnection, AsyncCheckAndMutateRow) { TEST(DataTracingConnection, SampleRows) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext> p; auto mock = std::make_shared(); EXPECT_CALL(*mock, SampleRows).WillOnce([] { @@ -424,16 +443,15 @@ TEST(DataTracingConnection, SampleRows) { TEST(DataTracingConnection, AsyncSampleRows) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext>> p; auto mock = std::make_shared(); - EXPECT_CALL(*mock, AsyncSampleRows).WillOnce([] { - EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future>>( - internal::AbortedError("fail")); - }); + EXPECT_CALL(*mock, AsyncSampleRows).WillOnce(expect_context(p)); auto under_test = MakeDataTracingConnection(mock); - auto samples = under_test->AsyncSampleRows(kTableName); + auto samples = + under_test->AsyncSampleRows(kTableName).then(expect_no_context); + p.set_value(internal::AbortedError("fail")); EXPECT_THAT(samples.get(), StatusIs(StatusCode::kAborted)); EXPECT_THAT( @@ -471,16 +489,15 @@ TEST(DataTracingConnection, ReadModifyWriteRow) { TEST(DataTracingConnection, AsyncReadModifyWriteRow) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext> p; auto mock = std::make_shared(); - EXPECT_CALL(*mock, AsyncReadModifyWriteRow).WillOnce([] { - EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future>( - internal::AbortedError("fail")); - }); + EXPECT_CALL(*mock, AsyncReadModifyWriteRow).WillOnce(expect_context(p)); auto under_test = MakeDataTracingConnection(mock); - auto samples = under_test->AsyncReadModifyWriteRow({}); + auto samples = + under_test->AsyncReadModifyWriteRow({}).then(expect_no_context); + p.set_value(internal::AbortedError("fail")); EXPECT_THAT(samples.get(), StatusIs(StatusCode::kAborted)); EXPECT_THAT( @@ -504,6 +521,7 @@ TEST(DataTracingConnection, AsyncReadRows) { bigtable::RowSet const&, std::int64_t, bigtable::Filter const&) { EXPECT_TRUE(ThereIsAnActiveSpan()); + EXPECT_TRUE(OTelContextCaptured()); // Invoke the callbacks. on_row(bigtable::Row("r1", {})).get(); on_row(bigtable::Row("r2", {})).get(); @@ -523,6 +541,8 @@ TEST(DataTracingConnection, AsyncReadRows) { MockFunction on_finish; EXPECT_CALL(on_finish, Call).WillOnce([](Status const& status) { + EXPECT_FALSE(ThereIsAnActiveSpan()); + EXPECT_FALSE(OTelContextCaptured()); EXPECT_THAT(status, StatusIs(StatusCode::kAborted)); }); @@ -543,17 +563,17 @@ TEST(DataTracingConnection, AsyncReadRows) { TEST(DataTracingConnection, AsyncReadRowFound) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext>> p; auto mock = std::make_shared(); - EXPECT_CALL(*mock, AsyncReadRow).WillOnce([] { - EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future( - make_status_or>({true, {"row", {}}})); - }); + EXPECT_CALL(*mock, AsyncReadRow).WillOnce(expect_context(p)); auto under_test = MakeDataTracingConnection(mock); - auto row = under_test->AsyncReadRow(kTableName, "row", - bigtable::Filter::PassAllFilter()); + auto row = + under_test + ->AsyncReadRow(kTableName, "row", bigtable::Filter::PassAllFilter()) + .then(expect_no_context); + p.set_value(std::pair({true, {"row", {}}})); EXPECT_THAT(row.get(), IsOkAndHolds(Pair(true, _))); EXPECT_THAT(span_catcher->GetSpans(), @@ -568,17 +588,17 @@ TEST(DataTracingConnection, AsyncReadRowFound) { TEST(DataTracingConnection, AsyncReadRowNotFound) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext>> p; auto mock = std::make_shared(); - EXPECT_CALL(*mock, AsyncReadRow).WillOnce([] { - EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future( - make_status_or>({false, {"row", {}}})); - }); + EXPECT_CALL(*mock, AsyncReadRow).WillOnce(expect_context(p)); auto under_test = MakeDataTracingConnection(mock); - auto row = under_test->AsyncReadRow(kTableName, "row", - bigtable::Filter::PassAllFilter()); + auto row = + under_test + ->AsyncReadRow(kTableName, "row", bigtable::Filter::PassAllFilter()) + .then(expect_no_context); + p.set_value(std::pair({false, {"row", {}}})); EXPECT_THAT(row.get(), IsOkAndHolds(Pair(false, _))); EXPECT_THAT(span_catcher->GetSpans(), @@ -593,17 +613,17 @@ TEST(DataTracingConnection, AsyncReadRowNotFound) { TEST(DataTracingConnection, AsyncReadRowFailure) { auto span_catcher = InstallSpanCatcher(); + PromiseWithOTelContext>> p; auto mock = std::make_shared(); - EXPECT_CALL(*mock, AsyncReadRow).WillOnce([] { - EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future>>( - internal::AbortedError("fail")); - }); + EXPECT_CALL(*mock, AsyncReadRow).WillOnce(expect_context(p)); auto under_test = MakeDataTracingConnection(mock); - auto row = under_test->AsyncReadRow(kTableName, "row", - bigtable::Filter::PassAllFilter()); + auto row = + under_test + ->AsyncReadRow(kTableName, "row", bigtable::Filter::PassAllFilter()) + .then(expect_no_context); + p.set_value(internal::AbortedError("fail")); EXPECT_THAT(row.get(), StatusIs(StatusCode::kAborted)); EXPECT_THAT(