Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(otel): avoid crashes in tracing wrappers for streams #14477

Merged
merged 11 commits into from
Jul 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ TEST(GoldenKitchenSinkTracingStubTest, AsyncStreamingRead) {
"google.test.admin.database.v1.GoldenKitchenSink/StreamingRead"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail"),
SpanHasAttributes(
OTelAttribute<std::string>("grpc.peer", _),
OTelAttribute<std::string>("gl-cpp.status_code", kErrorCode)))));
}

Expand Down Expand Up @@ -370,7 +369,6 @@ TEST(GoldenKitchenSinkTracingStubTest, AsyncStreamingWrite) {
"google.test.admin.database.v1.GoldenKitchenSink/StreamingWrite"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail"),
SpanHasAttributes(
OTelAttribute<std::string>("grpc.peer", _),
OTelAttribute<std::string>("gl-cpp.status_code", kErrorCode)))));
}

Expand Down Expand Up @@ -408,7 +406,6 @@ TEST(GoldenKitchenSinkTracingStubTest, AsyncStreamingReadWrite) {
"StreamingReadWrite"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail"),
SpanHasAttributes(
OTelAttribute<std::string>("grpc.peer", _),
OTelAttribute<std::string>("gl-cpp.status_code", kErrorCode)))));
}

Expand Down
10 changes: 8 additions & 2 deletions google/cloud/internal/async_read_write_stream_tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class AsyncStreamingReadWriteRpcTracing
EndSpan(*ss);
auto started = f.get();
span_->SetAttribute("gl-cpp.stream_started", started);
started_ = started;
return started;
});
}
Expand Down Expand Up @@ -111,15 +112,20 @@ class AsyncStreamingReadWriteRpcTracing

private:
Status End(Status status) {
if (!context_) return status;
return EndSpan(*std::move(context_), *std::move(span_), std::move(status));
if (!span_) return status;
if (started_) {
return EndSpan(*std::move(context_), *std::move(span_),
std::move(status));
}
return EndSpan(*std::move(span_), std::move(status));
}

std::shared_ptr<grpc::ClientContext> context_;
std::unique_ptr<AsyncStreamingReadWriteRpc<Request, Response>> impl_;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
int read_count_ = 0;
int write_count_ = 0;
bool started_ = false;
};

} // namespace internal
Expand Down
68 changes: 66 additions & 2 deletions google/cloud/internal/async_read_write_stream_tracing_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ TEST(AsyncStreamingReadWriteRpcTracing, WritesDone) {
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadWriteRpcTracing, Finish) {
TEST(AsyncStreamingReadWriteRpcTracing, FinishWithoutStart) {
auto span_catcher = testing_util::InstallSpanCatcher();

auto span = MakeSpan("span");
Expand All @@ -272,6 +272,31 @@ TEST(AsyncStreamingReadWriteRpcTracing, Finish) {
TestedStream stream(context(), std::move(mock), span);
EXPECT_THAT(stream.Finish().get(), StatusIs(StatusCode::kAborted, "fail"));

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans,
UnorderedElementsAre(
AllOf(SpanNamed("span"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError,
"fail"),
Not(SpanHasAttributes(
OTelAttribute<std::string>("grpc.peer", _)))),
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadWriteRpcTracing, FinishWithStart) {
auto span_catcher = testing_util::InstallSpanCatcher();

auto span = MakeSpan("span");
auto mock = std::make_unique<MockStream>();

EXPECT_CALL(*mock, Start).WillOnce([] { return make_ready_future(true); });
EXPECT_CALL(*mock, Finish)
.WillOnce(Return(make_ready_future(internal::AbortedError("fail"))));

TestedStream stream(context(), std::move(mock), span);
EXPECT_TRUE(stream.Start().get());
EXPECT_THAT(stream.Finish().get(), StatusIs(StatusCode::kAborted, "fail"));

auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Expand All @@ -280,7 +305,8 @@ TEST(AsyncStreamingReadWriteRpcTracing, Finish) {
SpanNamed("span"),
SpanHasAttributes(OTelAttribute<std::string>("grpc.peer", _)),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail")),
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
AllOf(SpanNamed("Finish"), SpanWithParent(span)),
AllOf(SpanNamed("Start"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadWriteRpcTracing, GetRequestMetadata) {
Expand Down Expand Up @@ -321,6 +347,44 @@ TEST(AsyncStreamingReadWriteRpcTracing, SpanEndsOnDestruction) {
EXPECT_THAT(spans, ElementsAre(SpanNamed("span")));
}

TEST(AsyncStreamingReadWriteRpcTracing,
UnstartedStreamShouldNotExtractMetadata) {
auto span_catcher = testing_util::InstallSpanCatcher();

{
auto mock = std::make_unique<MockStream>();
auto span = MakeSpan("span");
auto context = std::make_shared<grpc::ClientContext>();
TestedStream stream(context, std::move(mock), span);
}

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans, ElementsAre(SpanNamed("span")));
}

TEST(AsyncStreamingReadWriteRpcTracing, StartedStreamShouldExtractMetadata) {
auto span_catcher = testing_util::InstallSpanCatcher();
{
auto span = MakeSpan("span");
auto mock = std::make_unique<MockStream>();
auto context = std::make_shared<grpc::ClientContext>();
EXPECT_CALL(*mock, Start).WillOnce([context] {
SetServerMetadata(*context, RpcMetadata{{{"hk", "hv"}}, {{"tk", "tv"}}});
return make_ready_future(true);
});

TestedStream stream(context, std::move(mock), span);
EXPECT_TRUE(stream.Start().get());
}

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans, UnorderedElementsAre(
SpanNamed("Start"),
AllOf(SpanNamed("span"),
SpanHasAttributes(OTelAttribute<std::string>(
"rpc.grpc.response.metadata.hk", "hv")))));
}

} // namespace
} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
10 changes: 8 additions & 2 deletions google/cloud/internal/async_streaming_read_rpc_tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class AsyncStreamingReadRpcTracing : public AsyncStreamingReadRpc<Response> {
EndSpan(*ss);
auto started = f.get();
span_->SetAttribute("gl-cpp.stream_started", started);
started_ = started;
return started;
});
}
Expand Down Expand Up @@ -88,14 +89,19 @@ class AsyncStreamingReadRpcTracing : public AsyncStreamingReadRpc<Response> {

private:
Status End(Status status) {
if (!context_) return status;
return EndSpan(*std::move(context_), *std::move(span_), std::move(status));
if (!span_) return status;
if (started_) {
return EndSpan(*std::move(context_), *std::move(span_),
std::move(status));
}
return EndSpan(*std::move(span_), std::move(status));
}

std::shared_ptr<grpc::ClientContext> context_;
std::unique_ptr<AsyncStreamingReadRpc<Response>> impl_;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
int read_count_ = 0;
bool started_ = false;
};

} // namespace internal
Expand Down
66 changes: 64 additions & 2 deletions google/cloud/internal/async_streaming_read_rpc_tracing_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ TEST(AsyncStreamingReadRpcTracing, Read) {
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadRpcTracing, Finish) {
TEST(AsyncStreamingReadRpcTracing, FinishWithoutStart) {
auto span_catcher = testing_util::InstallSpanCatcher();

auto span = MakeSpan("span");
Expand All @@ -161,6 +161,30 @@ TEST(AsyncStreamingReadRpcTracing, Finish) {
TestedStream stream(context(), std::move(mock), span);
EXPECT_THAT(stream.Finish().get(), StatusIs(StatusCode::kAborted, "fail"));

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans,
UnorderedElementsAre(
AllOf(SpanNamed("span"),
SpanWithStatus(opentelemetry::trace::StatusCode::kError,
"fail"),
Not(SpanHasAttributes(
OTelAttribute<std::string>("grpc.peer", _)))),
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadRpcTracing, FinishWithStart) {
auto span_catcher = testing_util::InstallSpanCatcher();

auto span = MakeSpan("span");
auto mock = std::make_unique<MockStream>();
EXPECT_CALL(*mock, Start).WillOnce([] { return make_ready_future(true); });
EXPECT_CALL(*mock, Finish)
.WillOnce(Return(make_ready_future(internal::AbortedError("fail"))));

TestedStream stream(context(), std::move(mock), span);
EXPECT_TRUE(stream.Start().get());
EXPECT_THAT(stream.Finish().get(), StatusIs(StatusCode::kAborted, "fail"));

auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Expand All @@ -169,7 +193,8 @@ TEST(AsyncStreamingReadRpcTracing, Finish) {
SpanNamed("span"),
SpanHasAttributes(OTelAttribute<std::string>("grpc.peer", _)),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, "fail")),
AllOf(SpanNamed("Finish"), SpanWithParent(span))));
AllOf(SpanNamed("Finish"), SpanWithParent(span)),
AllOf(SpanNamed("Start"), SpanWithParent(span))));
}

TEST(AsyncStreamingReadRpcTracing, GetRequestMetadata) {
Expand Down Expand Up @@ -199,6 +224,43 @@ TEST(AsyncStreamingReadRpcTracing, SpanEndsOnDestruction) {
EXPECT_THAT(spans, ElementsAre(SpanNamed("span")));
}

TEST(AsyncStreamingReadRpcTracing, UnstartedStreamShouldNotExtractMetadata) {
auto span_catcher = testing_util::InstallSpanCatcher();

{
auto mock = std::make_unique<MockStream>();
auto span = MakeSpan("span");
auto context = std::make_shared<grpc::ClientContext>();
TestedStream stream(context, std::move(mock), span);
}

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans, ElementsAre(SpanNamed("span")));
}

TEST(AsyncStreamingReadRpcTracing, StartedStreamShouldExtractMetadata) {
auto span_catcher = testing_util::InstallSpanCatcher();
{
auto span = MakeSpan("span");
auto mock = std::make_unique<MockStream>();
auto context = std::make_shared<grpc::ClientContext>();
EXPECT_CALL(*mock, Start).WillOnce([context] {
SetServerMetadata(*context, RpcMetadata{{{"hk", "hv"}}, {{"tk", "tv"}}});
return make_ready_future(true);
});

TestedStream stream(context, std::move(mock), span);
EXPECT_TRUE(stream.Start().get());
}

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans, UnorderedElementsAre(
SpanNamed("Start"),
AllOf(SpanNamed("span"),
SpanHasAttributes(OTelAttribute<std::string>(
"rpc.grpc.response.metadata.hk", "hv")))));
}

} // namespace
} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
20 changes: 13 additions & 7 deletions google/cloud/internal/async_streaming_write_rpc_tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ class AsyncStreamingWriteRpcTracing
impl_(std::move(impl)),
span_(std::move(span)) {}
~AsyncStreamingWriteRpcTracing() override {
if (context_) {
(void)EndSpan(*std::move(context_), *std::move(span_), Status());
}
(void)End(make_status_or<Response>({}));
}

void Cancel() override {
Expand All @@ -58,6 +56,7 @@ class AsyncStreamingWriteRpcTracing
EndSpan(*ss);
auto started = f.get();
span_->SetAttribute("gl-cpp.stream_started", started);
started_ = started;
return started;
});
}
Expand Down Expand Up @@ -94,10 +93,7 @@ class AsyncStreamingWriteRpcTracing
return impl_->Finish().then(
[this, fs = std::move(finish_span)](future<StatusOr<Response>> f) {
EndSpan(*fs);
auto response = f.get();
if (!context_) return response;
return EndSpan(*std::move(context_), *std::move(span_),
std::move(response));
return End(f.get());
});
}

Expand All @@ -106,10 +102,20 @@ class AsyncStreamingWriteRpcTracing
}

private:
StatusOr<Response> End(StatusOr<Response> status) {
if (!span_) return status;
if (started_) {
return EndSpan(*std::move(context_), *std::move(span_),
std::move(status));
}
return EndSpan(*std::move(span_), std::move(status));
}

std::shared_ptr<grpc::ClientContext> context_;
std::unique_ptr<AsyncStreamingWriteRpc<Request, Response>> impl_;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
int write_count_ = 0;
bool started_ = false;
};

} // namespace internal
Expand Down
Loading
Loading