Skip to content

Commit

Permalink
buffer: Force copy when appending small slices to OwnedImpl buffer to…
Browse files Browse the repository at this point in the history
… avoid fragmentation (#117)

Change OwnedImpl::move to force a copy instead of taking ownership of slices in cases where the offered slices are below kCopyThreshold

Risk Level: medium, changes to buffer behavior
Testing: Unit Tests
Docs Changes: N/A
Release Notes: N/A

Signed-off-by: Antonio Vicente <avd@google.com>
  • Loading branch information
yanavlasov authored and lizan committed Mar 2, 2020
1 parent 1b0f2a1 commit 95f66f7
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 31 deletions.
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Version history
1.13.1 (Pending)
================
* rbac: added :ref:`url_path <envoy_api_field_config.rbac.v2.Permission.url_path>` for matching URL path without the query and fragment string.
* buffer: force copy when appending small slices to OwnedImpl buffer to avoid fragmentation.
* http: added HTTP/1.1 flood protection. Can be temporarily disabled using the runtime feature `envoy.reloadable_features.http1_flood_protection`.
* listeners: fixed issue where :ref:`TLS inspector listener filter <config_listener_filters_tls_inspector>` could have been bypassed by a client using only TLS 1.3.
* sds: fix the SDS vulnerability that TLS validation context (e.g., subject alt name or hash) cannot be effectively validated in some cases.
Expand Down
41 changes: 34 additions & 7 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@

namespace Envoy {
namespace Buffer {

void OwnedImpl::add(const void* data, uint64_t size) {
namespace {
// This size has been determined to be optimal from running the
// //test/integration:http_benchmark benchmark tests.
// TODO(yanavlasov): This may not be optimal for all hardware configurations or traffic patterns and
// may need to be configurable in the future.
constexpr uint64_t CopyThreshold = 512;
} // namespace

void OwnedImpl::addImpl(const void* data, uint64_t size) {
const char* src = static_cast<const char*>(data);
bool new_slice_needed = slices_.empty();
while (size != 0) {
Expand All @@ -26,6 +33,8 @@ void OwnedImpl::add(const void* data, uint64_t size) {
}
}

void OwnedImpl::add(const void* data, uint64_t size) { addImpl(data, size); }

void OwnedImpl::addBufferFragment(BufferFragment& fragment) {
length_ += fragment.size();
slices_.emplace_back(std::make_unique<UnownedSlice>(fragment));
Expand Down Expand Up @@ -228,6 +237,26 @@ void* OwnedImpl::linearize(uint32_t size) {
return slices_.front()->data();
}

void OwnedImpl::coalesceOrAddSlice(SlicePtr&& other_slice) {
const uint64_t slice_size = other_slice->dataSize();
// The `other_slice` content can be coalesced into the existing slice IFF:
// 1. The `other_slice` can be coalesced. Objects of type UnownedSlice can not be coalesced. See
// comment in the UnownedSlice class definition;
// 2. There are existing slices;
// 3. The `other_slice` content length is under the CopyThreshold;
// 4. There is enough unused space in the existing slice to accommodate the `other_slice` content.
if (other_slice->canCoalesce() && !slices_.empty() && slice_size < CopyThreshold &&
slices_.back()->reservableSize() >= slice_size) {
// Copy content of the `other_slice`. The `move` methods which call this method effectively
// drain the source buffer.
addImpl(other_slice->data(), slice_size);
} else {
// Take ownership of the slice.
slices_.emplace_back(std::move(other_slice));
length_ += slice_size;
}
}

void OwnedImpl::move(Instance& rhs) {
ASSERT(&rhs != this);
// We do the static cast here because in practice we only have one buffer implementation right
Expand All @@ -236,10 +265,9 @@ void OwnedImpl::move(Instance& rhs) {
OwnedImpl& other = static_cast<OwnedImpl&>(rhs);
while (!other.slices_.empty()) {
const uint64_t slice_size = other.slices_.front()->dataSize();
slices_.emplace_back(std::move(other.slices_.front()));
other.slices_.pop_front();
length_ += slice_size;
coalesceOrAddSlice(std::move(other.slices_.front()));
other.length_ -= slice_size;
other.slices_.pop_front();
}
other.postProcess();
}
Expand All @@ -260,9 +288,8 @@ void OwnedImpl::move(Instance& rhs, uint64_t length) {
other.slices_.front()->drain(copy_size);
other.length_ -= copy_size;
} else {
slices_.emplace_back(std::move(other.slices_.front()));
coalesceOrAddSlice(std::move(other.slices_.front()));
other.slices_.pop_front();
length_ += slice_size;
other.length_ -= slice_size;
}
length -= copy_size;
Expand Down
21 changes: 21 additions & 0 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ class Slice {
return SliceRepresentation{dataSize(), reservableSize(), capacity_};
}

/**
* @return true if content in this Slice can be coalesced into another Slice.
*/
virtual bool canCoalesce() const { return true; }

protected:
Slice(uint64_t data, uint64_t reservable, uint64_t capacity)
: data_(data), reservable_(reservable), capacity_(capacity) {}
Expand Down Expand Up @@ -415,6 +420,13 @@ class UnownedSlice : public Slice {

~UnownedSlice() override { fragment_.done(); }

/**
* BufferFragment objects encapsulated by UnownedSlice are used to track when response content
* is written into transport connection. As a result these slices can not be coalesced when moved
* between buffers.
*/
bool canCoalesce() const override { return false; }

private:
BufferFragment& fragment_;
};
Expand Down Expand Up @@ -550,6 +562,15 @@ class OwnedImpl : public LibEventInstance {
*/
bool isSameBufferImpl(const Instance& rhs) const;

void addImpl(const void* data, uint64_t size);

/**
* Moves contents of the `other_slice` by either taking its ownership or coalescing it
* into an existing slice.
* NOTE: the caller is responsible for draining the buffer that contains the `other_slice`.
*/
void coalesceOrAddSlice(SlicePtr&& other_slice);

/** Ring buffer of slices. */
SliceDeque slices_;

Expand Down
88 changes: 68 additions & 20 deletions test/common/buffer/owned_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ TEST_F(OwnedImplTest, AddBufferFragmentNoCleanup) {
}

TEST_F(OwnedImplTest, AddBufferFragmentWithCleanup) {
char input[] = "hello world";
BufferFragmentImpl frag(input, 11, [this](const void*, size_t, const BufferFragmentImpl*) {
release_callback_called_ = true;
});
std::string input(2048, 'a');
BufferFragmentImpl frag(
input.c_str(), input.size(),
[this](const void*, size_t, const BufferFragmentImpl*) { release_callback_called_ = true; });
Buffer::OwnedImpl buffer;
buffer.addBufferFragment(frag);
EXPECT_EQ(11, buffer.length());
EXPECT_EQ(2048, buffer.length());

buffer.drain(5);
EXPECT_EQ(6, buffer.length());
buffer.drain(2000);
EXPECT_EQ(48, buffer.length());
EXPECT_FALSE(release_callback_called_);

buffer.drain(6);
buffer.drain(48);
EXPECT_EQ(0, buffer.length());
EXPECT_TRUE(release_callback_called_);
}
Expand All @@ -94,22 +94,22 @@ TEST_F(OwnedImplTest, AddEmptyFragment) {
}

TEST_F(OwnedImplTest, AddBufferFragmentDynamicAllocation) {
char input_stack[] = "hello world";
char* input = new char[11];
std::copy(input_stack, input_stack + 11, input);
std::string input_str(2048, 'a');
char* input = new char[2048];
std::copy(input_str.c_str(), input_str.c_str() + 11, input);

BufferFragmentImpl* frag = new BufferFragmentImpl(
input, 11, [this](const void* data, size_t, const BufferFragmentImpl* frag) {
input, 2048, [this](const void* data, size_t, const BufferFragmentImpl* frag) {
release_callback_called_ = true;
delete[] static_cast<const char*>(data);
delete frag;
});

Buffer::OwnedImpl buffer;
buffer.addBufferFragment(*frag);
EXPECT_EQ(11, buffer.length());
EXPECT_EQ(2048, buffer.length());

buffer.drain(5);
buffer.drain(2042);
EXPECT_EQ(6, buffer.length());
EXPECT_FALSE(release_callback_called_);

Expand All @@ -119,10 +119,10 @@ TEST_F(OwnedImplTest, AddBufferFragmentDynamicAllocation) {
}

TEST_F(OwnedImplTest, AddOwnedBufferFragmentWithCleanup) {
char input[] = "hello world";
const size_t expected_length = sizeof(input) - 1;
std::string input(2048, 'a');
const size_t expected_length = input.size();
auto frag = OwnedBufferFragmentImpl::create(
{input, expected_length},
{input.c_str(), expected_length},
[this](const OwnedBufferFragmentImpl*) { release_callback_called_ = true; });
Buffer::OwnedImpl buffer;
buffer.addBufferFragment(*frag);
Expand All @@ -140,10 +140,10 @@ TEST_F(OwnedImplTest, AddOwnedBufferFragmentWithCleanup) {

// Verify that OwnedBufferFragment work correctly when input buffer is allocated on the heap.
TEST_F(OwnedImplTest, AddOwnedBufferFragmentDynamicAllocation) {
char input_stack[] = "hello world";
const size_t expected_length = sizeof(input_stack) - 1;
std::string input_str(2048, 'a');
const size_t expected_length = input_str.size();
char* input = new char[expected_length];
std::copy(input_stack, input_stack + expected_length, input);
std::copy(input_str.c_str(), input_str.c_str() + expected_length, input);

auto* frag = OwnedBufferFragmentImpl::create({input, expected_length},
[this, input](const OwnedBufferFragmentImpl* frag) {
Expand Down Expand Up @@ -690,6 +690,54 @@ TEST(OverflowDetectingUInt64, Arithmetic) {
EXPECT_DEATH(length += 1, "overflow");
}

void TestBufferMove(uint64_t buffer1_length, uint64_t buffer2_length,
uint64_t expected_slice_count) {
Buffer::OwnedImpl buffer1;
buffer1.add(std::string(buffer1_length, 'a'));
EXPECT_EQ(1, buffer1.getRawSlices(nullptr, 0));

Buffer::OwnedImpl buffer2;
buffer2.add(std::string(buffer2_length, 'b'));
EXPECT_EQ(1, buffer2.getRawSlices(nullptr, 0));

buffer1.move(buffer2);
EXPECT_EQ(expected_slice_count, buffer1.getRawSlices(nullptr, 0));
EXPECT_EQ(buffer1_length + buffer2_length, buffer1.length());
// Make sure `buffer2` was drained.
EXPECT_EQ(0, buffer2.length());
}

// Slice size large enough to prevent slice content from being coalesced into an existing slice
constexpr uint64_t kLargeSliceSize = 2048;

TEST_F(OwnedImplTest, MoveBuffersWithLargeSlices) {
// Large slices should not be coalesced together
TestBufferMove(kLargeSliceSize, kLargeSliceSize, 2);
}

TEST_F(OwnedImplTest, MoveBuffersWithSmallSlices) {
// Small slices should be coalesced together
TestBufferMove(1, 1, 1);
}

TEST_F(OwnedImplTest, MoveSmallSliceIntoLargeSlice) {
// Small slices should be coalesced with a large one
TestBufferMove(kLargeSliceSize, 1, 1);
}

TEST_F(OwnedImplTest, MoveLargeSliceIntoSmallSlice) {
// Large slice should NOT be coalesced into the small one
TestBufferMove(1, kLargeSliceSize, 2);
}

TEST_F(OwnedImplTest, MoveSmallSliceIntoNotEnoughFreeSpace) {
// Small slice will not be coalesced if a previous slice does not have enough free space
// Slice buffer sizes are allocated in 4Kb increments
// Make first slice have 127 of free space (it is actually less as there is small overhead of the
// OwnedSlice object) And second slice 128 bytes
TestBufferMove(4096 - 127, 128, 2);
}

} // namespace
} // namespace Buffer
} // namespace Envoy
10 changes: 7 additions & 3 deletions test/common/buffer/zero_copy_input_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "test/common/buffer/utility.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace Envoy {
Expand Down Expand Up @@ -37,15 +38,18 @@ TEST_F(ZeroCopyInputStreamTest, Next) {
}

TEST_F(ZeroCopyInputStreamTest, TwoSlices) {
Buffer::OwnedImpl buffer("efgh");
// Make content larger than 512 bytes so it would not be coalesced when
// moved into the stream_ buffer.
Buffer::OwnedImpl buffer(std::string(1024, 'A'));
stream_.move(buffer);

EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_EQ(4, size_);
EXPECT_EQ(0, memcmp(slice_data_.data(), data_, size_));
EXPECT_TRUE(stream_.Next(&data_, &size_));
EXPECT_EQ(4, size_);
EXPECT_EQ(0, memcmp("efgh", data_, size_));
EXPECT_EQ(1024, size_);
EXPECT_THAT(absl::string_view(static_cast<const char*>(data_), size_),
testing::Each(testing::AllOf('A')));
}

TEST_F(ZeroCopyInputStreamTest, BackUp) {
Expand Down
9 changes: 8 additions & 1 deletion test/common/http/http1/codec_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using testing::_;
using testing::AtLeast;
using testing::InSequence;
using testing::Invoke;
using testing::InvokeWithoutArgs;
using testing::NiceMock;
using testing::Return;
using testing::ReturnRef;
Expand Down Expand Up @@ -1043,7 +1044,13 @@ TEST_F(Http1ServerConnectionImplTest, ChunkedResponse) {
EXPECT_EQ(0U, buffer.length());

std::string output;
ON_CALL(connection_, write(_, _)).WillByDefault(AddBufferToString(&output));
ON_CALL(connection_, write(_, _)).WillByDefault(Invoke([&output](Buffer::Instance& data, bool) {
// Verify that individual writes into the codec's output buffer were coalesced into a single
// slice
ASSERT_EQ(1, data.getRawSlices(nullptr, 0));
output.append(data.toString());
data.drain(data.length());
}));

TestHeaderMapImpl headers{{":status", "200"}};
response_encoder->encodeHeaders(headers, false);
Expand Down

0 comments on commit 95f66f7

Please sign in to comment.