From 7d7ff41c68dd50ac844252dca5cea9d03afbee8e Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 23 May 2024 12:28:11 -0400 Subject: [PATCH] docs(pubsub): add optimistic subscribe example (#14272) * docs(pubsub): add optomistic subscribe example * address comments * address comments * update ci to include different execution paths --- google/cloud/pubsub/samples/samples.cc | 82 +++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub/samples/samples.cc b/google/cloud/pubsub/samples/samples.cc index cf5cd190052d1..6805f2812d34c 100644 --- a/google/cloud/pubsub/samples/samples.cc +++ b/google/cloud/pubsub/samples/samples.cc @@ -129,7 +129,8 @@ void ExampleStatusOr(google::cloud::pubsub_admin::TopicAdminClient client, // The actual type of `topic` is // google::cloud::StatusOr, but // we expect it'll most often be declared with auto like this. - for (auto& topic : client.ListTopics(project_id)) { + for (auto& topic : + client.ListTopics(google::cloud::Project(project_id).FullName())) { // Use `topic` like a smart pointer; check it before de-referencing if (!topic) { // `topic` doesn't contain a value, so `.status()` will contain error @@ -1024,6 +1025,65 @@ void SubscriberRetrySettings(std::vector const& argv) { std::move(p.second), __func__); } +void OptimisticSubscribe(std::vector const& argv) { + namespace examples = ::google::cloud::testing_util; + if (argv.size() != 3) { + throw examples::Usage{ + "optimistic-subscribe "}; + } + namespace pubsub_admin = ::google::cloud::pubsub_admin; + namespace pubsub = ::google::cloud::pubsub; + namespace gc = ::google::cloud; + [](std::string project_id, std::string topic_id, + std::string subscription_id) { + // Do not retry the attempts to consume messages. + auto subscriber = pubsub::Subscriber( + pubsub::MakeSubscriberConnection( + pubsub::Subscription(project_id, subscription_id)), + google::cloud::Options{}.set( + pubsub::LimitedErrorCountRetryPolicy( + /*maximum_failures=*/0) + .clone())); + + // [START pubsub_optimistic_subscribe] + auto process_response = [](gc::StatusOr response) { + if (response) { + std::cout << "Received message " << response->message << "\n"; + std::move(response->handler).ack(); + return gc::Status(); + } + if (response.status().code() == gc::StatusCode::kUnavailable && + response.status().message() == "no messages returned") { + std::cout << "No messages returned from Pull()\n"; + return gc::Status(); + } + return response.status(); + }; + + // Instead of checking if the subscription exists, optimistically try to + // consume from the subscription. + auto status = process_response(subscriber.Pull()); + if (status.ok()) return; + if (status.code() != gc::StatusCode::kNotFound) throw std::move(status); + + // Since the subscription does not exist, create the subscription. + pubsub_admin::SubscriptionAdminClient subscription_admin_client( + pubsub_admin::MakeSubscriptionAdminConnection()); + google::pubsub::v1::Subscription request; + request.set_name( + pubsub::Subscription(project_id, subscription_id).FullName()); + request.set_topic( + pubsub::Topic(project_id, std::move(topic_id)).FullName()); + auto sub = subscription_admin_client.CreateSubscription(request); + if (!sub) throw std::move(sub).status(); + + // Consume from the new subscription. + status = process_response(subscriber.Pull()); + if (!status.ok()) throw std::move(status); + // [END pubsub_optimistic_subscribe] + }(argv.at(0), argv.at(1), argv.at(2)); +} + void AutoRunAvro( std::string const& project_id, std::string const& testdata_directory, google::cloud::internal::DefaultPRNG& generator, @@ -1223,6 +1283,9 @@ void AutoRun(std::vector const& argv) { auto const dead_letter_topic = google::cloud::pubsub::Topic(project_id, dead_letter_topic_id); auto const snapshot_id = RandomSnapshotId(generator); + auto const nonexistent_subscription_id = RandomSubscriptionId(generator); + auto const nonexistent_subscription = google::cloud::pubsub::Subscription( + project_id, nonexistent_subscription_id); using ::google::cloud::StatusCode; auto ignore_emulator_failures = @@ -1308,7 +1371,7 @@ void AutoRun(std::vector const& argv) { (void)subscription_admin.CreateSubscription(dead_letter_request); cleanup.Defer([subscription_admin, subscription, filtered_subscription, exactly_once_subscription, ordering_subscription, - dead_letter_subscription]() mutable { + dead_letter_subscription, nonexistent_subscription]() mutable { std::cout << "\nDelete subscription (" << subscription.subscription_id() << ")" << std::endl; (void)subscription_admin.DeleteSubscription(subscription.FullName()); @@ -1329,6 +1392,10 @@ void AutoRun(std::vector const& argv) { << dead_letter_subscription.subscription_id() << ")" << std::endl; (void)subscription_admin.DeleteSubscription( dead_letter_subscription.FullName()); + std::cout << "\nDelete subscription (" + << nonexistent_subscription.subscription_id() << ")" << std::endl; + (void)subscription_admin.DeleteSubscription( + nonexistent_subscription.FullName()); }); ignore_emulator_failures([&] { @@ -1449,6 +1516,16 @@ void AutoRun(std::vector const& argv) { PublishHelper(publisher, "SubscriberRetrySettings", 1); SubscriberRetrySettings({project_id, subscription_id}); + std::cout << "\nRunning OptimisticSubscribe() sample [1]" << std::endl; + PublishHelper(publisher, "OptimisticSubscribe", 1); + OptimisticSubscribe({project_id, topic_id, subscription_id}); + + std::cout << "\nRunning OptimisticSubscribe() sample [2]" << std::endl; + OptimisticSubscribe({project_id, topic_id, subscription_id}); + + std::cout << "\nRunning OptimisticSubscribe() sample [3]" << std::endl; + OptimisticSubscribe({project_id, topic_id, nonexistent_subscription_id}); + std::cout << "\nAutoRun done" << std::endl; } @@ -1493,6 +1570,7 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape) {"subscriber-concurrency-control", SubscriberConcurrencyControl}, {"subscriber-flow-control-settings", SubscriberFlowControlSettings}, {"subscriber-retry-settings", SubscriberRetrySettings}, + {"optimistic-subscribe", OptimisticSubscribe}, {"auto", AutoRun}, }); return example.Run(argc, argv);