diff --git a/source/extensions/config_subscription/grpc/xds_mux/BUILD b/source/extensions/config_subscription/grpc/xds_mux/BUILD index a9e7f75fd70a..a0fa33be7558 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/BUILD +++ b/source/extensions/config_subscription/grpc/xds_mux/BUILD @@ -70,6 +70,7 @@ envoy_cc_extension( "//source/common/memory:utils_lib", "//source/extensions/config_subscription/grpc:eds_resources_cache_lib", "//source/extensions/config_subscription/grpc:grpc_mux_context_lib", + "//source/extensions/config_subscription/grpc:grpc_mux_failover_lib", "//source/extensions/config_subscription/grpc:grpc_stream_lib", "//source/extensions/config_subscription/grpc:pausable_ack_queue_lib", "//source/extensions/config_subscription/grpc:watch_map_lib", diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc index 544a813e7a26..b564b0b929db 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc @@ -39,28 +39,49 @@ using AllMuxes = ThreadSafeSingleton; template GrpcMuxImpl::GrpcMuxImpl(std::unique_ptr subscription_state_factory, - GrpcMuxContext& grpc_mux_content, bool skip_subsequent_node) - : grpc_stream_(this, std::move(grpc_mux_content.async_client_), - grpc_mux_content.service_method_, grpc_mux_content.dispatcher_, - grpc_mux_content.scope_, std::move(grpc_mux_content.backoff_strategy_), - grpc_mux_content.rate_limit_settings_), + GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node) + : grpc_stream_(createGrpcStreamObject(grpc_mux_context)), subscription_state_factory_(std::move(subscription_state_factory)), - skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_content.local_info_), + skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_context.local_info_), dynamic_update_callback_handle_( - grpc_mux_content.local_info_.contextProvider().addDynamicContextUpdateCallback( + grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback( [this](absl::string_view resource_type_url) { onDynamicContextUpdate(resource_type_url); return absl::OkStatus(); })), - config_validators_(std::move(grpc_mux_content.config_validators_)), - xds_config_tracker_(grpc_mux_content.xds_config_tracker_), - xds_resources_delegate_(grpc_mux_content.xds_resources_delegate_), - eds_resources_cache_(std::move(grpc_mux_content.eds_resources_cache_)), - target_xds_authority_(grpc_mux_content.target_xds_authority_) { - THROW_IF_NOT_OK(Config::Utility::checkLocalInfo("ads", grpc_mux_content.local_info_)); + config_validators_(std::move(grpc_mux_context.config_validators_)), + xds_config_tracker_(grpc_mux_context.xds_config_tracker_), + xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_), + eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)), + target_xds_authority_(grpc_mux_context.target_xds_authority_) { + THROW_IF_NOT_OK(Config::Utility::checkLocalInfo("ads", grpc_mux_context.local_info_)); AllMuxes::get().insert(this); } +template +std::unique_ptr> +GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { + if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { + return std::make_unique>( + /*primary_stream_creator=*/ + [&grpc_mux_context](GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { + return std::make_unique>( + callbacks, std::move(grpc_mux_context.async_client_), + grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, + grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_), + grpc_mux_context.rate_limit_settings_); + }, + /*failover_stream_creator=*/ + // TODO(adisuissa): implement when failover is fully plumbed. + absl::nullopt, + /*grpc_mux_callbacks=*/*this, + /*dispatch=*/grpc_mux_context.dispatcher_); + } + return std::make_unique>( + this, std::move(grpc_mux_context.async_client_), grpc_mux_context.service_method_, + grpc_mux_context.dispatcher_, grpc_mux_context.scope_, + std::move(grpc_mux_context.backoff_strategy_), grpc_mux_context.rate_limit_settings_); +} template GrpcMuxImpl::~GrpcMuxImpl() { AllMuxes::get().erase(this); } @@ -231,7 +252,7 @@ template void GrpcMuxImpl:: } started_ = true; ENVOY_LOG(debug, "GrpcMuxImpl now trying to establish a stream"); - grpc_stream_.establishNewStream(); + grpc_stream_->establishNewStream(); } template diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h index 8af1675ba5cb..2e0ce7407862 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h @@ -22,7 +22,7 @@ #include "source/common/config/api_version.h" #include "source/common/grpc/common.h" #include "source/extensions/config_subscription/grpc/grpc_mux_context.h" -#include "source/extensions/config_subscription/grpc/grpc_stream.h" +#include "source/extensions/config_subscription/grpc/grpc_mux_failover.h" #include "source/extensions/config_subscription/grpc/pausable_ack_queue.h" #include "source/extensions/config_subscription/grpc/watch_map.h" #include "source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h" @@ -107,7 +107,14 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, return makeOptRefFromPtr(eds_resources_cache_.get()); } - GrpcStream& grpcStreamForTest() { return grpc_stream_; } + GrpcStreamInterface& grpcStreamForTest() { + // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated, + // return grpc_stream_.currentStreamForTest() directly (defined in GrpcMuxFailover). + if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { + return dynamic_cast*>(grpc_stream_.get())->currentStreamForTest(); + } + return *grpc_stream_.get(); + } protected: class WatchImpl : public Envoy::Config::GrpcMuxWatch { @@ -137,10 +144,10 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, }; void sendGrpcMessage(RQ& msg_proto, S& sub_state); - void maybeUpdateQueueSizeStat(uint64_t size) { grpc_stream_.maybeUpdateQueueSizeStat(size); } - bool grpcStreamAvailable() { return grpc_stream_.grpcStreamAvailable(); } - bool rateLimitAllowsDrain() { return grpc_stream_.checkRateLimitAllowsDrain(); } - void sendMessage(RQ& msg_proto) { grpc_stream_.sendMessage(msg_proto); } + void maybeUpdateQueueSizeStat(uint64_t size) { grpc_stream_->maybeUpdateQueueSizeStat(size); } + bool grpcStreamAvailable() { return grpc_stream_->grpcStreamAvailable(); } + bool rateLimitAllowsDrain() { return grpc_stream_->checkRateLimitAllowsDrain(); } + void sendMessage(RQ& msg_proto) { grpc_stream_->sendMessage(msg_proto); } S& subscriptionStateFor(const std::string& type_url); WatchMap& watchMapFor(const std::string& type_url); @@ -157,6 +164,12 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, const LocalInfo::LocalInfo& localInfo() const { return local_info_; } private: + // Helper function to create the grpc_stream_ object. + // TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support + // is deprecated. + std::unique_ptr> + createGrpcStreamObject(GrpcMuxContext& grpc_mux_context); + // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check // whether we *want* to send a (Delta)DiscoveryRequest). bool canSendDiscoveryRequest(const std::string& type_url); @@ -172,7 +185,10 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, // Invoked when dynamic context parameters change for a resource type. void onDynamicContextUpdate(absl::string_view resource_type_url); - GrpcStream grpc_stream_; + // Multiplexes the stream to the primary and failover sources. + // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated, + // convert from unique_ptr to GrpcMuxFailover directly. + std::unique_ptr> grpc_stream_; // Resource (N)ACKs we're waiting to send, stored in the order that they should be sent in. All // of our different resource types' ACKs are mixed together in this queue. See class for diff --git a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc index 2e9ea564397c..123ffd52a56d 100644 --- a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc @@ -26,6 +26,7 @@ #include "test/test_common/logging.h" #include "test/test_common/resources.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/test_runtime.h" #include "test/test_common/test_time.h" #include "test/test_common/utility.h" @@ -48,7 +49,7 @@ namespace { // We test some mux specific stuff below, other unit test coverage for singleton use of GrpcMuxImpl // is provided in [grpc_]subscription_impl_test.cc. -class GrpcMuxImplTestBase : public testing::Test { +class GrpcMuxImplTestBase : public testing::TestWithParam { public: GrpcMuxImplTestBase() : async_client_(new Grpc::MockAsyncClient()), @@ -56,8 +57,13 @@ class GrpcMuxImplTestBase : public testing::Test { control_plane_stats_(Utility::generateControlPlaneStats(*stats_.rootScope())), control_plane_connected_state_( stats_.gauge("control_plane.connected_state", Stats::Gauge::ImportMode::NeverImport)), - control_plane_pending_requests_(stats_.gauge("control_plane.pending_requests", - Stats::Gauge::ImportMode::NeverImport)) {} + control_plane_pending_requests_( + stats_.gauge("control_plane.pending_requests", Stats::Gauge::ImportMode::NeverImport)) { + // Once "envoy.restart_features.xds_failover_support" is deprecated, the + // test should no longer be parameterized. + scoped_runtime_.mergeValues( + {{"envoy.restart_features.xds_failover_support", GetParam() ? "true" : "false"}}); + } void setup() { setup(rate_limit_settings_); } @@ -123,6 +129,7 @@ class GrpcMuxImplTestBase : public testing::Test { return grpc_mux_->addWatch(type_url, resources, callbacks, resource_decoder, {}); } + TestScopedRuntime scoped_runtime_; NiceMock dispatcher_; NiceMock random_; Grpc::MockAsyncClient* async_client_; @@ -147,8 +154,10 @@ class GrpcMuxImplTest : public GrpcMuxImplTestBase { Event::SimulatedTimeSystem time_system_; }; +INSTANTIATE_TEST_SUITE_P(GrpcMuxImpl, GrpcMuxImplTest, ::testing::Bool()); + // Validate behavior when multiple type URL watches are maintained, watches are created/destroyed. -TEST_F(GrpcMuxImplTest, MultipleTypeUrlStreams) { +TEST_P(GrpcMuxImplTest, MultipleTypeUrlStreams) { setup(); InSequence s; @@ -169,7 +178,7 @@ TEST_F(GrpcMuxImplTest, MultipleTypeUrlStreams) { } // Validate behavior when multiple type URL watches are maintained and the stream is reset. -TEST_F(GrpcMuxImplTest, ResetStream) { +TEST_P(GrpcMuxImplTest, ResetStream) { InSequence s; auto* timer = new Event::MockTimer(&dispatcher_); @@ -213,7 +222,7 @@ TEST_F(GrpcMuxImplTest, ResetStream) { } // Validate pause-resume behavior. -TEST_F(GrpcMuxImplTest, PauseResume) { +TEST_P(GrpcMuxImplTest, PauseResume) { setup(); InSequence s; GrpcMuxWatchPtr foo1; @@ -248,7 +257,7 @@ TEST_F(GrpcMuxImplTest, PauseResume) { } // Validate behavior when type URL mismatches occur. -TEST_F(GrpcMuxImplTest, TypeUrlMismatch) { +TEST_P(GrpcMuxImplTest, TypeUrlMismatch) { setup(); auto invalid_response = std::make_unique(); @@ -289,7 +298,7 @@ TEST_F(GrpcMuxImplTest, TypeUrlMismatch) { expectSendMessage("type_url_foo", {}, ""); } -TEST_F(GrpcMuxImplTest, RpcErrorMessageTruncated) { +TEST_P(GrpcMuxImplTest, RpcErrorMessageTruncated) { setup(); auto invalid_response = std::make_unique(); InSequence s; @@ -351,7 +360,7 @@ resourceWithEmptyTtl(envoy::config::endpoint::v3::ClusterLoadAssignment& cla) { return resource; } // Validates the behavior when the TTL timer expires. -TEST_F(GrpcMuxImplTest, ResourceTTL) { +TEST_P(GrpcMuxImplTest, ResourceTTL) { setup(); time_system_.setSystemTime(std::chrono::seconds(0)); @@ -487,7 +496,7 @@ TEST_F(GrpcMuxImplTest, ResourceTTL) { } // Checks that the control plane identifier is logged -TEST_F(GrpcMuxImplTest, LogsControlPlaneIndentifier) { +TEST_P(GrpcMuxImplTest, LogsControlPlaneIndentifier) { setup(); std::string type_url = "foo"; @@ -521,7 +530,7 @@ TEST_F(GrpcMuxImplTest, LogsControlPlaneIndentifier) { } // Validate behavior when watches has an unknown resource name. -TEST_F(GrpcMuxImplTest, WildcardWatch) { +TEST_P(GrpcMuxImplTest, WildcardWatch) { setup(); const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; @@ -553,7 +562,7 @@ TEST_F(GrpcMuxImplTest, WildcardWatch) { } // Validate behavior when watches specify resources (potentially overlapping). -TEST_F(GrpcMuxImplTest, WatchDemux) { +TEST_P(GrpcMuxImplTest, WatchDemux) { setup(); // We will not require InSequence here: an update that causes multiple onConfigUpdates // causes them in an indeterminate order, based on the whims of the hash map. @@ -643,7 +652,7 @@ TEST_F(GrpcMuxImplTest, WatchDemux) { } // Validate behavior when we have multiple watchers that send empty updates. -TEST_F(GrpcMuxImplTest, MultipleWatcherWithEmptyUpdates) { +TEST_P(GrpcMuxImplTest, MultipleWatcherWithEmptyUpdates) { setup(); InSequence s; const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; @@ -666,7 +675,7 @@ TEST_F(GrpcMuxImplTest, MultipleWatcherWithEmptyUpdates) { } // Validate behavior when we have Single Watcher that sends Empty updates. -TEST_F(GrpcMuxImplTest, SingleWatcherWithEmptyUpdates) { +TEST_P(GrpcMuxImplTest, SingleWatcherWithEmptyUpdates) { setup(); const std::string& type_url = Config::TypeUrl::get().Cluster; NiceMock foo_callbacks; @@ -696,8 +705,11 @@ class GrpcMuxImplTestWithMockTimeSystem : public GrpcMuxImplTestBase { Event::DelegatingTestTimeSystem mock_time_system_; }; +INSTANTIATE_TEST_SUITE_P(GrpcMuxImplTestWithMockTimeSystem, GrpcMuxImplTestWithMockTimeSystem, + ::testing::Bool()); + // Verifies that rate limiting is not enforced with defaults. -TEST_F(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithDefaultSettings) { +TEST_P(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithDefaultSettings) { auto ttl_timer = new Event::MockTimer(&dispatcher_); // Retry timer, @@ -735,7 +747,7 @@ TEST_F(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithDefaultSettings) { } // Verifies that default rate limiting is enforced with empty RateLimitSettings. -TEST_F(GrpcMuxImplTest, TooManyRequestsWithEmptyRateLimitSettings) { +TEST_P(GrpcMuxImplTest, TooManyRequestsWithEmptyRateLimitSettings) { // Validate that request drain timer is created. auto ttl_timer = new Event::MockTimer(&dispatcher_); @@ -792,7 +804,7 @@ TEST_F(GrpcMuxImplTest, TooManyRequestsWithEmptyRateLimitSettings) { } // Verifies that rate limiting is enforced with custom RateLimitSettings. -TEST_F(GrpcMuxImplTest, TooManyRequestsWithCustomRateLimitSettings) { +TEST_P(GrpcMuxImplTest, TooManyRequestsWithCustomRateLimitSettings) { // Validate that request drain timer is created. // TTL timer. @@ -845,7 +857,7 @@ TEST_F(GrpcMuxImplTest, TooManyRequestsWithCustomRateLimitSettings) { } // Verifies that a message with no resources is accepted. -TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsEmptyResources) { +TEST_P(GrpcMuxImplTest, UnwatchedTypeAcceptsEmptyResources) { setup(); EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); @@ -885,7 +897,7 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsEmptyResources) { // are accompanied by interesting ones. // Note: this was previously "rejects", not "accepts". See // https://github.com/envoyproxy/envoy/pull/8350#discussion_r328218220 for discussion. -TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsResources) { +TEST_P(GrpcMuxImplTest, UnwatchedTypeAcceptsResources) { setup(); EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; @@ -908,7 +920,7 @@ TEST_F(GrpcMuxImplTest, UnwatchedTypeAcceptsResources) { grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); } -TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { +TEST_P(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { EXPECT_CALL(local_info_, clusterName()).WillOnce(ReturnRef(EMPTY_STRING)); GrpcMuxContext grpc_mux_context{ /*async_client_=*/std::unique_ptr(async_client_), @@ -934,7 +946,7 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { "--service-node and --service-cluster options."); } -TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { +TEST_P(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { EXPECT_CALL(local_info_, nodeName()).WillOnce(ReturnRef(EMPTY_STRING)); GrpcMuxContext grpc_mux_context{ /*async_client_=*/std::unique_ptr(async_client_), @@ -961,7 +973,7 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { } // Validate that a valid resource decoder is used after removing a subscription. -TEST_F(GrpcMuxImplTest, ValidResourceDecoderAfterRemoval) { +TEST_P(GrpcMuxImplTest, ValidResourceDecoderAfterRemoval) { setup(); const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; @@ -1039,7 +1051,7 @@ TEST_F(GrpcMuxImplTest, ValidResourceDecoderAfterRemoval) { } // Validate behavior when dynamic context parameters are updated. -TEST_F(GrpcMuxImplTest, DynamicContextParameters) { +TEST_P(GrpcMuxImplTest, DynamicContextParameters) { setup(); InSequence s; auto foo = grpc_mux_->addWatch("foo", {"x", "y"}, callbacks_, resource_decoder_, {}); @@ -1061,7 +1073,7 @@ TEST_F(GrpcMuxImplTest, DynamicContextParameters) { expectSendMessage("foo", {}, "", false); } -TEST_F(GrpcMuxImplTest, AllMuxesStateTest) { +TEST_P(GrpcMuxImplTest, AllMuxesStateTest) { setup(); GrpcMuxContext grpc_mux_context{ /*async_client_=*/std::unique_ptr(), @@ -1089,20 +1101,20 @@ TEST_F(GrpcMuxImplTest, AllMuxesStateTest) { } // Validates that the EDS cache getter returns the cache. -TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEds) { +TEST_P(GrpcMuxImplTest, EdsResourcesCacheForEds) { eds_resources_cache_ = new NiceMock(); setup(); EXPECT_TRUE(grpc_mux_->edsResourcesCache().has_value()); } // Validates that the EDS cache getter returns empty if there is no cache. -TEST_F(GrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) { +TEST_P(GrpcMuxImplTest, EdsResourcesCacheForEdsNoCache) { setup(); EXPECT_FALSE(grpc_mux_->edsResourcesCache().has_value()); } // Validate that an EDS resource is cached if there's a cache. -TEST_F(GrpcMuxImplTest, CacheEdsResource) { +TEST_P(GrpcMuxImplTest, CacheEdsResource) { // Create the cache that will also be passed to the GrpcMux object via setup(). eds_resources_cache_ = new NiceMock(); setup(); @@ -1144,7 +1156,7 @@ TEST_F(GrpcMuxImplTest, CacheEdsResource) { // Validate that an update to an EDS resource watcher is reflected in the cache, // if there's a cache. -TEST_F(GrpcMuxImplTest, UpdateCacheEdsResource) { +TEST_P(GrpcMuxImplTest, UpdateCacheEdsResource) { // Create the cache that will also be passed to the GrpcMux object via setup(). eds_resources_cache_ = new NiceMock(); setup(); @@ -1191,7 +1203,7 @@ TEST_F(GrpcMuxImplTest, UpdateCacheEdsResource) { // Validate that adding and removing watchers reflects on the cache changes, // if there's a cache. -TEST_F(GrpcMuxImplTest, AddRemoveSubscriptions) { +TEST_P(GrpcMuxImplTest, AddRemoveSubscriptions) { // Create the cache that will also be passed to the GrpcMux object via setup(). eds_resources_cache_ = new NiceMock(); setup();