Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscription: making exception free #34554

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ class SubscriptionFactory {
* @param resource_decoder how incoming opaque resource objects are to be decoded.
* @param options subscription options.
*
* @return SubscriptionPtr subscription object corresponding for config and type_url.
* @return SubscriptionPtr subscription object corresponding for config and type_url or error
* status.
*/
virtual SubscriptionPtr subscriptionFromConfigSource(
virtual absl::StatusOr<SubscriptionPtr> subscriptionFromConfigSource(
const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) PURE;
Expand All @@ -73,9 +74,10 @@ class SubscriptionFactory {
* CollectionSubscription object.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
*
* @return SubscriptionPtr subscription object corresponding for collection_locator.
* @return SubscriptionPtr subscription object corresponding for collection_locator or error
* status.
*/
virtual SubscriptionPtr
virtual absl::StatusOr<SubscriptionPtr>
collectionSubscriptionFromUrl(const xds::core::v3::ResourceLocator& collection_locator,
const envoy::config::core::v3::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
Expand Down
3 changes: 2 additions & 1 deletion envoy/runtime/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,9 @@ class Loader {
* the constructor is finished, with the exception of dynamic RTDS layers,
* which require ClusterManager.
* @param cm cluster manager reference.
* @return a status indicating if initialization was successful.
*/
virtual void initialize(Upstream::ClusterManager& cm) PURE;
virtual absl::Status initialize(Upstream::ClusterManager& cm) PURE;

/**
* @return const Snapshot& the current snapshot. This reference is safe to use for the duration of
Expand Down
69 changes: 39 additions & 30 deletions source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ SubscriptionFactoryImpl::SubscriptionFactoryImpl(
validation_visitor_(validation_visitor), api_(api), server_(server),
xds_resources_delegate_(xds_resources_delegate), xds_config_tracker_(xds_config_tracker) {}

SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
absl::StatusOr<SubscriptionPtr> SubscriptionFactoryImpl::subscriptionFromConfigSource(
const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) {
THROW_IF_NOT_OK(Config::Utility::checkLocalInfo(type_url, local_info_));
RETURN_IF_NOT_OK(Config::Utility::checkLocalInfo(type_url, local_info_));
SubscriptionStats stats = Utility::generateStats(scope);

std::string subscription_type = "";
Expand All @@ -50,29 +50,29 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(

switch (config.config_source_specifier_case()) {
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kPath: {
THROW_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(config.path(), api_));
RETURN_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(config.path(), api_));
subscription_type = "envoy.config_subscription.filesystem";
break;
}
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kPathConfigSource: {
THROW_IF_NOT_OK(
RETURN_IF_NOT_OK(
Utility::checkFilesystemSubscriptionBackingPath(config.path_config_source().path(), api_));
subscription_type = "envoy.config_subscription.filesystem";
break;
}
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kApiConfigSource: {
const envoy::config::core::v3::ApiConfigSource& api_config_source = config.api_config_source();
THROW_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.primaryClusters(),
api_config_source));
THROW_IF_NOT_OK(Utility::checkTransportVersion(api_config_source));
RETURN_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.primaryClusters(),
api_config_source));
RETURN_IF_NOT_OK(Utility::checkTransportVersion(api_config_source));
switch (api_config_source.api_type()) {
PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_GRPC:
throwEnvoyExceptionOrPanic("Unsupported config source AGGREGATED_GRPC");
return absl::InvalidArgumentError("Unsupported config source AGGREGATED_GRPC");
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC:
throwEnvoyExceptionOrPanic("Unsupported config source AGGREGATED_DELTA_GRPC");
return absl::InvalidArgumentError("Unsupported config source AGGREGATED_DELTA_GRPC");
case envoy::config::core::v3::ApiConfigSource::DEPRECATED_AND_UNAVAILABLE_DO_NOT_USE:
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
"REST_LEGACY no longer a supported ApiConfigSource. "
"Please specify an explicit supported api_type in the following config:\n" +
config.DebugString());
Expand All @@ -87,7 +87,7 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
break;
}
if (subscription_type.empty()) {
throwEnvoyExceptionOrPanic("Invalid API config source API type");
return absl::InvalidArgumentError("Invalid API config source API type");
}
break;
}
Expand All @@ -96,32 +96,32 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
break;
}
default:
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
"Missing config source specifier in envoy::config::core::v3::ConfigSource");
}
ConfigSubscriptionFactory* factory =
Registry::FactoryRegistry<ConfigSubscriptionFactory>::getFactory(subscription_type);
if (factory == nullptr) {
throwEnvoyExceptionOrPanic(fmt::format(
return absl::InvalidArgumentError(fmt::format(
"Didn't find a registered config subscription factory implementation for name: '{}'",
subscription_type));
}
return factory->create(data);
}

SubscriptionPtr createFromFactoryOrThrow(ConfigSubscriptionFactory::SubscriptionData& data,
absl::string_view subscription_type) {
absl::StatusOr<SubscriptionPtr> createFromFactory(ConfigSubscriptionFactory::SubscriptionData& data,
absl::string_view subscription_type) {
ConfigSubscriptionFactory* factory =
Registry::FactoryRegistry<ConfigSubscriptionFactory>::getFactory(subscription_type);
if (factory == nullptr) {
throwEnvoyExceptionOrPanic(fmt::format(
return absl::InvalidArgumentError(fmt::format(
"Didn't find a registered config subscription factory implementation for name: '{}'",
subscription_type));
}
return factory->create(data);
}

SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
absl::StatusOr<SubscriptionPtr> SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
const xds::core::v3::ResourceLocator& collection_locator,
const envoy::config::core::v3::ConfigSource& config, absl::string_view resource_type,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
Expand All @@ -148,59 +148,68 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
switch (collection_locator.scheme()) {
case xds::core::v3::ResourceLocator::FILE: {
const std::string path = Http::Utility::localPathFromFilePath(collection_locator.id());
THROW_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(path, api_));
RETURN_IF_NOT_OK(Utility::checkFilesystemSubscriptionBackingPath(path, api_));
factory_config.set_path(path);
return createFromFactoryOrThrow(data, "envoy.config_subscription.filesystem_collection");
auto ptr_or_error = createFromFactory(data, "envoy.config_subscription.filesystem_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
case xds::core::v3::ResourceLocator::XDSTP: {
if (resource_type != collection_locator.resource_type()) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("xdstp:// type does not match {} in {}", resource_type,
Config::XdsResourceIdentifier::encodeUrl(collection_locator)));
}
switch (config.config_source_specifier_case()) {
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kApiConfigSource: {
const envoy::config::core::v3::ApiConfigSource& api_config_source =
config.api_config_source();
THROW_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.primaryClusters(),
api_config_source));
RETURN_IF_NOT_OK(Utility::checkApiConfigSourceSubscriptionBackingCluster(
cm_.primaryClusters(), api_config_source));
// All Envoy collections currently are xDS resource graph roots and require node context
// parameters.
options.add_xdstp_node_context_params_ = true;
switch (api_config_source.api_type()) {
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
std::string type_url = TypeUtil::descriptorFullNameToTypeUrl(resource_type);
data.type_url_ = type_url;
return createFromFactoryOrThrow(data, "envoy.config_subscription.delta_grpc_collection");
auto ptr_or_error =
createFromFactory(data, "envoy.config_subscription.delta_grpc_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_GRPC:
FALLTHRU;
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC: {
return createFromFactoryOrThrow(data,
"envoy.config_subscription.aggregated_grpc_collection");
auto ptr_or_error =
createFromFactory(data, "envoy.config_subscription.aggregated_grpc_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
default:
throwEnvoyExceptionOrPanic(fmt::format("Unknown xdstp:// transport API type in {}",
api_config_source.DebugString()));
return absl::InvalidArgumentError(fmt::format("Unknown xdstp:// transport API type in {}",
api_config_source.DebugString()));
}
}
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kAds: {
// TODO(adisuissa): verify that the ADS is set up in delta-xDS mode.
// All Envoy collections currently are xDS resource graph roots and require node context
// parameters.
options.add_xdstp_node_context_params_ = true;
return createFromFactoryOrThrow(data, "envoy.config_subscription.ads_collection");
auto ptr_or_error = createFromFactory(data, "envoy.config_subscription.ads_collection");
RETURN_IF_NOT_OK(ptr_or_error.status());
return std::move(ptr_or_error.value());
}
default:
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
"Missing or not supported config source specifier in "
"envoy::config::core::v3::ConfigSource for a collection. Only ADS and "
"gRPC in delta-xDS mode are supported.");
}
}
default:
// TODO(htuch): Implement HTTP semantics for collection ResourceLocators.
throwEnvoyExceptionOrPanic("Unsupported code path");
return absl::InvalidArgumentError("Unsupported code path");
}
}

Expand Down
11 changes: 5 additions & 6 deletions source/common/config/subscription_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ class SubscriptionFactoryImpl : public SubscriptionFactory, Logger::Loggable<Log
XdsConfigTrackerOptRef xds_config_tracker);

// Config::SubscriptionFactory
SubscriptionPtr subscriptionFromConfigSource(const envoy::config::core::v3::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder,
const SubscriptionOptions& options) override;
SubscriptionPtr
absl::StatusOr<SubscriptionPtr> subscriptionFromConfigSource(
const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) override;
absl::StatusOr<SubscriptionPtr>
collectionSubscriptionFromUrl(const xds::core::v3::ResourceLocator& collection_locator,
const envoy::config::core::v3::ConfigSource& config,
absl::string_view resource_type, Stats::Scope& scope,
Expand Down
7 changes: 5 additions & 2 deletions source/common/filter/config_discovery_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ FilterConfigSubscription::FilterConfigSubscription(
filter_config_provider_manager_(filter_config_provider_manager),
subscription_id_(subscription_id) {
const auto resource_name = getResourceName();
subscription_ = cluster_manager.subscriptionFactory().subscriptionFromConfigSource(
config_source, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {});
subscription_ =
THROW_OR_RETURN_VALUE(cluster_manager.subscriptionFactory().subscriptionFromConfigSource(
config_source, Grpc::Common::typeUrl(resource_name), *scope_, *this,
resource_decoder_, {}),
Config::SubscriptionPtr);
}

void FilterConfigSubscription::start() {
Expand Down
12 changes: 8 additions & 4 deletions source/common/listener_manager/lds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ LdsApiImpl::LdsApiImpl(const envoy::config::core::v3::ConfigSource& lds_config,
init_target_("LDS", [this]() { subscription_->start({}); }) {
const auto resource_name = getResourceName();
if (lds_resources_locator == nullptr) {
subscription_ = cm.subscriptionFactory().subscriptionFromConfigSource(
lds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {});
subscription_ = THROW_OR_RETURN_VALUE(cm.subscriptionFactory().subscriptionFromConfigSource(
lds_config, Grpc::Common::typeUrl(resource_name),
*scope_, *this, resource_decoder_, {}),
Config::SubscriptionPtr);
} else {
subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl(
*lds_resources_locator, lds_config, resource_name, *scope_, *this, resource_decoder_);
subscription_ = THROW_OR_RETURN_VALUE(
cm.subscriptionFactory().collectionSubscriptionFromUrl(
*lds_resources_locator, lds_config, resource_name, *scope_, *this, resource_decoder_),
Config::SubscriptionPtr);
}
init_manager.add(init_target_);
}
Expand Down
5 changes: 3 additions & 2 deletions source/common/rds/rds_route_config_subscription.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ RdsRouteConfigSubscription::RdsRouteConfigSubscription(
manager_identifier_(manager_identifier), config_update_info_(std::move(config_update)),
resource_decoder_(std::move(resource_decoder)) {
const auto resource_type = route_config_provider_manager_.protoTraits().resourceType();
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
config_source, Envoy::Grpc::Common::typeUrl(resource_type), *scope_, *this,
resource_decoder_, {});
resource_decoder_, {}),
Envoy::Config::SubscriptionPtr);
local_init_manager_.add(local_init_target_);
}

Expand Down
10 changes: 6 additions & 4 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,20 @@ ScopedRdsConfigSubscription::ScopedRdsConfigSubscription(
route_config_provider_manager_(route_config_provider_manager) {
const auto resource_name = getResourceName();
if (scoped_rds.srds_resources_locator().empty()) {
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
scoped_rds.scoped_rds_config_source(), Grpc::Common::typeUrl(resource_name), *scope_,
*this, resource_decoder_, {});
*this, resource_decoder_, {}),
Envoy::Config::SubscriptionPtr);
} else {
const auto srds_resources_locator = THROW_OR_RETURN_VALUE(
Envoy::Config::XdsResourceIdentifier::decodeUrl(scoped_rds.srds_resources_locator()),
xds::core::v3::ResourceLocator);
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().collectionSubscriptionFromUrl(
srds_resources_locator, scoped_rds.scoped_rds_config_source(), resource_name, *scope_,
*this, resource_decoder_);
*this, resource_decoder_),
Envoy::Config::SubscriptionPtr);
}

// TODO(tony612): consider not using the callback here.
Expand Down
5 changes: 3 additions & 2 deletions source/common/router/vhds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ VhdsSubscription::VhdsSubscription(RouteConfigUpdatePtr& config_update_info,
const auto resource_name = getResourceName();
Envoy::Config::SubscriptionOptions options;
options.use_namespace_matching_ = true;
subscription_ =
subscription_ = THROW_OR_RETURN_VALUE(
factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
config_update_info_->protobufConfigurationCast().vhds().config_source(),
Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, options);
Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, options),
Envoy::Config::SubscriptionPtr);
}

void VhdsSubscription::updateOnDemand(const std::string& with_route_config_name_prefix) {
Expand Down
12 changes: 8 additions & 4 deletions source/common/runtime/runtime_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,13 @@ absl::Status LoaderImpl::initLayers(Event::Dispatcher& dispatcher,
return loadNewSnapshot();
}

void LoaderImpl::initialize(Upstream::ClusterManager& cm) {
absl::Status LoaderImpl::initialize(Upstream::ClusterManager& cm) {
cm_ = &cm;

for (const auto& s : subscriptions_) {
s->createSubscription();
RETURN_IF_NOT_OK(s->createSubscription());
}
return absl::OkStatus();
}

void LoaderImpl::startRtdsSubscriptions(ReadyCallback on_done) {
Expand All @@ -632,11 +633,14 @@ RtdsSubscription::RtdsSubscription(
stats_scope_(store_.createScope("runtime")), resource_name_(rtds_layer.name()),
init_target_("RTDS " + resource_name_, [this]() { start(); }) {}

void RtdsSubscription::createSubscription() {
absl::Status RtdsSubscription::createSubscription() {
const auto resource_name = getResourceName();
subscription_ = parent_.cm_->subscriptionFactory().subscriptionFromConfigSource(
auto subscription_or_error = parent_.cm_->subscriptionFactory().subscriptionFromConfigSource(
config_source_, Grpc::Common::typeUrl(resource_name), *stats_scope_, *this, resource_decoder_,
{});
RETURN_IF_NOT_OK(subscription_or_error.status());
subscription_ = std::move(*subscription_or_error);
return absl::OkStatus();
}

absl::Status
Expand Down
Loading