Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(GCS+gRPC): more efficient WriteObject() stall timeouts #9576

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 40 additions & 48 deletions google/cloud/storage/internal/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,6 @@ StatusOr<ObjectAccessControl> FindDefaultObjectAccessControl(
"> in object id " + response->id());
}

std::chrono::milliseconds DefaultTransferStallTimeout(
std::chrono::milliseconds value) {
if (value != std::chrono::milliseconds(0)) return value;
// We need a large value for `wait_for()`, but not so large that it can easily
// overflow. Fortunately, uploads automatically cancel (server side) after
// 7 days, so waiting for 14 days will not create spurious timeouts.
return std::chrono::milliseconds(std::chrono::hours(24) * 14);
}

// If this is the last `Write()` call of the last `UploadChunk()` set the flags
// to finalize the request
void MaybeFinalize(google::storage::v2::WriteObjectRequest& write_request,
Expand Down Expand Up @@ -129,39 +120,34 @@ Status TimeoutError(std::chrono::milliseconds timeout, std::string const& op) {
"] while waiting for " + op);
}

struct WaitForFinish {
std::unique_ptr<GrpcClient::WriteObjectStream> stream;
void operator()(future<StatusOr<google::storage::v2::WriteObjectResponse>>) {}
};

struct WaitForIdle {
std::unique_ptr<GrpcClient::WriteObjectStream> stream;
void operator()(future<bool>) {
auto finish = stream->Finish();
(void)finish.then(WaitForFinish{std::move(stream)});
}
};

StatusOr<QueryResumableUploadResponse> CloseWriteObjectStream(
std::chrono::milliseconds timeout,
std::function<future<bool>()> const& create_watchdog,
std::unique_ptr<GrpcClient::WriteObjectStream> writer,
bool sent_last_message, google::cloud::Options const& options) {
if (!writer) return TimeoutError(timeout, "Write()");
if (!sent_last_message) {
auto pending = writer->WritesDone();
if (pending.wait_for(timeout) == std::future_status::timeout) {
auto watchdog = create_watchdog().then([&writer](auto f) {
if (!f.get()) return false;
writer->Cancel();
pending.then(WaitForIdle{std::move(writer)});
return true;
});
(void)writer->Write(google::storage::v2::WriteObjectRequest{},
grpc::WriteOptions().set_last_message());
watchdog.cancel();
if (watchdog.get()) {
writer->Close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need another watchdog that might cancel this Close() operation? we use one a few lines below here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If we get to this point the streaming RPC is already canceled. Setting up another watchdog, just to cancel the request again seems unlikely to speed up things.

return TimeoutError(timeout, "WritesDone()");
}
}
auto pending = writer->Finish();
if (pending.wait_for(timeout) == std::future_status::timeout) {
auto watchdog = create_watchdog().then([&writer](auto f) {
if (!f.get()) return false;
writer->Cancel();
pending.then(WaitForFinish{std::move(writer)});
return TimeoutError(timeout, "Finish()");
}
auto response = pending.get();
return true;
});
auto response = writer->Close();
watchdog.cancel();
if (watchdog.get()) return TimeoutError(timeout, "Close()");
if (!response) return std::move(response).status();
return GrpcObjectRequestParser::FromProto(*std::move(response), options);
}
Expand Down Expand Up @@ -619,20 +605,19 @@ StatusOr<EmptyResponse> GrpcClient::DeleteResumableUpload(

StatusOr<QueryResumableUploadResponse> GrpcClient::UploadChunk(
UploadChunkRequest const& request) {
auto const timeout =
DefaultTransferStallTimeout(google::cloud::internal::CurrentOptions()
.get<TransferStallTimeoutOption>());
auto const timeout = google::cloud::internal::CurrentOptions()
.get<TransferStallTimeoutOption>();
auto create_watchdog = [cq = background_->cq(), timeout]() mutable {
if (timeout == std::chrono::seconds(0)) {
return make_ready_future(false);
}
return cq.MakeRelativeTimer(timeout).then(
[](auto f) { return f.get().ok(); });
};

auto context = absl::make_unique<grpc::ClientContext>();
ApplyQueryParameters(*context, request, "resource");
auto writer = stub_->AsyncWriteObject(background_->cq(), std::move(context));

auto pending_start = writer->Start();
if (pending_start.wait_for(timeout) == std::future_status::timeout) {
writer->Cancel();
pending_start.then(WaitForIdle{std::move(writer)});
return TimeoutError(timeout, "Start()");
}
auto writer = stub_->WriteObject(std::move(context));

std::size_t const maximum_chunk_size =
google::storage::v2::ServiceConstants::MAX_WRITE_CHUNK_BYTES;
Expand Down Expand Up @@ -660,15 +645,22 @@ StatusOr<QueryResumableUploadResponse> GrpcClient::UploadChunk(
auto options = grpc::WriteOptions();
MaybeFinalize(write_request, options, request, has_more);

auto pending = writer->Write(write_request, options);
if (pending.wait_for(timeout) == std::future_status::timeout) {
auto watchdog = create_watchdog().then([&writer](auto f) {
if (!f.get()) return false;
writer->Cancel();
pending.then(WaitForIdle{std::move(writer)});
return true;
});
auto success = writer->Write(write_request, options);
watchdog.cancel();
if (watchdog.get()) {
// The stream is cancelled, but we need to close it explicitly.
writer->Close();
writer.reset();
return false;
}

sent_last_message = options.is_last_message();
if (!pending.get()) return false;
if (!success) return false;
// After the first message, clear the object specification and checksums,
// there is no need to resend it.
write_request.clear_write_object_spec();
Expand All @@ -679,8 +671,8 @@ StatusOr<QueryResumableUploadResponse> GrpcClient::UploadChunk(
};

auto close_writer = [&]() -> StatusOr<QueryResumableUploadResponse> {
return CloseWriteObjectStream(timeout, std::move(writer), sent_last_message,
options());
return CloseWriteObjectStream(timeout, create_watchdog, std::move(writer),
sent_last_message, options());
};

auto buffers = request.payload();
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/storage/internal/grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "google/cloud/storage/internal/raw_client.h"
#include "google/cloud/storage/version.h"
#include "google/cloud/background_threads.h"
#include "google/cloud/internal/async_streaming_write_rpc.h"
#include "google/cloud/internal/streaming_write_rpc.h"
#include <google/storage/v2/storage.pb.h>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -56,7 +56,7 @@ class GrpcClient : public RawClient,

~GrpcClient() override = default;

using WriteObjectStream = ::google::cloud::internal::AsyncStreamingWriteRpc<
using WriteObjectStream = ::google::cloud::internal::StreamingWriteRpc<
google::storage::v2::WriteObjectRequest,
google::storage::v2::WriteObjectResponse>;

Expand Down
178 changes: 74 additions & 104 deletions google/cloud/storage/internal/grpc_client_upload_chunk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "google/cloud/grpc_options.h"
#include "google/cloud/options.h"
#include "google/cloud/testing_util/is_proto_equal.h"
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
#include "google/cloud/testing_util/status_matchers.h"
#include "absl/memory/memory.h"
#include <gmock/gmock.h>
Expand All @@ -28,76 +29,42 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace internal {
namespace {

using ::google::cloud::storage::testing::MockAsyncInsertStream;
using ::google::cloud::storage::testing::MockInsertStream;
using ::google::cloud::storage::testing::MockStorageStub;
using ::google::cloud::testing_util::MockCompletionQueueImpl;
using ::google::cloud::testing_util::StatusIs;
using ::testing::ByMove;
using ::testing::HasSubstr;
using ::testing::Return;

/// @verify that stall timeouts are reported correctly.
TEST(GrpcClientUploadChunkTest, StallTimeoutStart) {
// The mock will satisfy this promise when `Cancel()` is called.
promise<void> hold_response;

auto mock = std::make_shared<MockStorageStub>();
EXPECT_CALL(*mock, AsyncWriteObject)
.WillOnce([&](google::cloud::CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>) {
::testing::InSequence sequence;
auto stream = absl::make_unique<MockAsyncInsertStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return hold_response.get_future().then(
[](future<void>) { return false; });
});
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
hold_response.set_value();
});
EXPECT_CALL(*stream, Finish)
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(google::storage::v2::WriteObjectResponse{})))));
return stream;
});

auto client = GrpcClient::CreateMock(mock);
google::cloud::internal::OptionsSpan const span(
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
auto response = client->UploadChunk(UploadChunkRequest(
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
EXPECT_THAT(response,
StatusIs(StatusCode::kDeadlineExceeded, HasSubstr("Start()")));
}

/// @verify that stall timeouts are reported correctly.
TEST(GrpcClientUploadChunkTest, StallTimeoutWrite) {
// The mock will satisfy this promise when `Cancel()` is called.
promise<void> hold_response;

auto mock = std::make_shared<MockStorageStub>();
EXPECT_CALL(*mock, AsyncWriteObject)
.WillOnce([&](google::cloud::CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>) {
EXPECT_CALL(*mock, WriteObject)
.WillOnce([&](std::unique_ptr<grpc::ClientContext>) {
::testing::InSequence sequence;
auto stream = absl::make_unique<MockAsyncInsertStream>();
EXPECT_CALL(*stream, Start)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, Write).WillOnce([&] {
return hold_response.get_future().then(
[](future<void>) { return false; });
});
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
hold_response.set_value();
});
EXPECT_CALL(*stream, Finish)
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(google::storage::v2::WriteObjectResponse{})))));
auto stream = absl::make_unique<MockInsertStream>();
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Write).WillOnce(Return(false));
EXPECT_CALL(*stream, Close)
.WillOnce(Return(
make_status_or(google::storage::v2::WriteObjectResponse{})));
return stream;
});

auto client = GrpcClient::CreateMock(mock);
auto const expected = std::chrono::seconds(42);
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer(std::chrono::nanoseconds(expected)))
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(std::chrono::system_clock::now())))));
auto cq = CompletionQueue(mock_cq);

auto client = GrpcClient::CreateMock(
mock, Options{}
.set<TransferStallTimeoutOption>(expected)
.set<GrpcCompletionQueueOption>(cq));
google::cloud::internal::OptionsSpan const span(
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
Options{}.set<TransferStallTimeoutOption>(expected));
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
auto response = client->UploadChunk(UploadChunkRequest(
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
Expand All @@ -107,35 +74,35 @@ TEST(GrpcClientUploadChunkTest, StallTimeoutWrite) {

/// @verify that stall timeouts are reported correctly.
TEST(GrpcClientUploadChunkTest, StallTimeoutWritesDone) {
// The mock will satisfy this promise when `Cancel()` is called.
promise<void> hold_response;

auto mock = std::make_shared<MockStorageStub>();
EXPECT_CALL(*mock, AsyncWriteObject)
.WillOnce([&](google::cloud::CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>) {
EXPECT_CALL(*mock, WriteObject)
.WillOnce([&](std::unique_ptr<grpc::ClientContext>) {
::testing::InSequence sequence;
auto stream = absl::make_unique<MockAsyncInsertStream>();
EXPECT_CALL(*stream, Start)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, Write)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, WritesDone).WillOnce([&] {
return hold_response.get_future().then(
[](future<void>) { return false; });
});
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
hold_response.set_value();
});
EXPECT_CALL(*stream, Finish)
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(google::storage::v2::WriteObjectResponse{})))));
auto stream = absl::make_unique<MockInsertStream>();
EXPECT_CALL(*stream, Write).WillOnce(Return(true));
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Write).WillOnce(Return(false));
EXPECT_CALL(*stream, Close)
.WillOnce(Return(google::storage::v2::WriteObjectResponse{}));
return stream;
});

auto client = GrpcClient::CreateMock(mock);
auto const expected = std::chrono::seconds(42);
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer(std::chrono::nanoseconds(expected)))
.WillOnce(Return(ByMove(
make_ready_future(StatusOr<std::chrono::system_clock::time_point>(
Status{StatusCode::kCancelled, "test-only"})))))
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(std::chrono::system_clock::now())))));
auto cq = CompletionQueue(mock_cq);

auto client = GrpcClient::CreateMock(
mock, Options{}
.set<TransferStallTimeoutOption>(expected)
.set<GrpcCompletionQueueOption>(cq));
google::cloud::internal::OptionsSpan const span(
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
Options{}.set<TransferStallTimeoutOption>(expected));
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
auto response = client->UploadChunk(UploadChunkRequest(
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
Expand All @@ -144,41 +111,44 @@ TEST(GrpcClientUploadChunkTest, StallTimeoutWritesDone) {
}

/// @verify that stall timeouts are reported correctly.
TEST(GrpcClientUploadChunkTest, StallTimeoutFinish) {
// The mock will satisfy this promise when `Cancel()` is called.
promise<void> hold_response;

TEST(GrpcClientUploadChunkTest, StallTimeoutClose) {
auto mock = std::make_shared<MockStorageStub>();
EXPECT_CALL(*mock, AsyncWriteObject)
.WillOnce([&](google::cloud::CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>) {
EXPECT_CALL(*mock, WriteObject)
.WillOnce([&](std::unique_ptr<grpc::ClientContext>) {
::testing::InSequence sequence;
auto stream = absl::make_unique<MockAsyncInsertStream>();
EXPECT_CALL(*stream, Start)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, Write)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, WritesDone)
.WillOnce(Return(ByMove(make_ready_future(true))));
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return hold_response.get_future().then([](future<void>) {
return make_status_or(google::storage::v2::WriteObjectResponse{});
});
});
EXPECT_CALL(*stream, Cancel).WillOnce([&] {
hold_response.set_value();
});
auto stream = absl::make_unique<MockInsertStream>();
EXPECT_CALL(*stream, Write).Times(2).WillRepeatedly(Return(true));
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Close)
.WillOnce(Return(
make_status_or(google::storage::v2::WriteObjectResponse{})));
return stream;
});

auto client = GrpcClient::CreateMock(mock);
auto const expected = std::chrono::seconds(42);
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer(std::chrono::nanoseconds(expected)))
.WillOnce(Return(ByMove(
make_ready_future(StatusOr<std::chrono::system_clock::time_point>(
Status{StatusCode::kCancelled, "test-only"})))))
.WillOnce(Return(ByMove(
make_ready_future(StatusOr<std::chrono::system_clock::time_point>(
Status{StatusCode::kCancelled, "test-only"})))))
.WillOnce(Return(ByMove(make_ready_future(
make_status_or(std::chrono::system_clock::now())))));
auto cq = CompletionQueue(mock_cq);

auto client = GrpcClient::CreateMock(
mock, Options{}
.set<TransferStallTimeoutOption>(expected)
.set<GrpcCompletionQueueOption>(cq));
google::cloud::internal::OptionsSpan const span(
Options{}.set<TransferStallTimeoutOption>(std::chrono::seconds(1)));
Options{}.set<TransferStallTimeoutOption>(expected));
auto const payload = std::string(UploadChunkRequest::kChunkSizeQuantum, 'A');
auto response = client->UploadChunk(UploadChunkRequest(
"test-only-upload-id", /*offset=*/0, {ConstBuffer{payload}}));
EXPECT_THAT(response,
StatusIs(StatusCode::kDeadlineExceeded, HasSubstr("Finish()")));
StatusIs(StatusCode::kDeadlineExceeded, HasSubstr("Close()")));
}

} // namespace
Expand Down