Skip to content

Commit

Permalink
xds-failover: add unified gRPC mux support (envoyproxy#34534)
Browse files Browse the repository at this point in the history
This PR introduces xDS-Failover support for unified gRPC mux implementations.
This is similar to envoyproxy#34437, and another part of envoyproxy#34417.

Risk Level: medium if the runtime flag is set to true and unified-gRPC-mux is used.
Testing: Updated the test cases (parameterized).

Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa authored Jun 6, 2024
1 parent ad0a100 commit b6b585b
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 50 deletions.
1 change: 1 addition & 0 deletions source/extensions/config_subscription/grpc/xds_mux/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ envoy_cc_extension(
"//source/common/memory:utils_lib",
"//source/extensions/config_subscription/grpc:eds_resources_cache_lib",
"//source/extensions/config_subscription/grpc:grpc_mux_context_lib",
"//source/extensions/config_subscription/grpc:grpc_mux_failover_lib",
"//source/extensions/config_subscription/grpc:grpc_stream_lib",
"//source/extensions/config_subscription/grpc:pausable_ack_queue_lib",
"//source/extensions/config_subscription/grpc:watch_map_lib",
Expand Down
49 changes: 35 additions & 14 deletions source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,49 @@ using AllMuxes = ThreadSafeSingleton<AllMuxesState>;

template <class S, class F, class RQ, class RS>
GrpcMuxImpl<S, F, RQ, RS>::GrpcMuxImpl(std::unique_ptr<F> subscription_state_factory,
GrpcMuxContext& grpc_mux_content, bool skip_subsequent_node)
: grpc_stream_(this, std::move(grpc_mux_content.async_client_),
grpc_mux_content.service_method_, grpc_mux_content.dispatcher_,
grpc_mux_content.scope_, std::move(grpc_mux_content.backoff_strategy_),
grpc_mux_content.rate_limit_settings_),
GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
: grpc_stream_(createGrpcStreamObject(grpc_mux_context)),
subscription_state_factory_(std::move(subscription_state_factory)),
skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_content.local_info_),
skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_context.local_info_),
dynamic_update_callback_handle_(
grpc_mux_content.local_info_.contextProvider().addDynamicContextUpdateCallback(
grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
[this](absl::string_view resource_type_url) {
onDynamicContextUpdate(resource_type_url);
return absl::OkStatus();
})),
config_validators_(std::move(grpc_mux_content.config_validators_)),
xds_config_tracker_(grpc_mux_content.xds_config_tracker_),
xds_resources_delegate_(grpc_mux_content.xds_resources_delegate_),
eds_resources_cache_(std::move(grpc_mux_content.eds_resources_cache_)),
target_xds_authority_(grpc_mux_content.target_xds_authority_) {
THROW_IF_NOT_OK(Config::Utility::checkLocalInfo("ads", grpc_mux_content.local_info_));
config_validators_(std::move(grpc_mux_context.config_validators_)),
xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_),
eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)),
target_xds_authority_(grpc_mux_context.target_xds_authority_) {
THROW_IF_NOT_OK(Config::Utility::checkLocalInfo("ads", grpc_mux_context.local_info_));
AllMuxes::get().insert(this);
}

template <class S, class F, class RQ, class RS>
std::unique_ptr<GrpcStreamInterface<RQ, RS>>
GrpcMuxImpl<S, F, RQ, RS>::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) {
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
return std::make_unique<GrpcMuxFailover<RQ, RS>>(
/*primary_stream_creator=*/
[&grpc_mux_context](GrpcStreamCallbacks<RS>* callbacks) -> GrpcStreamInterfacePtr<RQ, RS> {
return std::make_unique<GrpcStream<RQ, RS>>(
callbacks, std::move(grpc_mux_context.async_client_),
grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_),
grpc_mux_context.rate_limit_settings_);
},
/*failover_stream_creator=*/
// TODO(adisuissa): implement when failover is fully plumbed.
absl::nullopt,
/*grpc_mux_callbacks=*/*this,
/*dispatch=*/grpc_mux_context.dispatcher_);
}
return std::make_unique<GrpcStream<RQ, RS>>(
this, std::move(grpc_mux_context.async_client_), grpc_mux_context.service_method_,
grpc_mux_context.dispatcher_, grpc_mux_context.scope_,
std::move(grpc_mux_context.backoff_strategy_), grpc_mux_context.rate_limit_settings_);
}
template <class S, class F, class RQ, class RS> GrpcMuxImpl<S, F, RQ, RS>::~GrpcMuxImpl() {
AllMuxes::get().erase(this);
}
Expand Down Expand Up @@ -231,7 +252,7 @@ template <class S, class F, class RQ, class RS> void GrpcMuxImpl<S, F, RQ, RS>::
}
started_ = true;
ENVOY_LOG(debug, "GrpcMuxImpl now trying to establish a stream");
grpc_stream_.establishNewStream();
grpc_stream_->establishNewStream();
}

template <class S, class F, class RQ, class RS>
Expand Down
30 changes: 23 additions & 7 deletions source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "source/common/config/api_version.h"
#include "source/common/grpc/common.h"
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
#include "source/extensions/config_subscription/grpc/grpc_stream.h"
#include "source/extensions/config_subscription/grpc/grpc_mux_failover.h"
#include "source/extensions/config_subscription/grpc/pausable_ack_queue.h"
#include "source/extensions/config_subscription/grpc/watch_map.h"
#include "source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h"
Expand Down Expand Up @@ -107,7 +107,14 @@ class GrpcMuxImpl : public GrpcStreamCallbacks<RS>,
return makeOptRefFromPtr(eds_resources_cache_.get());
}

GrpcStream<RQ, RS>& grpcStreamForTest() { return grpc_stream_; }
GrpcStreamInterface<RQ, RS>& grpcStreamForTest() {
// TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated,
// return grpc_stream_.currentStreamForTest() directly (defined in GrpcMuxFailover).
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
return dynamic_cast<GrpcMuxFailover<RQ, RS>*>(grpc_stream_.get())->currentStreamForTest();
}
return *grpc_stream_.get();
}

protected:
class WatchImpl : public Envoy::Config::GrpcMuxWatch {
Expand Down Expand Up @@ -137,10 +144,10 @@ class GrpcMuxImpl : public GrpcStreamCallbacks<RS>,
};

void sendGrpcMessage(RQ& msg_proto, S& sub_state);
void maybeUpdateQueueSizeStat(uint64_t size) { grpc_stream_.maybeUpdateQueueSizeStat(size); }
bool grpcStreamAvailable() { return grpc_stream_.grpcStreamAvailable(); }
bool rateLimitAllowsDrain() { return grpc_stream_.checkRateLimitAllowsDrain(); }
void sendMessage(RQ& msg_proto) { grpc_stream_.sendMessage(msg_proto); }
void maybeUpdateQueueSizeStat(uint64_t size) { grpc_stream_->maybeUpdateQueueSizeStat(size); }
bool grpcStreamAvailable() { return grpc_stream_->grpcStreamAvailable(); }
bool rateLimitAllowsDrain() { return grpc_stream_->checkRateLimitAllowsDrain(); }
void sendMessage(RQ& msg_proto) { grpc_stream_->sendMessage(msg_proto); }

S& subscriptionStateFor(const std::string& type_url);
WatchMap& watchMapFor(const std::string& type_url);
Expand All @@ -157,6 +164,12 @@ class GrpcMuxImpl : public GrpcStreamCallbacks<RS>,
const LocalInfo::LocalInfo& localInfo() const { return local_info_; }

private:
// Helper function to create the grpc_stream_ object.
// TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support
// is deprecated.
std::unique_ptr<GrpcStreamInterface<RQ, RS>>
createGrpcStreamObject(GrpcMuxContext& grpc_mux_context);

// Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
// whether we *want* to send a (Delta)DiscoveryRequest).
bool canSendDiscoveryRequest(const std::string& type_url);
Expand All @@ -172,7 +185,10 @@ class GrpcMuxImpl : public GrpcStreamCallbacks<RS>,
// Invoked when dynamic context parameters change for a resource type.
void onDynamicContextUpdate(absl::string_view resource_type_url);

GrpcStream<RQ, RS> grpc_stream_;
// Multiplexes the stream to the primary and failover sources.
// TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated,
// convert from unique_ptr<GrpcStreamInterface> to GrpcMuxFailover directly.
std::unique_ptr<GrpcStreamInterface<RQ, RS>> grpc_stream_;

// Resource (N)ACKs we're waiting to send, stored in the order that they should be sent in. All
// of our different resource types' ACKs are mixed together in this queue. See class for
Expand Down
Loading

0 comments on commit b6b585b

Please sign in to comment.