Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(pubsub): add optimistic subscribe example #14272

Merged
merged 4 commits into from
May 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 80 additions & 2 deletions google/cloud/pubsub/samples/samples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ void ExampleStatusOr(google::cloud::pubsub_admin::TopicAdminClient client,
// The actual type of `topic` is
// google::cloud::StatusOr<google::pubsub::v1::Topic>, but
// we expect it'll most often be declared with auto like this.
for (auto& topic : client.ListTopics(project_id)) {
for (auto& topic :
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unrelated change, but I noticed it when I ran the auto version of samples

client.ListTopics(google::cloud::Project(project_id).FullName())) {
// Use `topic` like a smart pointer; check it before de-referencing
if (!topic) {
// `topic` doesn't contain a value, so `.status()` will contain error
Expand Down Expand Up @@ -1024,6 +1025,65 @@ void SubscriberRetrySettings(std::vector<std::string> const& argv) {
std::move(p.second), __func__);
}

void OptimisticSubscribe(std::vector<std::string> const& argv) {
namespace examples = ::google::cloud::testing_util;
if (argv.size() != 3) {
throw examples::Usage{
"optimistic-subscribe <project-id> <topic-id> <subscription-id>"};
}
namespace pubsub_admin = ::google::cloud::pubsub_admin;
namespace pubsub = ::google::cloud::pubsub;
namespace gc = ::google::cloud;
[](std::string project_id, std::string topic_id,
std::string subscription_id) {
// Do not retry the attempts to consume messages.
auto subscriber = pubsub::Subscriber(
pubsub::MakeSubscriberConnection(
pubsub::Subscription(project_id, subscription_id)),
google::cloud::Options{}.set<pubsub::RetryPolicyOption>(
pubsub::LimitedErrorCountRetryPolicy(
/*maximum_failures=*/0)
.clone()));

// [START pubsub_optimistic_subscribe]
auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
if (response) {
std::cout << "Received message " << response->message << "\n";
std::move(response->handler).ack();
return gc::Status();
}
if (response.status().code() == gc::StatusCode::kUnavailable &&
response.status().message() == "no messages returned") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I don't love that we are recommending code that parses an error message. We make no guarantees about the contents of the error message. It's ok, though.

optional, and not for this PR: This could be an opportunity for that client origin stuff. Something that feels more official?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree... I think using IsClientOrigin would be better. I can refactor in a later PR.

std::cout << "No messages returned from Pull()\n";
return gc::Status();
}
return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);
// [END pubsub_optimistic_subscribe]
}(argv.at(0), argv.at(1), argv.at(2));
}

void AutoRunAvro(
std::string const& project_id, std::string const& testdata_directory,
google::cloud::internal::DefaultPRNG& generator,
Expand Down Expand Up @@ -1223,6 +1283,9 @@ void AutoRun(std::vector<std::string> const& argv) {
auto const dead_letter_topic =
google::cloud::pubsub::Topic(project_id, dead_letter_topic_id);
auto const snapshot_id = RandomSnapshotId(generator);
auto const nonexistent_subscription_id = RandomSubscriptionId(generator);
auto const nonexistent_subscription = google::cloud::pubsub::Subscription(
project_id, nonexistent_subscription_id);

using ::google::cloud::StatusCode;
auto ignore_emulator_failures =
Expand Down Expand Up @@ -1308,7 +1371,7 @@ void AutoRun(std::vector<std::string> const& argv) {
(void)subscription_admin.CreateSubscription(dead_letter_request);
cleanup.Defer([subscription_admin, subscription, filtered_subscription,
exactly_once_subscription, ordering_subscription,
dead_letter_subscription]() mutable {
dead_letter_subscription, nonexistent_subscription]() mutable {
std::cout << "\nDelete subscription (" << subscription.subscription_id()
<< ")" << std::endl;
(void)subscription_admin.DeleteSubscription(subscription.FullName());
Expand All @@ -1329,6 +1392,10 @@ void AutoRun(std::vector<std::string> const& argv) {
<< dead_letter_subscription.subscription_id() << ")" << std::endl;
(void)subscription_admin.DeleteSubscription(
dead_letter_subscription.FullName());
std::cout << "\nDelete subscription ("
<< nonexistent_subscription.subscription_id() << ")" << std::endl;
(void)subscription_admin.DeleteSubscription(
nonexistent_subscription.FullName());
});

ignore_emulator_failures([&] {
Expand Down Expand Up @@ -1449,6 +1516,16 @@ void AutoRun(std::vector<std::string> const& argv) {
PublishHelper(publisher, "SubscriberRetrySettings", 1);
SubscriberRetrySettings({project_id, subscription_id});

std::cout << "\nRunning OptimisticSubscribe() sample [1]" << std::endl;
PublishHelper(publisher, "OptimisticSubscribe", 1);
OptimisticSubscribe({project_id, topic_id, subscription_id});

std::cout << "\nRunning OptimisticSubscribe() sample [2]" << std::endl;
OptimisticSubscribe({project_id, topic_id, subscription_id});

std::cout << "\nRunning OptimisticSubscribe() sample [3]" << std::endl;
OptimisticSubscribe({project_id, topic_id, nonexistent_subscription_id});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea


std::cout << "\nAutoRun done" << std::endl;
}

Expand Down Expand Up @@ -1493,6 +1570,7 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape)
{"subscriber-concurrency-control", SubscriberConcurrencyControl},
{"subscriber-flow-control-settings", SubscriberFlowControlSettings},
{"subscriber-retry-settings", SubscriberRetrySettings},
{"optimistic-subscribe", OptimisticSubscribe},
{"auto", AutoRun},
});
return example.Run(argc, argv);
Expand Down
Loading