Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: subscription factory dependency injection. #7200

Merged
merged 13 commits into from
Jun 11, 2019
9 changes: 9 additions & 0 deletions include/envoy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "subscription_factory_interface",
hdrs = ["subscription_factory.h"],
deps = [
":subscription_interface",
"@envoy_api//envoy/api/v2/core:config_source_cc",
],
)

envoy_cc_library(
name = "subscription_interface",
hdrs = ["subscription.h"],
Expand Down
2 changes: 2 additions & 0 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class Subscription {
virtual void updateResources(const std::set<std::string>& update_to_these_names) PURE;
};

using SubscriptionPtr = std::unique_ptr<Subscription>;

/**
* Per subscription stats. @see stats_macros.h
*/
Expand Down
32 changes: 32 additions & 0 deletions include/envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include "envoy/api/api.h"
#include "envoy/api/v2/core/config_source.pb.h"
#include "envoy/config/subscription.h"
#include "envoy/stats/scope.h"

namespace Envoy {
namespace Config {

class SubscriptionFactory {
public:
virtual ~SubscriptionFactory() {}
htuch marked this conversation as resolved.
Show resolved Hide resolved

/**
* Subscription factory interface.
*
* @param config envoy::api::v2::core::ConfigSource to construct from.
* @param type_url type URL for the resource being subscribed to.
* @param scope stats scope for any stats tracked by the subscription.
* @param callbacks the callbacks needed by all Subscription objects, to deliver config updates.
* The callbacks must not result in the deletion of the Subscription object.
* @return SubscriptionPtr subscription object corresponding for config and type_url.
*/
virtual SubscriptionPtr
subscriptionFromConfigSource(const envoy::api::v2::core::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
SubscriptionCallbacks& callbacks) PURE;
};

} // namespace Config
} // namespace Envoy
1 change: 1 addition & 0 deletions include/envoy/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ envoy_cc_library(
":upstream_interface",
"//include/envoy/access_log:access_log_interface",
"//include/envoy/config:grpc_mux_interface",
"//include/envoy/config:subscription_factory_interface",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/http:async_client_interface",
"//include/envoy/http:conn_pool_interface",
Expand Down
11 changes: 10 additions & 1 deletion include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "envoy/api/v2/cds.pb.h"
#include "envoy/config/bootstrap/v2/bootstrap.pb.h"
#include "envoy/config/grpc_mux.h"
#include "envoy/config/subscription_factory.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/http/async_client.h"
#include "envoy/http/conn_pool.h"
Expand Down Expand Up @@ -131,7 +132,7 @@ class ClusterManager {
* If information about the cluster needs to be kept, use the ThreadLocalCluster::info() method to
* obtain cluster information that is safe to store.
*/
virtual ThreadLocalCluster* get(const std::string& cluster) PURE;
virtual ThreadLocalCluster* get(absl::string_view cluster) PURE;

/**
* Allocate a load balanced HTTP connection pool for a cluster. This is *per-thread* so that
Expand Down Expand Up @@ -235,6 +236,14 @@ class ClusterManager {

virtual ClusterManagerFactory& clusterManagerFactory() PURE;

/**
* Obtain the subscription factory for the cluster manager. Since subscriptions may have an
* upstream component, the factory is a facet of the cluster manager.
*
* @return Config::SubscriptionFactory& the subscription factory.
*/
virtual Config::SubscriptionFactory& subscriptionFactory() PURE;

virtual std::size_t warmingClusterCount() const PURE;
};

Expand Down
5 changes: 3 additions & 2 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ envoy_cc_library(

envoy_cc_library(
name = "subscription_factory_lib",
srcs = ["subscription_factory.cc"],
hdrs = ["subscription_factory.h"],
srcs = ["subscription_factory_impl.cc"],
hdrs = ["subscription_factory_impl.h"],
deps = [
":delta_subscription_lib",
":filesystem_subscription_lib",
Expand All @@ -321,6 +321,7 @@ envoy_cc_library(
":http_subscription_lib",
":type_to_endpoint_lib",
":utility_lib",
"//include/envoy/config:subscription_factory_interface",
"//include/envoy/config:subscription_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/protobuf",
Expand Down
42 changes: 0 additions & 42 deletions source/common/config/subscription_factory.h

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "common/config/subscription_factory.h"
#include "common/config/subscription_factory_impl.h"

#include "common/config/delta_subscription_impl.h"
#include "common/config/filesystem_subscription_impl.h"
Expand All @@ -12,24 +12,29 @@
namespace Envoy {
namespace Config {

std::unique_ptr<Subscription> SubscriptionFactory::subscriptionFromConfigSource(
const envoy::api::v2::core::ConfigSource& config, const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, Runtime::RandomGenerator& random,
Stats::Scope& scope, absl::string_view type_url,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api,
SubscriptionCallbacks& callbacks) {
SubscriptionFactoryImpl::SubscriptionFactoryImpl(
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher,
Upstream::ClusterManager& cm, Runtime::RandomGenerator& random,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
: local_info_(local_info), dispatcher_(dispatcher), cm_(cm), random_(random),
validation_visitor_(validation_visitor), api_(api) {}

SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
const envoy::api::v2::core::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks) {
Config::Utility::checkLocalInfo(type_url, local_info_);
std::unique_ptr<Subscription> result;
SubscriptionStats stats = Utility::generateStats(scope);
switch (config.config_source_specifier_case()) {
case envoy::api::v2::core::ConfigSource::kPath: {
Utility::checkFilesystemSubscriptionBackingPath(config.path(), api);
Utility::checkFilesystemSubscriptionBackingPath(config.path(), api_);
result = std::make_unique<Config::FilesystemSubscriptionImpl>(
dispatcher, config.path(), callbacks, stats, validation_visitor, api);
dispatcher_, config.path(), callbacks, stats, validation_visitor_, api_);
break;
}
case envoy::api::v2::core::ConfigSource::kApiConfigSource: {
const envoy::api::v2::core::ApiConfigSource& api_config_source = config.api_config_source();
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm.clusters(), api_config_source);
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.clusters(), api_config_source);
switch (api_config_source.api_type()) {
case envoy::api::v2::core::ApiConfigSource::UNSUPPORTED_REST_LEGACY:
throw EnvoyException(
Expand All @@ -38,29 +43,29 @@ std::unique_ptr<Subscription> SubscriptionFactory::subscriptionFromConfigSource(
config.DebugString());
case envoy::api::v2::core::ApiConfigSource::REST:
result = std::make_unique<HttpSubscriptionImpl>(
local_info, cm, api_config_source.cluster_names()[0], dispatcher, random,
local_info_, cm_, api_config_source.cluster_names()[0], dispatcher_, random_,
Utility::apiConfigSourceRefreshDelay(api_config_source),
Utility::apiConfigSourceRequestTimeout(api_config_source), restMethod(type_url),
callbacks, stats, Utility::configSourceInitialFetchTimeout(config), validation_visitor);
callbacks, stats, Utility::configSourceInitialFetchTimeout(config), validation_visitor_);
break;
case envoy::api::v2::core::ApiConfigSource::GRPC:
result = std::make_unique<GrpcSubscriptionImpl>(
local_info,
Config::Utility::factoryForGrpcApiConfigSource(cm.grpcAsyncClientManager(),
local_info_,
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope)
->create(),
dispatcher, random, sotwGrpcMethod(type_url), type_url, callbacks, stats, scope,
dispatcher_, random_, sotwGrpcMethod(type_url), type_url, callbacks, stats, scope,
Utility::parseRateLimitSettings(api_config_source),
Utility::configSourceInitialFetchTimeout(config));
break;
case envoy::api::v2::core::ApiConfigSource::DELTA_GRPC: {
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm.clusters(), api_config_source);
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm_.clusters(), api_config_source);
result = std::make_unique<DeltaSubscriptionImpl>(
local_info,
Config::Utility::factoryForGrpcApiConfigSource(cm.grpcAsyncClientManager(),
local_info_,
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope)
->create(),
dispatcher, deltaGrpcMethod(type_url), type_url, random, scope,
dispatcher_, deltaGrpcMethod(type_url), type_url, random_, scope,
Utility::parseRateLimitSettings(api_config_source), callbacks, stats,
Utility::configSourceInitialFetchTimeout(config));
break;
Expand All @@ -72,7 +77,7 @@ std::unique_ptr<Subscription> SubscriptionFactory::subscriptionFromConfigSource(
}
case envoy::api::v2::core::ConfigSource::kAds: {
result = std::make_unique<GrpcMuxSubscriptionImpl>(
cm.adsMux(), callbacks, stats, type_url, dispatcher,
cm_.adsMux(), callbacks, stats, type_url, dispatcher_,
Utility::configSourceInitialFetchTimeout(config));
break;
}
Expand Down
34 changes: 34 additions & 0 deletions source/common/config/subscription_factory_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "envoy/api/api.h"
#include "envoy/api/v2/core/base.pb.h"
#include "envoy/config/subscription.h"
#include "envoy/config/subscription_factory.h"
#include "envoy/stats/scope.h"
#include "envoy/upstream/cluster_manager.h"

namespace Envoy {
namespace Config {

class SubscriptionFactoryImpl : public SubscriptionFactory {
public:
SubscriptionFactoryImpl(const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher,
Upstream::ClusterManager& cm, Runtime::RandomGenerator& random,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api);

// Config::SubscriptionFactory
SubscriptionPtr subscriptionFromConfigSource(const envoy::api::v2::core::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
SubscriptionCallbacks& callbacks) override;

private:
const LocalInfo::LocalInfo& local_info_;
Event::Dispatcher& dispatcher_;
Upstream::ClusterManager& cm_;
Runtime::RandomGenerator& random_;
ProtobufMessage::ValidationVisitor& validation_visitor_;
Api::Api& api_;
};

} // namespace Config
} // namespace Envoy
9 changes: 4 additions & 5 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void Utility::translateApiConfigSource(const std::string& cluster, uint32_t refr
Protobuf::util::TimeUtil::MillisecondsToDuration(refresh_delay_ms));
}

void Utility::checkCluster(const std::string& error_prefix, const std::string& cluster_name,
void Utility::checkCluster(absl::string_view error_prefix, absl::string_view cluster_name,
Upstream::ClusterManager& cm) {
Upstream::ThreadLocalCluster* cluster = cm.get(cluster_name);
if (cluster == nullptr) {
Expand All @@ -58,15 +58,14 @@ void Utility::checkCluster(const std::string& error_prefix, const std::string& c
}
}

void Utility::checkClusterAndLocalInfo(const std::string& error_prefix,
const std::string& cluster_name,
Upstream::ClusterManager& cm,
void Utility::checkClusterAndLocalInfo(absl::string_view error_prefix,
absl::string_view cluster_name, Upstream::ClusterManager& cm,
const LocalInfo::LocalInfo& local_info) {
checkCluster(error_prefix, cluster_name, cm);
checkLocalInfo(error_prefix, local_info);
}

void Utility::checkLocalInfo(const std::string& error_prefix,
void Utility::checkLocalInfo(absl::string_view error_prefix,
const LocalInfo::LocalInfo& local_info) {
if (local_info.clusterName().empty() || local_info.nodeName().empty()) {
throw EnvoyException(
Expand Down
9 changes: 4 additions & 5 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Utility {
* @param cluster_name supplies the cluster name to check.
* @param cm supplies the cluster manager.
*/
static void checkCluster(const std::string& error_prefix, const std::string& cluster_name,
static void checkCluster(absl::string_view error_prefix, absl::string_view cluster_name,
Upstream::ClusterManager& cm);

/**
Expand All @@ -120,17 +120,16 @@ class Utility {
* @param cm supplies the cluster manager.
* @param local_info supplies the local info.
*/
static void checkClusterAndLocalInfo(const std::string& error_prefix,
const std::string& cluster_name,
Upstream::ClusterManager& cm,
static void checkClusterAndLocalInfo(absl::string_view error_prefix,
absl::string_view cluster_name, Upstream::ClusterManager& cm,
const LocalInfo::LocalInfo& local_info);

/**
* Check local info for API config sanity. Throws on error.
* @param error_prefix supplies the prefix to use in error messages.
* @param local_info supplies the local info.
*/
static void checkLocalInfo(const std::string& error_prefix,
static void checkLocalInfo(absl::string_view error_prefix,
const LocalInfo::LocalInfo& local_info);

/**
Expand Down
2 changes: 0 additions & 2 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ envoy_cc_library(
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/config:rds_json_lib",
"//source/common/config:subscription_factory_lib",
"//source/common/config:utility_lib",
"//source/common/init:target_lib",
"//source/common/protobuf:utility_lib",
Expand Down Expand Up @@ -181,7 +180,6 @@ envoy_cc_library(
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/config:config_provider_lib",
"//source/common/config:subscription_factory_lib",
"@envoy_api//envoy/admin/v2alpha:config_dump_cc",
"@envoy_api//envoy/api/v2:srds_cc",
],
Expand Down
12 changes: 5 additions & 7 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,11 @@ RdsRouteConfigSubscription::RdsRouteConfigSubscription(
route_config_provider_manager_(route_config_provider_manager),
manager_identifier_(manager_identifier),
validation_visitor_(factory_context_.messageValidationVisitor()) {
Envoy::Config::Utility::checkLocalInfo("rds", factory_context.localInfo());

subscription_ = Envoy::Config::SubscriptionFactory::subscriptionFromConfigSource(
rds.config_source(), factory_context.localInfo(), factory_context.dispatcher(),
factory_context.clusterManager(), factory_context.random(), *scope_,
Grpc::Common::typeUrl(envoy::api::v2::RouteConfiguration().GetDescriptor()->full_name()),
factory_context.messageValidationVisitor(), factory_context.api(), *this);
subscription_ =
factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
rds.config_source(),
Grpc::Common::typeUrl(envoy::api::v2::RouteConfiguration().GetDescriptor()->full_name()),
*scope_, *this);

config_update_info_ = std::make_unique<RouteConfigUpdateReceiverImpl>(
factory_context.timeSource(), factory_context.messageValidationVisitor());
Expand Down
Loading