Skip to content

Commit

Permalink
xds: apply node identifier optimization (#7876)
Browse files Browse the repository at this point in the history
Omit the node identifier from subsequent discovery requests on the same stream.
Restricted to non-incremental xDS for tractability.

Risk Level: low, affects xDS protocol but guarded by an option
Testing: Unit/integration tests are updated
Docs Changes: xDS spec clarification
Release Notes: omit the node identifier from subsequent discovery requests

Fixes: #7860

Signed-off-by: Kuat Yessenov <kuat@google.com>
  • Loading branch information
kyessenov authored and htuch committed Aug 13, 2019
1 parent 1dc418f commit 8e07e82
Show file tree
Hide file tree
Showing 26 changed files with 119 additions and 79 deletions.
3 changes: 3 additions & 0 deletions api/envoy/api/v2/core/config_source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ message ApiConfigSource {
// For GRPC APIs, the rate limit settings. If present, discovery requests made by Envoy will be
// rate limited.
RateLimitSettings rate_limit_settings = 6;

// Skip the node identifier in subsequent discovery requests for streaming gRPC config types.
bool set_node_on_first_message_only = 7;
}

// Aggregated Discovery Service (ADS) options. This is currently empty, but when
Expand Down
7 changes: 7 additions & 0 deletions api/xds_protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ versioning across resource types. When ADS is not used, even each
resource of a given resource type may have a distinct version, since the
Envoy API allows distinct EDS/RDS resources to point at different :ref:`ConfigSources <envoy_api_msg_core.ConfigSource>`.

Only the first request on a stream is guaranteed to carry the node identifier.
The subsequent discovery requests on the same stream may carry an empty node
identifier. This holds true regardless of the acceptance of the discovery
responses on the same stream. The node identifier should always be identical if
present more than once on the stream. It is sufficient to only check the first
message for the node identifier as a result.

.. _xds_protocol_resource_update:

Resource Update
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Version history
* access log: added :ref:`buffering <envoy_api_field_config.accesslog.v2.CommonGrpcAccessLogConfig.buffer_size_bytes>` and :ref:`periodical flushing <envoy_api_field_config.accesslog.v2.CommonGrpcAccessLogConfig.buffer_flush_interval>` support to gRPC access logger. Defaults to 16KB buffer and flushing every 1 second.
* admin: added ability to configure listener :ref:`socket options <envoy_api_field_config.bootstrap.v2.Admin.socket_options>`.
* admin: added config dump support for Secret Discovery Service :ref:`SecretConfigDump <envoy_api_msg_admin.v2alpha.SecretsConfigDump>`.
* api: added ::ref:`set_node_on_first_message_only <envoy_api_field_core.ApiConfigSource.set_node_on_first_message_only>` option to omit the node identifier from the subsequent discovery requests on the same stream.
* config: enforcing that terminal filters (e.g. HttpConnectionManager for L4, router for L7) be the last in their respective filter chains.
* buffer filter: the buffer filter populates content-length header if not present, behavior can be disabled using the runtime feature `envoy.reloadable_features.buffer_filter_populate_content_length`.
* config: added access log :ref:`extension filter<envoy_api_field_config.filter.accesslog.v2.AccessLogFilter.extension_filter>`.
Expand Down
1 change: 1 addition & 0 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace Config {
* Manages the logic of a (non-aggregated) delta xDS subscription.
* TODO(fredlas) add aggregation support. The plan is for that to happen in XdsGrpcContext,
* which this class will then "have a" rather than "be a".
* TODO(kyessenov) implement skip_subsequent_node for delta xDS subscription.
*/
class DeltaSubscriptionImpl : public Subscription,
public GrpcStreamCallbacks<envoy::api::v2::DeltaDiscoveryResponse>,
Expand Down
11 changes: 8 additions & 3 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info,
Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings)
const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node)
: grpc_stream_(this, std::move(async_client), service_method, random, dispatcher, scope,
rate_limit_settings),

local_info_(local_info) {
local_info_(local_info), skip_subsequent_node_(skip_subsequent_node),
first_stream_request_(true) {
Config::Utility::checkLocalInfo("ads", local_info);
}

Expand Down Expand Up @@ -57,8 +57,12 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
}
}

if (skip_subsequent_node_ && !first_stream_request_) {
request.clear_node();
}
ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url, request.DebugString());
grpc_stream_.sendMessage(request);
first_stream_request_ = false;

// clear error_detail after the request is sent if it exists.
if (api_state_[type_url].request_.has_error_detail()) {
Expand Down Expand Up @@ -206,6 +210,7 @@ void GrpcMuxImpl::onDiscoveryResponse(
void GrpcMuxImpl::onWriteable() { drainRequests(); }

void GrpcMuxImpl::onStreamEstablished() {
first_stream_request_ = true;
for (const auto& type_url : subscriptions_) {
queueDiscoveryRequest(type_url);
}
Expand Down
4 changes: 3 additions & 1 deletion source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class GrpcMuxImpl : public GrpcMux,
GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, Grpc::RawAsyncClientPtr async_client,
Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings);
const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node);
~GrpcMuxImpl() override;

void start() override;
Expand Down Expand Up @@ -104,6 +104,8 @@ class GrpcMuxImpl : public GrpcMux,

GrpcStream<envoy::api::v2::DiscoveryRequest, envoy::api::v2::DiscoveryResponse> grpc_stream_;
const LocalInfo::LocalInfo& local_info_;
const bool skip_subsequent_node_;
bool first_stream_request_;
std::unordered_map<std::string, ApiState> api_state_;
// Envoy's dependency ordering.
std::list<std::string> subscriptions_;
Expand Down
7 changes: 4 additions & 3 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ class GrpcSubscriptionImpl : public Config::Subscription {
const Protobuf::MethodDescriptor& service_method, absl::string_view type_url,
SubscriptionCallbacks& callbacks, SubscriptionStats stats,
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings,
std::chrono::milliseconds init_fetch_timeout)
: callbacks_(callbacks), grpc_mux_(local_info, std::move(async_client), dispatcher,
service_method, random, scope, rate_limit_settings),
std::chrono::milliseconds init_fetch_timeout, bool skip_subsequent_node)
: callbacks_(callbacks),
grpc_mux_(local_info, std::move(async_client), dispatcher, service_method, random, scope,
rate_limit_settings, skip_subsequent_node),
grpc_mux_subscription_(grpc_mux_, callbacks_, stats, type_url, dispatcher,
init_fetch_timeout) {}

Expand Down
3 changes: 2 additions & 1 deletion source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
->create(),
dispatcher_, random_, sotwGrpcMethod(type_url), type_url, callbacks, stats, scope,
Utility::parseRateLimitSettings(api_config_source),
Utility::configSourceInitialFetchTimeout(config));
Utility::configSourceInitialFetchTimeout(config),
api_config_source.set_node_on_first_message_only());
break;
case envoy::api::v2::core::ApiConfigSource::DELTA_GRPC: {
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.clusters(), api_config_source);
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ ClusterManagerImpl::ClusterManagerImpl(
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_,
Envoy::Config::Utility::parseRateLimitSettings(bootstrap.dynamic_resources().ads_config()));
Envoy::Config::Utility::parseRateLimitSettings(bootstrap.dynamic_resources().ads_config()),
bootstrap.dynamic_resources().ads_config().set_node_on_first_message_only());
} else {
ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>();
}
Expand Down
5 changes: 3 additions & 2 deletions test/common/config/delta_subscription_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness {
subscription_->start(cluster_names);
}

void expectSendMessage(const std::set<std::string>& cluster_names,
const std::string& version) override {
void expectSendMessage(const std::set<std::string>& cluster_names, const std::string& version,
bool expect_node = false) override {
UNREFERENCED_PARAMETER(version);
UNREFERENCED_PARAMETER(expect_node);
expectSendMessage(cluster_names, {}, Grpc::Status::GrpcStatus::Ok, "", {});
}

Expand Down
5 changes: 3 additions & 2 deletions test/common/config/filesystem_subscription_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ class FilesystemSubscriptionTestHarness : public SubscriptionTestHarness {
}
}

void expectSendMessage(const std::set<std::string>& cluster_names,
const std::string& version) override {
void expectSendMessage(const std::set<std::string>& cluster_names, const std::string& version,
bool expect_node) override {
UNREFERENCED_PARAMETER(cluster_names);
UNREFERENCED_PARAMETER(version);
UNREFERENCED_PARAMETER(expect_node);
}

void deliverConfigUpdate(const std::vector<std::string>& cluster_names,
Expand Down
51 changes: 26 additions & 25 deletions test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,26 @@ class GrpcMuxImplTestBase : public testing::Test {
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_, rate_limit_settings_);
random_, stats_, rate_limit_settings_, true);
}

void setup(const RateLimitSettings& custom_rate_limit_settings) {
grpc_mux_ = std::make_unique<GrpcMuxImpl>(
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_, custom_rate_limit_settings);
random_, stats_, custom_rate_limit_settings, true);
}

void expectSendMessage(const std::string& type_url,
const std::vector<std::string>& resource_names, const std::string& version,
const std::string& nonce = "",
bool first = false, const std::string& nonce = "",
const Protobuf::int32 error_code = Grpc::Status::GrpcStatus::Ok,
const std::string& error_message = "") {
envoy::api::v2::DiscoveryRequest expected_request;
expected_request.mutable_node()->CopyFrom(local_info_.node());
if (first) {
expected_request.mutable_node()->CopyFrom(local_info_.node());
}
for (const auto& resource : resource_names) {
expected_request.add_resource_names(resource);
}
Expand Down Expand Up @@ -111,7 +113,7 @@ TEST_F(GrpcMuxImplTest, MultipleTypeUrlStreams) {
auto foo_sub = grpc_mux_->subscribe("foo", {"x", "y"}, callbacks_);
auto bar_sub = grpc_mux_->subscribe("bar", {}, callbacks_);
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
expectSendMessage("bar", {}, "");
grpc_mux_->start();
EXPECT_EQ(1, control_plane_connected_state_.value());
Expand Down Expand Up @@ -142,7 +144,7 @@ TEST_F(GrpcMuxImplTest, ResetStream) {
auto bar_sub = grpc_mux_->subscribe("bar", {}, callbacks_);
auto baz_sub = grpc_mux_->subscribe("baz", {"z"}, callbacks_);
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
expectSendMessage("bar", {}, "");
expectSendMessage("baz", {"z"}, "");
grpc_mux_->start();
Expand All @@ -156,7 +158,7 @@ TEST_F(GrpcMuxImplTest, ResetStream) {
grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::GrpcStatus::Canceled, "");
EXPECT_EQ(0, control_plane_connected_state_.value());
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
expectSendMessage("bar", {}, "");
expectSendMessage("baz", {"z"}, "");
timer_cb();
Expand All @@ -173,7 +175,7 @@ TEST_F(GrpcMuxImplTest, PauseResume) {
grpc_mux_->pause("foo");
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
grpc_mux_->start();
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
grpc_mux_->resume("foo");
grpc_mux_->pause("bar");
expectSendMessage("foo", {"z", "x", "y"}, "");
Expand All @@ -196,7 +198,7 @@ TEST_F(GrpcMuxImplTest, TypeUrlMismatch) {
auto foo_sub = grpc_mux_->subscribe("foo", {"x", "y"}, callbacks_);

EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage("foo", {"x", "y"}, "");
expectSendMessage("foo", {"x", "y"}, "", true);
grpc_mux_->start();

{
Expand All @@ -215,7 +217,7 @@ TEST_F(GrpcMuxImplTest, TypeUrlMismatch) {
e->what()));
}));

expectSendMessage("foo", {"x", "y"}, "", "", Grpc::Status::GrpcStatus::Internal,
expectSendMessage("foo", {"x", "y"}, "", false, "", Grpc::Status::GrpcStatus::Internal,
fmt::format("bar does not match foo type URL in DiscoveryResponse {}",
invalid_response->DebugString()));
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(invalid_response));
Expand All @@ -231,7 +233,7 @@ TEST_F(GrpcMuxImplTest, WildcardWatch) {
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
auto foo_sub = grpc_mux_->subscribe(type_url, {}, callbacks_);
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {}, "");
expectSendMessage(type_url, {}, "", true);
grpc_mux_->start();

{
Expand Down Expand Up @@ -267,7 +269,7 @@ TEST_F(GrpcMuxImplTest, WatchDemux) {
auto bar_sub = grpc_mux_->subscribe(type_url, {"y", "z"}, bar_callbacks);
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
// Should dedupe the "x" resource.
expectSendMessage(type_url, {"y", "z", "x"}, "");
expectSendMessage(type_url, {"y", "z", "x"}, "", true);
grpc_mux_->start();

{
Expand Down Expand Up @@ -345,7 +347,7 @@ TEST_F(GrpcMuxImplTest, MultipleWatcherWithEmptyUpdates) {
auto foo_sub = grpc_mux_->subscribe(type_url, {"x", "y"}, foo_callbacks);

EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {"x", "y"}, "");
expectSendMessage(type_url, {"x", "y"}, "", true);
grpc_mux_->start();

std::unique_ptr<envoy::api::v2::DiscoveryResponse> response(
Expand All @@ -368,7 +370,7 @@ TEST_F(GrpcMuxImplTest, SingleWatcherWithEmptyUpdates) {
auto foo_sub = grpc_mux_->subscribe(type_url, {}, foo_callbacks);

EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {}, "");
expectSendMessage(type_url, {}, "", true);
grpc_mux_->start();

std::unique_ptr<envoy::api::v2::DiscoveryResponse> response(
Expand Down Expand Up @@ -422,7 +424,7 @@ TEST_F(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithDefaultSettings) {
};

auto foo_sub = grpc_mux_->subscribe("foo", {"x"}, callbacks_);
expectSendMessage("foo", {"x"}, "");
expectSendMessage("foo", {"x"}, "", true);
grpc_mux_->start();

// Exhausts the limit.
Expand Down Expand Up @@ -475,7 +477,7 @@ TEST_F(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithEmptyRateLimitSetti
};

auto foo_sub = grpc_mux_->subscribe("foo", {"x"}, callbacks_);
expectSendMessage("foo", {"x"}, "");
expectSendMessage("foo", {"x"}, "", true);
grpc_mux_->start();

// Validate that drain_request_timer is enabled when there are no tokens.
Expand Down Expand Up @@ -531,7 +533,7 @@ TEST_F(GrpcMuxImplTest, TooManyRequestsWithCustomRateLimitSettings) {
};

auto foo_sub = grpc_mux_->subscribe("foo", {"x"}, callbacks_);
expectSendMessage("foo", {"x"}, "");
expectSendMessage("foo", {"x"}, "", true);
grpc_mux_->start();

// Validate that rate limit is not enforced for 100 requests.
Expand Down Expand Up @@ -565,7 +567,7 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsEmptyResources) {
grpc_mux_->start();
{
// subscribe and unsubscribe to simulate a cluster added and removed
expectSendMessage(type_url, {"y"}, "");
expectSendMessage(type_url, {"y"}, "", true);
auto temp_sub = grpc_mux_->subscribe(type_url, {"y"}, callbacks_);
expectSendMessage(type_url, {}, "");
}
Expand All @@ -581,11 +583,11 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsEmptyResources) {
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response));

// when we add the new subscription version should be 1 and nonce should be bar
expectSendMessage(type_url, {"x"}, "1", "bar");
expectSendMessage(type_url, {"x"}, "1", false, "bar");

// simulate a new cluster x is added. add CLA subscription for it.
auto sub = grpc_mux_->subscribe(type_url, {"x"}, callbacks_);
expectSendMessage(type_url, {}, "1", "bar");
expectSendMessage(type_url, {}, "1", false, "bar");
}

// Verifies that a messsage with some resources is rejected when there are no watches.
Expand All @@ -598,7 +600,7 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeRejectsResources) {

grpc_mux_->start();
// subscribe and unsubscribe (by not keeping the return watch) so that the type is known to envoy
expectSendMessage(type_url, {"y"}, "");
expectSendMessage(type_url, {"y"}, "", true);
expectSendMessage(type_url, {}, "");
grpc_mux_->subscribe(type_url, {"y"}, callbacks_);

Expand All @@ -615,7 +617,7 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeRejectsResources) {
response->add_resources()->PackFrom(load_assignment);

// The message should be rejected.
expectSendMessage(type_url, {}, "", "bar");
expectSendMessage(type_url, {}, "", false, "bar");
EXPECT_LOG_CONTAINS("warning", "Ignoring unwatched type URL " + type_url,
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)));
}
Expand All @@ -627,7 +629,7 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) {
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_, rate_limit_settings_),
random_, stats_, rate_limit_settings_, true),
EnvoyException,
"ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via "
"--service-node and --service-cluster options.");
Expand All @@ -640,12 +642,11 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) {
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
random_, stats_, rate_limit_settings_),
random_, stats_, rate_limit_settings_, true),
EnvoyException,
"ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via "
"--service-node and --service-cluster options.");
}

} // namespace
} // namespace Config
} // namespace Envoy
4 changes: 2 additions & 2 deletions test/common/config/grpc_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) {
// Retry and succeed.
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));

expectSendMessage({"cluster2"}, "");
expectSendMessage({"cluster2"}, "", true);
timer_cb_();
EXPECT_TRUE(statsAre(3, 0, 0, 1, 0, 0));
verifyControlPlaneStats(1);
Expand All @@ -49,7 +49,7 @@ TEST_F(GrpcSubscriptionImplTest, RemoteStreamClose) {

// Retry and succeed.
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
expectSendMessage({"cluster0", "cluster1"}, "");
expectSendMessage({"cluster0", "cluster1"}, "", true);
timer_cb_();
EXPECT_TRUE(statsAre(2, 0, 0, 1, 0, 0));
}
Expand Down
Loading

0 comments on commit 8e07e82

Please sign in to comment.