diff --git a/api/envoy/api/v2/cds.proto b/api/envoy/api/v2/cds.proto index 647a5015063f..e7df4a940bc6 100644 --- a/api/envoy/api/v2/cds.proto +++ b/api/envoy/api/v2/cds.proto @@ -583,6 +583,10 @@ message Cluster { // If panic mode is triggered, new hosts are still eligible for traffic; they simply do not // contribute to the calculation when deciding whether panic mode is enabled or not. bool ignore_new_hosts_until_first_hc = 5; + + // If set to `true`, the cluster manager will drain all existing + // connections to upstream hosts whenever hosts are added or removed from the cluster. + bool close_connections_on_host_set_change = 6; } // Common configuration for all load balancer implementations. diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 60b1d5cbf208..609554b4d374 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -42,6 +42,7 @@ Version history certificate validation context. * upstream: added network filter chains to upstream connections, see :ref:`filters`. * upstream: use p2c to select hosts for least-requests load balancers if all host weights are the same, even in cases where weights are not equal to 1. +* upstream: added :ref:`an option ` that allows draining HTTP, TCP connection pools on cluster membership change. * zookeeper: parse responses and emit latency stats. 1.11.1 (August 13, 2019) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 78bfade0a9d3..1dd78f2dabfa 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -314,12 +314,21 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { // Now setup for cross-thread updates. cluster.prioritySet().addMemberUpdateCb( [&cluster, this](const HostVector&, const HostVector& hosts_removed) -> void { - // TODO(snowp): Should this be subject to merge windows? - - // Whenever hosts are removed from the cluster, we make each TLS cluster drain it's - // connection pools for the removed hosts. - if (!hosts_removed.empty()) { - postThreadLocalHostRemoval(cluster, hosts_removed); + if (cluster.info()->lbConfig().close_connections_on_host_set_change()) { + for (const auto& host_set : cluster.prioritySet().hostSetsPerPriority()) { + // This will drain all tcp and http connection pools. + postThreadLocalDrainConnections(cluster, host_set->hosts()); + } + } else { + // TODO(snowp): Should this be subject to merge windows? + + // Whenever hosts are removed from the cluster, we make each TLS cluster drain it's + // connection pools for the removed hosts. If `close_connections_on_host_set_change` is + // enabled, this case will be covered by first `if` statement, where all + // connection pools are drained. + if (!hosts_removed.empty()) { + postThreadLocalDrainConnections(cluster, hosts_removed); + } } }); @@ -712,8 +721,8 @@ Tcp::ConnectionPool::Instance* ClusterManagerImpl::tcpConnPoolForCluster( return entry->second->tcpConnPool(priority, context, transport_socket_options); } -void ClusterManagerImpl::postThreadLocalHostRemoval(const Cluster& cluster, - const HostVector& hosts_removed) { +void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster, + const HostVector& hosts_removed) { tls_->runOnAllThreads([this, name = cluster.info()->name(), hosts_removed]() { ThreadLocalClusterManagerImpl::removeHosts(name, hosts_removed, *tls_); }); diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index ae0ecb2dbd40..00b892eaba67 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -232,7 +232,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::LoggablesetInitializedCb([&]() -> void { initialized.ready(); }); + + std::unique_ptr callbacks(new NiceMock()); + ClusterUpdateCallbacksHandlePtr cb = + cluster_manager_->addThreadLocalClusterUpdateCallbacks(*callbacks); + + EXPECT_FALSE(cluster_manager_->get("cluster_1")->info()->addedViaApi()); + + // Verify that we get no hosts when the HostSet is empty. + EXPECT_EQ(nullptr, cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr)); + EXPECT_EQ(nullptr, cluster_manager_->tcpConnPoolForCluster("cluster_1", ResourcePriority::Default, + nullptr, nullptr)); + EXPECT_EQ(nullptr, + cluster_manager_->tcpConnForCluster("cluster_1", nullptr, nullptr).connection_); + + Cluster& cluster = cluster_manager_->activeClusters().begin()->second; + + // Set up the HostSet. + HostSharedPtr host1 = makeTestHost(cluster.info(), "tcp://127.0.0.1:80"); + HostSharedPtr host2 = makeTestHost(cluster.info(), "tcp://127.0.0.1:81"); + + HostVector hosts{host1, host2}; + auto hosts_ptr = std::make_shared(hosts); + + // Sending non-mergeable updates. + cluster.prioritySet().updateHosts( + 0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, {}, + 100); + + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); + + EXPECT_CALL(factory_, allocateConnPool_(_, _)) + .Times(3) + .WillRepeatedly(ReturnNew()); + + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) + .Times(3) + .WillRepeatedly(ReturnNew()); + + // This should provide us a CP for each of the above hosts. + Http::ConnectionPool::MockInstance* cp1 = + dynamic_cast(cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr)); + // Create persistent connection for host2. + Http::ConnectionPool::MockInstance* cp2 = + dynamic_cast(cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http2, nullptr)); + + Tcp::ConnectionPool::MockInstance* tcp1 = + dynamic_cast(cluster_manager_->tcpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, nullptr, nullptr)); + + Tcp::ConnectionPool::MockInstance* tcp2 = + dynamic_cast(cluster_manager_->tcpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, nullptr, nullptr)); + + EXPECT_NE(cp1, cp2); + EXPECT_NE(tcp1, tcp2); + + EXPECT_CALL(*cp2, addDrainedCallback(_)) + .WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + EXPECT_CALL(*cp1, addDrainedCallback(_)) + .WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + EXPECT_CALL(*tcp1, addDrainedCallback(_)) + .WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + EXPECT_CALL(*tcp2, addDrainedCallback(_)) + .WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + HostVector hosts_removed; + hosts_removed.push_back(host2); + + // This update should drain all connection pools (host1, host2). + cluster.prioritySet().updateHosts( + 0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, {}, + hosts_removed, 100); + + // Recreate connection pool for host1. + cp1 = dynamic_cast(cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr)); + + tcp1 = dynamic_cast(cluster_manager_->tcpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, nullptr, nullptr)); + + HostSharedPtr host3 = makeTestHost(cluster.info(), "tcp://127.0.0.1:82"); + + HostVector hosts_added; + hosts_added.push_back(host3); + + EXPECT_CALL(*cp1, addDrainedCallback(_)) + .WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + EXPECT_CALL(*tcp1, addDrainedCallback(_)) + .WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + // Adding host3 should drain connection pool for host1. + cluster.prioritySet().updateHosts( + 0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, + hosts_added, {}, 100); +} + +TEST_F(ClusterManagerImplTest, ConnPoolsNotDrainedOnHostSetChange) { + const std::string yaml = R"EOF( + static_resources: + clusters: + - name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + type: STATIC + )EOF"; + + ReadyWatcher initialized; + EXPECT_CALL(initialized, ready()); + create(parseBootstrapFromV2Yaml(yaml)); + + // Set up for an initialize callback. + cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); + + std::unique_ptr callbacks(new NiceMock()); + ClusterUpdateCallbacksHandlePtr cb = + cluster_manager_->addThreadLocalClusterUpdateCallbacks(*callbacks); + + Cluster& cluster = cluster_manager_->activeClusters().begin()->second; + + // Set up the HostSet. + HostSharedPtr host1 = makeTestHost(cluster.info(), "tcp://127.0.0.1:80"); + + HostVector hosts{host1}; + auto hosts_ptr = std::make_shared(hosts); + + // Sending non-mergeable updates. + cluster.prioritySet().updateHosts( + 0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, {}, + 100); + + EXPECT_CALL(factory_, allocateConnPool_(_, _)) + .Times(1) + .WillRepeatedly(ReturnNew()); + + EXPECT_CALL(factory_, allocateTcpConnPool_(_)) + .Times(1) + .WillRepeatedly(ReturnNew()); + + // This should provide us a CP for each of the above hosts. + Http::ConnectionPool::MockInstance* cp1 = + dynamic_cast(cluster_manager_->httpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr)); + + Tcp::ConnectionPool::MockInstance* tcp1 = + dynamic_cast(cluster_manager_->tcpConnPoolForCluster( + "cluster_1", ResourcePriority::Default, nullptr, nullptr)); + + HostSharedPtr host2 = makeTestHost(cluster.info(), "tcp://127.0.0.1:82"); + HostVector hosts_added; + hosts_added.push_back(host2); + + // No connection pools should be drained. + EXPECT_CALL(*cp1, drainConnections()).Times(0); + EXPECT_CALL(*tcp1, drainConnections()).Times(0); + + // No connection pools should be drained. + cluster.prioritySet().updateHosts( + 0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, + hosts_added, {}, 100); +} + } // namespace } // namespace Upstream } // namespace Envoy