Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common): add GrpcCompressionAlgorithmOption #13108

Merged
merged 1 commit into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google/cloud/grpc_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ void ConfigureContext(grpc::ClientContext& context, Options const& opts) {
if (opts.has<GrpcSetupOption>()) {
opts.get<GrpcSetupOption>()(context);
}
if (opts.has<GrpcCompressionAlgorithmOption>()) {
// Overwrites anything set by the GrpcSetupOption.
context.set_compression_algorithm(
opts.get<GrpcCompressionAlgorithmOption>());
}
}

void ConfigurePollContext(grpc::ClientContext& context, Options const& opts) {
Expand Down
25 changes: 21 additions & 4 deletions google/cloud/grpc_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ struct GrpcCredentialOption {
using Type = std::shared_ptr<grpc::ChannelCredentials>;
};

/**
* The gRPC compression algorithm used by clients/operations configured
* with this object.
*
* @ingroup options
*/
struct GrpcCompressionAlgorithmOption {
using Type = grpc_compression_algorithm;
};

/**
* The number of transport channels to create.
*
Expand Down Expand Up @@ -175,10 +185,11 @@ struct GrpcBackgroundThreadsFactoryOption {
* A list of all the gRPC options.
*/
using GrpcOptionList =
OptionList<GrpcCredentialOption, GrpcNumChannelsOption,
GrpcChannelArgumentsOption, GrpcChannelArgumentsNativeOption,
GrpcTracingOptionsOption, GrpcBackgroundThreadPoolSizeOption,
GrpcCompletionQueueOption, GrpcBackgroundThreadsFactoryOption>;
OptionList<GrpcCredentialOption, GrpcCompressionAlgorithmOption,
GrpcNumChannelsOption, GrpcChannelArgumentsOption,
GrpcChannelArgumentsNativeOption, GrpcTracingOptionsOption,
GrpcBackgroundThreadPoolSizeOption, GrpcCompletionQueueOption,
GrpcBackgroundThreadsFactoryOption>;

namespace internal {

Expand All @@ -191,6 +202,9 @@ namespace internal {
* `set_credentials()` directly on the context. Instead, use the Google
* Unified Auth Credentials library, via
* #google::cloud::UnifiedCredentialsOption.
*
* @warning `MergeOptions()` will simply select the preferred function, rather
* than merging the behavior of the preferred and alternate functions.
*/
struct GrpcSetupOption {
using Type = std::function<void(grpc::ClientContext&)>;
Expand All @@ -207,6 +221,9 @@ struct GrpcSetupOption {
* `set_credentials()` directly on the context. Instead, use the Google
* Unified Auth Credentials library, via
* #google::cloud::UnifiedCredentialsOption.
*
* @warning `MergeOptions()` will simply select the preferred function, rather
* than merging the behavior of the preferred and alternate functions.
*/
struct GrpcSetupPollOption {
using Type = std::function<void(grpc::ClientContext&)>;
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/pubsub/internal/default_batch_sink_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "google/cloud/pubsub/testing/mock_publisher_stub.h"
#include "google/cloud/pubsub/testing/test_retry_policies.h"
#include "google/cloud/pubsub/topic.h"
#include "google/cloud/grpc_options.h"
#include "google/cloud/internal/background_threads_impl.h"
#include "google/cloud/testing_util/is_proto_equal.h"
#include "google/cloud/testing_util/status_matchers.h"
Expand Down Expand Up @@ -125,6 +126,9 @@ TEST(DefaultBatchSinkTest, BasicWithCompression) {
EXPECT_CALL(*mock, AsyncPublish)
.WillOnce([](Unused, auto context,
google::pubsub::v1::PublishRequest const& request) {
// The pubsub::CompressionAlgorithmOption takes precedence over
// GrpcCompressionAlgorithmOption when the former's threshold is
// met.
EXPECT_EQ(context->compression_algorithm(), GRPC_COMPRESS_GZIP);
EXPECT_THAT(request, IsProtoEqual(MakeRequest(3)));
return make_ready_future(make_status_or(MakeResponse(request)));
Expand All @@ -135,6 +139,7 @@ TEST(DefaultBatchSinkTest, BasicWithCompression) {
std::move(mock), background.cq(),
DefaultPublisherOptions(
pubsub_testing::MakeTestOptions()
.set<GrpcCompressionAlgorithmOption>(GRPC_COMPRESS_NONE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alevenberg maybe we should consider using pubsub::CompressionAlgorithmOption = google::cloud::GrpcCompressionAlgorithmOption;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be problematic, given that pubsub::CompressionAlgorithmOption is given a default value (of GRPC_COMPRESS_DEFLATE).

.set<pubsub::CompressionThresholdOption>(0)
.set<pubsub::CompressionAlgorithmOption>(GRPC_COMPRESS_GZIP)));

Expand Down
42 changes: 27 additions & 15 deletions google/cloud/spanner/integration_tests/client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,36 @@ class ClientIntegrationTest : public spanner_testing::DatabaseIntegrationTest {
protected:
static void SetUpTestSuite() {
spanner_testing::DatabaseIntegrationTest::SetUpTestSuite();
client_ = std::make_unique<Client>(MakeConnection(GetDatabase()));
client_ = std::make_unique<Client>(MakeConnection(
GetDatabase(),
Options{}.set<GrpcCompressionAlgorithmOption>(GRPC_COMPRESS_GZIP)));
}

void SetUp() override {
auto commit_result = client_->Commit(
Mutations{MakeDeleteMutation("Singers", KeySet::All())});
Mutations{MakeDeleteMutation("Singers", KeySet::All())},
Options{}.set<GrpcCompressionAlgorithmOption>(GRPC_COMPRESS_DEFLATE));
ASSERT_STATUS_OK(commit_result);
}

static StatusOr<Timestamp> DatabaseNow() {
auto statement = SqlStatement("SELECT CURRENT_TIMESTAMP()");
auto rows = client_->ExecuteQuery(std::move(statement));
auto rows = client_->ExecuteQuery(
std::move(statement),
Options{}.set<GrpcCompressionAlgorithmOption>(GRPC_COMPRESS_NONE));
auto row = GetSingularRow(StreamOf<std::tuple<Timestamp>>(rows));
if (!row) return std::move(row).status();
return std::get<0>(*row);
}

static void InsertTwoSingers() {
auto commit_result = client_->Commit(Mutations{
InsertMutationBuilder("Singers", {"SingerId", "FirstName", "LastName"})
.EmplaceRow(1, "test-fname-1", "test-lname-1")
.EmplaceRow(2, "test-fname-2", "test-lname-2")
.Build()});
auto commit_result = client_->Commit(
Mutations{InsertMutationBuilder("Singers",
{"SingerId", "FirstName", "LastName"})
.EmplaceRow(1, "test-fname-1", "test-lname-1")
.EmplaceRow(2, "test-fname-2", "test-lname-2")
.Build()},
Options{}.set<GrpcCompressionAlgorithmOption>(GRPC_COMPRESS_DEFLATE));
ASSERT_STATUS_OK(commit_result);
}

Expand All @@ -94,22 +101,27 @@ class PgClientIntegrationTest
protected:
static void SetUpTestSuite() {
spanner_testing::PgDatabaseIntegrationTest::SetUpTestSuite();
client_ = std::make_unique<Client>(MakeConnection(GetDatabase()));
client_ = std::make_unique<Client>(MakeConnection(
GetDatabase(),
Options{}.set<GrpcCompressionAlgorithmOption>(GRPC_COMPRESS_DEFLATE)));
}

void SetUp() override {
if (UsingEmulator()) return;
auto commit_result = client_->Commit(
Mutations{MakeDeleteMutation("Singers", KeySet::All())});
Mutations{MakeDeleteMutation("Singers", KeySet::All())},
Options{}.set<GrpcCompressionAlgorithmOption>(GRPC_COMPRESS_DEFLATE));
ASSERT_STATUS_OK(commit_result);
}

static void InsertTwoSingers() {
auto commit_result = client_->Commit(Mutations{
InsertMutationBuilder("Singers", {"SingerId", "FirstName", "LastName"})
.EmplaceRow(1, "test-fname-1", "test-lname-1")
.EmplaceRow(2, "test-fname-2", "test-lname-2")
.Build()});
auto commit_result = client_->Commit(
Mutations{InsertMutationBuilder("Singers",
{"SingerId", "FirstName", "LastName"})
.EmplaceRow(1, "test-fname-1", "test-lname-1")
.EmplaceRow(2, "test-fname-2", "test-lname-2")
.Build()},
Options{}.set<GrpcCompressionAlgorithmOption>(GRPC_COMPRESS_DEFLATE));
ASSERT_STATUS_OK(commit_result);
}

Expand Down
57 changes: 44 additions & 13 deletions google/cloud/spanner/internal/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,35 +127,37 @@ class ProtoBuilder {
};

// Matchers for mock calls.
MATCHER_P(HasSession, session, "has expected session name") {
MATCHER_P(HasCompressionAlgorithm, algorithm, "has compression algorithm") {
return arg.compression_algorithm() == algorithm;
}

MATCHER_P(HasSession, session, "has session name") {
return arg.session() == session;
}

MATCHER_P(HasTransactionId, transaction_id, "has expected transaction id") {
MATCHER_P(HasTransactionId, transaction_id, "has transaction id") {
return arg.transaction().id() == transaction_id;
}

// As above, but for Commit and Rollback requests, which don't have a
// `TransactionSelector` but just store the "naked" ID directly in the proto.
MATCHER_P(HasNakedTransactionId, transaction_id,
"has expected transaction id") {
MATCHER_P(HasNakedTransactionId, transaction_id, "has transaction id") {
return arg.transaction_id() == transaction_id;
}

MATCHER_P(HasReturnStats, return_commit_stats,
"has expected return-stats value") {
MATCHER_P(HasReturnStats, return_commit_stats, "has return-stats value") {
return arg.return_commit_stats() == return_commit_stats;
}

MATCHER(HasBeginTransaction, "has begin TransactionSelector set") {
return arg.transaction().has_begin();
}

MATCHER_P(HasDatabase, database, "has expected database") {
MATCHER_P(HasDatabase, database, "has database") {
return arg.database() == database.FullName();
}

MATCHER_P(HasCreatorRole, role, "has expected creator role") {
MATCHER_P(HasCreatorRole, role, "has creator role") {
return arg.session_template().creator_role() == role;
}

Expand All @@ -176,23 +178,23 @@ MATCHER(HasBadSession, "is bound to a session that's marked bad") {
});
}

MATCHER_P(HasPriority, priority, "has expected priority") {
MATCHER_P(HasPriority, priority, "has priority") {
return arg.request_options().priority() == priority;
}

MATCHER_P(HasRequestTag, tag, "has expected request tag") {
MATCHER_P(HasRequestTag, tag, "has request tag") {
return arg.request_options().request_tag() == tag;
}

MATCHER_P(HasTransactionTag, tag, "has expected transaction tag") {
MATCHER_P(HasTransactionTag, tag, "has transaction tag") {
return arg.request_options().transaction_tag() == tag;
}

MATCHER_P(HasReplicaLocation, location, "has expected replica location") {
MATCHER_P(HasReplicaLocation, location, "has replica location") {
return arg.location() == location;
}

MATCHER_P(HasReplicaType, type, "has expected replica type") {
MATCHER_P(HasReplicaType, type, "has replica type") {
return arg.type() == type;
}

Expand Down Expand Up @@ -2460,6 +2462,35 @@ TEST(ConnectionImplTest, CommitSuccessWithStats) {
EXPECT_EQ(42, commit->commit_stats->mutation_count);
}

TEST(ConnectionImplTest, CommitSuccessWithCompression) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = spanner::Database("placeholder_project", "placeholder_instance",
"placeholder_database_id");
EXPECT_CALL(*mock, BatchCreateSessions(_, HasDatabase(db)))
.WillOnce(Return(MakeSessionsResponse({"test-session-name"})));
google::spanner::v1::Transaction txn = MakeTestTransaction();
EXPECT_CALL(*mock, BeginTransaction).WillOnce(Return(txn));
EXPECT_CALL(*mock, Commit(HasCompressionAlgorithm(GRPC_COMPRESS_GZIP),
HasSession("test-session-name")))
.WillOnce([](grpc::ClientContext&,
google::spanner::v1::CommitRequest const&) {
return MakeCommitResponse(
spanner::MakeTimestamp(std::chrono::system_clock::from_time_t(123))
.value());
});

auto conn = MakeConnectionImpl(db, mock);
internal::OptionsSpan span(
MakeLimitedTimeOptions().set<GrpcCompressionAlgorithmOption>(
GRPC_COMPRESS_GZIP));
auto commit = conn->Commit({spanner::MakeReadWriteTransaction(), {}});
EXPECT_THAT(
commit,
IsOkAndHolds(Field(
&spanner::CommitResult::commit_timestamp,
Eq(spanner::MakeTimestamp(absl::FromUnixSeconds(123)).value()))));
}

TEST(ConnectionImplTest, CommitAtLeastOnce) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = spanner::Database("placeholder_project", "placeholder_instance",
Expand Down