Skip to content

Commit

Permalink
Make cluster manager know about local cluster (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
RomanDzhabarov authored Oct 14, 2016
1 parent de6e783 commit 0809f52
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 29 deletions.
50 changes: 40 additions & 10 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,21 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, Stats::Store&
loadCluster(cluster, stats, dns_resolver, ssl_context_manager, runtime, random);
}

Optional<std::string> local_cluster_name;
if (config.hasObject("local_cluster_name")) {
local_cluster_name.value(config.getString("local_cluster_name"));
if (get(local_cluster_name.value()) == nullptr) {
throw EnvoyException(
fmt::format("local cluster '{}' must be defined", local_cluster_name.value()));
}
}

tls.set(thread_local_slot_,
[this, &stats, &runtime, &random, local_zone_name, local_address](
[this, &stats, &runtime, &random, local_zone_name, local_address, local_cluster_name](
Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectPtr {
return ThreadLocal::ThreadLocalObjectPtr{new ThreadLocalClusterManagerImpl(
*this, dispatcher, runtime, random, local_zone_name, local_address)};
*this, dispatcher, runtime, random, local_zone_name, local_address,
local_cluster_name)};
});

// To avoid threading issues, for those clusters that start with hosts already in them (like
Expand Down Expand Up @@ -209,12 +219,29 @@ Http::AsyncClient& ClusterManagerImpl::httpAsyncClientForCluster(const std::stri
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl(
ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, const std::string& local_zone_name,
const std::string& local_address)
const std::string& local_address, const Optional<std::string>& local_cluster_name)
: parent_(parent), dispatcher_(dispatcher) {
// If local cluster is defined then we need to initialize it first.
if (local_cluster_name.valid()) {
auto& local_cluster = parent.primary_clusters_[local_cluster_name.value()];
thread_local_clusters_[local_cluster_name.value()].reset(
new ClusterEntry(*this, *local_cluster, runtime, random, parent.stats_, dispatcher,
local_zone_name, local_address, nullptr));
}

const HostSet* local_host_set =
local_cluster_name.valid() ? &thread_local_clusters_[local_cluster_name.value()]->host_set_
: nullptr;

for (auto& cluster : parent.primary_clusters_) {
thread_local_clusters_[cluster.first].reset(new ClusterEntry(*this, *cluster.second, runtime,
random, parent.stats_, dispatcher,
local_zone_name, local_address));
// If local cluster name is set then we already initialized this cluster.
if (local_cluster_name.valid() && local_cluster_name.value() == cluster.first) {
continue;
}

thread_local_clusters_[cluster.first].reset(
new ClusterEntry(*this, *cluster.second, runtime, random, parent.stats_, dispatcher,
local_zone_name, local_address, local_host_set));
}

for (auto& cluster : thread_local_clusters_) {
Expand Down Expand Up @@ -281,23 +308,26 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::shutdown() {
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
ThreadLocalClusterManagerImpl& parent, const Cluster& cluster, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, Stats::Store& stats_store, Event::Dispatcher& dispatcher,
const std::string& local_zone_name, const std::string& local_address)
const std::string& local_zone_name, const std::string& local_address,
const HostSet* local_host_set)
: parent_(parent), primary_cluster_(cluster),
http_async_client_(
cluster, stats_store, dispatcher, local_zone_name, parent.parent_, runtime, random,
Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent.parent_)}, local_address) {

switch (cluster.lbType()) {
case LoadBalancerType::LeastRequest: {
lb_.reset(new LeastRequestLoadBalancer(host_set_, cluster.stats(), runtime, random));
lb_.reset(
new LeastRequestLoadBalancer(host_set_, local_host_set, cluster.stats(), runtime, random));
break;
}
case LoadBalancerType::Random: {
lb_.reset(new RandomLoadBalancer(host_set_, cluster.stats(), runtime, random));
lb_.reset(new RandomLoadBalancer(host_set_, local_host_set, cluster.stats(), runtime, random));
break;
}
case LoadBalancerType::RoundRobin: {
lb_.reset(new RoundRobinLoadBalancer(host_set_, cluster.stats(), runtime, random));
lb_.reset(
new RoundRobinLoadBalancer(host_set_, local_host_set, cluster.stats(), runtime, random));
break;
}
}
Expand Down
8 changes: 5 additions & 3 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class ClusterManagerImpl : public ClusterManager {
ClusterEntry(ThreadLocalClusterManagerImpl& parent, const Cluster& cluster,
Runtime::Loader& runtime, Runtime::RandomGenerator& random,
Stats::Store& stats_store, Event::Dispatcher& dispatcher,
const std::string& local_zone_name, const std::string& local_address);
const std::string& local_zone_name, const std::string& local_address,
const HostSet* local_host_set);

Http::ConnectionPool::Instance* connPool(ResourcePriority priority);

Expand All @@ -90,15 +91,16 @@ class ClusterManagerImpl : public ClusterManager {
ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
Runtime::Loader& runtime, Runtime::RandomGenerator& random,
const std::string& local_zone_name,
const std::string& local_address);
const std::string& local_address,
const Optional<std::string>& local_cluster_name);
void drainConnPools(HostPtr old_host, ConnPoolsContainer& container);
static void updateClusterMembership(const std::string& name, ConstHostVectorPtr hosts,
ConstHostVectorPtr healthy_hosts,
ConstHostVectorPtr local_zone_hosts,
ConstHostVectorPtr local_zone_healthy_hosts,
const std::vector<HostPtr>& hosts_added,
const std::vector<HostPtr>& hosts_removed,
ThreadLocal::Instance& tls, uint32_t thead_local_slot);
ThreadLocal::Instance& tls, uint32_t thread_local_slot);

// ThreadLocal::ThreadLocalObject
void shutdown() override;
Expand Down
7 changes: 4 additions & 3 deletions source/common/upstream/load_balancer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ ConstHostPtr RoundRobinLoadBalancer::chooseHost() {
return hosts_to_use[rr_index_++ % hosts_to_use.size()];
}

LeastRequestLoadBalancer::LeastRequestLoadBalancer(const HostSet& host_set, ClusterStats& stats,
Runtime::Loader& runtime,
LeastRequestLoadBalancer::LeastRequestLoadBalancer(const HostSet& host_set,
const HostSet* local_host_set,
ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random)
: LoadBalancerBase(host_set, stats, runtime, random) {
: LoadBalancerBase(host_set, local_host_set, stats, runtime, random) {
host_set.addMemberUpdateCb(
[this](const std::vector<HostPtr>&, const std::vector<HostPtr>& hosts_removed) -> void {
if (last_host_) {
Expand Down
22 changes: 13 additions & 9 deletions source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ namespace Upstream {
*/
class LoadBalancerBase {
protected:
LoadBalancerBase(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random)
: stats_(stats), runtime_(runtime), random_(random), host_set_(host_set) {}
LoadBalancerBase(const HostSet& host_set, const HostSet* local_host_set, ClusterStats& stats,
Runtime::Loader& runtime, Runtime::RandomGenerator& random)
: stats_(stats), runtime_(runtime), random_(random), host_set_(host_set),
local_host_set_(local_host_set) {}

/**
* Pick the host list to use (healthy or all depending on how many in the set are not healthy).
Expand All @@ -26,16 +27,18 @@ class LoadBalancerBase {

private:
const HostSet& host_set_;
const HostSet* local_host_set_;
};

/**
* Implementation of LoadBalancer that performs RR selection across the hosts in the cluster.
*/
class RoundRobinLoadBalancer : public LoadBalancer, LoadBalancerBase {
public:
RoundRobinLoadBalancer(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime,
RoundRobinLoadBalancer(const HostSet& host_set, const HostSet* local_host_set_,
ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random)
: LoadBalancerBase(host_set, stats, runtime, random) {}
: LoadBalancerBase(host_set, local_host_set_, stats, runtime, random) {}

// Upstream::LoadBalancer
ConstHostPtr chooseHost() override;
Expand All @@ -59,7 +62,8 @@ class RoundRobinLoadBalancer : public LoadBalancer, LoadBalancerBase {
*/
class LeastRequestLoadBalancer : public LoadBalancer, LoadBalancerBase {
public:
LeastRequestLoadBalancer(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime,
LeastRequestLoadBalancer(const HostSet& host_set, const HostSet* local_host_set_,
ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random);

// Upstream::LoadBalancer
Expand All @@ -75,9 +79,9 @@ class LeastRequestLoadBalancer : public LoadBalancer, LoadBalancerBase {
*/
class RandomLoadBalancer : public LoadBalancer, LoadBalancerBase {
public:
RandomLoadBalancer(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random)
: LoadBalancerBase(host_set, stats, runtime, random) {}
RandomLoadBalancer(const HostSet& host_set, const HostSet* local_host_set, ClusterStats& stats,
Runtime::Loader& runtime, Runtime::RandomGenerator& random)
: LoadBalancerBase(host_set, local_host_set, stats, runtime, random) {}

// Upstream::LoadBalancer
ConstHostPtr chooseHost() override;
Expand Down
59 changes: 59 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,65 @@ TEST_F(ClusterManagerImplTest, UnknownClusterType) {
EXPECT_THROW(create(loader), EnvoyException);
}

TEST_F(ClusterManagerImplTest, LocalClusterNotDefined) {
std::string json = R"EOF(
{
"local_cluster_name": "new_cluster",
"clusters": [
{
"name": "cluster_1",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11001"}]
},
{
"name": "cluster_2",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11002"}]
}]
}
)EOF";

Json::StringLoader loader(json);
EXPECT_THROW(create(loader), EnvoyException);
}

TEST_F(ClusterManagerImplTest, LocalClusterDefined) {
std::string json = R"EOF(
{
"local_cluster_name": "new_cluster",
"clusters": [
{
"name": "cluster_1",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11001"}]
},
{
"name": "cluster_2",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11002"}]
},
{
"name": "new_cluster",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11002"}]
}]
}
)EOF";

Json::StringLoader loader(json);
create(loader);
}

TEST_F(ClusterManagerImplTest, DuplicateCluster) {
std::string json = R"EOF(
{
Expand Down
6 changes: 3 additions & 3 deletions test/common/upstream/load_balancer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class RoundRobinLoadBalancerTest : public testing::Test {
NiceMock<Runtime::MockRandomGenerator> random_;
Stats::IsolatedStoreImpl stats_store_;
ClusterStats stats_;
RoundRobinLoadBalancer lb_{cluster_, stats_, runtime_, random_};
RoundRobinLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_};
};

TEST_F(RoundRobinLoadBalancerTest, NoHosts) { EXPECT_EQ(nullptr, lb_.chooseHost()); }
Expand Down Expand Up @@ -196,7 +196,7 @@ class LeastRequestLoadBalancerTest : public testing::Test {
NiceMock<Runtime::MockRandomGenerator> random_;
Stats::IsolatedStoreImpl stats_store_;
ClusterStats stats_;
LeastRequestLoadBalancer lb_{cluster_, stats_, runtime_, random_};
LeastRequestLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_};
};

TEST_F(LeastRequestLoadBalancerTest, NoHosts) { EXPECT_EQ(nullptr, lb_.chooseHost()); }
Expand Down Expand Up @@ -354,7 +354,7 @@ class RandomLoadBalancerTest : public testing::Test {
NiceMock<Runtime::MockRandomGenerator> random_;
Stats::IsolatedStoreImpl stats_store_;
ClusterStats stats_;
RandomLoadBalancer lb_{cluster_, stats_, runtime_, random_};
RandomLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_};
};

TEST_F(RandomLoadBalancerTest, NoHosts) { EXPECT_EQ(nullptr, lb_.chooseHost()); }
Expand Down
2 changes: 1 addition & 1 deletion test/common/upstream/load_balancer_simulation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class DISABLED_SimulationTest : public testing::Test {
Stats::IsolatedStoreImpl stats_store_;
ClusterStats stats_;
// TODO: make per originating host load balancer.
RandomLoadBalancer lb_{cluster_, stats_, runtime_, random_};
RandomLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_};
};

TEST_F(DISABLED_SimulationTest, strictlyEqualDistribution) {
Expand Down

0 comments on commit 0809f52

Please sign in to comment.