Skip to content

Commit

Permalink
feat(pubsub): implement blocking publisher (#10055)
Browse files Browse the repository at this point in the history
Some applications need a simple way to publish a small number of event
to several topics.  Or they need to publish with a low rate to many
topics. The `pubsub::BlockingPublisher` client is more convenient for
such applications than the existing `pubsub::Publisher`. The latter is
optimized for high-throughput clients publishing to a single topic.
  • Loading branch information
coryan committed Oct 18, 2022
1 parent 9eee84e commit e50c330
Show file tree
Hide file tree
Showing 14 changed files with 450 additions and 8 deletions.
4 changes: 4 additions & 0 deletions google/cloud/pubsub/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ add_library(
ack_handler.h
application_callback.h
backoff_policy.h
blocking_publisher.cc
blocking_publisher.h
blocking_publisher_connection.cc
blocking_publisher_connection.h
connection_options.cc
Expand Down Expand Up @@ -195,6 +197,7 @@ add_library(google_cloud_cpp_pubsub_mocks INTERFACE)
set(google_cloud_cpp_pubsub_mocks_hdrs
# cmake-format: sort
mocks/mock_ack_handler.h
mocks/mock_blocking_publisher_connection.h
mocks/mock_exactly_once_ack_handler.h
mocks/mock_publisher_connection.h
mocks/mock_schema_admin_connection.h
Expand Down Expand Up @@ -256,6 +259,7 @@ function (google_cloud_cpp_pubsub_client_define_tests)
# cmake-format: sort
ack_handler_test.cc
blocking_publisher_connection_test.cc
blocking_publisher_test.cc
exactly_once_ack_handler_test.cc
internal/batching_publisher_connection_test.cc
internal/default_batch_sink_test.cc
Expand Down
37 changes: 37 additions & 0 deletions google/cloud/pubsub/blocking_publisher.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/pubsub/blocking_publisher.h"

namespace google {
namespace cloud {
namespace pubsub {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

BlockingPublisher::BlockingPublisher(
std::shared_ptr<BlockingPublisherConnection> connection, Options opts)
: connection_(std::move(connection)),
options_(
internal::MergeOptions(std::move(opts), connection_->options())) {}

StatusOr<std::string> BlockingPublisher::Publish(Topic topic, Message message,
Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
return connection_->Publish({std::move(topic), std::move(message)});
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub
} // namespace cloud
} // namespace google
122 changes: 122 additions & 0 deletions google/cloud/pubsub/blocking_publisher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_BLOCKING_PUBLISHER_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_BLOCKING_PUBLISHER_H

#include "google/cloud/pubsub/blocking_publisher_connection.h"
#include "google/cloud/pubsub/publisher_options.h"
#include "google/cloud/pubsub/version.h"
#include <string>

namespace google {
namespace cloud {
namespace pubsub {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

/**
* Publish messages to the Cloud Pub/Sub service.
*
* This class is used to publish messages to any given topic. It is intended
* for low-volume publishers. Applications sending less than one message per
* second may find this class easier to use than `Publisher`, which can handle
* thousands of messages per second.
*
* @see https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub
* service.
*
* @par Example
* @snippet blocking_samples.cc blocking-publish
*
* @par Performance
* `BlockingPublisher` objects are relatively cheap to create, copy, and move.
* However, each `BlockingPublisher` object must be created with a
* `std::shared_ptr<BlockingPublisherConnection>`, which itself is relatively
* expensive to create. Therefore, connection instances should be shared when
* possible. See the `MakeBlockingPublisherConnection()` method and the
* `BlockingPublisherConnection` interface for more details.
*
* @par Thread Safety
* Instances of this class created via copy-construction or copy-assignment
* share the underlying pool of connections. Access to these copies via multiple
* threads is guaranteed to work. Two threads operating on the same instance of
* this class is not guaranteed to work.
*
* @par Background Threads
* This class uses the background threads configured via the `Options` from
* `GrpcOptionList`. Applications can create their own pool of background
* threads by (a) creating their own #google::cloud::CompletionQueue, (b)
* passing this completion queue as a `GrpcCompletionQueueOption`, and (c)
* attaching any number of threads to the completion queue.
*
* @par Error Handling
* This class uses `StatusOr<T>` to report errors. When an operation fails to
* perform its work the returned `StatusOr<T>` contains the error details. If
* the `ok()` member function in the `StatusOr<T>` returns `true` then it
* contains the expected result. Please consult the #google::cloud::StatusOr
* documentation for more details.
*/
class BlockingPublisher {
public:
explicit BlockingPublisher(
std::shared_ptr<BlockingPublisherConnection> connection,
Options opts = {});

BlockingPublisher(BlockingPublisher const&) = default;
BlockingPublisher& operator=(BlockingPublisher const&) = default;
BlockingPublisher(BlockingPublisher&&) = default;
BlockingPublisher& operator=(BlockingPublisher&&) = default;

friend bool operator==(BlockingPublisher const& a,
BlockingPublisher const& b) {
return a.connection_ == b.connection_;
}
friend bool operator!=(BlockingPublisher const& a,
BlockingPublisher const& b) {
return !(a == b);
}

/**
* Publishes the @p message on the topic @p topic.
*
* @par Idempotency
* This is a non-idempotent operation, but the client library will
* automatically retry RPCs that fail with transient errors. As Cloud Pub/Sub
* has "at least once" delivery semantics applications are expected to handle
* duplicate messages without problems. The application can disable retries
* by changing the retry policy, please see the example below.
*
* @par Example
* @snippet blocking_samples.cc blocking-publish
*
* @par Example
* @snippet blocking_samples.cc blocking-publish-no-retry
*
* @return On success, the server-assigned ID of the message. IDs are
* guaranteed to be unique within the topic.
*/
StatusOr<std::string> Publish(Topic topic, Message message,
Options opts = {});

private:
std::shared_ptr<BlockingPublisherConnection> connection_;
Options options_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_BLOCKING_PUBLISHER_H
5 changes: 4 additions & 1 deletion google/cloud/pubsub/blocking_publisher_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ TEST(BlockingPublisherConnectionTest, Basic) {
});

auto publisher = MakeTestPublisherConnection(mock);
google::cloud::internal::OptionsSpan span(publisher->options());
auto response = publisher->Publish(
{topic, MessageBuilder{}.SetData("test-data-0").Build()});
google::cloud::internal::OptionsSpan span(publisher->options());
ASSERT_STATUS_OK(response);
EXPECT_EQ("test-message-id-0", *response);
}
Expand Down Expand Up @@ -129,6 +129,7 @@ TEST(BlockingPublisherConnectionTest, HandlePermanentError) {
});

auto publisher = MakeTestPublisherConnection(mock);
google::cloud::internal::OptionsSpan span(publisher->options());
auto response = publisher->Publish(
{topic, MessageBuilder{}.SetData("test-message-0").Build()});
EXPECT_THAT(response,
Expand All @@ -146,6 +147,7 @@ TEST(BlockingPublisherConnectionTest, HandleTooManyTransients) {
});

auto publisher = MakeTestPublisherConnection(mock);
google::cloud::internal::OptionsSpan span(publisher->options());
auto response = publisher->Publish(
{topic, MessageBuilder{}.SetData("test-message-0").Build()});
EXPECT_THAT(response,
Expand All @@ -169,6 +171,7 @@ TEST(BlockingPublisherConnectionTest, HandleTransient) {
});

auto publisher = MakeTestPublisherConnection(mock);
google::cloud::internal::OptionsSpan span(publisher->options());
auto response = publisher->Publish(
{topic, MessageBuilder{}.SetData("test-message-0").Build()});
ASSERT_STATUS_OK(response);
Expand Down
110 changes: 110 additions & 0 deletions google/cloud/pubsub/blocking_publisher_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/pubsub/blocking_publisher.h"
#include "google/cloud/pubsub/mocks/mock_blocking_publisher_connection.h"
#include "google/cloud/testing_util/status_matchers.h"
#include <gmock/gmock.h>

namespace google {
namespace cloud {
namespace pubsub {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

using ::testing::Return;

struct TestOptionA {
using Type = std::string;
};

struct TestOptionB {
using Type = std::string;
};

struct TestOptionC {
using Type = std::string;
};

TEST(BlockingPublisherTest, OptionsNoOverrides) {
Topic const topic("test-project", "test-topic");
auto mock = std::make_shared<pubsub_mocks::MockBlockingPublisherConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(Return(Options{}
.set<TestOptionA>("test-a")
.set<TestOptionB>("test-b")
.set<TestOptionC>("test-c")));
EXPECT_CALL(*mock, Publish).WillOnce([](auto const&) {
auto const& current = google::cloud::internal::CurrentOptions();
EXPECT_EQ(current.get<TestOptionA>(), "test-a");
EXPECT_EQ(current.get<TestOptionB>(), "test-b");
EXPECT_EQ(current.get<TestOptionC>(), "test-c");
return std::string{"test-ack-id"};
});

BlockingPublisher publisher(mock);
ASSERT_STATUS_OK(
publisher.Publish(topic, MessageBuilder().SetData("test-only").Build()));
}

TEST(BlockingPublisherTest, OptionsClientOverrides) {
Topic const topic("test-project", "test-topic");
auto mock = std::make_shared<pubsub_mocks::MockBlockingPublisherConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(Return(Options{}
.set<TestOptionA>("test-a")
.set<TestOptionB>("test-b")
.set<TestOptionC>("test-c")));
EXPECT_CALL(*mock, Publish).WillOnce([](auto const&) {
auto const& current = google::cloud::internal::CurrentOptions();
EXPECT_EQ(current.get<TestOptionA>(), "override-a");
EXPECT_EQ(current.get<TestOptionB>(), "test-b");
EXPECT_EQ(current.get<TestOptionC>(), "test-c");
return std::string{"test-ack-id"};
});

BlockingPublisher publisher(mock, Options{}.set<TestOptionA>("override-a"));
ASSERT_STATUS_OK(
publisher.Publish(topic, MessageBuilder().SetData("test-only").Build()));
}

TEST(BlockingPublisherTest, OptionsFunctionOverrides) {
Topic const topic("test-project", "test-topic");
auto mock = std::make_shared<pubsub_mocks::MockBlockingPublisherConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(Return(Options{}
.set<TestOptionA>("test-a")
.set<TestOptionB>("test-b")
.set<TestOptionC>("test-c")));
EXPECT_CALL(*mock, Publish).WillOnce([](auto const&) {
auto const& current = google::cloud::internal::CurrentOptions();
EXPECT_EQ(current.get<TestOptionA>(), "override-a1");
EXPECT_EQ(current.get<TestOptionB>(), "override-b1");
EXPECT_EQ(current.get<TestOptionC>(), "test-c");
return std::string{"test-ack-id"};
});

BlockingPublisher publisher(mock, Options{}.set<TestOptionA>("override-a"));
ASSERT_STATUS_OK(
publisher.Publish(topic, MessageBuilder().SetData("test-only").Build(),
Options{}
.set<TestOptionA>("override-a1")
.set<TestOptionB>("override-b1")));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub
} // namespace cloud
} // namespace google
2 changes: 2 additions & 0 deletions google/cloud/pubsub/google_cloud_cpp_pubsub.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ google_cloud_cpp_pubsub_hdrs = [
"ack_handler.h",
"application_callback.h",
"backoff_policy.h",
"blocking_publisher.h",
"blocking_publisher_connection.h",
"connection_options.h",
"exactly_once_ack_handler.h",
Expand Down Expand Up @@ -89,6 +90,7 @@ google_cloud_cpp_pubsub_hdrs = [

google_cloud_cpp_pubsub_srcs = [
"ack_handler.cc",
"blocking_publisher.cc",
"blocking_publisher_connection.cc",
"connection_options.cc",
"exactly_once_ack_handler.cc",
Expand Down
1 change: 1 addition & 0 deletions google/cloud/pubsub/google_cloud_cpp_pubsub_mocks.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

google_cloud_cpp_pubsub_mocks_hdrs = [
"mocks/mock_ack_handler.h",
"mocks/mock_blocking_publisher_connection.h",
"mocks/mock_exactly_once_ack_handler.h",
"mocks/mock_publisher_connection.h",
"mocks/mock_schema_admin_connection.h",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/pubsub/blocking_publisher_connection.h"
#include "google/cloud/pubsub/blocking_publisher.h"
#include "google/cloud/pubsub/testing/random_names.h"
#include "google/cloud/pubsub/testing/test_retry_policies.h"
#include "google/cloud/pubsub/topic_admin_client.h"
Expand Down Expand Up @@ -72,9 +72,9 @@ TEST_F(BlockingPublisherIntegrationTest, Basic) {
.set<UnifiedCredentialsOption>(MakeInsecureCredentials())
.set<internal::UseInsecureChannelOption>(true);
}
auto publisher = MakeBlockingPublisherConnection(options);
auto publish = publisher->Publish(
{topic_, MessageBuilder().SetData("test data").Build()});
auto publisher = BlockingPublisher(MakeBlockingPublisherConnection(options));
auto publish =
publisher.Publish(topic_, MessageBuilder().SetData("test data").Build());
ASSERT_STATUS_OK(publish);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ BlockingPublisherConnectionImpl::BlockingPublisherConnectionImpl(

StatusOr<std::string> BlockingPublisherConnectionImpl::Publish(
PublishParams p) {
auto const& current = internal::CurrentOptions();
google::pubsub::v1::PublishRequest request;
request.set_topic(p.topic.FullName());
*request.add_messages() = ToProto(std::move(p.message));
auto response = RetryLoop(
options_.get<pubsub::RetryPolicyOption>()->clone(),
options_.get<pubsub::BackoffPolicyOption>()->clone(),
current.get<pubsub::RetryPolicyOption>()->clone(),
current.get<pubsub::BackoffPolicyOption>()->clone(),
Idempotency::kIdempotent,
[this](grpc::ClientContext& context,
google::pubsub::v1::PublishRequest const& request) {
Expand Down
Loading

0 comments on commit e50c330

Please sign in to comment.