Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigtable): Use retry policy on streams with failing mutations #9706

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions google/cloud/bigtable/internal/async_bulk_apply.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,15 @@ void AsyncBulkApplier::OnRead(
}

void AsyncBulkApplier::OnFinish(Status const& status) {
auto const is_retryable = status.ok() || retry_policy_->OnFailure(status);
state_.OnFinish(status);
if (!state_.HasPendingMutations() || !is_retryable) {
if (!state_.HasPendingMutations() || !retry_policy_->OnFailure(status)) {
SetPromise();
return;
}

using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;

auto self = this->shared_from_this();
cq_.MakeRelativeTimer(backoff_policy_->OnCompletion())
.then([self](TimerFuture result) {
.then([self](auto result) {
if (result.get()) {
self->StartIteration();
} else {
Expand Down
61 changes: 61 additions & 0 deletions google/cloud/bigtable/internal/async_bulk_apply_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,67 @@ TEST(AsyncBulkApplyTest, CurrentOptionsContinuedOnRetries) {
timer_promise.set_value(make_status_or(std::chrono::system_clock::now()));
}

TEST(AsyncBulkApplyTest, RetriesOkStreamWithFailedMutations) {
std::vector<bigtable::FailedMutation> expected = {
{Status(StatusCode::kUnavailable, "try again"), 0}};
bigtable::BulkMutation mut(IdempotentMutation("r1"));

auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, AsyncMutateRows)
.Times(kNumRetries + 1)
.WillRepeatedly([](CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>,
v2::MutateRowsRequest const& request) {
EXPECT_EQ(kAppProfile, request.app_profile_id());
EXPECT_EQ(kTableName, request.table_name());
EXPECT_THAT(request.entries(), ElementsAre(MatchEntry("r1")));
auto stream = absl::make_unique<MockAsyncMutateRowsStream>();
EXPECT_CALL(*stream, Start).WillOnce([] {
return make_ready_future(true);
});
// The overall stream succeeds, but it contains a failed mutation.
// Our retry and backoff policies should take effect.
EXPECT_CALL(*stream, Read)
.WillOnce([] {
return make_ready_future(
MakeResponse({{0, grpc::StatusCode::UNAVAILABLE}}));
})
.WillOnce([] {
return make_ready_future(
absl::optional<v2::MutateRowsResponse>{});
});
EXPECT_CALL(*stream, Finish).WillOnce([] {
return make_ready_future(Status());
});
return stream;
});

auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer)
.Times(kNumRetries)
.WillRepeatedly([] {
return make_ready_future(
make_status_or(std::chrono::system_clock::now()));
});
CompletionQueue cq(mock_cq);

auto retry = DataLimitedErrorCountRetryPolicy(kNumRetries).clone();
auto mock_b = absl::make_unique<MockBackoffPolicy>();
EXPECT_CALL(*mock_b, OnCompletion).Times(kNumRetries);
auto idempotency = bigtable::DefaultIdempotentMutationPolicy();

MockFunction<void(grpc::ClientContext&)> mock_setup;
EXPECT_CALL(mock_setup, Call).Times(kNumRetries + 1);
internal::OptionsSpan span(
Options{}.set<internal::GrpcSetupOption>(mock_setup.AsStdFunction()));

auto actual = AsyncBulkApplier::Create(
cq, mock, std::move(retry), std::move(mock_b), *idempotency, kAppProfile,
kTableName, std::move(mut));

CheckFailedMutations(actual.get(), expected);
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable_internal
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/bigtable/internal/data_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ std::vector<bigtable::FailedMutation> DataConnectionImpl::BulkApply(
// micro-optimization.
std::unique_ptr<bigtable::DataRetryPolicy> retry;
std::unique_ptr<BackoffPolicy> backoff;
do {
while (true) {
auto status = mutator.MakeOneRequest(*stub_);
if (status.ok()) continue;
if (!mutator.HasPendingMutations()) break;
if (!retry) retry = retry_policy();
if (!retry->OnFailure(status)) break;
if (!backoff) backoff = backoff_policy();
auto delay = backoff->OnCompletion();
std::this_thread::sleep_for(delay);
} while (mutator.HasPendingMutations());
}
return std::move(mutator).OnRetryDone();
}

Expand Down
37 changes: 37 additions & 0 deletions google/cloud/bigtable/internal/data_connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,43 @@ TEST(DataConnectionTest, BulkApplyNoSleepIfNoPendingMutations) {
(void)conn->BulkApply(kTableName, std::move(mut));
}

TEST(DataConnectionTest, BulkApplyRetriesOkStreamWithFailedMutations) {
std::vector<bigtable::FailedMutation> expected = {
{Status(StatusCode::kUnavailable, "try again"), 0}};
bigtable::BulkMutation mut(IdempotentMutation("r1"));

auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, MutateRows)
.Times(kNumRetries + 1)
.WillRepeatedly(
[](std::unique_ptr<grpc::ClientContext>,
google::bigtable::v2::MutateRowsRequest const& request) {
EXPECT_EQ(kAppProfile, request.app_profile_id());
EXPECT_EQ(kTableName, request.table_name());
auto stream = absl::make_unique<MockMutateRowsStream>();
// The overall stream succeeds, but it contains failed mutations.
// Our retry and backoff policies should take effect.
EXPECT_CALL(*stream, Read)
.WillOnce(Return(MakeBulkApplyResponse(
{{0, grpc::StatusCode::UNAVAILABLE}})))
.WillOnce(Return(Status()));
return stream;
});

auto mock_b = absl::make_unique<MockBackoffPolicy>();
EXPECT_CALL(*mock_b, clone).WillOnce([]() {
auto clone = absl::make_unique<MockBackoffPolicy>();
EXPECT_CALL(*clone, OnCompletion).Times(kNumRetries);
return clone;
});

auto conn = TestConnection(std::move(mock));
internal::OptionsSpan span(
CallOptions().set<DataBackoffPolicyOption>(std::move(mock_b)));
auto actual = conn->BulkApply(kTableName, std::move(mut));
CheckFailedMutations(actual, expected);
}

// The `AsyncBulkApplier` is tested extensively in `async_bulk_apply_test.cc`.
// In this test, we just verify that the configuration is passed along.
TEST(DataConnectionTest, AsyncBulkApply) {
Expand Down
7 changes: 2 additions & 5 deletions google/cloud/bigtable/internal/legacy_async_bulk_apply.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,15 @@ void AsyncRetryBulkApply::OnRead(
}

void AsyncRetryBulkApply::OnFinish(CompletionQueue cq, Status const& status) {
auto const is_retryable = status.ok() || rpc_retry_policy_->OnFailure(status);
state_.OnFinish(status);
if (!state_.HasPendingMutations() || !is_retryable) {
if (!state_.HasPendingMutations() || !rpc_retry_policy_->OnFailure(status)) {
SetPromise();
return;
}

using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;

auto self = this->shared_from_this();
cq.MakeRelativeTimer(rpc_backoff_policy_->OnCompletion(status))
.then([self, cq](TimerFuture result) {
.then([self, cq](auto result) {
if (result.get()) {
self->StartIteration(std::move(cq));
} else {
Expand Down
50 changes: 50 additions & 0 deletions google/cloud/bigtable/internal/legacy_async_bulk_apply_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,56 @@ TEST_F(AsyncBulkApplyTest, TooManyFailures) {
EXPECT_TRUE(cq_impl_->empty());
}

TEST_F(AsyncBulkApplyTest, RetryPolicyUsedForOkStreamsWithFailedMutations) {
bigtable::BulkMutation mut{bigtable::SingleRowMutation(
"row", {bigtable::SetCell("f", "c", 0_ms, "v2")})};

// We give up on the 3rd error.
auto constexpr kErrorCount = 2;

EXPECT_CALL(*client_, PrepareAsyncMutateRows)
.Times(kErrorCount + 1)
.WillRepeatedly([](grpc::ClientContext*,
btproto::MutateRowsRequest const&,
grpc::CompletionQueue*) {
auto reader = absl::make_unique<
MockClientAsyncReaderInterface<btproto::MutateRowsResponse>>();
EXPECT_CALL(*reader, Read)
.WillOnce([](btproto::MutateRowsResponse* r, void*) {
auto& r1 = *r->add_entries();
r1.set_index(0);
r1.mutable_status()->set_code(grpc::StatusCode::UNAVAILABLE);
})
.WillOnce([](btproto::MutateRowsResponse*, void*) {});
EXPECT_CALL(*reader, Finish).WillOnce([](grpc::Status* status, void*) {
*status = grpc::Status::OK;
});
EXPECT_CALL(*reader, StartCall);
return reader;
});

auto limited_retry_policy = LimitedErrorCountRetryPolicy(kErrorCount);
auto bulk_apply_future = internal::AsyncRetryBulkApply::Create(
cq_, limited_retry_policy.clone(), rpc_backoff_policy_->clone(),
*idempotent_mutation_policy_, metadata_update_policy_, client_,
"my-app-profile", "my-table", std::move(mut));

for (int retry = 0; retry != kErrorCount; ++retry) {
SimulateIteration();
// simulate the backoff timer
cq_impl_->SimulateCompletion(true);
ASSERT_EQ(1U, cq_impl_->size());
}

SimulateIteration();

auto failures = StatusOnly(bulk_apply_future.get());
EXPECT_THAT(failures, ElementsAre(StatusIs(StatusCode::kUnavailable)));

ASSERT_EQ(0U, cq_impl_->size());
EXPECT_TRUE(cq_impl_->empty());
}

TEST_F(AsyncBulkApplyTest, UsesBackoffPolicy) {
bigtable::BulkMutation mut{
bigtable::SingleRowMutation("foo2",
Expand Down
4 changes: 1 addition & 3 deletions google/cloud/bigtable/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,8 @@ std::vector<FailedMutation> Table::BulkApply(BulkMutation mut, Options opts) {
retry_policy->Setup(client_context);
metadata_update_policy_.Setup(client_context);
status = mutator.MakeOneRequest(*client_, client_context);
if (!status.ok() && !retry_policy->OnFailure(status)) {
break;
}
if (!mutator.HasPendingMutations()) break;
if (!retry_policy->OnFailure(status)) break;
auto delay = backoff_policy->OnCompletion(status);
std::this_thread::sleep_for(delay);
}
Expand Down
12 changes: 0 additions & 12 deletions google/cloud/bigtable/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,6 @@ class Table {
* It is possible that some mutations may not be attempted at all. These
* mutations are considered failing and will be returned.
*
* @note The retry policy is only impacted by the result of the gRPC stream.
* Let's say you have a `LimitedErrorCountRetryPolicy` of 2. If an
* idempotent mutation fails with a retryable error and the stream itself
* succeeds, it may be retried more than 2 times. Only when the stream
* fails twice will we give up and consider the mutation to be failed.
*
* @note This function takes ownership (and then discards) the data in the
* mutation. In general, a `BulkMutation` can modify multiple rows, and
* the modifications for each row can change (or create) multiple cells,
Expand Down Expand Up @@ -517,12 +511,6 @@ class Table {
* It is possible that some mutations may not be attempted at all. These
* mutations are considered failing and will be returned.
*
* @note The retry policy is only impacted by the result of the gRPC stream.
* Let's say you have a `LimitedErrorCountRetryPolicy` of 2. If an
* idempotent mutation fails with a retryable error and the stream itself
* succeeds, it may be retried more than 2 times. Only when the stream
* fails twice will we give up and consider the mutation to be failed.
*
* @note This function takes ownership (and then discards) the data in the
* mutation. In general, a `BulkMutation` can modify multiple rows, and
* the modifications for each row can change (or create) multiple cells,
Expand Down
35 changes: 35 additions & 0 deletions google/cloud/bigtable/table_bulk_apply_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,41 @@ TEST_F(TableBulkApplyTest, TooManyFailures) {
failures.front().status().code());
}

TEST_F(TableBulkApplyTest, RetryPolicyUsedForOkStreamWithFailedMutations) {
// Create a table with specific policies so we can test the behavior
// without having to depend on timers expiring. In this case tolerate only
// 3 failures.
Table custom_table(
client_, "foo_table",
// Configure the Table to stop at 3 failures.
LimitedErrorCountRetryPolicy(2),
// Use much shorter backoff than the default to test faster.
ExponentialBackoffPolicy(10_us, 40_us));

auto create_stream = [&](grpc::ClientContext*,
btproto::MutateRowsRequest const&) {
auto stream = absl::make_unique<MockMutateRowsReader>(
"google.bigtable.v2.Bigtable.MutateRows");
EXPECT_CALL(*stream, Read)
.WillOnce([](btproto::MutateRowsResponse* r) {
auto& e0 = *r->add_entries();
e0.set_index(0);
e0.mutable_status()->set_code(grpc::StatusCode::UNAVAILABLE);
return true;
})
.WillOnce(Return(false));
EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
return stream;
};

EXPECT_CALL(*client_, MutateRows).Times(3).WillRepeatedly(create_stream);

auto failures = custom_table.BulkApply(BulkMutation(
SingleRowMutation("bar", {SetCell("fam", "col", 0_ms, "qux")})));
EXPECT_FALSE(failures.empty());
EXPECT_EQ(StatusCode::kUnavailable, failures.front().status().code());
}

/// @test Verify that Table::BulkApply() retries only idempotent mutations.
TEST_F(TableBulkApplyTest, RetryOnlyIdempotent) {
// We will send both idempotent and non-idempotent mutations. We prepare the
Expand Down