Skip to content

Commit

Permalink
subscription: making exception free (envoyproxy#34554)
Browse files Browse the repository at this point in the history
Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk authored Jun 6, 2024
1 parent de78171 commit db9d7ea
Show file tree
Hide file tree
Showing 24 changed files with 150 additions and 100 deletions.
10 changes: 6 additions & 4 deletions envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ class SubscriptionFactory {
* @param resource_decoder how incoming opaque resource objects are to be decoded.
* @param options subscription options.
*
* @return SubscriptionPtr subscription object corresponding for config and type_url.
* @return SubscriptionPtr subscription object corresponding for config and type_url or error
* status.
*/
virtual SubscriptionPtr subscriptionFromConfigSource(
virtual absl::StatusOr<SubscriptionPtr> subscriptionFromConfigSource(
const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) PURE;
Expand All @@ -73,9 +74,10 @@ class SubscriptionFactory {
* CollectionSubscription object.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
*
* @return SubscriptionPtr subscription object corresponding for collection_locator.
* @return SubscriptionPtr subscription object corresponding for collection_locator or error
* status.
*/
virtual SubscriptionPtr
virtual absl::StatusOr<SubscriptionPtr>
collectionSubscriptionFromUrl(const xds::core::v3::ResourceLocator& collection_locator,
const envoy::config::core::v3::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
Expand Down
3 changes: 2 additions & 1 deletion envoy/runtime/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,9 @@ class Loader {
* the constructor is finished, with the exception of dynamic RTDS layers,
* which require ClusterManager.
* @param cm cluster manager reference.
* @return a status indicating if initialization was successful.
*/
virtual void initialize(Upstream::ClusterManager& cm) PURE;
virtual absl::Status initialize(Upstream::ClusterManager& cm) PURE;

/**
* @return const Snapshot& the current snapshot. This reference is safe to use for the duration of
Expand Down
69 changes: 39 additions & 30 deletions source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ SubscriptionFactoryImpl::SubscriptionFactoryImpl(
validation_visitor_(validation_visitor), api_(api), server_(server),
xds_resources_delegate_(xds_resources_delegate), xds_config_tracker_(xds_config_tracker) {}

SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
absl::StatusOr<SubscriptionPtr> SubscriptionFactoryImpl::subscriptionFromConfigSource(
const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) {
THROW_IF_NOT_OK(Config::Utility::checkLocalInfo(type_url, local_info_));
RETURN_IF_NOT_OK(Config::Utility::checkLocalInfo(type_url, local_info_));
SubscriptionStats stats = Utility::generateStats(scope);

std::string subscription_type = "";
Expand All @@ -50,29 +50,29 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(

switch (config.config_source_specifier_case()) {
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kPath: {
THROW_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(config.path(), api_));
RETURN_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(config.path(), api_));
subscription_type = "envoy.config_subscription.filesystem";
break;
}
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kPathConfigSource: {
THROW_IF_NOT_OK(
RETURN_IF_NOT_OK(
Utility::checkFilesystemSubscriptionBackingPath(config.path_config_source().path(), api_));
subscription_type = "envoy.config_subscription.filesystem";
break;
}
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kApiConfigSource: {
const envoy::config::core::v3::ApiConfigSource& api_config_source = config.api_config_source();
THROW_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.primaryClusters(),
api_config_source));
THROW_IF_NOT_OK(Utility::checkTransportVersion(api_config_source));
RETURN_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.primaryClusters(),
api_config_source));
RETURN_IF_NOT_OK(Utility::checkTransportVersion(api_config_source));
switch (api_config_source.api_type()) {
PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_GRPC:
throwEnvoyExceptionOrPanic("Unsupported config source AGGREGATED_GRPC");
return absl::InvalidArgumentError("Unsupported config source AGGREGATED_GRPC");
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC:
throwEnvoyExceptionOrPanic("Unsupported config source AGGREGATED_DELTA_GRPC");
return absl::InvalidArgumentError("Unsupported config source AGGREGATED_DELTA_GRPC");
case envoy::config::core::v3::ApiConfigSource::DEPRECATED_AND_UNAVAILABLE_DO_NOT_USE:
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
"REST_LEGACY no longer a supported ApiConfigSource. "
"Please specify an explicit supported api_type in the following config:\n" +
config.DebugString());
Expand All @@ -87,7 +87,7 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
break;
}
if (subscription_type.empty()) {
throwEnvoyExceptionOrPanic("Invalid API config source API type");
return absl::InvalidArgumentError("Invalid API config source API type");
}
break;
}
Expand All @@ -96,32 +96,32 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
break;
}
default:
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
"Missing config source specifier in envoy::config::core::v3::ConfigSource");
}
ConfigSubscriptionFactory* factory =
Registry::FactoryRegistry<ConfigSubscriptionFactory>::getFactory(subscription_type);
if (factory == nullptr) {
throwEnvoyExceptionOrPanic(fmt::format(
return absl::InvalidArgumentError(fmt::format(
"Didn't find a registered config subscription factory implementation for name: '{}'",
subscription_type));
}
return factory->create(data);
}

SubscriptionPtr createFromFactoryOrThrow(ConfigSubscriptionFactory::SubscriptionData& data,
absl::string_view subscription_type) {
absl::StatusOr<SubscriptionPtr> createFromFactory(ConfigSubscriptionFactory::SubscriptionData& data,
absl::string_view subscription_type) {
ConfigSubscriptionFactory* factory =
Registry::FactoryRegistry<ConfigSubscriptionFactory>::getFactory(subscription_type);
if (factory == nullptr) {
throwEnvoyExceptionOrPanic(fmt::format(
return absl::InvalidArgumentError(fmt::format(
"Didn't find a registered config subscription factory implementation for name: '{}'",
subscription_type));
}
return factory->create(data);
}

SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
absl::StatusOr<SubscriptionPtr> SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
const xds::core::v3::ResourceLocator& collection_locator,
const envoy::config::core::v3::ConfigSource& config, absl::string_view resource_type,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
Expand All @@ -148,59 +148,68 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
switch (collection_locator.scheme()) {
case xds::core::v3::ResourceLocator::FILE: {
const std::string path = Http::Utility::localPathFromFilePath(collection_locator.id());
THROW_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(path, api_));
RETURN_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(path, api_));
factory_config.set_path(path);
return createFromFactoryOrThrow(data, "envoy.config_subscription.filesystem_collection");
auto ptr_or_error = createFromFactory(data, "envoy.config_subscription.filesystem_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
case xds::core::v3::ResourceLocator::XDSTP: {
if (resource_type != collection_locator.resource_type()) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("xdstp:// type does not match {} in {}", resource_type,
Config::XdsResourceIdentifier::encodeUrl(collection_locator)));
}
switch (config.config_source_specifier_case()) {
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kApiConfigSource: {
const envoy::config::core::v3::ApiConfigSource& api_config_source =
config.api_config_source();
THROW_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.primaryClusters(),
api_config_source));
RETURN_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(
cm_.primaryClusters(), api_config_source));
// All Envoy collections currently are xDS resource graph roots and require node context
// parameters.
options.add_xdstp_node_context_params_ = true;
switch (api_config_source.api_type()) {
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
std::string type_url = TypeUtil::descriptorFullNameToTypeUrl(resource_type);
data.type_url_ = type_url;
return createFromFactoryOrThrow(data, "envoy.config_subscription.delta_grpc_collection");
auto ptr_or_error =
createFromFactory(data, "envoy.config_subscription.delta_grpc_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_GRPC:
FALLTHRU;
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC: {
return createFromFactoryOrThrow(data,
"envoy.config_subscription.aggregated_grpc_collection");
auto ptr_or_error =
createFromFactory(data, "envoy.config_subscription.aggregated_grpc_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
default:
throwEnvoyExceptionOrPanic(fmt::format("Unknown xdstp:// transport API type in {}",
api_config_source.DebugString()));
return absl::InvalidArgumentError(fmt::format("Unknown xdstp:// transport API type in {}",
api_config_source.DebugString()));
}
}
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kAds: {
// TODO(adisuissa): verify that the ADS is set up in delta-xDS mode.
// All Envoy collections currently are xDS resource graph roots and require node context
// parameters.
options.add_xdstp_node_context_params_ = true;
return createFromFactoryOrThrow(data, "envoy.config_subscription.ads_collection");
auto ptr_or_error = createFromFactory(data, "envoy.config_subscription.ads_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
default:
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
"Missing or not supported config source specifier in "
"envoy::config::core::v3::ConfigSource for a collection. Only ADS and "
"gRPC in delta-xDS mode are supported.");
}
}
default:
// TODO(htuch): Implement HTTP semantics for collection ResourceLocators.
throwEnvoyExceptionOrPanic("Unsupported code path");
return absl::InvalidArgumentError("Unsupported code path");
}
}

Expand Down
11 changes: 5 additions & 6 deletions source/common/config/subscription_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ class SubscriptionFactoryImpl : public SubscriptionFactory, Logger::Loggable<Log
XdsConfigTrackerOptRef xds_config_tracker);

// Config::SubscriptionFactory
SubscriptionPtr subscriptionFromConfigSource(const envoy::config::core::v3::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder,
const SubscriptionOptions& options) override;
SubscriptionPtr
absl::StatusOr<SubscriptionPtr> subscriptionFromConfigSource(
const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) override;
absl::StatusOr<SubscriptionPtr>
collectionSubscriptionFromUrl(const xds::core::v3::ResourceLocator& collection_locator,
const envoy::config::core::v3::ConfigSource& config,
absl::string_view resource_type, Stats::Scope& scope,
Expand Down
7 changes: 5 additions & 2 deletions source/common/filter/config_discovery_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ FilterConfigSubscription::FilterConfigSubscription(
filter_config_provider_manager_(filter_config_provider_manager),
subscription_id_(subscription_id) {
const auto resource_name = getResourceName();
subscription_ = cluster_manager.subscriptionFactory().subscriptionFromConfigSource(
config_source, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {});
subscription_ =
THROW_OR_RETURN_VALUE(cluster_manager.subscriptionFactory().subscriptionFromConfigSource(
config_source, Grpc::Common::typeUrl(resource_name), *scope_, *this,
resource_decoder_, {}),
Config::SubscriptionPtr);
}

void FilterConfigSubscription::start() {
Expand Down
12 changes: 8 additions & 4 deletions source/common/listener_manager/lds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ LdsApiImpl::LdsApiImpl(const envoy::config::core::v3::ConfigSource& lds_config,
init_target_("LDS", [this]() { subscription_->start({}); }) {
const auto resource_name = getResourceName();
if (lds_resources_locator == nullptr) {
subscription_ = cm.subscriptionFactory().subscriptionFromConfigSource(
lds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {});
subscription_ = THROW_OR_RETURN_VALUE(cm.subscriptionFactory().subscriptionFromConfigSource(
lds_config, Grpc::Common::typeUrl(resource_name),
*scope_, *this, resource_decoder_, {}),
Config::SubscriptionPtr);
} else {
subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl(
*lds_resources_locator, lds_config, resource_name, *scope_, *this, resource_decoder_);
subscription_ = THROW_OR_RETURN_VALUE(
cm.subscriptionFactory().collectionSubscriptionFromUrl(
*lds_resources_locator, lds_config, resource_name, *scope_, *this, resource_decoder_),
Config::SubscriptionPtr);
}
init_manager.add(init_target_);
}
Expand Down
5 changes: 3 additions & 2 deletions source/common/rds/rds_route_config_subscription.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ RdsRouteConfigSubscription::RdsRouteConfigSubscription(
manager_identifier_(manager_identifier), config_update_info_(std::move(config_update)),
resource_decoder_(std::move(resource_decoder)) {
const auto resource_type = route_config_provider_manager_.protoTraits().resourceType();
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
config_source, Envoy::Grpc::Common::typeUrl(resource_type), *scope_, *this,
resource_decoder_, {});
resource_decoder_, {}),
Envoy::Config::SubscriptionPtr);
local_init_manager_.add(local_init_target_);
}

Expand Down
10 changes: 6 additions & 4 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,20 @@ ScopedRdsConfigSubscription::ScopedRdsConfigSubscription(
route_config_provider_manager_(route_config_provider_manager) {
const auto resource_name = getResourceName();
if (scoped_rds.srds_resources_locator().empty()) {
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
scoped_rds.scoped_rds_config_source(), Grpc::Common::typeUrl(resource_name), *scope_,
*this, resource_decoder_, {});
*this, resource_decoder_, {}),
Envoy::Config::SubscriptionPtr);
} else {
const auto srds_resources_locator = THROW_OR_RETURN_VALUE(
Envoy::Config::XdsResourceIdentifier::decodeUrl(scoped_rds.srds_resources_locator()),
xds::core::v3::ResourceLocator);
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().collectionSubscriptionFromUrl(
srds_resources_locator, scoped_rds.scoped_rds_config_source(), resource_name, *scope_,
*this, resource_decoder_);
*this, resource_decoder_),
Envoy::Config::SubscriptionPtr);
}

// TODO(tony612): consider not using the callback here.
Expand Down
5 changes: 3 additions & 2 deletions source/common/router/vhds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ VhdsSubscription::VhdsSubscription(RouteConfigUpdatePtr& config_update_info,
const auto resource_name = getResourceName();
Envoy::Config::SubscriptionOptions options;
options.use_namespace_matching_ = true;
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
config_update_info_->protobufConfigurationCast().vhds().config_source(),
Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, options);
Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, options),
Envoy::Config::SubscriptionPtr);
}

void VhdsSubscription::updateOnDemand(const std::string& with_route_config_name_prefix) {
Expand Down
12 changes: 8 additions & 4 deletions source/common/runtime/runtime_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,13 @@ absl::Status LoaderImpl::initLayers(Event::Dispatcher& dispatcher,
return loadNewSnapshot();
}

void LoaderImpl::initialize(Upstream::ClusterManager& cm) {
absl::Status LoaderImpl::initialize(Upstream::ClusterManager& cm) {
cm_ = &cm;

for (const auto& s : subscriptions_) {
s->createSubscription();
RETURN_IF_NOT_OK(s->createSubscription());
}
return absl::OkStatus();
}

void LoaderImpl::startRtdsSubscriptions(ReadyCallback on_done) {
Expand All @@ -632,11 +633,14 @@ RtdsSubscription::RtdsSubscription(
stats_scope_(store_.createScope("runtime")), resource_name_(rtds_layer.name()),
init_target_("RTDS " + resource_name_, [this]() { start(); }) {}

void RtdsSubscription::createSubscription() {
absl::Status RtdsSubscription::createSubscription() {
const auto resource_name = getResourceName();
subscription_ = parent_.cm_->subscriptionFactory().subscriptionFromConfigSource(
auto subscription_or_error = parent_.cm_->subscriptionFactory().subscriptionFromConfigSource(
config_source_, Grpc::Common::typeUrl(resource_name), *stats_scope_, *this, resource_decoder_,
{});
RETURN_IF_NOT_OK(subscription_or_error.status());
subscription_ = std::move(*subscription_or_error);
return absl::OkStatus();
}

absl::Status
Expand Down
Loading

0 comments on commit db9d7ea

Please sign in to comment.