diff --git a/source/common/tcp/async_tcp_client_impl.cc b/source/common/tcp/async_tcp_client_impl.cc index 4d6482cb5dc6..0d6ce288c8ed 100644 --- a/source/common/tcp/async_tcp_client_impl.cc +++ b/source/common/tcp/async_tcp_client_impl.cc @@ -24,6 +24,14 @@ AsyncTcpClientImpl::AsyncTcpClientImpl(Event::Dispatcher& dispatcher, connect_timer_(dispatcher.createTimer([this]() { onConnectTimeout(); })), enable_half_close_(enable_half_close) {} +AsyncTcpClientImpl::~AsyncTcpClientImpl() { + if (connection_) { + connection_->removeConnectionCallbacks(*this); + } + + close(Network::ConnectionCloseType::NoFlush); +} + bool AsyncTcpClientImpl::connect() { if (connection_) { return false; @@ -69,7 +77,8 @@ void AsyncTcpClientImpl::onConnectTimeout() { } void AsyncTcpClientImpl::close(Network::ConnectionCloseType type) { - if (connection_) { + if (connection_ && !closing_) { + closing_ = true; connection_->close(type); } } @@ -127,6 +136,7 @@ void AsyncTcpClientImpl::onEvent(Network::ConnectionEvent event) { detected_close_ = connection_->detectedCloseType(); } + closing_ = false; dispatcher_.deferredDelete(std::move(connection_)); if (callbacks_) { callbacks_->onEvent(event); diff --git a/source/common/tcp/async_tcp_client_impl.h b/source/common/tcp/async_tcp_client_impl.h index ef965ca68cc5..2f239b757028 100644 --- a/source/common/tcp/async_tcp_client_impl.h +++ b/source/common/tcp/async_tcp_client_impl.h @@ -28,6 +28,7 @@ class AsyncTcpClientImpl : public AsyncTcpClient, AsyncTcpClientImpl(Event::Dispatcher& dispatcher, Upstream::ThreadLocalCluster& thread_local_cluster, Upstream::LoadBalancerContext* context, bool enable_half_close); + ~AsyncTcpClientImpl(); void close(Network::ConnectionCloseType type) override; @@ -106,6 +107,7 @@ class AsyncTcpClientImpl : public AsyncTcpClient, Event::TimerPtr connect_timer_; AsyncTcpClientCallbacks* callbacks_{}; Network::DetectedCloseType detected_close_{Network::DetectedCloseType::Normal}; + bool closing_{false}; bool connected_{false}; bool enable_half_close_{false}; }; diff --git a/test/common/tcp/async_tcp_client_impl_test.cc b/test/common/tcp/async_tcp_client_impl_test.cc index f185545a1223..409808bdfde7 100644 --- a/test/common/tcp/async_tcp_client_impl_test.cc +++ b/test/common/tcp/async_tcp_client_impl_test.cc @@ -18,6 +18,15 @@ using testing::Return; namespace Envoy { namespace Tcp { +class CustomMockClientConnection : public Network::MockClientConnection { +public: + ~CustomMockClientConnection() { + if (state_ != Connection::State::Closed) { + raiseEvent(Network::ConnectionEvent::LocalClose); + } + }; +}; + class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public testing::Test { public: AsyncTcpClientImplTest() = default; @@ -32,7 +41,7 @@ class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public test } void expectCreateConnection(bool trigger_connected = true) { - connection_ = new NiceMock(); + connection_ = new NiceMock(); Upstream::MockHost::MockCreateConnectionData conn_info; connection_->streamInfo().setAttemptCount(1); conn_info.connection_ = connection_; @@ -59,7 +68,7 @@ class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public test NiceMock* connect_timer_; NiceMock dispatcher_; NiceMock cluster_manager_; - Network::MockClientConnection* connection_{}; + CustomMockClientConnection* connection_{}; NiceMock callbacks_; }; diff --git a/test/integration/filters/test_network_async_tcp_filter.cc b/test/integration/filters/test_network_async_tcp_filter.cc index 6116b2d01dc1..58c80d793153 100644 --- a/test/integration/filters/test_network_async_tcp_filter.cc +++ b/test/integration/filters/test_network_async_tcp_filter.cc @@ -41,7 +41,8 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter { const test::integration::filters::TestNetworkAsyncTcpFilterConfig& config, Stats::Scope& scope, Upstream::ClusterManager& cluster_manager) : stats_(generateStats("test_network_async_tcp_filter", scope)), - cluster_name_(config.cluster_name()), cluster_manager_(cluster_manager) { + cluster_name_(config.cluster_name()), kill_after_on_data_(config.kill_after_on_data()), + cluster_manager_(cluster_manager) { const auto thread_local_cluster = cluster_manager_.getThreadLocalCluster(cluster_name_); options_ = std::make_shared(true); if (thread_local_cluster != nullptr) { @@ -60,6 +61,11 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter { data.length()); client_->write(data, end_stream); + if (kill_after_on_data_) { + Tcp::AsyncTcpClient* c1 = client_.release(); + delete c1; + } + return Network::FilterStatus::StopIteration; } @@ -166,6 +172,7 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter { TestNetworkAsyncTcpFilterStats stats_; Tcp::AsyncTcpClientPtr client_; absl::string_view cluster_name_; + bool kill_after_on_data_; std::unique_ptr request_callbacks_; std::unique_ptr downstream_callbacks_; Upstream::ClusterManager& cluster_manager_; diff --git a/test/integration/filters/test_network_async_tcp_filter.proto b/test/integration/filters/test_network_async_tcp_filter.proto index bcb4d9beee34..fc84979375bb 100644 --- a/test/integration/filters/test_network_async_tcp_filter.proto +++ b/test/integration/filters/test_network_async_tcp_filter.proto @@ -4,4 +4,5 @@ package test.integration.filters; message TestNetworkAsyncTcpFilterConfig { string cluster_name = 1; + bool kill_after_on_data = 2; } diff --git a/test/integration/tcp_async_client_integration_test.cc b/test/integration/tcp_async_client_integration_test.cc index 89c4e29c1771..f0a9932bbc0a 100644 --- a/test/integration/tcp_async_client_integration_test.cc +++ b/test/integration/tcp_async_client_integration_test.cc @@ -1,3 +1,4 @@ +#include "test/integration/filters/test_network_async_tcp_filter.pb.h" #include "test/integration/integration.h" #include "gtest/gtest.h" @@ -16,15 +17,37 @@ class TcpAsyncClientIntegrationTest : public testing::TestWithParam void { + test::integration::filters::TestNetworkAsyncTcpFilterConfig proto_config; + TestUtility::loadFromYaml(yaml, proto_config); + + auto* listener = bootstrap.mutable_static_resources()->mutable_listeners(0); + auto* filter_chain = listener->mutable_filter_chains(0); + auto* filter = filter_chain->mutable_filters(0); + filter->mutable_typed_config()->PackFrom(proto_config); + }); + + BaseIntegrationTest::initialize(); + } }; INSTANTIATE_TEST_SUITE_P(IpVersions, TcpAsyncClientIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); TEST_P(TcpAsyncClientIntegrationTest, SingleRequest) { - enableHalfClose(true); - initialize(); + init(); std::string request("request"); std::string response("response"); @@ -51,8 +74,7 @@ TEST_P(TcpAsyncClientIntegrationTest, SingleRequest) { } TEST_P(TcpAsyncClientIntegrationTest, MultipleRequestFrames) { - enableHalfClose(true); - initialize(); + init(); std::string data_frame_1("data_frame_1"); std::string data_frame_2("data_frame_2"); @@ -85,8 +107,7 @@ TEST_P(TcpAsyncClientIntegrationTest, MultipleRequestFrames) { } TEST_P(TcpAsyncClientIntegrationTest, MultipleResponseFrames) { - enableHalfClose(true); - initialize(); + init(); std::string data_frame_1("data_frame_1"); std::string response_1("response_1"); @@ -116,8 +137,7 @@ TEST_P(TcpAsyncClientIntegrationTest, Reconnect) { return; } - enableHalfClose(true); - initialize(); + init(); IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0")); ASSERT_TRUE(tcp_client->write("hello1", false)); @@ -143,11 +163,24 @@ TEST_P(TcpAsyncClientIntegrationTest, Reconnect) { test_server_->waitForGaugeEq("cluster.cluster_0.upstream_cx_active", 0); } +TEST_P(TcpAsyncClientIntegrationTest, ClientTearDown) { + init(true); + + std::string request("request"); + + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0")); + ASSERT_TRUE(tcp_client->write(request, true)); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + ASSERT_TRUE(fake_upstream_connection->waitForData(request.size())); + + tcp_client->close(); +} + #if ENVOY_PLATFORM_ENABLE_SEND_RST // Test if RST close can be detected from downstream and upstream is closed by RST. TEST_P(TcpAsyncClientIntegrationTest, TestClientCloseRST) { - enableHalfClose(true); - initialize(); + init(); std::string request("request"); std::string response("response"); @@ -178,8 +211,7 @@ TEST_P(TcpAsyncClientIntegrationTest, TestClientCloseRST) { // Test if RST close can be detected from upstream. TEST_P(TcpAsyncClientIntegrationTest, TestUpstreamCloseRST) { - enableHalfClose(true); - initialize(); + init(); std::string request("request"); std::string response("response"); @@ -212,8 +244,7 @@ TEST_P(TcpAsyncClientIntegrationTest, TestUpstreamCloseRST) { // the client. The behavior is different for windows, since RST support is literally supported for // unix like system, disabled the test for windows. TEST_P(TcpAsyncClientIntegrationTest, TestDownstremHalfClosedThenRST) { - enableHalfClose(true); - initialize(); + init(); std::string request("request"); std::string response("response");