Skip to content

Commit

Permalink
tls_inspector: inline the recv in the onAccept (#7951)
Browse files Browse the repository at this point in the history
Description:
As discussed in #7864 this PR is the attempt to peek the socket at the invoke of onAccept.
Usually client_hello packet should be in the buffer when tls_inspector is peeking, we could save a poll cycle for this connection.

Once we agree on the solution I can apply to http_inspector as well.

The expecting latency improvement especially when poll cycle is large.

Benchmark:
Env:
hardware Intel(R) Xeon(R) CPU @ 2.20GHz
envoy: concurrency = 1, tls_inspector as listener filter. One tls filter chain, and one plain text filter chain.
load background: a [sniper](https://github.com/lubia/sniper) client with concurrency = 5 hitting the server with tls handshake, aiming to hit using the tls_filter chain. The qps is about 170/s
Another load client hitting the plain text filter chain but would go through tls_inspector with concurrency = 1

This PR: 
TransactionTime:              10.3 - 11.0 ms(mean)
Master                
TransactionTime:              12.3 - 12.8 ms(mean)

Risk Level: Med (ActiveSocket code is affected to adopt the side effect of onAccept)
Testing: 
Docs Changes:
Release Notes:
Fixes #7864

Signed-off-by: Yuchen Dai <silentdai@gmail.com>
  • Loading branch information
lambdai authored and lizan committed Aug 29, 2019
1 parent 5e45d48 commit 7f060b6
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 56 deletions.
6 changes: 4 additions & 2 deletions source/common/network/io_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ IoSocketHandleImpl::~IoSocketHandleImpl() {

Api::IoCallUint64Result IoSocketHandleImpl::close() {
ASSERT(fd_ != -1);
const int rc = ::close(fd_);
auto& os_syscalls = Api::OsSysCallsSingleton::get();
const auto& result = os_syscalls.close(fd_);
fd_ = -1;
return Api::IoCallUint64Result(rc, Api::IoErrorPtr(nullptr, IoSocketError::deleteIoError));
return Api::IoCallUint64Result(result.rc_,
Api::IoErrorPtr(nullptr, IoSocketError::deleteIoError));
}

bool IoSocketHandleImpl::isOpen() const { return fd_ != -1; }
Expand Down
78 changes: 50 additions & 28 deletions source/extensions/filters/listener/tls_inspector/tls_inspector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,47 @@ Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) {
ENVOY_LOG(debug, "tls inspector: new connection accepted");
Network::ConnectionSocket& socket = cb.socket();
ASSERT(file_event_ == nullptr);

file_event_ = cb.dispatcher().createFileEvent(
socket.ioHandle().fd(),
[this](uint32_t events) {
if (events & Event::FileReadyType::Closed) {
config_->stats().connection_closed_.inc();
done(false);
return;
}

ASSERT(events == Event::FileReadyType::Read);
onRead();
},
Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Closed);

cb_ = &cb;
return Network::FilterStatus::StopIteration;

ParseState parse_state = onRead();
switch (parse_state) {
case ParseState::Error:
// As per discussion in https://github.com/envoyproxy/envoy/issues/7864
// we don't add new enum in FilterStatus so we have to signal the caller
// the new condition.
cb.socket().close();
return Network::FilterStatus::StopIteration;
case ParseState::Done:
return Network::FilterStatus::Continue;
case ParseState::Continue:
// do nothing but create the event
file_event_ = cb.dispatcher().createFileEvent(
socket.ioHandle().fd(),
[this](uint32_t events) {
if (events & Event::FileReadyType::Closed) {
config_->stats().connection_closed_.inc();
done(false);
return;
}

ASSERT(events == Event::FileReadyType::Read);
ParseState parse_state = onRead();
switch (parse_state) {
case ParseState::Error:
done(false);
break;
case ParseState::Done:
done(true);
break;
case ParseState::Continue:
// do nothing but wait for the next event
break;
}
},
Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Closed);
return Network::FilterStatus::StopIteration;
}
NOT_REACHED_GCOVR_EXCL_LINE
}

void Filter::onALPN(const unsigned char* data, unsigned int len) {
Expand Down Expand Up @@ -122,7 +146,7 @@ void Filter::onServername(absl::string_view name) {
clienthello_success_ = true;
}

void Filter::onRead() {
ParseState Filter::onRead() {
// This receive code is somewhat complicated, because it must be done as a MSG_PEEK because
// there is no way for a listener-filter to pass payload data to the ConnectionImpl and filters
// that get created later.
Expand All @@ -141,11 +165,10 @@ void Filter::onRead() {
ENVOY_LOG(trace, "tls inspector: recv: {}", result.rc_);

if (result.rc_ == -1 && result.errno_ == EAGAIN) {
return;
return ParseState::Continue;
} else if (result.rc_ < 0) {
config_->stats().read_error_.inc();
done(false);
return;
return ParseState::Error;
}

// Because we're doing a MSG_PEEK, data we've seen before gets returned every time, so
Expand All @@ -154,8 +177,9 @@ void Filter::onRead() {
const uint8_t* data = buf_ + read_;
const size_t len = result.rc_ - read_;
read_ = result.rc_;
parseClientHello(data, len);
return parseClientHello(data, len);
}
return ParseState::Continue;
}

void Filter::done(bool success) {
Expand All @@ -164,7 +188,7 @@ void Filter::done(bool success) {
cb_->continueFilterChain(success);
}

void Filter::parseClientHello(const void* data, size_t len) {
ParseState Filter::parseClientHello(const void* data, size_t len) {
// Ownership is passed to ssl_ in SSL_set_bio()
bssl::UniquePtr<BIO> bio(BIO_new_mem_buf(data, len));

Expand All @@ -185,9 +209,9 @@ void Filter::parseClientHello(const void* data, size_t len) {
// We've hit the specified size limit. This is an unreasonably large ClientHello;
// indicate failure.
config_->stats().client_hello_too_large_.inc();
done(false);
return ParseState::Error;
}
break;
return ParseState::Continue;
case SSL_ERROR_SSL:
if (clienthello_success_) {
config_->stats().tls_found_.inc();
Expand All @@ -200,11 +224,9 @@ void Filter::parseClientHello(const void* data, size_t len) {
} else {
config_->stats().tls_not_found_.inc();
}
done(true);
break;
return ParseState::Done;
default:
done(false);
break;
return ParseState::Error;
}
}

Expand Down
12 changes: 10 additions & 2 deletions source/extensions/filters/listener/tls_inspector/tls_inspector.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ struct TlsInspectorStats {
ALL_TLS_INSPECTOR_STATS(GENERATE_COUNTER_STRUCT)
};

enum class ParseState {
// Parse result is out. It could be tls or not.
Done,
// Parser expects more data.
Continue,
// Parser reports unrecoverable error.
Error
};
/**
* Global configuration for TLS inspector.
*/
Expand Down Expand Up @@ -68,8 +76,8 @@ class Filter : public Network::ListenerFilter, Logger::Loggable<Logger::Id::filt
Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override;

private:
void parseClientHello(const void* data, size_t len);
void onRead();
ParseState parseClientHello(const void* data, size_t len);
ParseState onRead();
void done(bool success);
void onALPN(const unsigned char* data, unsigned int len);
void onServername(absl::string_view name);
Expand Down
17 changes: 15 additions & 2 deletions source/server/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ void ConnectionHandlerImpl::ActiveSocket::unlink() {

void ConnectionHandlerImpl::ActiveSocket::continueFilterChain(bool success) {
if (success) {
bool no_error = true;
if (iter_ == accept_filters_.end()) {
iter_ = accept_filters_.begin();
} else {
Expand All @@ -192,11 +193,23 @@ void ConnectionHandlerImpl::ActiveSocket::continueFilterChain(bool success) {
if (status == Network::FilterStatus::StopIteration) {
// The filter is responsible for calling us again at a later time to continue the filter
// chain from the next filter.
return;
if (!socket().ioHandle().isOpen()) {
// break the loop but should not create new connection
no_error = false;
break;
} else {
// Blocking at the filter but no error
return;
}
}
}
// Successfully ran all the accept filters.
newConnection();
if (no_error) {
newConnection();
} else {
// Signal the caller that no extra filter chain iteration is needed.
iter_ = accept_filters_.end();
}
}

// Filter execution concluded, unlink and delete this ActiveSocket if it was linked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class AddrFamilyAwareSocketOptionImplTest : public SocketOptionTest {
.WillRepeatedly(Invoke([](int domain, int type, int protocol) {
return Api::SysCallIntResult{::socket(domain, type, protocol), 0};
}));
EXPECT_CALL(os_sys_calls_, close(_)).Times(testing::AnyNumber());
}
};

Expand Down
1 change: 1 addition & 0 deletions test/config_test/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class ConfigTest {
.WillByDefault(Invoke([&](const std::string& file) -> std::string {
return api_->fileSystem().fileReadToEnd(file);
}));
ON_CALL(os_sys_calls_, close(_)).WillByDefault(Return(Api::SysCallIntResult{0, 0}));

// Here we setup runtime to mimic the actual deprecated feature list used in the
// production code. Note that this test is actually more strict than production because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,10 @@ TEST_P(ProxyProtocolTest, errorRecv_2) {
const ssize_t rc = ::readv(fd, iov, iovcnt);
return Api::SysCallSizeResult{rc, errno};
}));

EXPECT_CALL(os_sys_calls, close(_)).Times(AnyNumber()).WillRepeatedly(Invoke([](int fd) {
const int rc = ::close(fd);
return Api::SysCallIntResult{rc, errno};
}));
connect(false);
write(buffer, sizeof(buffer));

Expand Down Expand Up @@ -316,7 +319,10 @@ TEST_P(ProxyProtocolTest, errorFIONREAD_1) {
const ssize_t rc = ::readv(fd, iov, iovcnt);
return Api::SysCallSizeResult{rc, errno};
}));

EXPECT_CALL(os_sys_calls, close(_)).Times(AnyNumber()).WillRepeatedly(Invoke([](int fd) {
const int rc = ::close(fd);
return Api::SysCallIntResult{rc, errno};
}));
connect(false);
write(buffer, sizeof(buffer));

Expand Down Expand Up @@ -527,7 +533,10 @@ TEST_P(ProxyProtocolTest, v2ParseExtensionsIoctlError) {
const ssize_t rc = ::readv(fd, iov, iovcnt);
return Api::SysCallSizeResult{rc, errno};
}));

EXPECT_CALL(os_sys_calls, close(_)).Times(AnyNumber()).WillRepeatedly(Invoke([](int fd) {
const int rc = ::close(fd);
return Api::SysCallIntResult{rc, errno};
}));
connect(false);
write(buffer, sizeof(buffer));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
Expand Down Expand Up @@ -656,7 +665,10 @@ TEST_P(ProxyProtocolTest, v2Fragmented3Error) {
const ssize_t rc = ::readv(fd, iov, iovcnt);
return Api::SysCallSizeResult{rc, errno};
}));

EXPECT_CALL(os_sys_calls, close(_)).Times(AnyNumber()).WillRepeatedly(Invoke([](int fd) {
const int rc = ::close(fd);
return Api::SysCallIntResult{rc, errno};
}));
connect(false);
write(buffer, 17);

Expand Down Expand Up @@ -702,7 +714,10 @@ TEST_P(ProxyProtocolTest, v2Fragmented4Error) {
const ssize_t rc = ::readv(fd, iov, iovcnt);
return Api::SysCallSizeResult{rc, errno};
}));

EXPECT_CALL(os_sys_calls, close(_)).Times(AnyNumber()).WillRepeatedly(Invoke([](int fd) {
const int rc = ::close(fd);
return Api::SysCallIntResult{rc, errno};
}));
connect(false);
write(buffer, 10);
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ class TlsInspectorTest : public testing::Test {
EXPECT_CALL(cb_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(socket_, ioHandle()).WillRepeatedly(ReturnRef(*io_handle_));

// Prepare the first recv attempt during
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(
Invoke([](int fd, void* buffer, size_t length, int flag) -> Api::SysCallSizeResult {
ENVOY_LOG_MISC(error, "In mock syscall recv {} {} {} {}", fd, buffer, length, flag);
return Api::SysCallSizeResult{static_cast<ssize_t>(0), 0};
}));
EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::FileTriggerType::Edge,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
Expand Down Expand Up @@ -231,6 +238,36 @@ TEST_F(TlsInspectorTest, NotSsl) {
EXPECT_EQ(1, cfg_->stats().tls_not_found_.value());
}

TEST_F(TlsInspectorTest, InlineReadSucceed) {
filter_ = std::make_unique<Filter>(cfg_);

EXPECT_CALL(cb_, socket()).WillRepeatedly(ReturnRef(socket_));
EXPECT_CALL(cb_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(socket_, ioHandle()).WillRepeatedly(ReturnRef(*io_handle_));
const std::vector<absl::string_view> alpn_protos = {absl::string_view("h2")};
const std::string servername("example.com");
std::vector<uint8_t> client_hello = Tls::Test::generateClientHello(servername, "\x02h2");

EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(Invoke(
[&client_hello](int fd, void* buffer, size_t length, int flag) -> Api::SysCallSizeResult {
ENVOY_LOG_MISC(trace, "In mock syscall recv {} {} {} {}", fd, buffer, length, flag);
ASSERT(length >= client_hello.size());
memcpy(buffer, client_hello.data(), client_hello.size());
return Api::SysCallSizeResult{ssize_t(client_hello.size()), 0};
}));

// No event is created if the inline recv parse the hello.
EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::FileTriggerType::Edge,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.Times(0);

EXPECT_CALL(socket_, setRequestedServerName(Eq(servername)));
EXPECT_CALL(socket_, setRequestedApplicationProtocols(alpn_protos));
EXPECT_CALL(socket_, setDetectedTransportProtocol(absl::string_view("tls")));
EXPECT_EQ(Network::FilterStatus::Continue, filter_->onAccept(cb_));
}
} // namespace
} // namespace TlsInspector
} // namespace ListenerFilters
Expand Down
8 changes: 7 additions & 1 deletion test/mocks/api/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "gtest/gtest.h"

using testing::_;
using testing::Invoke;
using testing::Return;

namespace Envoy {
Expand All @@ -26,7 +27,12 @@ Event::DispatcherPtr MockApi::allocateDispatcher(Buffer::WatermarkFactoryPtr&& w
return Event::DispatcherPtr{allocateDispatcher_(std::move(watermark_factory), time_system_)};
}

MockOsSysCalls::MockOsSysCalls() = default;
MockOsSysCalls::MockOsSysCalls() {
ON_CALL(*this, close(_)).WillByDefault(Invoke([](int fd) {
const int rc = ::close(fd);
return SysCallIntResult{rc, errno};
}));
}

MockOsSysCalls::~MockOsSysCalls() = default;

Expand Down
1 change: 1 addition & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ envoy_cc_test(
"//test/mocks/network:network_mocks",
"//test/mocks/server:server_mocks",
"//test/test_common:network_utility_lib",
"//test/test_common:threadsafe_singleton_injector_lib",
],
)

Expand Down
Loading

0 comments on commit 7f060b6

Please sign in to comment.