Skip to content

Commit

Permalink
feat(spanner): support "data boost" on partitioned queries and reads (#…
Browse files Browse the repository at this point in the history
…10998)

Setting the `spanner::PartitionDataBoostOption` on a `PartitionRead()`
or `PartitionQuery()` will result in the respective partitioned `Read()`
and `ExecuteQuery()` requests being executed via Spanner independent
compute resources.
  • Loading branch information
devbww committed Mar 3, 2023
1 parent 2e04236 commit aea166e
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 52 deletions.
Binary file modified ci/abi-dumps/google_cloud_cpp_spanner.expected.abi.dump.gz
Binary file not shown.
2 changes: 2 additions & 0 deletions google/cloud/spanner/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class Connection {
std::vector<std::string> columns;
ReadOptions read_options;
absl::optional<std::string> partition_token;
bool partition_data_boost = false; // when partition_token
};

/// Wrap the arguments to `PartitionRead()`.
Expand All @@ -93,6 +94,7 @@ class Connection {
SqlStatement statement;
QueryOptions query_options;
absl::optional<std::string> partition_token;
bool partition_data_boost = false; // when partition_token
};

/// Wrap the arguments to `ExecutePartitionedDml()`.
Expand Down
24 changes: 17 additions & 7 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ spanner::RowStream ConnectionImpl::ReadImpl(
request->set_limit(params.read_options.limit);
if (params.partition_token) {
request->set_partition_token(*std::move(params.partition_token));
if (params.partition_data_boost) {
request->set_data_boost_enabled(true);
}
}
request->mutable_request_options()->set_priority(
ProtoRequestPriority(params.read_options.request_priority));
Expand Down Expand Up @@ -552,12 +555,17 @@ StatusOr<std::vector<spanner::ReadPartition>> ConnectionImpl::PartitionReadImpl(
return status;
}

// Note: This is not `ReadParams::data_boost`. While that is the ultimate
// destination for the `PartitionOptions::data_boost` value, first it is
// encapsulated in a `ReadPartition`.
bool const data_boost = partition_options.data_boost;

std::vector<spanner::ReadPartition> read_partitions;
for (auto const& partition : response->partitions()) {
read_partitions.push_back(MakeReadPartition(
response->transaction().id(), ctx.route_to_leader, ctx.tag,
session->session_name(), partition.partition_token(), params.table,
params.keys, params.columns, params.read_options));
params.keys, params.columns, data_boost, params.read_options));
}

return read_partitions;
Expand Down Expand Up @@ -589,6 +597,9 @@ StatusOr<ResultType> ConnectionImpl::ExecuteSqlImpl(
request.set_query_mode(query_mode);
if (params.partition_token) {
request.set_partition_token(*std::move(params.partition_token));
if (params.partition_data_boost) {
request.set_data_boost_enabled(true);
}
}
if (params.query_options.optimizer_version()) {
request.mutable_query_options()->set_optimizer_version(
Expand Down Expand Up @@ -811,8 +822,7 @@ ConnectionImpl::PartitionQueryImpl(
*request.mutable_params() = std::move(*sql_statement.mutable_params());
*request.mutable_param_types() =
std::move(*sql_statement.mutable_param_types());
*request.mutable_partition_options() =
ToProto(std::move(params.partition_options));
*request.mutable_partition_options() = ToProto(params.partition_options);

auto stub = session_pool_->GetStub(*session);
for (;;) {
Expand Down Expand Up @@ -850,10 +860,10 @@ ConnectionImpl::PartitionQueryImpl(

std::vector<spanner::QueryPartition> query_partitions;
for (auto const& partition : response->partitions()) {
query_partitions.push_back(
MakeQueryPartition(response->transaction().id(), ctx.route_to_leader,
ctx.tag, session->session_name(),
partition.partition_token(), params.statement));
query_partitions.push_back(MakeQueryPartition(
response->transaction().id(), ctx.route_to_leader, ctx.tag,
session->session_name(), partition.partition_token(),
params.partition_options.data_boost, params.statement));
}
return query_partitions;
}
Expand Down
78 changes: 72 additions & 6 deletions google/cloud/spanner/internal/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2529,6 +2529,37 @@ TEST(ConnectionImplTest, RollbackInvalidatedTransaction) {
HasSubstr("constraint error")));
}

TEST(ConnectionImplTest, ReadPartition) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = spanner::Database("placeholder_project", "placeholder_instance",
"placeholder_database_id");

EXPECT_CALL(*mock, BatchCreateSessions(_, HasDatabase(db)))
.WillOnce(Return(MakeSessionsResponse({"test-session-name"})));
EXPECT_CALL(*mock, BeginTransaction).Times(0);
EXPECT_CALL(*mock, StreamingRead)
.WillOnce([](grpc::ClientContext&,
google::spanner::v1::ReadRequest const& request) {
EXPECT_EQ("test-session-name", request.session());
EXPECT_EQ("Table", request.table());
EXPECT_EQ("DEADBEEF", request.partition_token());
EXPECT_TRUE(request.data_boost_enabled());
return MakeReader(
{R"pb(metadata: { transaction: { id: " ABCDEF00 " } })pb"});
});

auto conn = MakeConnectionImpl(db, mock);
internal::OptionsSpan span(MakeLimitedTimeOptions());
auto rows = conn->Read({spanner::MakeReadOnlyTransaction(),
"Table",
spanner::KeySet::All(),
{"Column"},
{},
"DEADBEEF",
true});
EXPECT_TRUE(ContainsNoRows(rows));
}

TEST(ConnectionImplTest, PartitionReadSuccess) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = spanner::Database("placeholder_project", "placeholder_instance",
Expand Down Expand Up @@ -2572,23 +2603,27 @@ TEST(ConnectionImplTest, PartitionReadSuccess) {
read_options.index_name = "index";
read_options.limit = 21;
read_options.request_priority = spanner::RequestPriority::kLow;
bool data_boost = true;
StatusOr<std::vector<spanner::ReadPartition>> result =
conn->PartitionRead({{txn,
"table",
spanner::KeySet::All(),
{"UserId", "UserName"},
read_options}});
read_options},
{absl::nullopt, absl::nullopt, data_boost}});
ASSERT_STATUS_OK(result);
EXPECT_THAT(txn, HasSessionAndTransaction("test-session-name", "CAFEDEAD",
false, ""));

std::vector<spanner::ReadPartition> expected_read_partitions = {
spanner_internal::MakeReadPartition(
"CAFEDEAD", false, "", "test-session-name", "BADDECAF", "table",
spanner::KeySet::All(), {"UserId", "UserName"}, read_options),
spanner::KeySet::All(), {"UserId", "UserName"}, data_boost,
read_options),
spanner_internal::MakeReadPartition(
"CAFEDEAD", false, "", "test-session-name", "DEADBEEF", "table",
spanner::KeySet::All(), {"UserId", "UserName"}, read_options)};
spanner::KeySet::All(), {"UserId", "UserName"}, data_boost,
read_options)};

EXPECT_THAT(*result, UnorderedPointwise(Eq(), expected_read_partitions));
}
Expand Down Expand Up @@ -2649,6 +2684,35 @@ TEST(ConnectionImplTest, PartitionReadTooManyTransientFailures) {
StatusIs(StatusCode::kUnavailable, HasSubstr("try-again")));
}

TEST(ConnectionImplTest, QueryPartition) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = spanner::Database("placeholder_project", "placeholder_instance",
"placeholder_database_id");

EXPECT_CALL(*mock, BatchCreateSessions(_, HasDatabase(db)))
.WillOnce(Return(MakeSessionsResponse({"test-session-name"})));
EXPECT_CALL(*mock, BeginTransaction).Times(0);
EXPECT_CALL(*mock, ExecuteStreamingSql)
.WillOnce([](grpc::ClientContext&,
google::spanner::v1::ExecuteSqlRequest const& request) {
EXPECT_EQ("test-session-name", request.session());
EXPECT_EQ("SELECT * FROM Table", request.sql());
EXPECT_EQ("DEADBEEF", request.partition_token());
EXPECT_TRUE(request.data_boost_enabled());
return MakeReader(
{R"pb(metadata: { transaction: { id: " ABCDEF00 " } })pb"});
});

auto conn = MakeConnectionImpl(db, mock);
internal::OptionsSpan span(MakeLimitedTimeOptions());
auto rows = conn->ExecuteQuery({spanner::MakeReadOnlyTransaction(),
spanner::SqlStatement("SELECT * FROM Table"),
{},
"DEADBEEF",
true});
EXPECT_TRUE(ContainsNoRows(rows));
}

TEST(ConnectionImplTest, PartitionQuerySuccess) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = spanner::Database("placeholder_project", "placeholder_instance",
Expand Down Expand Up @@ -2683,18 +2747,20 @@ TEST(ConnectionImplTest, PartitionQuerySuccess) {
auto conn = MakeConnectionImpl(db, mock);
internal::OptionsSpan span(MakeLimitedTimeOptions());
spanner::SqlStatement sql_statement("SELECT * FROM Table");
bool data_boost = true;
StatusOr<std::vector<spanner::QueryPartition>> result = conn->PartitionQuery(
{MakeReadOnlyTransaction(spanner::Transaction::ReadOnlyOptions()),
sql_statement, spanner::PartitionOptions()});
sql_statement,
{absl::nullopt, absl::nullopt, data_boost}});
ASSERT_STATUS_OK(result);

std::vector<spanner::QueryPartition> expected_query_partitions = {
spanner_internal::MakeQueryPartition("CAFEDEAD", false, "",
"test-session-name", "BADDECAF",
sql_statement),
data_boost, sql_statement),
spanner_internal::MakeQueryPartition("CAFEDEAD", false, "",
"test-session-name", "DEADBEEF",
sql_statement)};
data_boost, sql_statement)};

EXPECT_THAT(*result, UnorderedPointwise(Eq(), expected_query_partitions));
}
Expand Down
14 changes: 14 additions & 0 deletions google/cloud/spanner/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,20 @@ struct PartitionsMaximumOption {
using Type = std::int64_t;
};

/**
* Option for `google::cloud::Options` to use "data boost" in the
* partitions returned from `Client::PartitionRead()` or `PartitionQuery()`.
*
* If true, the requests from the subsequent partitioned `Client::Read()`
* and `Client::ExecuteQuery()` requests will be executed via Spanner
* independent compute resources.
*
* @ingroup spanner-options
*/
struct PartitionDataBoostOption {
using Type = bool;
};

/**
* Option for `google::cloud::Options` to set a per-transaction tag.
*
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/spanner/partition_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ Options ToOptions(PartitionOptions const& po) {
if (po.max_partitions) {
opts.set<PartitionsMaximumOption>(*po.max_partitions);
}
if (po.data_boost) {
opts.set<PartitionDataBoostOption>(true);
}
return opts;
}

Expand All @@ -39,6 +42,9 @@ PartitionOptions ToPartitionOptions(Options const& opts) {
if (opts.has<PartitionsMaximumOption>()) {
po.max_partitions = opts.get<PartitionsMaximumOption>();
}
if (opts.has<PartitionDataBoostOption>()) {
po.data_boost = opts.get<PartitionDataBoostOption>();
}
return po;
}

Expand All @@ -55,6 +61,10 @@ google::spanner::v1::PartitionOptions ToProto(
if (po.partition_size_bytes) {
proto.set_partition_size_bytes(*po.partition_size_bytes);
}
// Note: PartitionOptions.data_boost sets the data_boost_enabled
// field of the ReadRequest and ExecuteSqlRequest messages in
// ConnectionImpl::ReadImpl() and ConnectionImpl::ExecuteSqlImpl()
// respectively.
return proto;
}

Expand Down
11 changes: 10 additions & 1 deletion google/cloud/spanner/partition_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,20 @@ struct PartitionOptions {
* returned may be smaller or larger than this maximum count request.
*/
absl::optional<std::int64_t> max_partitions;

/**
* Use "data boost" in the returned partitions.
*
* If true, the requests from the subsequent partitioned `Client::Read()`
* and `Client::ExecuteQuery()` requests will be executed via Spanner
* independent compute resources.
*/
bool data_boost = false;
};

inline bool operator==(PartitionOptions const& a, PartitionOptions const& b) {
return a.partition_size_bytes == b.partition_size_bytes &&
a.max_partitions == b.max_partitions;
a.max_partitions == b.max_partitions && a.data_boost == b.data_boost;
}

inline bool operator!=(PartitionOptions const& a, PartitionOptions const& b) {
Expand Down
16 changes: 11 additions & 5 deletions google/cloud/spanner/partition_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,23 @@ TEST(PartitionOptionsTest, Regular) {
}

TEST(PartitionOptionsTest, Proto) {
PartitionOptions po{1, 2};
PartitionOptions po{1, 2, true};
auto proto = spanner_internal::ToProto(po);
EXPECT_EQ(*po.partition_size_bytes, proto.partition_size_bytes());
EXPECT_EQ(*po.max_partitions, proto.max_partitions());
}

TEST(PartitionOptionsTest, OptionsRoundTrip) {
for (auto const& po :
{PartitionOptions{absl::nullopt, absl::nullopt},
PartitionOptions{42, absl::nullopt},
PartitionOptions{absl::nullopt, 42}, PartitionOptions{32, 64}}) {
for (auto const& po : {
PartitionOptions{absl::nullopt, absl::nullopt, false},
PartitionOptions{42, absl::nullopt, false},
PartitionOptions{absl::nullopt, 42, false},
PartitionOptions{32, 64, false},
PartitionOptions{absl::nullopt, absl::nullopt, true},
PartitionOptions{42, absl::nullopt, true},
PartitionOptions{absl::nullopt, 42, true},
PartitionOptions{32, 64, true},
}) {
EXPECT_EQ(po, ToPartitionOptions(ToOptions(po)));
}
}
Expand Down
9 changes: 7 additions & 2 deletions google/cloud/spanner/query_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ constexpr int kRouteToLeaderFieldNumber = 511037314;
QueryPartition::QueryPartition(std::string transaction_id, bool route_to_leader,
std::string transaction_tag,
std::string session_id,
std::string partition_token,
std::string partition_token, bool data_boost,
SqlStatement sql_statement)
: transaction_id_(std::move(transaction_id)),
route_to_leader_(route_to_leader),
transaction_tag_(std::move(transaction_tag)),
session_id_(std::move(session_id)),
partition_token_(std::move(partition_token)),
data_boost_(data_boost),
sql_statement_(std::move(sql_statement)) {}

bool operator==(QueryPartition const& a, QueryPartition const& b) {
Expand All @@ -42,7 +43,7 @@ bool operator==(QueryPartition const& a, QueryPartition const& b) {
a.transaction_tag_ == b.transaction_tag_ &&
a.session_id_ == b.session_id_ &&
a.partition_token_ == b.partition_token_ &&
a.sql_statement_ == b.sql_statement_;
a.data_boost_ == b.data_boost_ && a.sql_statement_ == b.sql_statement_;
}

StatusOr<std::string> SerializeQueryPartition(
Expand All @@ -59,6 +60,9 @@ StatusOr<std::string> SerializeQueryPartition(
(*proto.mutable_params()->mutable_fields())[param_name] = type_value.second;
(*proto.mutable_param_types())[param_name] = type_value.first;
}
if (query_partition.data_boost()) {
proto.set_data_boost_enabled(true);
}

// QueryOptions are not serialized, but are instead applied on the remote
// side during the Client::ExecuteQuery(QueryPartition, QueryOptions) call.
Expand Down Expand Up @@ -119,6 +123,7 @@ StatusOr<QueryPartition> DeserializeQueryPartition(
QueryPartition query_partition(proto.transaction().id(), route_to_leader,
proto.request_options().transaction_tag(),
proto.session(), proto.partition_token(),
proto.data_boost_enabled(),
SqlStatement(proto.sql(), sql_parameters));
return query_partition;
}
Expand Down
Loading

0 comments on commit aea166e

Please sign in to comment.