Skip to content

Commit

Permalink
grpc: propagate tracing headers when using google grpc streams (#34395)
Browse files Browse the repository at this point in the history
Following up on #33665, which adds tracing to Envoy gRPC streams, this PR introduces the same support using Google gRPC for completeness to #21119.

Risk Level: Low
Testing: The same manual tests described in #33665, besides the integration tests of extproc are also testing google grpc now.

Signed-off-by: Fernando Cainelli <fernando.cainelli-external@getyourguide.com>
  • Loading branch information
cainelli authored Jun 14, 2024
1 parent 3a0c2a4 commit 2b422e9
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 21 deletions.
5 changes: 3 additions & 2 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ minor_behavior_changes:
# *Changes that may cause incompatibilities for some users, but should not for most*
- area: grpc
change: |
Changes in ``AsyncStreamImpl`` now propagate tracing context headers in bidirectional streams when using
:ref:`Envoy gRPC client <envoy_v3_api_field_config.core.v3.GrpcService.envoy_grpc>`. Previously, tracing context headers
Changes in ``AsyncStreamImpl`` and ``GoogleAsyncStreamImpl`` now propagate tracing context headers in bidirectional streams when using
:ref:`Envoy gRPC client <envoy_v3_api_field_config.core.v3.GrpcService.envoy_grpc>` or
:ref:`Google C++ gRPC client <envoy_v3_api_field_config.core.v3.GrpcService.google_grpc>`. Previously, tracing context headers
were not being set when calling external services such as ``ext_proc``.
- area: http
change: |
Expand Down
36 changes: 35 additions & 1 deletion source/common/grpc/google_async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,26 @@ GoogleAsyncStreamImpl::GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent,
service_full_name_(service_full_name), method_name_(method_name), callbacks_(callbacks),
options_(options), unused_stream_info_(Http::Protocol::Http2, dispatcher_.timeSource(),
Network::ConnectionInfoProviderSharedPtr{},
StreamInfo::FilterState::LifeSpan::FilterChain) {}
StreamInfo::FilterState::LifeSpan::FilterChain) {
// TODO(cainelli): add a common library for tracing tags between gRPC implementations.
if (options.parent_span_ != nullptr) {
const std::string child_span_name =
options.child_span_name_.empty()
? absl::StrCat("async ", service_full_name, ".", method_name, " egress")
: options.child_span_name_;
current_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name,
parent.timeSource().systemTime());
current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_);
current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_);
current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
} else {
current_span_ = std::make_unique<Tracing::NullSpan>();
}

if (options.sampled_.has_value()) {
current_span_->setSampled(options.sampled_.value());
}
}

GoogleAsyncStreamImpl::~GoogleAsyncStreamImpl() {
ENVOY_LOG(debug, "GoogleAsyncStreamImpl destruct");
Expand All @@ -194,6 +213,13 @@ void GoogleAsyncStreamImpl::initialize(bool /*buffer_body_for_retry*/) {
// request headers should not be stored in stream_info.
// Maybe put it to parent_context?
parent_.metadata_parser_->evaluateHeaders(*initial_metadata, options_.parent_context.stream_info);
Tracing::HttpTraceContext trace_context(*initial_metadata);
Tracing::UpstreamContext upstream_context(nullptr, // host_
nullptr, // cluster_
Tracing::ServiceType::GoogleGrpc, // service_type_
true // async_client_span_
);
current_span_->injectContext(trace_context, upstream_context);
callbacks_.onCreateInitialMetadata(*initial_metadata);
initial_metadata->iterate([this](const Http::HeaderEntry& header) {
ctxt_.AddMetadata(std::string(header.key().getStringView()),
Expand Down Expand Up @@ -226,6 +252,11 @@ void GoogleAsyncStreamImpl::notifyRemoteClose(Status::GrpcStatus grpc_status,
parent_.stats_.streams_closed_[grpc_status]->inc();
}
ENVOY_LOG(debug, "notifyRemoteClose {} {}", grpc_status, message);
current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(grpc_status));
if (grpc_status != Status::WellKnownGrpcStatus::Ok) {
current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
}
current_span_->finishSpan();
callbacks_.onReceiveTrailingMetadata(trailing_metadata ? std::move(trailing_metadata)
: Http::ResponseTrailerMapImpl::create());
callbacks_.onRemoteClose(grpc_status, message);
Expand All @@ -241,6 +272,9 @@ void GoogleAsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& request, bool e

void GoogleAsyncStreamImpl::closeStream() {
// Empty EOS write queued.
current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
current_span_->finishSpan();

write_pending_queue_.emplace();
writeQueued();
}
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/google_async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ class GoogleAsyncStreamImpl : public RawAsyncStream,
// freed.
uint32_t inflight_tags_{};

Tracing::SpanPtr current_span_;
// This is unused.
StreamInfo::StreamInfoImpl unused_stream_info_;

Expand Down
21 changes: 19 additions & 2 deletions test/common/grpc/google_async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,30 @@ TEST_F(EnvoyGoogleAsyncClientImplTest, ThreadSafe) {
TEST_F(EnvoyGoogleAsyncClientImplTest, StreamHttpStartFail) {
initialize();

Tracing::MockSpan parent_span;
Tracing::MockSpan* child_span{new Tracing::MockSpan()};

EXPECT_CALL(parent_span, spawnChild_(_, "async helloworld.Greeter.SayHello egress", _))
.WillOnce(Return(child_span));
EXPECT_CALL(*child_span,
setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy)));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamCluster), Eq("test_cluster")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamAddress), Eq("fake_address")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().GrpcStatusCode), Eq("14")));
EXPECT_CALL(*child_span, injectContext(_, _));
EXPECT_CALL(*child_span, finishSpan());
EXPECT_CALL(*child_span, setSampled(true));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().Error), Eq(Tracing::Tags::get().True)));

EXPECT_CALL(*stub_factory_.stub_, PrepareCall_(_, _, _)).WillOnce(Return(nullptr));
MockAsyncStreamCallbacks<helloworld::HelloReply> grpc_callbacks;
EXPECT_CALL(grpc_callbacks, onCreateInitialMetadata(_));
EXPECT_CALL(grpc_callbacks, onReceiveTrailingMetadata_(_));
EXPECT_CALL(grpc_callbacks, onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, ""));
auto grpc_stream =
grpc_client_->start(*method_descriptor_, grpc_callbacks, Http::AsyncClient::StreamOptions());
Http::AsyncClient::StreamOptions stream_options;
stream_options.setParentSpan(parent_span).setSampled(true);

auto grpc_stream = grpc_client_->start(*method_descriptor_, grpc_callbacks, stream_options);
EXPECT_TRUE(grpc_stream == nullptr);
}

Expand Down
42 changes: 27 additions & 15 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -630,9 +630,6 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStream) {
}

TEST_P(ExtProcIntegrationTest, GetAndCloseStreamWithTracing) {
if (!IsEnvoyGrpc()) {
GTEST_SKIP() << "Tracing is currently only supported for Envoy gRPC";
}
initializeConfig();
config_helper_.addConfigModifier([&](HttpConnectionManager& cm) {
test::integration::filters::ExpectSpan ext_proc_span;
Expand All @@ -641,9 +638,13 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStreamWithTracing) {
ext_proc_span.set_context_injected(true);
ext_proc_span.set_sampled(true);
ext_proc_span.mutable_tags()->insert({"grpc.status_code", "0"});
ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"});
ext_proc_span.mutable_tags()->insert({"upstream_cluster", "ext_proc_server_0"});

if (IsEnvoyGrpc()) {
ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"});
} else {
ext_proc_span.mutable_tags()->insert(
{"upstream_address", grpc_upstreams_[0]->localAddress()->asString()});
}
test::integration::filters::TracerTestConfig test_config;
test_config.mutable_expect_spans()->Add()->CopyFrom(ext_proc_span);

Expand All @@ -659,6 +660,9 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStreamWithTracing) {
waitForFirstMessage(*grpc_upstreams_[0], request_headers_msg);

processor_stream_->startGrpcStream();
EXPECT_FALSE(processor_stream_->headers().get(LowerCaseString("traceparent")).empty())
<< "expected traceparent header";

processor_stream_->finishGrpcStream(Grpc::Status::Ok);
handleUpstreamRequest();
verifyDownstreamResponse(*response, 200);
Expand Down Expand Up @@ -697,9 +701,6 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStream) {
}

TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithTracing) {
if (!IsEnvoyGrpc()) {
GTEST_SKIP() << "Tracing is currently only supported for Envoy gRPC";
}
initializeConfig();
config_helper_.addConfigModifier([&](HttpConnectionManager& cm) {
test::integration::filters::ExpectSpan ext_proc_span;
Expand All @@ -709,8 +710,13 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithTracing) {
ext_proc_span.set_sampled(true);
ext_proc_span.mutable_tags()->insert({"grpc.status_code", "2"});
ext_proc_span.mutable_tags()->insert({"error", "true"});
ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"});
ext_proc_span.mutable_tags()->insert({"upstream_cluster", "ext_proc_server_0"});
if (IsEnvoyGrpc()) {
ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"});
} else {
ext_proc_span.mutable_tags()->insert(
{"upstream_address", grpc_upstreams_[0]->localAddress()->asString()});
}

test::integration::filters::TracerTestConfig test_config;
test_config.mutable_expect_spans()->Add()->CopyFrom(ext_proc_span);
Expand All @@ -725,6 +731,9 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithTracing) {

ProcessingRequest request_headers_msg;
waitForFirstMessage(*grpc_upstreams_[0], request_headers_msg);
EXPECT_FALSE(processor_stream_->headers().get(LowerCaseString("traceparent")).empty())
<< "expected traceparent header";

// Fail the stream immediately
processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "500"}}, true);
verifyDownstreamResponse(*response, 500);
Expand Down Expand Up @@ -2355,10 +2364,6 @@ TEST_P(ExtProcIntegrationTest, RequestMessageTimeout) {
}

TEST_P(ExtProcIntegrationTest, RequestMessageTimeoutWithTracing) {
if (!IsEnvoyGrpc()) {
GTEST_SKIP() << "Tracing is currently only supported for Envoy gRPC";
}

// ensure 200 ms timeout
proto_config_.mutable_message_timeout()->set_nanos(200000000);
initializeConfig();
Expand All @@ -2371,9 +2376,13 @@ TEST_P(ExtProcIntegrationTest, RequestMessageTimeoutWithTracing) {
ext_proc_span.set_sampled(true);
ext_proc_span.mutable_tags()->insert({"status", "canceled"});
ext_proc_span.mutable_tags()->insert({"error", ""}); // not an error
ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"});
ext_proc_span.mutable_tags()->insert({"upstream_cluster", "ext_proc_server_0"});

if (IsEnvoyGrpc()) {
ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"});
} else {
ext_proc_span.mutable_tags()->insert(
{"upstream_address", grpc_upstreams_[0]->localAddress()->asString()});
}
test::integration::filters::TracerTestConfig test_config;
test_config.mutable_expect_spans()->Add()->CopyFrom(ext_proc_span);

Expand All @@ -2391,6 +2400,9 @@ TEST_P(ExtProcIntegrationTest, RequestMessageTimeoutWithTracing) {
return false;
});

EXPECT_FALSE(processor_stream_->headers().get(LowerCaseString("traceparent")).empty())
<< "expected traceparent header";

// We should immediately have an error response now
verifyDownstreamResponse(*response, 500);
}
Expand Down
8 changes: 7 additions & 1 deletion test/extensions/filters/http/ext_proc/tracer_test_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

const Tracing::TraceContextHandler& traceParentHeader() {
CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "traceparent");
}

struct ExpectedSpan {
std::string operation_name;
bool sampled;
Expand Down Expand Up @@ -58,7 +62,9 @@ class Span : public Tracing::Span {
void setOperation(absl::string_view operation_name) { operation_name_ = operation_name; }
void setSampled(bool do_sample) { sampled_ = do_sample; }

void injectContext(Tracing::TraceContext&, const Tracing::UpstreamContext&) {
void injectContext(Tracing::TraceContext& trace_context, const Tracing::UpstreamContext&) {
std::string traceparent_header_value = "1";
traceParentHeader().setRefKey(trace_context, traceparent_header_value);
context_injected_ = true;
}
void setBaggage(absl::string_view, absl::string_view) { /* not implemented */
Expand Down

0 comments on commit 2b422e9

Please sign in to comment.