Skip to content

Commit

Permalink
Add upstream and downstream info in parent read callbacks in tcp too
Browse files Browse the repository at this point in the history
Ref: istio/istio#20802
Signed-off-by: gargnupur <gargnupur@google.com>

Add test

Signed-off-by: gargnupur <gargnupur@google.com>

Use streaminfo from connection callback as compared to maintaining an extra one just for access logging

Signed-off-by: gargnupur <gargnupur@google.com>
  • Loading branch information
gargnupur committed Feb 11, 2020
1 parent 74df8b3 commit 276def2
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 28 deletions.
10 changes: 7 additions & 3 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,9 @@ UpstreamDrainManager& Config::drainManager() {
return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
}

Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager,
TimeSource& time_source)
Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager, TimeSource&)
: config_(config), cluster_manager_(cluster_manager), downstream_callbacks_(*this),
upstream_callbacks_(new UpstreamCallbacks(this)), stream_info_(time_source) {
upstream_callbacks_(new UpstreamCallbacks(this)) {
ASSERT(config != nullptr);
}

Expand Down Expand Up @@ -292,6 +291,11 @@ void Filter::readDisableDownstream(bool disable) {
}
}

StreamInfo::StreamInfo& Filter::getStreamInfo() {
ASSERT(read_callbacks_ != nullptr);
return read_callbacks_->connection().streamInfo();
}

void Filter::DownstreamCallbacks::onAboveWriteBufferHighWatermark() {
ASSERT(!on_high_watermark_called_);
on_high_watermark_called_ = true;
Expand Down
3 changes: 1 addition & 2 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class Filter : public Network::ReadFilter,
bool on_high_watermark_called_{false};
};

virtual StreamInfo::StreamInfo& getStreamInfo() { return stream_info_; }
virtual StreamInfo::StreamInfo& getStreamInfo();

protected:
struct DownstreamCallbacks : public Network::ConnectionCallbacks {
Expand Down Expand Up @@ -354,7 +354,6 @@ class Filter : public Network::ReadFilter,
std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
// read filter.
std::unique_ptr<GenericUpstream> upstream_;
StreamInfo::StreamInfoImpl stream_info_;
RouteConstSharedPtr route_;
Network::TransportSocketOptionsSharedPtr transport_socket_options_;
uint32_t connect_attempts_{};
Expand Down
48 changes: 27 additions & 21 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,8 @@ class TcpProxyTest : public testing::Test {

NiceMock<Server::Configuration::MockFactoryContext> factory_context_;
ConfigSharedPtr config_;
std::unique_ptr<Filter> filter_;
NiceMock<Network::MockReadFilterCallbacks> filter_callbacks_;
std::unique_ptr<Filter> filter_;
std::vector<std::shared_ptr<NiceMock<Upstream::MockHost>>> upstream_hosts_{};
std::vector<std::unique_ptr<NiceMock<Network::MockClientConnection>>> upstream_connections_{};
std::vector<std::unique_ptr<NiceMock<Tcp::ConnectionPool::MockConnectionData>>>
Expand Down Expand Up @@ -1746,6 +1746,23 @@ TEST_F(TcpProxyTest, ShareFilterState) {
.value());
}

// Tests that filter callback can access downstream and upstream address and ssl properties.
TEST_F(TcpProxyTest, AccessDownstreamAndUpstreamProperties) {
setup(1);

raiseEventUpstreamConnected(0);
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamLocalAddress(),
filter_callbacks_.connection().localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamRemoteAddress(),
filter_callbacks_.connection().remoteAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamSslConnection(),
filter_callbacks_.connection().ssl());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamLocalAddress(),
upstream_connections_.at(0)->localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamSslConnection(),
upstream_connections_.at(0)->streamInfo().downstreamSslConnection());
}

class TcpProxyRoutingTest : public testing::Test {
public:
TcpProxyRoutingTest() = default;
Expand Down Expand Up @@ -1826,13 +1843,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UseClusterFromPerConnectionC
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.tcp_proxy.cluster",
std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);
ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.tcp_proxy.cluster", std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to specified cluster.
EXPECT_CALL(factory_context_.cluster_manager_,
Expand All @@ -1847,14 +1861,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UpstreamServerName)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.network.upstream_server_name",
std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.network.upstream_server_name", std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-server-name
Expand Down Expand Up @@ -1882,16 +1892,12 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(ApplicationProtocols)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData(
connection_.streamInfo().filterState()->setData(
Network::ApplicationProtocols::key(),
std::make_unique<Network::ApplicationProtocols>(std::vector<std::string>{"foo", "bar"}),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-application-protocol
EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster(_, _, _))
Expand Down
1 change: 1 addition & 0 deletions test/mocks/stream_info/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ envoy_cc_mock(
"//include/envoy/stream_info:stream_info_interface",
"//include/envoy/upstream:upstream_interface",
"//test/mocks/upstream:host_mocks",
"//test/test_common:simulated_time_system_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
26 changes: 24 additions & 2 deletions test/mocks/stream_info/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ namespace Envoy {
namespace StreamInfo {

MockStreamInfo::MockStreamInfo()
: filter_state_(std::make_shared<FilterStateImpl>(FilterState::LifeSpan::FilterChain)),
: start_time_(ts_.systemTime()),
filter_state_(std::make_shared<FilterStateImpl>(FilterState::LifeSpan::FilterChain)),
downstream_local_address_(new Network::Address::Ipv4Instance("127.0.0.2")),
downstream_direct_remote_address_(new Network::Address::Ipv4Instance("127.0.0.1")),
downstream_remote_address_(new Network::Address::Ipv4Instance("127.0.0.1")) {
ON_CALL(*this, upstreamHost()).WillByDefault(ReturnPointee(&host_));
ON_CALL(*this, setResponseFlag(_)).WillByDefault(Invoke([this](ResponseFlag response_flag) {
response_flags_ |= response_flag;
}));
ON_CALL(*this, onUpstreamHostSelected(_))
.WillByDefault(
Invoke([this](Upstream::HostDescriptionConstSharedPtr host) { upstream_host_ = host; }));
ON_CALL(*this, startTime()).WillByDefault(ReturnPointee(&start_time_));
ON_CALL(*this, startTimeMonotonic()).WillByDefault(ReturnPointee(&start_time_monotonic_));
ON_CALL(*this, lastDownstreamRxByteReceived())
Expand All @@ -37,6 +43,11 @@ MockStreamInfo::MockStreamInfo()
ON_CALL(*this, lastDownstreamTxByteSent())
.WillByDefault(ReturnPointee(&last_downstream_tx_byte_sent_));
ON_CALL(*this, requestComplete()).WillByDefault(ReturnPointee(&end_time_));
ON_CALL(*this, onRequestComplete()).WillByDefault(Invoke([this]() {
end_time_ = absl::make_optional<std::chrono::nanoseconds>(
std::chrono::duration_cast<std::chrono::nanoseconds>(ts_.systemTime() - start_time_)
.count());
}));
ON_CALL(*this, setUpstreamLocalAddress(_))
.WillByDefault(
Invoke([this](const Network::Address::InstanceConstSharedPtr& upstream_local_address) {
Expand Down Expand Up @@ -85,6 +96,17 @@ MockStreamInfo::MockStreamInfo()
bytes_sent_ += bytes_sent;
}));
ON_CALL(*this, bytesSent()).WillByDefault(ReturnPointee(&bytes_sent_));
ON_CALL(*this, hasResponseFlag(_)).WillByDefault(Invoke([this](ResponseFlag flag) {
return response_flags_ & flag;
}));
ON_CALL(*this, upstreamHost()).WillByDefault(Invoke([this]() {
if (upstream_host_) {
return upstream_host_;
}
ReturnPointee(&host_);
// Call should not reach here and is just to make compiler happy.
return upstream_host_;
}));
ON_CALL(*this, dynamicMetadata()).WillByDefault(ReturnRef(metadata_));
ON_CALL(Const(*this), dynamicMetadata()).WillByDefault(ReturnRef(metadata_));
ON_CALL(*this, filterState()).WillByDefault(ReturnRef(filter_state_));
Expand Down
4 changes: 4 additions & 0 deletions test/mocks/stream_info/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/stream_info/filter_state_impl.h"

#include "test/mocks/upstream/host.h"
#include "test/test_common/simulated_time_system.h"

#include "gmock/gmock.h"

Expand Down Expand Up @@ -91,6 +92,8 @@ class MockStreamInfo : public StreamInfo {

std::shared_ptr<testing::NiceMock<Upstream::MockHostDescription>> host_{
new testing::NiceMock<Upstream::MockHostDescription>()};
Upstream::HostDescriptionConstSharedPtr upstream_host_{};
Envoy::Event::SimulatedTimeSystem ts_;
SystemTime start_time_;
MonotonicTime start_time_monotonic_;
absl::optional<std::chrono::nanoseconds> last_downstream_rx_byte_received_;
Expand All @@ -104,6 +107,7 @@ class MockStreamInfo : public StreamInfo {
absl::optional<Http::Protocol> protocol_;
absl::optional<uint32_t> response_code_;
absl::optional<std::string> response_code_details_;
uint64_t response_flags_{};
envoy::config::core::v3::Metadata metadata_;
FilterStateSharedPtr upstream_filter_state_;
FilterStateSharedPtr filter_state_;
Expand Down

0 comments on commit 276def2

Please sign in to comment.