Skip to content

Commit

Permalink
impl(bigtable): BulkMutator keeps pending mutations' last status (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc committed Aug 18, 2022
1 parent ab44cf3 commit f174c62
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 28 deletions.
42 changes: 21 additions & 21 deletions google/cloud/bigtable/internal/bulk_mutator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ BulkMutatorState::BulkMutatorState(std::string const& app_profile_id,
});
auto idempotency =
is_idempotent ? Idempotency::kIdempotent : Idempotency::kNonIdempotent;
pending_annotations_.push_back(Annotations{index++, idempotency, false});
pending_annotations_.push_back(
Annotations{index++, idempotency, false, Status()});
}
}

Expand All @@ -73,14 +74,13 @@ google::bigtable::v2::MutateRowsRequest const& BulkMutatorState::BeforeStart() {
pending_mutations_ = {};
pending_mutations_.set_app_profile_id(mutations_.app_profile_id());
pending_mutations_.set_table_name(mutations_.table_name());
pending_annotations_ = {};
pending_annotations_.clear();

return mutations_;
}

std::vector<int> BulkMutatorState::OnRead(
void BulkMutatorState::OnRead(
google::bigtable::v2::MutateRowsResponse response) {
std::vector<int> res;
for (auto& entry : *response.mutable_entries()) {
// The type of `entry.index()` is a 64-bit int. But we can never create more
// than std::numeric_limits<std::size_t>::max() entries in the request
Expand All @@ -98,14 +98,11 @@ std::vector<int> BulkMutatorState::OnRead(
auto const index = static_cast<std::size_t>(entry.index());
auto& annotation = annotations_[index];
annotation.has_mutation_result = true;
auto const status = MakeStatusFromRpcError(entry.status());
auto status = MakeStatusFromRpcError(entry.status());
// Successful responses are not even recorded, this class only reports
// the failures. The data for successful responses is discarded, because
// this class takes ownership in the constructor.
if (status.ok()) {
res.push_back(annotation.original_index);
continue;
}
if (status.ok()) continue;
auto& original = *mutations_.mutable_entries(static_cast<int>(index));
// Failed responses are handled according to the current policies.
if (SafeGrpcRetry::IsTransientFailure(status) &&
Expand All @@ -114,23 +111,23 @@ std::vector<int> BulkMutatorState::OnRead(
// mapping from their index in pending_mutations_ to the original
// vector and other miscellanea.
pending_mutations_.add_entries()->Swap(&original);
pending_annotations_.push_back(annotation);
pending_annotations_.push_back(
Annotations{annotation.original_index, annotation.idempotency,
annotation.has_mutation_result, std::move(status)});
} else {
// Failures are saved for reporting, notice that we avoid copying, and
// we use the original index in the first request, not the one where it
// failed.
failures_.emplace_back(std::move(*entry.mutable_status()),
annotation.original_index);
failures_.emplace_back(std::move(status), annotation.original_index);
}
}
return res;
}

void BulkMutatorState::OnFinish(google::cloud::Status finish_status) {
last_status_ = std::move(finish_status);

int index = 0;
for (auto const& annotation : annotations_) {
for (auto& annotation : annotations_) {
if (annotation.has_mutation_result) {
++index;
continue;
Expand All @@ -141,7 +138,7 @@ void BulkMutatorState::OnFinish(google::cloud::Status finish_status) {
// If the mutation was retryable, move it to the pending mutations to try
// again, along with their index.
pending_mutations_.add_entries()->Swap(&original);
pending_annotations_.push_back(annotation);
pending_annotations_.push_back(std::move(annotation));
} else {
if (last_status_.ok()) {
google::cloud::Status status(
Expand All @@ -151,7 +148,7 @@ void BulkMutatorState::OnFinish(google::cloud::Status finish_status) {
"report it at "
"https://github.com/googleapis/google-cloud-cpp/issues/new");
failures_.emplace_back(
FailedMutation(status, annotation.original_index));
FailedMutation(std::move(status), annotation.original_index));
} else {
failures_.emplace_back(
FailedMutation(last_status_, annotation.original_index));
Expand All @@ -166,17 +163,20 @@ std::vector<FailedMutation> BulkMutatorState::OnRetryDone() && {

auto size = pending_mutations_.mutable_entries()->size();
for (int idx = 0; idx != size; idx++) {
int original_index = pending_annotations_[idx].original_index;
if (last_status_.ok()) {
auto& annotation = pending_annotations_[idx];
if (annotation.has_mutation_result) {
result.emplace_back(std::move(annotation.status),
annotation.original_index);
} else if (!last_status_.ok()) {
result.emplace_back(last_status_, annotation.original_index);
} else {
google::cloud::Status status(
google::cloud::StatusCode::kInternal,
"The server never sent a confirmation for this mutation but the "
"stream didn't fail either. This is most likely a bug, please "
"report it at "
"https://github.com/googleapis/google-cloud-cpp/issues/new");
result.emplace_back(status, original_index);
} else {
result.emplace_back(last_status_, original_index);
result.emplace_back(std::move(status), annotation.original_index);
}
}

Expand Down
19 changes: 12 additions & 7 deletions google/cloud/bigtable/internal/bulk_mutator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "google/cloud/bigtable/version.h"
#include "google/cloud/internal/invoke_result.h"
#include "google/cloud/internal/retry_policy.h"
#include "google/cloud/status.h"
#include "absl/memory/memory.h"
#include <string>
#include <vector>
Expand All @@ -45,12 +46,8 @@ class BulkMutatorState {
/// Returns the Request parameter for the next MutateRows() RPC.
google::bigtable::v2::MutateRowsRequest const& BeforeStart();

/**
* Handle the result of a `Read()` operation on the MutateRows RPC.
*
* Returns the original index of any successful operations.
*/
std::vector<int> OnRead(google::bigtable::v2::MutateRowsResponse response);
/// Handle the result of a `Read()` operation on the MutateRows RPC.
void OnRead(google::bigtable::v2::MutateRowsResponse response);

/// Handle the result of a `Finish()` operation on the MutateRows() RPC.
void OnFinish(google::cloud::Status finish_status);
Expand Down Expand Up @@ -92,9 +89,17 @@ class BulkMutatorState {
* request provided by the application.
*/
int original_index;
google::cloud::Idempotency idempotency;
Idempotency idempotency;
/// Set to `false` if the result is unknown.
bool has_mutation_result;
/**
* The last known status for this annotation.
*
* If the final stream attempt has failing mutations, but ends with an OK
* status, we return a `FailedMutation` made from `original_index` and
* `status`. The value is meaningless if `has_mutation_result` is false.
*/
Status status;
};

/// The annotations about the current bulk request.
Expand Down
112 changes: 112 additions & 0 deletions google/cloud/bigtable/internal/bulk_mutator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ using ::google::cloud::testing_util::chrono_literals::operator"" _ms; // NOLINT
using ::google::cloud::bigtable::testing::MockBigtableStub;
using ::google::cloud::bigtable::testing::MockMutateRowsStream;
using ::google::cloud::testing_util::StatusIs;
using ::testing::AnyOf;
using ::testing::Eq;
using ::testing::Matcher;
using ::testing::MockFunction;
Expand Down Expand Up @@ -360,6 +361,117 @@ TEST(BulkMutatorTest, ConfiguresContext) {
(void)mutator.MakeOneRequest(*mock);
}

TEST(BulkMutatorTest, MutationStatusReportedOnOkStream) {
BulkMutation mut(IdempotentMutation("r0"), IdempotentMutation("r1"));

auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, MutateRows)
.WillOnce([](std::unique_ptr<grpc::ClientContext>,
google::bigtable::v2::MutateRowsRequest const& request) {
EXPECT_THAT(request, HasCorrectResourceNames());
auto stream = absl::make_unique<MockMutateRowsStream>();
EXPECT_CALL(*stream, Read)
.WillOnce(
Return(MakeResponse({{0, grpc::StatusCode::UNAVAILABLE}})))
.WillOnce(Return(Status()));
return stream;
});

auto policy = DefaultIdempotentMutationPolicy();
internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
std::move(mut));

auto status = mutator.MakeOneRequest(*mock);
EXPECT_STATUS_OK(status);

auto failures = std::move(mutator).OnRetryDone();
ASSERT_EQ(2UL, failures.size());
// This mutation failed, although the stream succeeded. We should report the
// mutation status.
EXPECT_EQ(0, failures[0].original_index());
EXPECT_THAT(failures[0].status(), StatusIs(StatusCode::kUnavailable));
// The stream was OK, but it did not contain this mutation. Something has gone
// wrong, so we should report an INTERNAL error.
EXPECT_EQ(1, failures[1].original_index());
EXPECT_THAT(failures[1].status(), StatusIs(StatusCode::kInternal));
}

TEST(BulkMutatorTest, ReportEitherRetryableMutationFailOrStreamFail) {
BulkMutation mut(IdempotentMutation("r0"));

auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, MutateRows)
.WillOnce([](std::unique_ptr<grpc::ClientContext>,
google::bigtable::v2::MutateRowsRequest const& request) {
EXPECT_THAT(request, HasCorrectResourceNames());
auto stream = absl::make_unique<MockMutateRowsStream>();
EXPECT_CALL(*stream, Read)
.WillOnce(
Return(MakeResponse({{0, grpc::StatusCode::UNAVAILABLE}})))
.WillOnce(Return(Status(StatusCode::kDataLoss, "stream fail")));
return stream;
});

auto policy = DefaultIdempotentMutationPolicy();
internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
std::move(mut));

auto status = mutator.MakeOneRequest(*mock);
EXPECT_THAT(status, StatusIs(StatusCode::kDataLoss));

auto failures = std::move(mutator).OnRetryDone();
ASSERT_EQ(1UL, failures.size());
EXPECT_EQ(0, failures[0].original_index());
// The mutation fails for one reason, and the stream fails for another. As far
// as I am concerned, both are valid errors to report. The contract of the
// code does not need to be stricter than this.
EXPECT_THAT(failures[0].status(),
StatusIs(AnyOf(StatusCode::kUnavailable, StatusCode::kDataLoss)));
}

TEST(BulkMutatorTest, ReportOnlyLatestMutationStatus) {
// In this test, the mutation fails with an ABORTED status in the first
// response. It is not included in the second response. We should report the
// final stream failure for this mutation, as it is the more informative
// error.
BulkMutation mut(IdempotentMutation("r0"));

auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, MutateRows)
.WillOnce([](std::unique_ptr<grpc::ClientContext>,
google::bigtable::v2::MutateRowsRequest const& request) {
EXPECT_THAT(request, HasCorrectResourceNames());
auto stream = absl::make_unique<MockMutateRowsStream>();
EXPECT_CALL(*stream, Read)
.WillOnce(Return(MakeResponse({{0, grpc::StatusCode::ABORTED}})))
.WillOnce(Return(Status(StatusCode::kUnavailable, "try again")));
return stream;
})
.WillOnce([](std::unique_ptr<grpc::ClientContext>,
google::bigtable::v2::MutateRowsRequest const& request) {
EXPECT_THAT(request, HasCorrectResourceNames());
auto stream = absl::make_unique<MockMutateRowsStream>();
EXPECT_CALL(*stream, Read)
.WillOnce(Return(Status(StatusCode::kDataLoss, "fail")));
return stream;
});

auto policy = DefaultIdempotentMutationPolicy();
internal::BulkMutator mutator(kAppProfile, kTableName, *policy,
std::move(mut));

auto status = mutator.MakeOneRequest(*mock);
EXPECT_THAT(status, StatusIs(StatusCode::kUnavailable));

status = mutator.MakeOneRequest(*mock);
EXPECT_THAT(status, StatusIs(StatusCode::kDataLoss));

auto failures = std::move(mutator).OnRetryDone();
ASSERT_EQ(1UL, failures.size());
EXPECT_EQ(0, failures[0].original_index());
EXPECT_THAT(failures[0].status(), StatusIs(StatusCode::kDataLoss));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable
Expand Down

0 comments on commit f174c62

Please sign in to comment.