Skip to content

Commit

Permalink
fix(bigtable): async context propagation for tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc committed Nov 17, 2023
1 parent 37619f6 commit 0a288f4
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 71 deletions.
44 changes: 30 additions & 14 deletions google/cloud/bigtable/internal/data_tracing_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class DataTracingConnection : public bigtable::DataConnection {
future<Status> AsyncApply(std::string const& table_name,
bigtable::SingleRowMutation mut) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncApply");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return internal::EndSpan(std::move(span),
child_->AsyncApply(table_name, std::move(mut)));
}
Expand All @@ -82,12 +82,18 @@ class DataTracingConnection : public bigtable::DataConnection {
future<std::vector<bigtable::FailedMutation>> AsyncBulkApply(
std::string const& table_name, bigtable::BulkMutation mut) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncBulkApply");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
auto total_mutations = mut.size();
auto failures = child_->AsyncBulkApply(table_name, std::move(mut));
return failures.then([s = std::move(span), total_mutations](auto f) {
return EndBulkApplySpan(*s, total_mutations, f.get());
});
return child_->AsyncBulkApply(table_name, std::move(mut))
.then([s = std::move(span), total_mutations](auto f) {
return EndBulkApplySpan(*s, total_mutations, f.get());
})
.then([oc = opentelemetry::context::RuntimeContext::GetCurrent()](
auto f) {
auto t = f.get();
internal::DetachOTelContext(oc);
return t;
});
}

bigtable::RowReader ReadRowsFull(bigtable::ReadRowsParams params) override {
Expand Down Expand Up @@ -123,7 +129,7 @@ class DataTracingConnection : public bigtable::DataConnection {
bigtable::Filter filter, std::vector<bigtable::Mutation> true_mutations,
std::vector<bigtable::Mutation> false_mutations) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncCheckAndMutateRow");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return internal::EndSpan(
std::move(span),
child_->AsyncCheckAndMutateRow(
Expand All @@ -141,7 +147,7 @@ class DataTracingConnection : public bigtable::DataConnection {
future<StatusOr<std::vector<bigtable::RowKeySample>>> AsyncSampleRows(
std::string const& table_name) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncSampleRows");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return internal::EndSpan(std::move(span),
child_->AsyncSampleRows(table_name));
}
Expand All @@ -157,7 +163,7 @@ class DataTracingConnection : public bigtable::DataConnection {
future<StatusOr<bigtable::Row>> AsyncReadModifyWriteRow(
google::bigtable::v2::ReadModifyWriteRowRequest request) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncReadModifyWriteRow");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return internal::EndSpan(
std::move(span), child_->AsyncReadModifyWriteRow(std::move(request)));
}
Expand All @@ -168,10 +174,14 @@ class DataTracingConnection : public bigtable::DataConnection {
bigtable::RowSet row_set, std::int64_t rows_limit,
bigtable::Filter filter) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncReadRows");
auto scope = opentelemetry::trace::Scope(span);
auto traced_on_finish = [span, on_finish](Status const& status) {
return on_finish(internal::EndSpan(*span, status));
};
internal::OTelScope scope(span);
auto traced_on_finish =
[span, on_finish,
oc = opentelemetry::context::RuntimeContext::GetCurrent()](
Status const& status) {
internal::DetachOTelContext(oc);
return on_finish(internal::EndSpan(*span, status));
};
child_->AsyncReadRows(table_name, std::move(on_row),
std::move(traced_on_finish), std::move(row_set),
std::move(rows_limit), std::move(filter));
Expand All @@ -181,11 +191,17 @@ class DataTracingConnection : public bigtable::DataConnection {
std::string const& table_name, std::string row_key,
bigtable::Filter filter) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncReadRow");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return child_
->AsyncReadRow(table_name, std::move(row_key), std::move(filter))
.then([s = std::move(span)](auto f) {
return EndReadRowSpan(*s, f.get());
})
.then([oc = opentelemetry::context::RuntimeContext::GetCurrent()](
auto f) {
auto t = f.get();
internal::DetachOTelContext(oc);
return t;
});
}

Expand Down
134 changes: 77 additions & 57 deletions google/cloud/bigtable/internal/data_tracing_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ using ::google::cloud::bigtable_mocks::MockDataConnection;
using ::google::cloud::testing_util::InstallSpanCatcher;
using ::google::cloud::testing_util::IsOkAndHolds;
using ::google::cloud::testing_util::OTelAttribute;
using ::google::cloud::testing_util::OTelContextCaptured;
using ::google::cloud::testing_util::PromiseWithOTelContext;
using ::google::cloud::testing_util::SpanHasAttributes;
using ::google::cloud::testing_util::SpanHasInstrumentationScope;
using ::google::cloud::testing_util::SpanKindIsClient;
Expand All @@ -56,6 +58,21 @@ bigtable::SingleRowMutation Mutation() {
"row", {bigtable::SetCell("fam", "col", ms(0), "val")});
}

auto expect_context = [](auto& p) {
return [&p] {
EXPECT_TRUE(ThereIsAnActiveSpan());
EXPECT_TRUE(OTelContextCaptured());
return p.get_future();
};
};

auto expect_no_context = [](auto f) {
auto t = f.get();
EXPECT_FALSE(ThereIsAnActiveSpan());
EXPECT_FALSE(OTelContextCaptured());
return t;
};

TEST(DataTracingConnection, Options) {
struct TestOption {
using Type = int;
Expand Down Expand Up @@ -94,15 +111,15 @@ TEST(DataTracingConnection, Apply) {

TEST(DataTracingConnection, AsyncApply) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<Status> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncApply).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncApply).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto status = under_test->AsyncApply(kTableName, Mutation());
auto status =
under_test->AsyncApply(kTableName, Mutation()).then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(status.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand Down Expand Up @@ -171,15 +188,15 @@ TEST(DataTracingConnection, BulkApplyFailure) {

TEST(DataTracingConnection, AsyncBulkApplySuccess) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<std::vector<bigtable::FailedMutation>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncBulkApply).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(std::vector<bigtable::FailedMutation>{});
});
EXPECT_CALL(*mock, AsyncBulkApply).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto failures = under_test->AsyncBulkApply(kTableName, Mutation());
auto failures = under_test->AsyncBulkApply(kTableName, Mutation())
.then(expect_no_context);
p.set_value(std::vector<bigtable::FailedMutation>{});
EXPECT_THAT(failures.get(), IsEmpty());

EXPECT_THAT(
Expand All @@ -196,19 +213,19 @@ TEST(DataTracingConnection, AsyncBulkApplySuccess) {

TEST(DataTracingConnection, AsyncBulkApplyFailure) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<std::vector<bigtable::FailedMutation>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncBulkApply).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(std::vector<bigtable::FailedMutation>{
{internal::AbortedError("fail"), 1},
{internal::AbortedError("fail"), 2}});
});
EXPECT_CALL(*mock, AsyncBulkApply).WillOnce(expect_context(p));

bigtable::BulkMutation mut;
for (auto i = 0; i != 10; ++i) mut.push_back(Mutation());
auto under_test = MakeDataTracingConnection(mock);
auto failures = under_test->AsyncBulkApply(kTableName, std::move(mut));
auto failures = under_test->AsyncBulkApply(kTableName, std::move(mut))
.then(expect_no_context);
p.set_value(std::vector<bigtable::FailedMutation>{
{internal::AbortedError("fail"), 1},
{internal::AbortedError("fail"), 2}});
EXPECT_THAT(failures.get(), SizeIs(2));

EXPECT_THAT(
Expand Down Expand Up @@ -376,17 +393,18 @@ TEST(DataTracingConnection, CheckAndMutateRow) {

TEST(DataTracingConnection, AsyncCheckAndMutateRow) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<bigtable::MutationBranch>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncCheckAndMutateRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future<StatusOr<bigtable::MutationBranch>>(
internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncCheckAndMutateRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto branch = under_test->AsyncCheckAndMutateRow(
kTableName, "row", bigtable::Filter::PassAllFilter(), {}, {});
auto branch =
under_test
->AsyncCheckAndMutateRow(kTableName, "row",
bigtable::Filter::PassAllFilter(), {}, {})
.then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(branch.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand All @@ -401,6 +419,7 @@ TEST(DataTracingConnection, AsyncCheckAndMutateRow) {

TEST(DataTracingConnection, SampleRows) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<bigtable::MutationBranch>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, SampleRows).WillOnce([] {
Expand All @@ -424,16 +443,15 @@ TEST(DataTracingConnection, SampleRows) {

TEST(DataTracingConnection, AsyncSampleRows) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<std::vector<bigtable::RowKeySample>>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncSampleRows).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future<StatusOr<std::vector<bigtable::RowKeySample>>>(
internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncSampleRows).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto samples = under_test->AsyncSampleRows(kTableName);
auto samples =
under_test->AsyncSampleRows(kTableName).then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(samples.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand Down Expand Up @@ -471,16 +489,15 @@ TEST(DataTracingConnection, ReadModifyWriteRow) {

TEST(DataTracingConnection, AsyncReadModifyWriteRow) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<bigtable::Row>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadModifyWriteRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future<StatusOr<bigtable::Row>>(
internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncReadModifyWriteRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto samples = under_test->AsyncReadModifyWriteRow({});
auto samples =
under_test->AsyncReadModifyWriteRow({}).then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(samples.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand All @@ -504,6 +521,7 @@ TEST(DataTracingConnection, AsyncReadRows) {
bigtable::RowSet const&, std::int64_t,
bigtable::Filter const&) {
EXPECT_TRUE(ThereIsAnActiveSpan());
EXPECT_TRUE(OTelContextCaptured());
// Invoke the callbacks.
on_row(bigtable::Row("r1", {})).get();
on_row(bigtable::Row("r2", {})).get();
Expand All @@ -523,6 +541,8 @@ TEST(DataTracingConnection, AsyncReadRows) {

MockFunction<void(Status)> on_finish;
EXPECT_CALL(on_finish, Call).WillOnce([](Status const& status) {
EXPECT_FALSE(ThereIsAnActiveSpan());
EXPECT_FALSE(OTelContextCaptured());
EXPECT_THAT(status, StatusIs(StatusCode::kAborted));
});

Expand All @@ -543,17 +563,17 @@ TEST(DataTracingConnection, AsyncReadRows) {

TEST(DataTracingConnection, AsyncReadRowFound) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<std::pair<bool, bigtable::Row>>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(
make_status_or<std::pair<bool, bigtable::Row>>({true, {"row", {}}}));
});
EXPECT_CALL(*mock, AsyncReadRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto row = under_test->AsyncReadRow(kTableName, "row",
bigtable::Filter::PassAllFilter());
auto row =
under_test
->AsyncReadRow(kTableName, "row", bigtable::Filter::PassAllFilter())
.then(expect_no_context);
p.set_value(std::pair<bool, bigtable::Row>({true, {"row", {}}}));
EXPECT_THAT(row.get(), IsOkAndHolds(Pair(true, _)));

EXPECT_THAT(span_catcher->GetSpans(),
Expand All @@ -568,17 +588,17 @@ TEST(DataTracingConnection, AsyncReadRowFound) {

TEST(DataTracingConnection, AsyncReadRowNotFound) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<std::pair<bool, bigtable::Row>>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(
make_status_or<std::pair<bool, bigtable::Row>>({false, {"row", {}}}));
});
EXPECT_CALL(*mock, AsyncReadRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto row = under_test->AsyncReadRow(kTableName, "row",
bigtable::Filter::PassAllFilter());
auto row =
under_test
->AsyncReadRow(kTableName, "row", bigtable::Filter::PassAllFilter())
.then(expect_no_context);
p.set_value(std::pair<bool, bigtable::Row>({false, {"row", {}}}));
EXPECT_THAT(row.get(), IsOkAndHolds(Pair(false, _)));

EXPECT_THAT(span_catcher->GetSpans(),
Expand All @@ -593,17 +613,17 @@ TEST(DataTracingConnection, AsyncReadRowNotFound) {

TEST(DataTracingConnection, AsyncReadRowFailure) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<std::pair<bool, bigtable::Row>>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future<StatusOr<std::pair<bool, bigtable::Row>>>(
internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncReadRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto row = under_test->AsyncReadRow(kTableName, "row",
bigtable::Filter::PassAllFilter());
auto row =
under_test
->AsyncReadRow(kTableName, "row", bigtable::Filter::PassAllFilter())
.then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(row.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand Down

0 comments on commit 0a288f4

Please sign in to comment.