Skip to content

Commit

Permalink
docs(pubsub): add optimistic subscribe example (#14272)
Browse files Browse the repository at this point in the history
* docs(pubsub): add optomistic subscribe example

* address comments

* address comments

* update ci to include different execution paths
  • Loading branch information
alevenberg committed May 23, 2024
1 parent 4399a9c commit 7d7ff41
Showing 1 changed file with 80 additions and 2 deletions.
82 changes: 80 additions & 2 deletions google/cloud/pubsub/samples/samples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ void ExampleStatusOr(google::cloud::pubsub_admin::TopicAdminClient client,
// The actual type of `topic` is
// google::cloud::StatusOr<google::pubsub::v1::Topic>, but
// we expect it'll most often be declared with auto like this.
for (auto& topic : client.ListTopics(project_id)) {
for (auto& topic :
client.ListTopics(google::cloud::Project(project_id).FullName())) {
// Use `topic` like a smart pointer; check it before de-referencing
if (!topic) {
// `topic` doesn't contain a value, so `.status()` will contain error
Expand Down Expand Up @@ -1024,6 +1025,65 @@ void SubscriberRetrySettings(std::vector<std::string> const& argv) {
std::move(p.second), __func__);
}

void OptimisticSubscribe(std::vector<std::string> const& argv) {
namespace examples = ::google::cloud::testing_util;
if (argv.size() != 3) {
throw examples::Usage{
"optimistic-subscribe <project-id> <topic-id> <subscription-id>"};
}
namespace pubsub_admin = ::google::cloud::pubsub_admin;
namespace pubsub = ::google::cloud::pubsub;
namespace gc = ::google::cloud;
[](std::string project_id, std::string topic_id,
std::string subscription_id) {
// Do not retry the attempts to consume messages.
auto subscriber = pubsub::Subscriber(
pubsub::MakeSubscriberConnection(
pubsub::Subscription(project_id, subscription_id)),
google::cloud::Options{}.set<pubsub::RetryPolicyOption>(
pubsub::LimitedErrorCountRetryPolicy(
/*maximum_failures=*/0)
.clone()));

// [START pubsub_optimistic_subscribe]
auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
if (response) {
std::cout << "Received message " << response->message << "\n";
std::move(response->handler).ack();
return gc::Status();
}
if (response.status().code() == gc::StatusCode::kUnavailable &&
response.status().message() == "no messages returned") {
std::cout << "No messages returned from Pull()\n";
return gc::Status();
}
return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);
// [END pubsub_optimistic_subscribe]
}(argv.at(0), argv.at(1), argv.at(2));
}

void AutoRunAvro(
std::string const& project_id, std::string const& testdata_directory,
google::cloud::internal::DefaultPRNG& generator,
Expand Down Expand Up @@ -1223,6 +1283,9 @@ void AutoRun(std::vector<std::string> const& argv) {
auto const dead_letter_topic =
google::cloud::pubsub::Topic(project_id, dead_letter_topic_id);
auto const snapshot_id = RandomSnapshotId(generator);
auto const nonexistent_subscription_id = RandomSubscriptionId(generator);
auto const nonexistent_subscription = google::cloud::pubsub::Subscription(
project_id, nonexistent_subscription_id);

using ::google::cloud::StatusCode;
auto ignore_emulator_failures =
Expand Down Expand Up @@ -1308,7 +1371,7 @@ void AutoRun(std::vector<std::string> const& argv) {
(void)subscription_admin.CreateSubscription(dead_letter_request);
cleanup.Defer([subscription_admin, subscription, filtered_subscription,
exactly_once_subscription, ordering_subscription,
dead_letter_subscription]() mutable {
dead_letter_subscription, nonexistent_subscription]() mutable {
std::cout << "\nDelete subscription (" << subscription.subscription_id()
<< ")" << std::endl;
(void)subscription_admin.DeleteSubscription(subscription.FullName());
Expand All @@ -1329,6 +1392,10 @@ void AutoRun(std::vector<std::string> const& argv) {
<< dead_letter_subscription.subscription_id() << ")" << std::endl;
(void)subscription_admin.DeleteSubscription(
dead_letter_subscription.FullName());
std::cout << "\nDelete subscription ("
<< nonexistent_subscription.subscription_id() << ")" << std::endl;
(void)subscription_admin.DeleteSubscription(
nonexistent_subscription.FullName());
});

ignore_emulator_failures([&] {
Expand Down Expand Up @@ -1449,6 +1516,16 @@ void AutoRun(std::vector<std::string> const& argv) {
PublishHelper(publisher, "SubscriberRetrySettings", 1);
SubscriberRetrySettings({project_id, subscription_id});

std::cout << "\nRunning OptimisticSubscribe() sample [1]" << std::endl;
PublishHelper(publisher, "OptimisticSubscribe", 1);
OptimisticSubscribe({project_id, topic_id, subscription_id});

std::cout << "\nRunning OptimisticSubscribe() sample [2]" << std::endl;
OptimisticSubscribe({project_id, topic_id, subscription_id});

std::cout << "\nRunning OptimisticSubscribe() sample [3]" << std::endl;
OptimisticSubscribe({project_id, topic_id, nonexistent_subscription_id});

std::cout << "\nAutoRun done" << std::endl;
}

Expand Down Expand Up @@ -1493,6 +1570,7 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape)
{"subscriber-concurrency-control", SubscriberConcurrencyControl},
{"subscriber-flow-control-settings", SubscriberFlowControlSettings},
{"subscriber-retry-settings", SubscriberRetrySettings},
{"optimistic-subscribe", OptimisticSubscribe},
{"auto", AutoRun},
});
return example.Run(argc, argv);
Expand Down

0 comments on commit 7d7ff41

Please sign in to comment.