Skip to content

Commit

Permalink
feat(GCS+gRPC): more efficient WriteObject() stall timeouts
Browse files Browse the repository at this point in the history
This changes the implementation of `WriteObject()` to use the blocking
API and a watchdog timer to implement stall timeouts. If the timer
expires before the `Write()` operation completes a background thread
cancels the RPC.  If the operation completes, we cancel the timer.
  • Loading branch information
coryan committed Jul 27, 2022
1 parent 9c813e1 commit 71b28cc
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 154 deletions.
88 changes: 40 additions & 48 deletions google/cloud/storage/internal/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,6 @@ StatusOr<ObjectAccessControl> FindDefaultObjectAccessControl(
"> in object id " + response->id());
}

std::chrono::milliseconds DefaultTransferStallTimeout(
std::chrono::milliseconds value) {
if (value != std::chrono::milliseconds(0)) return value;
// We need a large value for `wait_for()`, but not so large that it can easily
// overflow. Fortunately, uploads automatically cancel (server side) after
// 7 days, so waiting for 14 days will not create spurious timeouts.
return std::chrono::milliseconds(std::chrono::hours(24) * 14);
}

// If this is the last `Write()` call of the last `UploadChunk()` set the flags
// to finalize the request
void MaybeFinalize(google::storage::v2::WriteObjectRequest& write_request,
Expand Down Expand Up @@ -129,39 +120,34 @@ Status TimeoutError(std::chrono::milliseconds timeout, std::string const& op) {
"] while waiting for " + op);
}

struct WaitForFinish {
std::unique_ptr<GrpcClient::WriteObjectStream> stream;
void operator()(future<StatusOr<google::storage::v2::WriteObjectResponse>>) {}
};

struct WaitForIdle {
std::unique_ptr<GrpcClient::WriteObjectStream> stream;
void operator()(future<bool>) {
auto finish = stream->Finish();
(void)finish.then(WaitForFinish{std::move(stream)});
}
};

StatusOr<QueryResumableUploadResponse> CloseWriteObjectStream(
std::chrono::milliseconds timeout,
std::function<future<bool>()> const& create_watchdog,
std::unique_ptr<GrpcClient::WriteObjectStream> writer,
bool sent_last_message, google::cloud::Options const& options) {
if (!writer) return TimeoutError(timeout, "Write()");
if (!sent_last_message) {
auto pending = writer->WritesDone();
if (pending.wait_for(timeout) == std::future_status::timeout) {
auto watchdog = create_watchdog().then([&writer](auto f) {
if (!f.get()) return false;
writer->Cancel();
pending.then(WaitForIdle{std::move(writer)});
return true;
});
(void)writer->Write(google::storage::v2::WriteObjectRequest{},
grpc::WriteOptions().set_last_message());
watchdog.cancel();
if (watchdog.get()) {
writer->Close();
return TimeoutError(timeout, "WritesDone()");
}
}
auto pending = writer->Finish();
if (pending.wait_for(timeout) == std::future_status::timeout) {
auto watchdog = create_watchdog().then([&writer](auto f) {
if (!f.get()) return false;
writer->Cancel();
pending.then(WaitForFinish{std::move(writer)});
return TimeoutError(timeout, "Finish()");
}
auto response = pending.get();
return true;
});
auto response = writer->Close();
watchdog.cancel();
if (watchdog.get()) return TimeoutError(timeout, "Close()");
if (!response) return std::move(response).status();
return GrpcObjectRequestParser::FromProto(*std::move(response), options);
}
Expand Down Expand Up @@ -619,20 +605,19 @@ StatusOr<EmptyResponse> GrpcClient::DeleteResumableUpload(

StatusOr<QueryResumableUploadResponse> GrpcClient::UploadChunk(
UploadChunkRequest const& request) {
auto const timeout =
DefaultTransferStallTimeout(google::cloud::internal::CurrentOptions()
.get<TransferStallTimeoutOption>());
auto const timeout = google::cloud::internal::CurrentOptions()
.get<TransferStallTimeoutOption>();
auto create_watchdog = [cq = background_->cq(), timeout]() mutable {
if (timeout == std::chrono::seconds(0)) {
return make_ready_future(false);
}
return cq.MakeRelativeTimer(timeout).then(
[](auto f) { return f.get().ok(); });
};

auto context = absl::make_unique<grpc::ClientContext>();
ApplyQueryParameters(*context, request, "resource");
auto writer = stub_->AsyncWriteObject(background_->cq(), std::move(context));

auto pending_start = writer->Start();
if (pending_start.wait_for(timeout) == std::future_status::timeout) {
writer->Cancel();
pending_start.then(WaitForIdle{std::move(writer)});
return TimeoutError(timeout, "Start()");
}
auto writer = stub_->WriteObject(std::move(context));

std::size_t const maximum_chunk_size =
google::storage::v2::ServiceConstants::MAX_WRITE_CHUNK_BYTES;
Expand Down Expand Up @@ -660,15 +645,22 @@ StatusOr<QueryResumableUploadResponse> GrpcClient::UploadChunk(
auto options = grpc::WriteOptions();
MaybeFinalize(write_request, options, request, has_more);

auto pending = writer->Write(write_request, options);
if (pending.wait_for(timeout) == std::future_status::timeout) {
auto watchdog = create_watchdog().then([&writer](auto f) {
if (!f.get()) return false;
writer->Cancel();
pending.then(WaitForIdle{std::move(writer)});
return true;
});
auto success = writer->Write(write_request, options);
watchdog.cancel();
if (watchdog.get()) {
// The stream is cancelled, but we need to close it explicitly.
writer->Close();
writer.reset();
return false;
}

sent_last_message = options.is_last_message();
if (!pending.get()) return false;
if (!success) return false;
// After the first message, clear the object specification and checksums,
// there is no need to resend it.
write_request.clear_write_object_spec();
Expand All @@ -679,8 +671,8 @@ StatusOr<QueryResumableUploadResponse> GrpcClient::UploadChunk(
};

auto close_writer = [&]() -> StatusOr<QueryResumableUploadResponse> {
return CloseWriteObjectStream(timeout, std::move(writer), sent_last_message,
options());
return CloseWriteObjectStream(timeout, create_watchdog, std::move(writer),
sent_last_message, options());
};

auto buffers = request.payload();
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/storage/internal/grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "google/cloud/storage/internal/raw_client.h"
#include "google/cloud/storage/version.h"
#include "google/cloud/background_threads.h"
#include "google/cloud/internal/async_streaming_write_rpc.h"
#include "google/cloud/internal/streaming_write_rpc.h"
#include <google/storage/v2/storage.pb.h>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -56,7 +56,7 @@ class GrpcClient : public RawClient,

~GrpcClient() override = default;

using WriteObjectStream = ::google::cloud::internal::AsyncStreamingWriteRpc<
using WriteObjectStream = ::google::cloud::internal::StreamingWriteRpc<
google::storage::v2::WriteObjectRequest,
google::storage::v2::WriteObjectResponse>;

Expand Down
178 changes: 74 additions & 104 deletions google/cloud/storage/internal/grpc_client_upload_chunk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "google/cloud/grpc_options.h"
#include "google/cloud/options.h"
#include "google/cloud/testing_util/is_proto_equal.h"
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
#include "google/cloud/testing_util/status_matchers.h"
#include "absl/memory/memory.h"
#include <gmock/gmock.h>
Expand All @@ -28,76 +29,42 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace internal {
namespace {

using ::google::cloud::storage::testing::MockAsyncInsertStream;
using ::google::cloud::storage::testing::MockInsertStream;
using ::google::cloud::storage::testing::MockStorageStub;
using ::google::cloud::testing_util::MockCompletionQueueImpl;
using ::google::cloud::testing_util::StatusIs;
using ::testing::ByMove;
using ::testing::HasSubstr;
using ::testing::Return;

/// @verify that stall timeouts are reported correctly.
TEST(GrpcClientUploadChunkTest, StallTimeoutStart) {
// The mock will satisfy this promise when `Cancel()` is called.
promise<void> hold_response;

auto mock = std::make_shared<MockStorageStub>();
EXPECT_CALL(*mock, AsyncWriteObject)
.WillOnce([&](google::cloud::CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>) {
::testing::InSequence sequence;
auto stream = absl::make_unique<MockAsyncInsertStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return hold_response.get_future().then(
[](future<void>) { return false; });
});
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
hold_response.set_value();
});
EXPECT_CALL(*stream, Finish)
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(google::storage::v2::WriteObjectResponse{})))));
return stream;
});

auto client = GrpcClient::CreateMock(mock);
google::cloud::internal::OptionsSpan const span(
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
auto response = client->UploadChunk(UploadChunkRequest(
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
EXPECT_THAT(response,
StatusIs(StatusCode::kDeadlineExceeded, HasSubstr("Start()")));
}

/// @verify that stall timeouts are reported correctly.
TEST(GrpcClientUploadChunkTest, StallTimeoutWrite) {
// The mock will satisfy this promise when `Cancel()` is called.
promise<void> hold_response;

auto mock = std::make_shared<MockStorageStub>();
EXPECT_CALL(*mock, AsyncWriteObject)
.WillOnce([&](google::cloud::CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>) {
EXPECT_CALL(*mock, WriteObject)
.WillOnce([&](std::unique_ptr<grpc::ClientContext>) {
::testing::InSequence sequence;
auto stream = absl::make_unique<MockAsyncInsertStream>();
EXPECT_CALL(*stream, Start)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, Write).WillOnce([&] {
return hold_response.get_future().then(
[](future<void>) { return false; });
});
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
hold_response.set_value();
});
EXPECT_CALL(*stream, Finish)
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(google::storage::v2::WriteObjectResponse{})))));
auto stream = absl::make_unique<MockInsertStream>();
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Write).WillOnce(Return(false));
EXPECT_CALL(*stream, Close)
.WillOnce(Return(
make_status_or(google::storage::v2::WriteObjectResponse{})));
return stream;
});

auto client = GrpcClient::CreateMock(mock);
auto const expected = std::chrono::seconds(42);
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer(std::chrono::nanoseconds(expected)))
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(std::chrono::system_clock::now())))));
auto cq = CompletionQueue(mock_cq);

auto client = GrpcClient::CreateMock(
mock, Options{}
.set<TransferStallTimeoutOption>(expected)
.set<GrpcCompletionQueueOption>(cq));
google::cloud::internal::OptionsSpan const span(
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
Options{}.set<TransferStallTimeoutOption>(expected));
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
auto response = client->UploadChunk(UploadChunkRequest(
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
Expand All @@ -107,35 +74,35 @@ TEST(GrpcClientUploadChunkTest, StallTimeoutWrite) {

/// @verify that stall timeouts are reported correctly.
TEST(GrpcClientUploadChunkTest, StallTimeoutWritesDone) {
// The mock will satisfy this promise when `Cancel()` is called.
promise<void> hold_response;

auto mock = std::make_shared<MockStorageStub>();
EXPECT_CALL(*mock, AsyncWriteObject)
.WillOnce([&](google::cloud::CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>) {
EXPECT_CALL(*mock, WriteObject)
.WillOnce([&](std::unique_ptr<grpc::ClientContext>) {
::testing::InSequence sequence;
auto stream = absl::make_unique<MockAsyncInsertStream>();
EXPECT_CALL(*stream, Start)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, Write)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, WritesDone).WillOnce([&] {
return hold_response.get_future().then(
[](future<void>) { return false; });
});
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
hold_response.set_value();
});
EXPECT_CALL(*stream, Finish)
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(google::storage::v2::WriteObjectResponse{})))));
auto stream = absl::make_unique<MockInsertStream>();
EXPECT_CALL(*stream, Write).WillOnce(Return(true));
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Write).WillOnce(Return(false));
EXPECT_CALL(*stream, Close)
.WillOnce(Return(google::storage::v2::WriteObjectResponse{}));
return stream;
});

auto client = GrpcClient::CreateMock(mock);
auto const expected = std::chrono::seconds(42);
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer(std::chrono::nanoseconds(expected)))
.WillOnce(Return(ByMove(
make_ready_future(StatusOr<std::chrono::system_clock::time_point>(
Status{StatusCode::kCancelled, "test-only"})))))
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(std::chrono::system_clock::now())))));
auto cq = CompletionQueue(mock_cq);

auto client = GrpcClient::CreateMock(
mock, Options{}
.set<TransferStallTimeoutOption>(expected)
.set<GrpcCompletionQueueOption>(cq));
google::cloud::internal::OptionsSpan const span(
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
Options{}.set<TransferStallTimeoutOption>(expected));
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
auto response = client->UploadChunk(UploadChunkRequest(
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
Expand All @@ -144,41 +111,44 @@ TEST(GrpcClientUploadChunkTest, StallTimeoutWritesDone) {
}

/// @verify that stall timeouts are reported correctly.
TEST(GrpcClientUploadChunkTest, StallTimeoutFinish) {
// The mock will satisfy this promise when `Cancel()` is called.
promise<void> hold_response;

TEST(GrpcClientUploadChunkTest, StallTimeoutClose) {
auto mock = std::make_shared<MockStorageStub>();
EXPECT_CALL(*mock, AsyncWriteObject)
.WillOnce([&](google::cloud::CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>) {
EXPECT_CALL(*mock, WriteObject)
.WillOnce([&](std::unique_ptr<grpc::ClientContext>) {
::testing::InSequence sequence;
auto stream = absl::make_unique<MockAsyncInsertStream>();
EXPECT_CALL(*stream, Start)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, Write)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, WritesDone)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return hold_response.get_future().then([](future<void>) {
return make_status_or(google::storage::v2::WriteObjectResponse{});
});
});
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
hold_response.set_value();
});
auto stream = absl::make_unique<MockInsertStream>();
EXPECT_CALL(*stream, Write).Times(2).WillRepeatedly(Return(true));
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Close)
.WillOnce(Return(
make_status_or(google::storage::v2::WriteObjectResponse{})));
return stream;
});

auto client = GrpcClient::CreateMock(mock);
auto const expected = std::chrono::seconds(42);
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer(std::chrono::nanoseconds(expected)))
.WillOnce(Return(ByMove(
make_ready_future(StatusOr<std::chrono::system_clock::time_point>(
Status{StatusCode::kCancelled, "test-only"})))))
.WillOnce(Return(ByMove(
make_ready_future(StatusOr<std::chrono::system_clock::time_point>(
Status{StatusCode::kCancelled, "test-only"})))))
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(std::chrono::system_clock::now())))));
auto cq = CompletionQueue(mock_cq);

auto client = GrpcClient::CreateMock(
mock, Options{}
.set<TransferStallTimeoutOption>(expected)
.set<GrpcCompletionQueueOption>(cq));
google::cloud::internal::OptionsSpan const span(
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
Options{}.set<TransferStallTimeoutOption>(expected));
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
auto response = client->UploadChunk(UploadChunkRequest(
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
EXPECT_THAT(response,
StatusIs(StatusCode::kDeadlineExceeded, HasSubstr("Finish()")));
StatusIs(StatusCode::kDeadlineExceeded, HasSubstr("Close()")));
}

} // namespace
Expand Down

0 comments on commit 71b28cc

Please sign in to comment.