Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigtable): async context propagation for tracing #13156

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions google/cloud/bigtable/internal/data_tracing_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class DataTracingConnection : public bigtable::DataConnection {
future<Status> AsyncApply(std::string const& table_name,
bigtable::SingleRowMutation mut) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncApply");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return internal::EndSpan(std::move(span),
child_->AsyncApply(table_name, std::move(mut)));
}
Expand All @@ -82,12 +82,18 @@ class DataTracingConnection : public bigtable::DataConnection {
future<std::vector<bigtable::FailedMutation>> AsyncBulkApply(
std::string const& table_name, bigtable::BulkMutation mut) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncBulkApply");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
auto total_mutations = mut.size();
auto failures = child_->AsyncBulkApply(table_name, std::move(mut));
return failures.then([s = std::move(span), total_mutations](auto f) {
return EndBulkApplySpan(*s, total_mutations, f.get());
});
return child_->AsyncBulkApply(table_name, std::move(mut))
.then([s = std::move(span), total_mutations](auto f) {
return EndBulkApplySpan(*s, total_mutations, f.get());
})
.then([oc = opentelemetry::context::RuntimeContext::GetCurrent()](
auto f) {
auto t = f.get();
internal::DetachOTelContext(oc);
return t;
});
}

bigtable::RowReader ReadRowsFull(bigtable::ReadRowsParams params) override {
Expand Down Expand Up @@ -123,7 +129,7 @@ class DataTracingConnection : public bigtable::DataConnection {
bigtable::Filter filter, std::vector<bigtable::Mutation> true_mutations,
std::vector<bigtable::Mutation> false_mutations) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncCheckAndMutateRow");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return internal::EndSpan(
std::move(span),
child_->AsyncCheckAndMutateRow(
Expand All @@ -141,7 +147,7 @@ class DataTracingConnection : public bigtable::DataConnection {
future<StatusOr<std::vector<bigtable::RowKeySample>>> AsyncSampleRows(
std::string const& table_name) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncSampleRows");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return internal::EndSpan(std::move(span),
child_->AsyncSampleRows(table_name));
}
Expand All @@ -157,7 +163,7 @@ class DataTracingConnection : public bigtable::DataConnection {
future<StatusOr<bigtable::Row>> AsyncReadModifyWriteRow(
google::bigtable::v2::ReadModifyWriteRowRequest request) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncReadModifyWriteRow");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return internal::EndSpan(
std::move(span), child_->AsyncReadModifyWriteRow(std::move(request)));
}
Expand All @@ -168,10 +174,14 @@ class DataTracingConnection : public bigtable::DataConnection {
bigtable::RowSet row_set, std::int64_t rows_limit,
bigtable::Filter filter) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncReadRows");
auto scope = opentelemetry::trace::Scope(span);
auto traced_on_finish = [span, on_finish](Status const& status) {
return on_finish(internal::EndSpan(*span, status));
};
internal::OTelScope scope(span);
auto traced_on_finish =
[span, on_finish,
oc = opentelemetry::context::RuntimeContext::GetCurrent()](
Status const& status) {
internal::DetachOTelContext(oc);
return on_finish(internal::EndSpan(*span, status));
};
child_->AsyncReadRows(table_name, std::move(on_row),
std::move(traced_on_finish), std::move(row_set),
std::move(rows_limit), std::move(filter));
Expand All @@ -181,11 +191,17 @@ class DataTracingConnection : public bigtable::DataConnection {
std::string const& table_name, std::string row_key,
bigtable::Filter filter) override {
auto span = internal::MakeSpan("bigtable::Table::AsyncReadRow");
auto scope = opentelemetry::trace::Scope(span);
internal::OTelScope scope(span);
return child_
->AsyncReadRow(table_name, std::move(row_key), std::move(filter))
.then([s = std::move(span)](auto f) {
return EndReadRowSpan(*s, f.get());
})
.then([oc = opentelemetry::context::RuntimeContext::GetCurrent()](
auto f) {
auto t = f.get();
internal::DetachOTelContext(oc);
return t;
});
}

Expand Down
134 changes: 77 additions & 57 deletions google/cloud/bigtable/internal/data_tracing_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ using ::google::cloud::bigtable_mocks::MockDataConnection;
using ::google::cloud::testing_util::InstallSpanCatcher;
using ::google::cloud::testing_util::IsOkAndHolds;
using ::google::cloud::testing_util::OTelAttribute;
using ::google::cloud::testing_util::OTelContextCaptured;
using ::google::cloud::testing_util::PromiseWithOTelContext;
using ::google::cloud::testing_util::SpanHasAttributes;
using ::google::cloud::testing_util::SpanHasInstrumentationScope;
using ::google::cloud::testing_util::SpanKindIsClient;
Expand All @@ -56,6 +58,21 @@ bigtable::SingleRowMutation Mutation() {
"row", {bigtable::SetCell("fam", "col", ms(0), "val")});
}

auto expect_context = [](auto& p) {
return [&p] {
EXPECT_TRUE(ThereIsAnActiveSpan());
EXPECT_TRUE(OTelContextCaptured());
return p.get_future();
};
};

auto expect_no_context = [](auto f) {
auto t = f.get();
EXPECT_FALSE(ThereIsAnActiveSpan());
EXPECT_FALSE(OTelContextCaptured());
return t;
};

TEST(DataTracingConnection, Options) {
struct TestOption {
using Type = int;
Expand Down Expand Up @@ -94,15 +111,15 @@ TEST(DataTracingConnection, Apply) {

TEST(DataTracingConnection, AsyncApply) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<Status> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncApply).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncApply).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto status = under_test->AsyncApply(kTableName, Mutation());
auto status =
under_test->AsyncApply(kTableName, Mutation()).then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(status.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand Down Expand Up @@ -171,15 +188,15 @@ TEST(DataTracingConnection, BulkApplyFailure) {

TEST(DataTracingConnection, AsyncBulkApplySuccess) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<std::vector<bigtable::FailedMutation>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncBulkApply).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(std::vector<bigtable::FailedMutation>{});
});
EXPECT_CALL(*mock, AsyncBulkApply).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto failures = under_test->AsyncBulkApply(kTableName, Mutation());
auto failures = under_test->AsyncBulkApply(kTableName, Mutation())
.then(expect_no_context);
p.set_value(std::vector<bigtable::FailedMutation>{});
EXPECT_THAT(failures.get(), IsEmpty());

EXPECT_THAT(
Expand All @@ -196,19 +213,19 @@ TEST(DataTracingConnection, AsyncBulkApplySuccess) {

TEST(DataTracingConnection, AsyncBulkApplyFailure) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<std::vector<bigtable::FailedMutation>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncBulkApply).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(std::vector<bigtable::FailedMutation>{
{internal::AbortedError("fail"), 1},
{internal::AbortedError("fail"), 2}});
});
EXPECT_CALL(*mock, AsyncBulkApply).WillOnce(expect_context(p));

bigtable::BulkMutation mut;
for (auto i = 0; i != 10; ++i) mut.push_back(Mutation());
auto under_test = MakeDataTracingConnection(mock);
auto failures = under_test->AsyncBulkApply(kTableName, std::move(mut));
auto failures = under_test->AsyncBulkApply(kTableName, std::move(mut))
.then(expect_no_context);
p.set_value(std::vector<bigtable::FailedMutation>{
{internal::AbortedError("fail"), 1},
{internal::AbortedError("fail"), 2}});
EXPECT_THAT(failures.get(), SizeIs(2));

EXPECT_THAT(
Expand Down Expand Up @@ -376,17 +393,18 @@ TEST(DataTracingConnection, CheckAndMutateRow) {

TEST(DataTracingConnection, AsyncCheckAndMutateRow) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<bigtable::MutationBranch>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncCheckAndMutateRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future<StatusOr<bigtable::MutationBranch>>(
internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncCheckAndMutateRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto branch = under_test->AsyncCheckAndMutateRow(
kTableName, "row", bigtable::Filter::PassAllFilter(), {}, {});
auto branch =
under_test
->AsyncCheckAndMutateRow(kTableName, "row",
bigtable::Filter::PassAllFilter(), {}, {})
.then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(branch.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand All @@ -401,6 +419,7 @@ TEST(DataTracingConnection, AsyncCheckAndMutateRow) {

TEST(DataTracingConnection, SampleRows) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<bigtable::MutationBranch>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, SampleRows).WillOnce([] {
Expand All @@ -424,16 +443,15 @@ TEST(DataTracingConnection, SampleRows) {

TEST(DataTracingConnection, AsyncSampleRows) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<std::vector<bigtable::RowKeySample>>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncSampleRows).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future<StatusOr<std::vector<bigtable::RowKeySample>>>(
internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncSampleRows).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto samples = under_test->AsyncSampleRows(kTableName);
auto samples =
under_test->AsyncSampleRows(kTableName).then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(samples.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand Down Expand Up @@ -471,16 +489,15 @@ TEST(DataTracingConnection, ReadModifyWriteRow) {

TEST(DataTracingConnection, AsyncReadModifyWriteRow) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<bigtable::Row>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadModifyWriteRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future<StatusOr<bigtable::Row>>(
internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncReadModifyWriteRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto samples = under_test->AsyncReadModifyWriteRow({});
auto samples =
under_test->AsyncReadModifyWriteRow({}).then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(samples.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand All @@ -504,6 +521,7 @@ TEST(DataTracingConnection, AsyncReadRows) {
bigtable::RowSet const&, std::int64_t,
bigtable::Filter const&) {
EXPECT_TRUE(ThereIsAnActiveSpan());
EXPECT_TRUE(OTelContextCaptured());
// Invoke the callbacks.
on_row(bigtable::Row("r1", {})).get();
on_row(bigtable::Row("r2", {})).get();
Expand All @@ -523,6 +541,8 @@ TEST(DataTracingConnection, AsyncReadRows) {

MockFunction<void(Status)> on_finish;
EXPECT_CALL(on_finish, Call).WillOnce([](Status const& status) {
EXPECT_FALSE(ThereIsAnActiveSpan());
EXPECT_FALSE(OTelContextCaptured());
EXPECT_THAT(status, StatusIs(StatusCode::kAborted));
});

Expand All @@ -543,17 +563,17 @@ TEST(DataTracingConnection, AsyncReadRows) {

TEST(DataTracingConnection, AsyncReadRowFound) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<std::pair<bool, bigtable::Row>>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(
make_status_or<std::pair<bool, bigtable::Row>>({true, {"row", {}}}));
});
EXPECT_CALL(*mock, AsyncReadRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto row = under_test->AsyncReadRow(kTableName, "row",
bigtable::Filter::PassAllFilter());
auto row =
under_test
->AsyncReadRow(kTableName, "row", bigtable::Filter::PassAllFilter())
.then(expect_no_context);
p.set_value(std::pair<bool, bigtable::Row>({true, {"row", {}}}));
EXPECT_THAT(row.get(), IsOkAndHolds(Pair(true, _)));

EXPECT_THAT(span_catcher->GetSpans(),
Expand All @@ -568,17 +588,17 @@ TEST(DataTracingConnection, AsyncReadRowFound) {

TEST(DataTracingConnection, AsyncReadRowNotFound) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<std::pair<bool, bigtable::Row>>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future(
make_status_or<std::pair<bool, bigtable::Row>>({false, {"row", {}}}));
});
EXPECT_CALL(*mock, AsyncReadRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto row = under_test->AsyncReadRow(kTableName, "row",
bigtable::Filter::PassAllFilter());
auto row =
under_test
->AsyncReadRow(kTableName, "row", bigtable::Filter::PassAllFilter())
.then(expect_no_context);
p.set_value(std::pair<bool, bigtable::Row>({false, {"row", {}}}));
EXPECT_THAT(row.get(), IsOkAndHolds(Pair(false, _)));

EXPECT_THAT(span_catcher->GetSpans(),
Expand All @@ -593,17 +613,17 @@ TEST(DataTracingConnection, AsyncReadRowNotFound) {

TEST(DataTracingConnection, AsyncReadRowFailure) {
auto span_catcher = InstallSpanCatcher();
PromiseWithOTelContext<StatusOr<std::pair<bool, bigtable::Row>>> p;

auto mock = std::make_shared<MockDataConnection>();
EXPECT_CALL(*mock, AsyncReadRow).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future<StatusOr<std::pair<bool, bigtable::Row>>>(
internal::AbortedError("fail"));
});
EXPECT_CALL(*mock, AsyncReadRow).WillOnce(expect_context(p));

auto under_test = MakeDataTracingConnection(mock);
auto row = under_test->AsyncReadRow(kTableName, "row",
bigtable::Filter::PassAllFilter());
auto row =
under_test
->AsyncReadRow(kTableName, "row", bigtable::Filter::PassAllFilter())
.then(expect_no_context);
p.set_value(internal::AbortedError("fail"));
EXPECT_THAT(row.get(), StatusIs(StatusCode::kAborted));

EXPECT_THAT(
Expand Down