Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): blocking pulls #10317

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google/cloud/pubsub/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ add_library(
publisher_options.h
pull_ack_handler.cc
pull_ack_handler.h
pull_response.h
retry_policy.h
schema.cc
schema.h
Expand Down
1 change: 1 addition & 0 deletions google/cloud/pubsub/google_cloud_cpp_pubsub.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ google_cloud_cpp_pubsub_hdrs = [
"publisher_connection.h",
"publisher_options.h",
"pull_ack_handler.h",
"pull_response.h",
"retry_policy.h",
"schema.h",
"schema_admin_client.h",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,32 @@ TEST_F(SubscriberIntegrationTest, ExactlyOnce) {
EXPECT_STATUS_OK(result.get());
}

TEST_F(SubscriberIntegrationTest, BlockingPull) {
auto publisher = Publisher(MakePublisherConnection(topic_));
auto subscriber =
Subscriber(MakeSubscriberConnection(exactly_once_subscription_));

std::set<std::string> ids;
for (auto const* data : {"message-0", "message-1", "message-2"}) {
auto response =
publisher.Publish(MessageBuilder{}.SetData(data).Build()).get();
EXPECT_STATUS_OK(response);
if (response) ids.insert(*std::move(response));
}
EXPECT_THAT(ids, Not(IsEmpty()));

auto const count = 2 * ids.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do 2*N iterations for an exactly-once subscription?

Is it to make sure everything gets acked, in the failure case where we continue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. For low values of "make sure".

for (std::size_t i = 0; i != count && !ids.empty(); ++i) {
auto response = subscriber.Pull();
EXPECT_STATUS_OK(response);
if (!response) continue;
auto ack = std::move(response->handler).ack().get();
EXPECT_STATUS_OK(ack);
ids.erase(response->message.message_id());
}
EXPECT_THAT(ids, IsEmpty());
}

/// @test Verify the backwards compatibility `v1` namespace still exists.
TEST_F(SubscriberIntegrationTest, BackwardsCompatibility) {
auto connection = ::google::cloud::pubsub::v1::MakeSubscriberConnection(
Expand Down
7 changes: 3 additions & 4 deletions google/cloud/pubsub/internal/subscriber_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ future<Status> SubscriberConnectionImpl::ExactlyOnceSubscribe(
background_->cq(), MakeClientId(), std::move(p.callback));
}

StatusOr<SubscriberConnectionImpl::PullResponse>
SubscriberConnectionImpl::Pull() {
StatusOr<pubsub::PullResponse> SubscriberConnectionImpl::Pull() {
google::pubsub::v1::PullRequest request;
request.set_subscription(subscription_.FullName());
request.set_max_messages(1);
Expand All @@ -76,8 +75,8 @@ SubscriberConnectionImpl::Pull() {
received_message.delivery_attempt());
auto message = pubsub_internal::FromProto(
std::move(*received_message.mutable_message()));
return PullResponse{pubsub::PullAckHandler(std::move(impl)),
std::move(message)};
return pubsub::PullResponse{pubsub::PullAckHandler(std::move(impl)),
std::move(message)};
}

Options SubscriberConnectionImpl::options() { return opts_; }
Expand Down
9 changes: 1 addition & 8 deletions google/cloud/pubsub/internal/subscriber_connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include "google/cloud/pubsub/ack_handler.h"
#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/pull_ack_handler.h"
#include "google/cloud/pubsub/subscriber_connection.h"
#include "google/cloud/status_or.h"
#include "google/cloud/version.h"
Expand All @@ -39,13 +38,7 @@ class SubscriberConnectionImpl : public pubsub::SubscriberConnection {

future<Status> ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p) override;

// TODO(#7187) - move to pubsub::SubscriberConnection
struct PullResponse {
pubsub::PullAckHandler handler;
pubsub::Message message;
};

StatusOr<PullResponse> Pull();
StatusOr<pubsub::PullResponse> Pull() override;

Options options() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
// limitations under the License.

#include "google/cloud/pubsub/internal/subscription_session.h"
#include "google/cloud/pubsub/ack_handler.h"
#include "google/cloud/pubsub/application_callback.h"
#include "google/cloud/pubsub/exactly_once_ack_handler.h"
#include "google/cloud/pubsub/internal/defaults.h"
#include "google/cloud/pubsub/subscriber_connection.h"
#include "google/cloud/pubsub/testing/fake_streaming_pull.h"
#include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
#include "google/cloud/pubsub/testing/test_retry_policies.h"
#include "google/cloud/log.h"
#include "google/cloud/testing_util/async_sequencer.h"
#include "google/cloud/testing_util/fake_completion_queue_impl.h"
#include "google/cloud/testing_util/scoped_log.h"
Expand Down
55 changes: 55 additions & 0 deletions google/cloud/pubsub/pull_response.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PULL_RESPONSE_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PULL_RESPONSE_H

#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/pull_ack_handler.h"
#include "google/cloud/pubsub/version.h"

namespace google {
namespace cloud {
namespace pubsub {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

/**
* The response for a blocking pull.
*
* If the application invokes `handler.nack()` or allows `handler` to go out
* of scope, then the service will redeliver the message.
*
* With exactly-once delivery subscriptions, the service will stop
* redelivering the message once the application invokes `handler.ack()` and
* the invocation succeeds. With best-efforts subscriptions, the service *may*
* redeliver the message, even after a successful `handler.ack()` invocation.
*
* If `handler` is not an rvalue, you may need to use `std::move(handler).ack()`
* or `std::move(handler).nack()`.
*
* @see https://cloud.google.com/pubsub/docs/exactly-once-delivery
*/
struct PullResponse {
/// The ack/nack handler associated with this message.
pubsub::PullAckHandler handler;
/// The message attributes and payload.
pubsub::Message message;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PULL_RESPONSE_H
18 changes: 18 additions & 0 deletions google/cloud/pubsub/samples/samples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,19 @@ void ExactlyOnceSubscribe(google::cloud::pubsub::Subscriber subscriber,
sample(std::move(subscriber)), __func__);
}

void Pull(google::cloud::pubsub::Subscriber subscriber,
std::vector<std::string> const&) {
//! [pull]
[](google::cloud::pubsub::Subscriber subscriber) {
auto response = subscriber.Pull();
if (!response) throw std::move(response).status();
std::cout << "Received message " << response->message << "\n";
std::move(response->handler).ack();
}
//! [pull]
(std::move(subscriber));
}

void SubscribeErrorListener(google::cloud::pubsub::Subscriber subscriber,
std::vector<std::string> const&) {
auto current = EventCounter::Instance().Current();
Expand Down Expand Up @@ -2248,6 +2261,10 @@ void AutoRun(std::vector<std::string> const& argv) {
std::cout << "\nRunning ExactlyOnceSubscribe() sample" << std::endl;
ExactlyOnceSubscribe(exactly_once_subscriber, {});

std::cout << "\nRunning Pull() sample" << std::endl;
PublishHelper(publisher, "Pull()", 1);
Pull(subscriber, {});

std::cout << "\nRunning Subscribe(filtered) sample" << std::endl;
PublishHelper(publisher, "Subscribe(filtered)", 8);
Subscribe(filtered_subscriber, {});
Expand Down Expand Up @@ -2499,6 +2516,7 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape)
CreateSubscriberCommand("subscribe", {}, Subscribe),
CreateSubscriberCommand("exactly-once-subscribe", {},
ExactlyOnceSubscribe),
CreateSubscriberCommand("pull", {}, Pull),
CreateSubscriberCommand("subscribe-error-listener", {},
SubscribeErrorListener),
CreateSubscriberCommand("subscribe-custom-attributes", {},
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/pubsub/subscriber.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ future<Status> Subscriber::Subscribe(ExactlyOnceApplicationCallback cb,
return connection_->ExactlyOnceSubscribe({std::move(cb)});
}

StatusOr<PullResponse> Subscriber::Pull(Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
return connection_->Pull();
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub
} // namespace cloud
Expand Down
24 changes: 24 additions & 0 deletions google/cloud/pubsub/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,30 @@ class Subscriber {
future<Status> Subscribe(ExactlyOnceApplicationCallback cb,
Options opts = {});

/**
* Pulls one message from @p subscription.
*
* @par Idempotency
* @parblock
* This is an idempotent operation; it only reads messages from the service.
* It will make multiple attempts to pull a message from the service, subject
* to the retry policies configured in the `SubscriberConnection`.
*
* Note that calling `PullAckHandler::ack()` and/or `PullAckHandler::nack()`
* have their own rules with respect to retrying.
* @endparblock
*
* @par Example
* @snippet samples.cc pull
*
* @param opts any option overrides to use in this call. These options take
* precedence over the options passed in the constructor, and over any
* options provided in the `PublisherConnection` initialization.
* @return a response including the message and a `PullAckHandler` to notify
* the library when the message has been successfully handled.
*/
StatusOr<PullResponse> Pull(Options opts = {});

private:
std::shared_ptr<SubscriberConnection> connection_;
google::cloud::Options options_;
Expand Down
9 changes: 7 additions & 2 deletions google/cloud/pubsub/subscriber_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "google/cloud/pubsub/options.h"
#include "google/cloud/pubsub/retry_policy.h"
#include "google/cloud/credentials.h"
#include "google/cloud/internal/make_status.h"
#include "google/cloud/internal/random.h"
#include "google/cloud/log.h"
#include <algorithm>
Expand Down Expand Up @@ -63,14 +64,18 @@ SubscriberConnection::~SubscriberConnection() = default;
// NOLINTNEXTLINE(performance-unnecessary-value-param)
future<Status> SubscriberConnection::Subscribe(SubscribeParams) {
return make_ready_future(
Status{StatusCode::kUnimplemented, "needs-override"});
internal::UnimplementedError("needs-override", GCP_ERROR_INFO()));
}

future<Status> SubscriberConnection::ExactlyOnceSubscribe(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
ExactlyOnceSubscribeParams) {
return make_ready_future(
Status{StatusCode::kUnimplemented, "needs-override"});
internal::UnimplementedError("needs-override", GCP_ERROR_INFO()));
}

StatusOr<pubsub::PullResponse> SubscriberConnection::Pull() {
return internal::UnimplementedError("needs-override", GCP_ERROR_INFO());
}

std::shared_ptr<SubscriberConnection> MakeSubscriberConnection(
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/pubsub/subscriber_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_CONNECTION_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_CONNECTION_H

#include "google/cloud/pubsub/ack_handler.h"
#include "google/cloud/pubsub/application_callback.h"
#include "google/cloud/pubsub/backoff_policy.h"
#include "google/cloud/pubsub/connection_options.h"
#include "google/cloud/pubsub/internal/subscriber_stub.h"
#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/pull_response.h"
#include "google/cloud/pubsub/retry_policy.h"
#include "google/cloud/pubsub/subscriber_options.h"
#include "google/cloud/pubsub/subscription.h"
Expand Down Expand Up @@ -79,6 +79,8 @@ class SubscriberConnection {
*/
virtual future<Status> ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p);

virtual StatusOr<PullResponse> Pull();

/// Returns the configuration parameters for this object
virtual Options options() { return Options{}; }
};
Expand Down
1 change: 1 addition & 0 deletions google/cloud/pubsub/subscriber_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/pubsub/subscriber_connection.h"
#include "google/cloud/pubsub/ack_handler.h"
#include "google/cloud/pubsub/exactly_once_ack_handler.h"
#include "google/cloud/pubsub/internal/defaults.h"
#include "google/cloud/pubsub/testing/fake_streaming_pull.h"
Expand Down