diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 540ef110ea46a..d51c0b7e8a503 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -41,8 +41,9 @@ minor_behavior_changes: # *Changes that may cause incompatibilities for some users, but should not for most* - area: grpc change: | - Changes in ``AsyncStreamImpl`` now propagate tracing context headers in bidirectional streams when using - :ref:`Envoy gRPC client `. Previously, tracing context headers + Changes in ``AsyncStreamImpl`` and ``GoogleAsyncStreamImpl`` now propagate tracing context headers in bidirectional streams when using + :ref:`Envoy gRPC client ` or + :ref:`Google C++ gRPC client `. Previously, tracing context headers were not being set when calling external services such as ``ext_proc``. - area: http change: | diff --git a/source/common/grpc/google_async_client_impl.cc b/source/common/grpc/google_async_client_impl.cc index 1d78db8abc0f8..ef6fdb86c525a 100644 --- a/source/common/grpc/google_async_client_impl.cc +++ b/source/common/grpc/google_async_client_impl.cc @@ -169,7 +169,26 @@ GoogleAsyncStreamImpl::GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent, service_full_name_(service_full_name), method_name_(method_name), callbacks_(callbacks), options_(options), unused_stream_info_(Http::Protocol::Http2, dispatcher_.timeSource(), Network::ConnectionInfoProviderSharedPtr{}, - StreamInfo::FilterState::LifeSpan::FilterChain) {} + StreamInfo::FilterState::LifeSpan::FilterChain) { + // TODO(cainelli): add a common library for tracing tags between gRPC implementations. + if (options.parent_span_ != nullptr) { + const std::string child_span_name = + options.child_span_name_.empty() + ? absl::StrCat("async ", service_full_name, ".", method_name, " egress") + : options.child_span_name_; + current_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name, + parent.timeSource().systemTime()); + current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_); + current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_); + current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy); + } else { + current_span_ = std::make_unique(); + } + + if (options.sampled_.has_value()) { + current_span_->setSampled(options.sampled_.value()); + } +} GoogleAsyncStreamImpl::~GoogleAsyncStreamImpl() { ENVOY_LOG(debug, "GoogleAsyncStreamImpl destruct"); @@ -194,6 +213,13 @@ void GoogleAsyncStreamImpl::initialize(bool /*buffer_body_for_retry*/) { // request headers should not be stored in stream_info. // Maybe put it to parent_context? parent_.metadata_parser_->evaluateHeaders(*initial_metadata, options_.parent_context.stream_info); + Tracing::HttpTraceContext trace_context(*initial_metadata); + Tracing::UpstreamContext upstream_context(nullptr, // host_ + nullptr, // cluster_ + Tracing::ServiceType::GoogleGrpc, // service_type_ + true // async_client_span_ + ); + current_span_->injectContext(trace_context, upstream_context); callbacks_.onCreateInitialMetadata(*initial_metadata); initial_metadata->iterate([this](const Http::HeaderEntry& header) { ctxt_.AddMetadata(std::string(header.key().getStringView()), @@ -226,6 +252,11 @@ void GoogleAsyncStreamImpl::notifyRemoteClose(Status::GrpcStatus grpc_status, parent_.stats_.streams_closed_[grpc_status]->inc(); } ENVOY_LOG(debug, "notifyRemoteClose {} {}", grpc_status, message); + current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(grpc_status)); + if (grpc_status != Status::WellKnownGrpcStatus::Ok) { + current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True); + } + current_span_->finishSpan(); callbacks_.onReceiveTrailingMetadata(trailing_metadata ? std::move(trailing_metadata) : Http::ResponseTrailerMapImpl::create()); callbacks_.onRemoteClose(grpc_status, message); @@ -241,6 +272,9 @@ void GoogleAsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& request, bool e void GoogleAsyncStreamImpl::closeStream() { // Empty EOS write queued. + current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled); + current_span_->finishSpan(); + write_pending_queue_.emplace(); writeQueued(); } diff --git a/source/common/grpc/google_async_client_impl.h b/source/common/grpc/google_async_client_impl.h index 45eceae184349..67326b01b0a02 100644 --- a/source/common/grpc/google_async_client_impl.h +++ b/source/common/grpc/google_async_client_impl.h @@ -314,6 +314,7 @@ class GoogleAsyncStreamImpl : public RawAsyncStream, // freed. uint32_t inflight_tags_{}; + Tracing::SpanPtr current_span_; // This is unused. StreamInfo::StreamInfoImpl unused_stream_info_; diff --git a/test/common/grpc/google_async_client_impl_test.cc b/test/common/grpc/google_async_client_impl_test.cc index cbc2b0b90aa2b..f75d30528f555 100644 --- a/test/common/grpc/google_async_client_impl_test.cc +++ b/test/common/grpc/google_async_client_impl_test.cc @@ -108,13 +108,30 @@ TEST_F(EnvoyGoogleAsyncClientImplTest, ThreadSafe) { TEST_F(EnvoyGoogleAsyncClientImplTest, StreamHttpStartFail) { initialize(); + Tracing::MockSpan parent_span; + Tracing::MockSpan* child_span{new Tracing::MockSpan()}; + + EXPECT_CALL(parent_span, spawnChild_(_, "async helloworld.Greeter.SayHello egress", _)) + .WillOnce(Return(child_span)); + EXPECT_CALL(*child_span, + setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy))); + EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamCluster), Eq("test_cluster"))); + EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamAddress), Eq("fake_address"))); + EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().GrpcStatusCode), Eq("14"))); + EXPECT_CALL(*child_span, injectContext(_, _)); + EXPECT_CALL(*child_span, finishSpan()); + EXPECT_CALL(*child_span, setSampled(true)); + EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().Error), Eq(Tracing::Tags::get().True))); + EXPECT_CALL(*stub_factory_.stub_, PrepareCall_(_, _, _)).WillOnce(Return(nullptr)); MockAsyncStreamCallbacks grpc_callbacks; EXPECT_CALL(grpc_callbacks, onCreateInitialMetadata(_)); EXPECT_CALL(grpc_callbacks, onReceiveTrailingMetadata_(_)); EXPECT_CALL(grpc_callbacks, onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "")); - auto grpc_stream = - grpc_client_->start(*method_descriptor_, grpc_callbacks, Http::AsyncClient::StreamOptions()); + Http::AsyncClient::StreamOptions stream_options; + stream_options.setParentSpan(parent_span).setSampled(true); + + auto grpc_stream = grpc_client_->start(*method_descriptor_, grpc_callbacks, stream_options); EXPECT_TRUE(grpc_stream == nullptr); } diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index f5ab80b2b7070..9d8dfdcc0521a 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -630,9 +630,6 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStream) { } TEST_P(ExtProcIntegrationTest, GetAndCloseStreamWithTracing) { - if (!IsEnvoyGrpc()) { - GTEST_SKIP() << "Tracing is currently only supported for Envoy gRPC"; - } initializeConfig(); config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { test::integration::filters::ExpectSpan ext_proc_span; @@ -641,9 +638,13 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStreamWithTracing) { ext_proc_span.set_context_injected(true); ext_proc_span.set_sampled(true); ext_proc_span.mutable_tags()->insert({"grpc.status_code", "0"}); - ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"}); ext_proc_span.mutable_tags()->insert({"upstream_cluster", "ext_proc_server_0"}); - + if (IsEnvoyGrpc()) { + ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"}); + } else { + ext_proc_span.mutable_tags()->insert( + {"upstream_address", grpc_upstreams_[0]->localAddress()->asString()}); + } test::integration::filters::TracerTestConfig test_config; test_config.mutable_expect_spans()->Add()->CopyFrom(ext_proc_span); @@ -659,6 +660,9 @@ TEST_P(ExtProcIntegrationTest, GetAndCloseStreamWithTracing) { waitForFirstMessage(*grpc_upstreams_[0], request_headers_msg); processor_stream_->startGrpcStream(); + EXPECT_FALSE(processor_stream_->headers().get(LowerCaseString("traceparent")).empty()) + << "expected traceparent header"; + processor_stream_->finishGrpcStream(Grpc::Status::Ok); handleUpstreamRequest(); verifyDownstreamResponse(*response, 200); @@ -697,9 +701,6 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStream) { } TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithTracing) { - if (!IsEnvoyGrpc()) { - GTEST_SKIP() << "Tracing is currently only supported for Envoy gRPC"; - } initializeConfig(); config_helper_.addConfigModifier([&](HttpConnectionManager& cm) { test::integration::filters::ExpectSpan ext_proc_span; @@ -709,8 +710,13 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithTracing) { ext_proc_span.set_sampled(true); ext_proc_span.mutable_tags()->insert({"grpc.status_code", "2"}); ext_proc_span.mutable_tags()->insert({"error", "true"}); - ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"}); ext_proc_span.mutable_tags()->insert({"upstream_cluster", "ext_proc_server_0"}); + if (IsEnvoyGrpc()) { + ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"}); + } else { + ext_proc_span.mutable_tags()->insert( + {"upstream_address", grpc_upstreams_[0]->localAddress()->asString()}); + } test::integration::filters::TracerTestConfig test_config; test_config.mutable_expect_spans()->Add()->CopyFrom(ext_proc_span); @@ -725,6 +731,9 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStreamWithTracing) { ProcessingRequest request_headers_msg; waitForFirstMessage(*grpc_upstreams_[0], request_headers_msg); + EXPECT_FALSE(processor_stream_->headers().get(LowerCaseString("traceparent")).empty()) + << "expected traceparent header"; + // Fail the stream immediately processor_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "500"}}, true); verifyDownstreamResponse(*response, 500); @@ -2355,10 +2364,6 @@ TEST_P(ExtProcIntegrationTest, RequestMessageTimeout) { } TEST_P(ExtProcIntegrationTest, RequestMessageTimeoutWithTracing) { - if (!IsEnvoyGrpc()) { - GTEST_SKIP() << "Tracing is currently only supported for Envoy gRPC"; - } - // ensure 200 ms timeout proto_config_.mutable_message_timeout()->set_nanos(200000000); initializeConfig(); @@ -2371,9 +2376,13 @@ TEST_P(ExtProcIntegrationTest, RequestMessageTimeoutWithTracing) { ext_proc_span.set_sampled(true); ext_proc_span.mutable_tags()->insert({"status", "canceled"}); ext_proc_span.mutable_tags()->insert({"error", ""}); // not an error - ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"}); ext_proc_span.mutable_tags()->insert({"upstream_cluster", "ext_proc_server_0"}); - + if (IsEnvoyGrpc()) { + ext_proc_span.mutable_tags()->insert({"upstream_address", "ext_proc_server_0"}); + } else { + ext_proc_span.mutable_tags()->insert( + {"upstream_address", grpc_upstreams_[0]->localAddress()->asString()}); + } test::integration::filters::TracerTestConfig test_config; test_config.mutable_expect_spans()->Add()->CopyFrom(ext_proc_span); @@ -2391,6 +2400,9 @@ TEST_P(ExtProcIntegrationTest, RequestMessageTimeoutWithTracing) { return false; }); + EXPECT_FALSE(processor_stream_->headers().get(LowerCaseString("traceparent")).empty()) + << "expected traceparent header"; + // We should immediately have an error response now verifyDownstreamResponse(*response, 500); } diff --git a/test/extensions/filters/http/ext_proc/tracer_test_filter.cc b/test/extensions/filters/http/ext_proc/tracer_test_filter.cc index 15c7b1b348ea9..a76b91e911f89 100644 --- a/test/extensions/filters/http/ext_proc/tracer_test_filter.cc +++ b/test/extensions/filters/http/ext_proc/tracer_test_filter.cc @@ -14,6 +14,10 @@ namespace Extensions { namespace HttpFilters { namespace ExternalProcessing { +const Tracing::TraceContextHandler& traceParentHeader() { + CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "traceparent"); +} + struct ExpectedSpan { std::string operation_name; bool sampled; @@ -58,7 +62,9 @@ class Span : public Tracing::Span { void setOperation(absl::string_view operation_name) { operation_name_ = operation_name; } void setSampled(bool do_sample) { sampled_ = do_sample; } - void injectContext(Tracing::TraceContext&, const Tracing::UpstreamContext&) { + void injectContext(Tracing::TraceContext& trace_context, const Tracing::UpstreamContext&) { + std::string traceparent_header_value = "1"; + traceParentHeader().setRefKey(trace_context, traceparent_header_value); context_injected_ = true; } void setBaggage(absl::string_view, absl::string_view) { /* not implemented */