Skip to content

Commit

Permalink
Add a WriteCord() method to ZeroCopyInputStream
Browse files Browse the repository at this point in the history
This change introduces the `WriteCord()` method in `ZeroCopyInputStream` with a default implementation efficiently writing the cord data onto the stream. Subsequent changes will introduce explicit Cord based input and output streams that will optimize the default implementation by providing zero-copy optimized implementations when reading and writing cord data from/to underlying encoded cord data.

PiperOrigin-RevId: 493030763
  • Loading branch information
martijnvels authored and copybara-github committed Dec 5, 2022
1 parent 0783c82 commit 192cd09
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 0 deletions.
27 changes: 27 additions & 0 deletions src/google/protobuf/io/zero_copy_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include "google/protobuf/io/zero_copy_stream.h"

#include <cstring>
#include <utility>

#include "google/protobuf/stubs/logging.h"
Expand Down Expand Up @@ -105,6 +106,32 @@ bool ZeroCopyInputStream::ReadCord(absl::Cord* cord, int count) {
return true;
}

bool ZeroCopyOutputStream::WriteCord(const absl::Cord& cord) {
if (cord.empty()) return true;

void* buffer;
int buffer_size = 0;
if (!Next(&buffer, &buffer_size)) return false;

for (absl::string_view fragment : cord.Chunks()) {
while (fragment.size() > static_cast<size_t>(buffer_size)) {
std::memcpy(buffer, fragment.data(), buffer_size);

fragment.remove_prefix(buffer_size);

if (!Next(&buffer, &buffer_size)) return false;
}
std::memcpy(buffer, fragment.data(), fragment.size());

// Advance the buffer.
buffer = static_cast<char*>(buffer) + fragment.size();
buffer_size -= static_cast<int>(fragment.size());
}
BackUp(buffer_size);
return true;
}


bool ZeroCopyOutputStream::WriteAliasedRaw(const void* /* data */,
int /* size */) {
GOOGLE_LOG(FATAL) << "This ZeroCopyOutputStream doesn't support aliasing. "
Expand Down
9 changes: 9 additions & 0 deletions src/google/protobuf/io/zero_copy_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,15 @@ class PROTOBUF_EXPORT ZeroCopyOutputStream {
virtual bool WriteAliasedRaw(const void* data, int size);
virtual bool AllowsAliasing() const { return false; }

// Writes the given Cord to the output.
//
// The default implementation iterates over all Cord chunks copying all cord
// data into the buffer(s) returned by the stream's `Next()` method.
//
// Some streams may implement this in a way that avoids copying the cord
// data by copying and managing a copy of the provided cord instead.
virtual bool WriteCord(const absl::Cord& cord);

};

} // namespace io
Expand Down
8 changes: 8 additions & 0 deletions src/google/protobuf/io/zero_copy_stream_impl_lite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ bool CopyingOutputStreamAdaptor::WriteAliasedRaw(const void* data, int size) {
return true;
}

bool CopyingOutputStreamAdaptor::WriteCord(const absl::Cord& cord) {
for (absl::string_view chunk : cord.Chunks()) {
if (!WriteAliasedRaw(chunk.data(), chunk.size())) {
return false;
}
}
return true;
}

bool CopyingOutputStreamAdaptor::WriteBuffer() {
if (failed_) {
Expand Down
4 changes: 4 additions & 0 deletions src/google/protobuf/io/zero_copy_stream_impl_lite.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@

#include "google/protobuf/stubs/callback.h"
#include "google/protobuf/stubs/common.h"
#include "absl/base/attributes.h"
#include "absl/strings/cord.h"
#include "absl/strings/cord_buffer.h"
#include "google/protobuf/io/zero_copy_stream.h"
#include "google/protobuf/port.h"

Expand Down Expand Up @@ -325,6 +328,7 @@ class PROTOBUF_EXPORT CopyingOutputStreamAdaptor : public ZeroCopyOutputStream {
int64_t ByteCount() const override;
bool WriteAliasedRaw(const void* data, int size) override;
bool AllowsAliasing() const override { return true; }
bool WriteCord(const absl::Cord& cord) override;

private:
// Write the current buffer, if it is present.
Expand Down
49 changes: 49 additions & 0 deletions src/google/protobuf/io/zero_copy_stream_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@
#include <sys/stat.h>
#include <sys/types.h>

#include <iterator>
#include <memory>
#include <sstream>
#include <utility>
#include <vector>

#include "google/protobuf/testing/file.h"
#include "absl/strings/cord.h"
#include "google/protobuf/io/coded_stream.h"
#include "google/protobuf/io/io_win32.h"
#include "google/protobuf/io/zero_copy_stream_impl.h"
Expand Down Expand Up @@ -825,6 +827,53 @@ TEST(DefaultReadCordTest, ReadCordEof) {
EXPECT_EQ(expected, dest);
}

TEST(DefaultWriteCordTest, WriteEmptyCordToArray) {
absl::Cord source;
std::string buffer = "abc";
ArrayOutputStream output(&buffer[0], static_cast<int>(buffer.length()));
EXPECT_TRUE(output.WriteCord(source));
EXPECT_EQ(output.ByteCount(), source.size());
EXPECT_EQ(buffer, "abc");
}

TEST(DefaultWriteCordTest, WriteSmallCord) {
absl::Cord source("foo bar");

std::string buffer(source.size(), 'z');
ArrayOutputStream output(&buffer[0], static_cast<int>(buffer.length()));
EXPECT_TRUE(output.WriteCord(source));
EXPECT_EQ(output.ByteCount(), source.size());
EXPECT_EQ(buffer, source);
}

TEST(DefaultWriteCordTest, WriteLargeCord) {
absl::Cord source;
for (int i = 0; i < 1024; i++) {
source.Append("foo bar");
}
// Verify that we created a fragmented cord.
ASSERT_GT(std::distance(source.chunk_begin(), source.chunk_end()), 1);

std::string buffer(source.size(), 'z');
ArrayOutputStream output(&buffer[0], static_cast<int>(buffer.length()));
EXPECT_TRUE(output.WriteCord(source));
EXPECT_EQ(output.ByteCount(), source.size());
EXPECT_EQ(buffer, source);
}

TEST(DefaultWriteCordTest, WriteTooLargeCord) {
absl::Cord source;
for (int i = 0; i < 1024; i++) {
source.Append("foo bar");
}

std::string buffer(source.size() - 1, 'z');
ArrayOutputStream output(&buffer[0], static_cast<int>(buffer.length()));
EXPECT_FALSE(output.WriteCord(source));
EXPECT_EQ(output.ByteCount(), buffer.size());
EXPECT_EQ(buffer, source.Subcord(0, output.ByteCount()));
}


// To test files, we create a temporary file, write, read, truncate, repeat.
TEST_F(IoTest, FileIo) {
Expand Down

0 comments on commit 192cd09

Please sign in to comment.