-
Notifications
You must be signed in to change notification settings - Fork 366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
docs(pubsub): add optimistic subscribe example #14272
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,7 +129,8 @@ void ExampleStatusOr(google::cloud::pubsub_admin::TopicAdminClient client, | |
// The actual type of `topic` is | ||
// google::cloud::StatusOr<google::pubsub::v1::Topic>, but | ||
// we expect it'll most often be declared with auto like this. | ||
for (auto& topic : client.ListTopics(project_id)) { | ||
for (auto& topic : | ||
client.ListTopics(google::cloud::Project(project_id).FullName())) { | ||
// Use `topic` like a smart pointer; check it before de-referencing | ||
if (!topic) { | ||
// `topic` doesn't contain a value, so `.status()` will contain error | ||
|
@@ -1024,6 +1025,65 @@ void SubscriberRetrySettings(std::vector<std::string> const& argv) { | |
std::move(p.second), __func__); | ||
} | ||
|
||
void OptimisticSubscribe(std::vector<std::string> const& argv) { | ||
namespace examples = ::google::cloud::testing_util; | ||
if (argv.size() != 3) { | ||
throw examples::Usage{ | ||
"optimistic-subscribe <project-id> <topic-id> <subscription-id>"}; | ||
} | ||
namespace pubsub_admin = ::google::cloud::pubsub_admin; | ||
namespace pubsub = ::google::cloud::pubsub; | ||
namespace gc = ::google::cloud; | ||
[](std::string project_id, std::string topic_id, | ||
std::string subscription_id) { | ||
// Do not retry the attempts to consume messages. | ||
auto subscriber = pubsub::Subscriber( | ||
pubsub::MakeSubscriberConnection( | ||
pubsub::Subscription(project_id, subscription_id)), | ||
google::cloud::Options{}.set<pubsub::RetryPolicyOption>( | ||
pubsub::LimitedErrorCountRetryPolicy( | ||
/*maximum_failures=*/0) | ||
.clone())); | ||
|
||
// [START pubsub_optimistic_subscribe] | ||
auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) { | ||
if (response) { | ||
std::cout << "Received message " << response->message << "\n"; | ||
std::move(response->handler).ack(); | ||
return gc::Status(); | ||
} | ||
if (response.status().code() == gc::StatusCode::kUnavailable && | ||
response.status().message() == "no messages returned") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm, I don't love that we are recommending code that parses an error message. We make no guarantees about the contents of the error message. It's ok, though. optional, and not for this PR: This could be an opportunity for that client origin stuff. Something that feels more official? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I agree... I think using IsClientOrigin would be better. I can refactor in a later PR. |
||
std::cout << "No messages returned from Pull()\n"; | ||
return gc::Status(); | ||
} | ||
return response.status(); | ||
}; | ||
|
||
// Instead of checking if the subscription exists, optimistically try to | ||
// consume from the subscription. | ||
auto status = process_response(subscriber.Pull()); | ||
if (status.ok()) return; | ||
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status); | ||
|
||
// Since the subscription does not exist, create the subscription. | ||
pubsub_admin::SubscriptionAdminClient subscription_admin_client( | ||
pubsub_admin::MakeSubscriptionAdminConnection()); | ||
google::pubsub::v1::Subscription request; | ||
request.set_name( | ||
pubsub::Subscription(project_id, subscription_id).FullName()); | ||
request.set_topic( | ||
pubsub::Topic(project_id, std::move(topic_id)).FullName()); | ||
auto sub = subscription_admin_client.CreateSubscription(request); | ||
if (!sub) throw std::move(sub).status(); | ||
|
||
// Consume from the new subscription. | ||
status = process_response(subscriber.Pull()); | ||
if (!status.ok()) throw std::move(status); | ||
// [END pubsub_optimistic_subscribe] | ||
}(argv.at(0), argv.at(1), argv.at(2)); | ||
} | ||
|
||
void AutoRunAvro( | ||
std::string const& project_id, std::string const& testdata_directory, | ||
google::cloud::internal::DefaultPRNG& generator, | ||
|
@@ -1223,6 +1283,9 @@ void AutoRun(std::vector<std::string> const& argv) { | |
auto const dead_letter_topic = | ||
google::cloud::pubsub::Topic(project_id, dead_letter_topic_id); | ||
auto const snapshot_id = RandomSnapshotId(generator); | ||
auto const nonexistent_subscription_id = RandomSubscriptionId(generator); | ||
auto const nonexistent_subscription = google::cloud::pubsub::Subscription( | ||
project_id, nonexistent_subscription_id); | ||
|
||
using ::google::cloud::StatusCode; | ||
auto ignore_emulator_failures = | ||
|
@@ -1308,7 +1371,7 @@ void AutoRun(std::vector<std::string> const& argv) { | |
(void)subscription_admin.CreateSubscription(dead_letter_request); | ||
cleanup.Defer([subscription_admin, subscription, filtered_subscription, | ||
exactly_once_subscription, ordering_subscription, | ||
dead_letter_subscription]() mutable { | ||
dead_letter_subscription, nonexistent_subscription]() mutable { | ||
std::cout << "\nDelete subscription (" << subscription.subscription_id() | ||
<< ")" << std::endl; | ||
(void)subscription_admin.DeleteSubscription(subscription.FullName()); | ||
|
@@ -1329,6 +1392,10 @@ void AutoRun(std::vector<std::string> const& argv) { | |
<< dead_letter_subscription.subscription_id() << ")" << std::endl; | ||
(void)subscription_admin.DeleteSubscription( | ||
dead_letter_subscription.FullName()); | ||
std::cout << "\nDelete subscription (" | ||
<< nonexistent_subscription.subscription_id() << ")" << std::endl; | ||
(void)subscription_admin.DeleteSubscription( | ||
nonexistent_subscription.FullName()); | ||
}); | ||
|
||
ignore_emulator_failures([&] { | ||
|
@@ -1449,6 +1516,16 @@ void AutoRun(std::vector<std::string> const& argv) { | |
PublishHelper(publisher, "SubscriberRetrySettings", 1); | ||
SubscriberRetrySettings({project_id, subscription_id}); | ||
|
||
std::cout << "\nRunning OptimisticSubscribe() sample [1]" << std::endl; | ||
PublishHelper(publisher, "OptimisticSubscribe", 1); | ||
OptimisticSubscribe({project_id, topic_id, subscription_id}); | ||
|
||
std::cout << "\nRunning OptimisticSubscribe() sample [2]" << std::endl; | ||
OptimisticSubscribe({project_id, topic_id, subscription_id}); | ||
|
||
std::cout << "\nRunning OptimisticSubscribe() sample [3]" << std::endl; | ||
OptimisticSubscribe({project_id, topic_id, nonexistent_subscription_id}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good idea |
||
|
||
std::cout << "\nAutoRun done" << std::endl; | ||
} | ||
|
||
|
@@ -1493,6 +1570,7 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape) | |
{"subscriber-concurrency-control", SubscriberConcurrencyControl}, | ||
{"subscriber-flow-control-settings", SubscriberFlowControlSettings}, | ||
{"subscriber-retry-settings", SubscriberRetrySettings}, | ||
{"optimistic-subscribe", OptimisticSubscribe}, | ||
{"auto", AutoRun}, | ||
}); | ||
return example.Run(argc, argv); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an unrelated change, but I noticed it when I ran the auto version of samples