Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upstream and downstream info in parent read callbacks in tcp too #9949

Merged
merged 5 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
TransportSocketPtr&& transport_socket, bool connected)
: ConnectionImplBase(dispatcher, next_global_id_++),
transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
filter_manager_(*this), stream_info_(dispatcher.timeSource()),
stream_info_(dispatcher.timeSource()), filter_manager_(*this),
write_buffer_(
dispatcher.getWatermarkFactory().create([this]() -> void { this->onLowWatermark(); },
[this]() -> void { this->onHighWatermark(); })),
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback

TransportSocketPtr transport_socket_;
ConnectionSocketPtr socket_;
FilterManagerImpl filter_manager_;
StreamInfo::StreamInfoImpl stream_info_;
FilterManagerImpl filter_manager_;

Buffer::OwnedImpl read_buffer_;
// This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has
Expand Down
9 changes: 6 additions & 3 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,9 @@ UpstreamDrainManager& Config::drainManager() {
return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
}

Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager,
TimeSource& time_source)
Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager)
: config_(config), cluster_manager_(cluster_manager), downstream_callbacks_(*this),
upstream_callbacks_(new UpstreamCallbacks(this)), stream_info_(time_source) {
upstream_callbacks_(new UpstreamCallbacks(this)) {
ASSERT(config != nullptr);
}

Expand Down Expand Up @@ -292,6 +291,10 @@ void Filter::readDisableDownstream(bool disable) {
}
}

StreamInfo::StreamInfo& Filter::getStreamInfo() {
return read_callbacks_->connection().streamInfo();
}

void Filter::DownstreamCallbacks::onAboveWriteBufferHighWatermark() {
ASSERT(!on_high_watermark_called_);
on_high_watermark_called_ = true;
Expand Down
6 changes: 2 additions & 4 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,7 @@ class Filter : public Network::ReadFilter,
Tcp::ConnectionPool::Callbacks,
protected Logger::Loggable<Logger::Id::filter> {
public:
Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager,
TimeSource& time_source);
Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager);
~Filter() override;

// Network::ReadFilter
Expand Down Expand Up @@ -302,7 +301,7 @@ class Filter : public Network::ReadFilter,
bool on_high_watermark_called_{false};
};

virtual StreamInfo::StreamInfo& getStreamInfo() { return stream_info_; }
virtual StreamInfo::StreamInfo& getStreamInfo();

protected:
struct DownstreamCallbacks : public Network::ConnectionCallbacks {
Expand Down Expand Up @@ -354,7 +353,6 @@ class Filter : public Network::ReadFilter,
std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
// read filter.
std::unique_ptr<GenericUpstream> upstream_;
StreamInfo::StreamInfoImpl stream_info_;
RouteConstSharedPtr route_;
Network::TransportSocketOptionsSharedPtr transport_socket_options_;
uint32_t connect_attempts_{};
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/filters/network/tcp_proxy/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ Network::FilterFactoryCb ConfigFactory::createFilterFactoryFromProtoTyped(
Envoy::TcpProxy::ConfigSharedPtr filter_config(
std::make_shared<Envoy::TcpProxy::Config>(proto_config, context));
return [filter_config, &context](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(std::make_shared<Envoy::TcpProxy::Filter>(
filter_config, context.clusterManager(), context.dispatcher().timeSource()));
filter_manager.addReadFilter(
std::make_shared<Envoy::TcpProxy::Filter>(filter_config, context.clusterManager()));
};
}

Expand Down
2 changes: 1 addition & 1 deletion test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class HttpConnectionManagerImplTest : public testing::Test, public ConnectionMan
bool shouldNormalizePath() const override { return normalize_path_; }
bool shouldMergeSlashes() const override { return merge_slashes_; }

DangerousDeprecatedTestTime test_time_;
Envoy::Event::SimulatedTimeSystem test_time_;
NiceMock<Router::MockRouteConfigProvider> route_config_provider_;
std::shared_ptr<Router::MockConfig> route_config_{new NiceMock<Router::MockConfig>()};
NiceMock<Router::MockScopedRouteConfigProvider> scoped_route_config_provider_;
Expand Down
3 changes: 1 addition & 2 deletions test/common/network/filter_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,7 @@ stat_prefix: name
tcp_proxy.set_cluster("fake_cluster");
TcpProxy::ConfigSharedPtr tcp_proxy_config(new TcpProxy::Config(tcp_proxy, factory_context));
manager.addReadFilter(
std::make_shared<TcpProxy::Filter>(tcp_proxy_config, factory_context.cluster_manager_,
factory_context.dispatcher().timeSource()));
std::make_shared<TcpProxy::Filter>(tcp_proxy_config, factory_context.cluster_manager_));

Extensions::Filters::Common::RateLimit::RequestCallbacks* request_callbacks{};
EXPECT_CALL(*rl_client, limit(_, "foo",
Expand Down
2 changes: 1 addition & 1 deletion test/common/stream_info/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ envoy_cc_test_library(
"//include/envoy/stream_info:stream_info_interface",
"//source/common/common:assert_lib",
"//source/common/stream_info:filter_state_lib",
"//test/test_common:test_time_lib",
"//test/test_common:simulated_time_system_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
Expand Down
4 changes: 2 additions & 2 deletions test/common/stream_info/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "common/common/assert.h"
#include "common/stream_info/filter_state_impl.h"

#include "test/test_common/test_time.h"
#include "test/test_common/simulated_time_system.h"

namespace Envoy {

Expand Down Expand Up @@ -242,7 +242,7 @@ class TestStreamInfo : public StreamInfo::StreamInfo {
std::string requested_server_name_;
std::string upstream_transport_failure_reason_;
const Http::HeaderMap* request_headers_{};
DangerousDeprecatedTestTime test_time_;
Envoy::Event::SimulatedTimeSystem test_time_;
};

} // namespace Envoy
68 changes: 40 additions & 28 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,11 @@ class TcpProxyTest : public testing::Test {
TcpProxyTest() {
ON_CALL(*factory_context_.access_log_manager_.file_, write(_))
.WillByDefault(SaveArg<0>(&access_log_data_));
ON_CALL(filter_callbacks_.connection_.stream_info_, onUpstreamHostSelected(_))
.WillByDefault(Invoke(
[this](Upstream::HostDescriptionConstSharedPtr host) { upstream_host_ = host; }));
ON_CALL(filter_callbacks_.connection_.stream_info_, upstreamHost())
.WillByDefault(ReturnPointee(&upstream_host_));
}

~TcpProxyTest() override {
Expand Down Expand Up @@ -905,7 +910,7 @@ class TcpProxyTest : public testing::Test {
}

{
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
EXPECT_CALL(filter_callbacks_.connection_, enableHalfClose(true));
EXPECT_CALL(filter_callbacks_.connection_, readDisable(true));
filter_->initializeReadFilterCallbacks(filter_callbacks_);
Expand Down Expand Up @@ -953,8 +958,8 @@ class TcpProxyTest : public testing::Test {

NiceMock<Server::Configuration::MockFactoryContext> factory_context_;
ConfigSharedPtr config_;
std::unique_ptr<Filter> filter_;
NiceMock<Network::MockReadFilterCallbacks> filter_callbacks_;
std::unique_ptr<Filter> filter_;
std::vector<std::shared_ptr<NiceMock<Upstream::MockHost>>> upstream_hosts_{};
std::vector<std::unique_ptr<NiceMock<Network::MockClientConnection>>> upstream_connections_{};
std::vector<std::unique_ptr<NiceMock<Tcp::ConnectionPool::MockConnectionData>>>
Expand All @@ -968,6 +973,7 @@ class TcpProxyTest : public testing::Test {
Network::Address::InstanceConstSharedPtr upstream_remote_address_;
std::list<std::function<Tcp::ConnectionPool::Cancellable*(Tcp::ConnectionPool::Cancellable*)>>
new_connection_functions_;
Upstream::HostDescriptionConstSharedPtr upstream_host_{};
};

TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(DefaultRoutes)) {
Expand Down Expand Up @@ -1253,7 +1259,7 @@ TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(RouteWithMetadataMatch)) {
{Envoy::Config::MetadataFilters::get().ENVOY_LB, metadata_struct});

configure(config);
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());

Expand Down Expand Up @@ -1301,7 +1307,7 @@ TEST_F(TcpProxyTest, WeightedClusterWithMetadataMatch) {
v2.set_string_value("v2");
HashedValue hv0(v0), hv1(v1), hv2(v2);

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);

// Expect filter to try to open a connection to cluster1.
Expand Down Expand Up @@ -1355,7 +1361,7 @@ TEST_F(TcpProxyTest, WeightedClusterWithMetadataMatch) {

TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(DisconnectBeforeData)) {
configure(defaultConfig());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);

filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
Expand Down Expand Up @@ -1394,7 +1400,7 @@ TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(UpstreamConnectionLimit)) {
0, 0, 0, 0, 0);

// setup sets up expectation for tcpConnForCluster but this test is expected to NOT call that
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
// The downstream connection closes if the proxy can't make an upstream connection.
EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush));
filter_->initializeReadFilterCallbacks(filter_callbacks_);
Expand Down Expand Up @@ -1746,6 +1752,23 @@ TEST_F(TcpProxyTest, ShareFilterState) {
.value());
}

// Tests that filter callback can access downstream and upstream address and ssl properties.
TEST_F(TcpProxyTest, AccessDownstreamAndUpstreamProperties) {
setup(1);

raiseEventUpstreamConnected(0);
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamLocalAddress(),
filter_callbacks_.connection().localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamRemoteAddress(),
filter_callbacks_.connection().remoteAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamSslConnection(),
filter_callbacks_.connection().ssl());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamLocalAddress(),
upstream_connections_.at(0)->localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamSslConnection(),
upstream_connections_.at(0)->streamInfo().downstreamSslConnection());
}

class TcpProxyRoutingTest : public testing::Test {
public:
TcpProxyRoutingTest() = default;
Expand All @@ -1766,7 +1789,7 @@ class TcpProxyRoutingTest : public testing::Test {
void initializeFilter() {
EXPECT_CALL(filter_callbacks_, connection()).WillRepeatedly(ReturnRef(connection_));

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);
}

Expand Down Expand Up @@ -1826,13 +1849,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UseClusterFromPerConnectionC
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.tcp_proxy.cluster",
std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);
ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.tcp_proxy.cluster", std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to specified cluster.
EXPECT_CALL(factory_context_.cluster_manager_,
Expand All @@ -1847,14 +1867,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UpstreamServerName)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.network.upstream_server_name",
std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.network.upstream_server_name", std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-server-name
Expand Down Expand Up @@ -1882,16 +1898,12 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(ApplicationProtocols)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData(
connection_.streamInfo().filterState()->setData(
Network::ApplicationProtocols::key(),
std::make_unique<Network::ApplicationProtocols>(std::vector<std::string>{"foo", "bar"}),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-application-protocol
EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster(_, _, _))
Expand Down Expand Up @@ -1933,7 +1945,7 @@ class TcpProxyHashingTest : public testing::Test {
void initializeFilter() {
EXPECT_CALL(filter_callbacks_, connection()).WillRepeatedly(ReturnRef(connection_));

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_, timeSystem());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);
}

Expand Down
7 changes: 4 additions & 3 deletions test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,6 @@ TEST_F(HttpHealthCheckerImplTest, SuccessServiceCheckWithAdditionalHeaders) {
key: value
)EOF");

std::string current_start_time;
cluster_->prioritySet().getMockHostSet(0)->hosts_ = {
makeTestHost(cluster_->info_, "tcp://127.0.0.1:80", metadata)};
cluster_->info_->stats().upstream_cx_total_.inc();
Expand All @@ -1348,8 +1347,10 @@ TEST_F(HttpHealthCheckerImplTest, SuccessServiceCheckWithAdditionalHeaders) {
EXPECT_EQ(headers.get(downstream_local_address_without_port)->value().getStringView(),
value_downstream_local_address_without_port);

EXPECT_NE(headers.get(start_time)->value().getStringView(), current_start_time);
current_start_time = std::string(headers.get(start_time)->value().getStringView());
Envoy::DateFormatter date_formatter("%s.%9f");
std::string current_start_time =
date_formatter.fromTime(dispatcher_.timeSource().systemTime());
EXPECT_EQ(headers.get(start_time)->value().getStringView(), current_start_time);
}));
health_checker_->start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class HttpGrpcAccessLogTest : public testing::Test {
void expectLogRequestMethod(const std::string& request_method) {
NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.host_ = nullptr;
stream_info.start_time_ = SystemTime(1h);

Http::TestHeaderMapImpl request_headers{
{":method", request_method},
Expand All @@ -104,7 +105,8 @@ class HttpGrpcAccessLogTest : public testing::Test {
socket_address:
address: "127.0.0.2"
port_value: 0
start_time: {{}}
start_time:
seconds: 3600
request:
request_method: {}
request_headers_bytes: {}
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/http/lua/lua_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@ TEST_F(LuaHttpFilterTest, SetGetDynamicMetadata) {
setup(SCRIPT);

Http::TestHeaderMapImpl request_headers{{":path", "/"}};
DangerousDeprecatedTestTime test_time;
Event::SimulatedTimeSystem test_time;
StreamInfo::StreamInfoImpl stream_info(Http::Protocol::Http2, test_time.timeSystem());
EXPECT_EQ(0, stream_info.dynamicMetadata().filter_metadata_size());
EXPECT_CALL(decoder_callbacks_, streamInfo()).WillOnce(ReturnRef(stream_info));
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/http/lua/wrappers_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class LuaStreamInfoWrapperTest
return metadata;
}

DangerousDeprecatedTestTime test_time_;
Event::SimulatedTimeSystem test_time_;
};

// Return the current request protocol.
Expand Down
13 changes: 11 additions & 2 deletions test/extensions/filters/network/tcp_proxy/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ TEST_P(RouteIpListConfigTest, DEPRECATED_FEATURE_TEST(TcpProxy)) {
ConfigFactory factory;
Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, context);
Network::MockConnection connection;
EXPECT_CALL(connection, addReadFilter(_));
NiceMock<Network::MockReadFilterCallbacks> readFilterCallback;
EXPECT_CALL(connection, addReadFilter(_))
.WillRepeatedly(Invoke([&readFilterCallback](Network::ReadFilterSharedPtr filter) {
filter->initializeReadFilterCallbacks(readFilterCallback);
}));
cb(connection);
}

Expand All @@ -119,9 +123,14 @@ TEST(ConfigTest, ConfigTest) {
config.set_cluster("cluster");

EXPECT_TRUE(factory.isTerminalFilter());

Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(config, context);
Network::MockConnection connection;
EXPECT_CALL(connection, addReadFilter(_));
NiceMock<Network::MockReadFilterCallbacks> readFilterCallback;
EXPECT_CALL(connection, addReadFilter(_))
.WillRepeatedly(Invoke([&readFilterCallback](Network::ReadFilterSharedPtr filter) {
filter->initializeReadFilterCallbacks(readFilterCallback);
}));
cb(connection);
}

Expand Down
2 changes: 1 addition & 1 deletion test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class ZipkinDriverTest : public testing::Test {
NiceMock<Runtime::MockRandomGenerator> random_;

NiceMock<Tracing::MockConfig> config_;
DangerousDeprecatedTestTime test_time_;
Event::SimulatedTimeSystem test_time_;
TimeSource& time_source_;
};

Expand Down
Loading