From d5ca4a7ad9f9e4d44850cfafc54500c000013aba Mon Sep 17 00:00:00 2001 From: Carlos O'Ryan Date: Wed, 12 Oct 2022 16:19:01 +0000 Subject: [PATCH] fix(pubsub): limit `ModifyAckDeadlineRequest` size (#10032) Pub/Sub sets a quota of 512KB for `ModifyAckDeadlineRequest` messages. Theoretically we could have gone over that limit, for example, if the application had thousands of messages in memory, and we needed to extend their leases. With this change such requests will be broken into multiple RPCs. It also sets a quota for `AcknowledgeRequest` messages, but always send a single `ack_id` in this case, there is no chance to go over the limit. --- .../streaming_subscription_batch_source.cc | 78 ++++++- .../streaming_subscription_batch_source.h | 15 ++ ...treaming_subscription_batch_source_test.cc | 207 ++++++++++++++++++ 3 files changed, 294 insertions(+), 6 deletions(-) diff --git a/google/cloud/pubsub/internal/streaming_subscription_batch_source.cc b/google/cloud/pubsub/internal/streaming_subscription_batch_source.cc index 733092b0b8c96..2b61697068675 100644 --- a/google/cloud/pubsub/internal/streaming_subscription_batch_source.cc +++ b/google/cloud/pubsub/internal/streaming_subscription_batch_source.cc @@ -17,12 +17,38 @@ #include "google/cloud/pubsub/internal/extend_leases_with_retry.h" #include "google/cloud/internal/async_retry_loop.h" #include "google/cloud/log.h" +#include #include namespace google { namespace cloud { namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { +// NOLINTNEXTLINE(misc-no-recursion) +future> WaitAll(std::vector> v) { + if (v.empty()) return make_ready_future(std::vector{}); + auto back = std::move(v.back()); + v.pop_back(); + return WaitAll(std::move(v)).then([b = std::move(back)](auto f) mutable { + return b.then([list = f.get()](auto g) mutable { + list.push_back(g.get()); + return list; + }); + }); +} + +future Reduce(std::vector> v) { + return WaitAll(std::move(v)).then([](auto f) { + auto ready = f.get(); + for (auto& s : ready) { + if (!s.ok()) return std::move(s); + } + return Status{}; + }); +} + +} // namespace StreamingSubscriptionBatchSource::StreamingSubscriptionBatchSource( CompletionQueue cq, @@ -119,8 +145,21 @@ future StreamingSubscriptionBatchSource::BulkNack( request.set_subscription(subscription_full_name_); for (auto& a : ack_ids) *request.add_ack_ids() = std::move(a); request.set_ack_deadline_seconds(0); - return stub_->AsyncModifyAckDeadline( - cq_, absl::make_unique(), request); + + auto requests = + SplitModifyAckDeadline(std::move(request), kMaxAckIdsPerMessage); + if (requests.size() == 1) { + return stub_->AsyncModifyAckDeadline( + cq_, absl::make_unique(), requests.front()); + } + + std::vector> pending(requests.size()); + std::transform(requests.begin(), requests.end(), pending.begin(), + [this](auto const& request) { + return stub_->AsyncModifyAckDeadline( + cq_, absl::make_unique(), request); + }); + return Reduce(std::move(pending)); } void StreamingSubscriptionBatchSource::ExtendLeases( @@ -133,14 +172,17 @@ void StreamingSubscriptionBatchSource::ExtendLeases( request.add_ack_ids(std::move(a)); } std::unique_lock lk(mu_); + auto split = SplitModifyAckDeadline(std::move(request), kMaxAckIdsPerMessage); if (exactly_once_delivery_enabled_.value_or(false)) { lk.unlock(); - (void)ExtendLeasesWithRetry(stub_, cq_, std::move(request)); + for (auto& r : split) (void)ExtendLeasesWithRetry(stub_, cq_, std::move(r)); return; } lk.unlock(); - (void)stub_->AsyncModifyAckDeadline( - cq_, absl::make_unique(), request); + for (auto& r : split) { + (void)stub_->AsyncModifyAckDeadline( + cq_, absl::make_unique(), r); + } } void StreamingSubscriptionBatchSource::StartStream( @@ -370,7 +412,7 @@ void StreamingSubscriptionBatchSource::ShutdownStream( lk.unlock(); auto weak = WeakFromThis(); // There are no pending reads or writes, and something (probable a read or - // write error) recommends we shutdown the stream + // write error) recommends we shut down the stream stream->Finish().then([weak, stream](future f) { if (auto self = weak.lock()) self->OnFinish(f.get()); }); @@ -454,6 +496,30 @@ void StreamingSubscriptionBatchSource::ChangeState( stream_state_ = s; } +std::vector +SplitModifyAckDeadline(google::pubsub::v1::ModifyAckDeadlineRequest request, + int max_ack_ids) { + // We expect this to be the common case. + if (request.ack_ids_size() <= max_ack_ids) return {std::move(request)}; + + std::vector result; + auto& source = *request.mutable_ack_ids(); + while (request.ack_ids_size() > max_ack_ids) { + google::pubsub::v1::ModifyAckDeadlineRequest r; + r.set_subscription(request.subscription()); + r.set_ack_deadline_seconds(request.ack_deadline_seconds()); + + auto begin = source.begin(); + auto end = std::next(source.begin(), max_ack_ids); + r.mutable_ack_ids()->Reserve(max_ack_ids); + for (auto i = begin; i != end; ++i) r.add_ack_ids(std::move(*i)); + source.erase(begin, end); + result.push_back(std::move(r)); + } + if (!request.ack_ids().empty()) result.push_back(std::move(request)); + return result; +} + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal } // namespace cloud diff --git a/google/cloud/pubsub/internal/streaming_subscription_batch_source.h b/google/cloud/pubsub/internal/streaming_subscription_batch_source.h index c571167b4b557..491653c973eba 100644 --- a/google/cloud/pubsub/internal/streaming_subscription_batch_source.h +++ b/google/cloud/pubsub/internal/streaming_subscription_batch_source.h @@ -68,6 +68,16 @@ class StreamingSubscriptionBatchSource kFinishing, }; + // The maximum size for `ModifyAckDeadlineRequest` is 512 KB: + // https://cloud.google.com/pubsub/quotas#resource_limits + // Typical ack ids are less than 200 bytes. This value is safe, but there is + // no need to over optimize it: + // - Google does not charge for these messages + // - The value is reached rarely + // - The CPU costs saved between 2,048 ids per message vs. the theoretical + // maximum are minimal + static int constexpr kMaxAckIdsPerMessage = 2048; + private: // C++17 adds weak_from_this(), we cannot use the same name as (1) some // versions of the standard library include `weak_from_this()` even with @@ -136,6 +146,11 @@ class StreamingSubscriptionBatchSource std::ostream& operator<<(std::ostream& os, StreamingSubscriptionBatchSource::StreamState s); +/// Split @p request such that each request has at most @p max_ack_ids. +std::vector +SplitModifyAckDeadline(google::pubsub::v1::ModifyAckDeadlineRequest request, + int max_ack_ids); + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal } // namespace cloud diff --git a/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc b/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc index 1c9122eec8501..bff92c9496da8 100644 --- a/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc +++ b/google/cloud/pubsub/internal/streaming_subscription_batch_source_test.cc @@ -17,9 +17,11 @@ #include "google/cloud/pubsub/subscription.h" #include "google/cloud/pubsub/testing/mock_subscriber_stub.h" #include "google/cloud/pubsub/testing/test_retry_policies.h" +#include "google/cloud/credentials.h" #include "google/cloud/internal/background_threads_impl.h" #include "google/cloud/log.h" #include "google/cloud/testing_util/async_sequencer.h" +#include "google/cloud/testing_util/is_proto_equal.h" #include "google/cloud/testing_util/mock_completion_queue_impl.h" #include "google/cloud/testing_util/status_matchers.h" #include @@ -36,6 +38,7 @@ using ::google::cloud::internal::AutomaticallyCreatedBackgroundThreads; using ::google::cloud::internal::RunAsyncBase; using ::google::cloud::testing_util::AsyncSequencer; using ::google::cloud::testing_util::IsOk; +using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::MockCompletionQueueImpl; using ::google::cloud::testing_util::StatusIs; using ::testing::_; @@ -44,6 +47,7 @@ using ::testing::AtLeast; using ::testing::AtMost; using ::testing::ByMove; using ::testing::ElementsAre; +using ::testing::ElementsAreArray; using ::testing::HasSubstr; using ::testing::Property; using ::testing::Return; @@ -115,6 +119,7 @@ std::shared_ptr MakeTestBatchSource( auto subscription = pubsub::Subscription("test-project", "test-subscription"); auto opts = DefaultSubscriberOptions(pubsub_testing::MakeTestOptions( Options{} + .set(MakeInsecureCredentials()) .set(100) .set(100 * 1024 * 1024L) .set(std::chrono::seconds(300)))); @@ -1120,6 +1125,208 @@ TEST(StreamingSubscriptionBatchSourceTest, ExtendLeasesWithRetry) { EXPECT_THAT(done.get(), IsOk()); } +TEST(StreamingSubscriptionBatchSourceTest, SplitModifyAckDeadlineSmall) { + auto constexpr kMaxIds = 3; + + std::vector bulk_nacks{"fake-001", "fake-002", "fake-003"}; + ModifyRequest request; + request.set_subscription( + "projects/test-project/subscriptions/test-subscription"); + request.set_ack_deadline_seconds(12345); + for (auto id : bulk_nacks) request.add_ack_ids(std::move(id)); + + auto const actual = SplitModifyAckDeadline(request, kMaxIds); + EXPECT_THAT(actual, ElementsAre(IsProtoEqual(request))); +} + +TEST(StreamingSubscriptionBatchSourceTest, SplitModifyAckDeadline) { + auto constexpr kMaxIds = 3; + + std::vector bulk_nacks{ + "fake-001", "fake-002", "fake-003", "fake-004", + "fake-005", "fake-006", "fake-007", + }; + ModifyRequest request; + request.set_subscription( + "projects/test-project/subscriptions/test-subscription"); + request.set_ack_deadline_seconds(12345); + for (auto id : bulk_nacks) request.add_ack_ids(std::move(id)); + + std::vector expected(3); + for (auto& e : expected) { + e.set_subscription(request.subscription()); + e.set_ack_deadline_seconds(request.ack_deadline_seconds()); + } + expected[0].add_ack_ids("fake-001"); + expected[0].add_ack_ids("fake-002"); + expected[0].add_ack_ids("fake-003"); + + expected[1].add_ack_ids("fake-004"); + expected[1].add_ack_ids("fake-005"); + expected[1].add_ack_ids("fake-006"); + + expected[2].add_ack_ids("fake-007"); + + auto const actual = SplitModifyAckDeadline(std::move(request), kMaxIds); + EXPECT_THAT(actual, + ElementsAre(IsProtoEqual(expected[0]), IsProtoEqual(expected[1]), + IsProtoEqual(expected[2]))); +} + +std::unique_ptr MakeUnusedStream( + bool enable_exactly_once) { + auto start_response = []() { return make_ready_future(true); }; + auto write_response = [](google::pubsub::v1::StreamingPullRequest const&, + grpc::WriteOptions const&) { + return make_ready_future(true); + }; + auto read_response = [enable_exactly_once]() { + using Response = ::google::pubsub::v1::StreamingPullResponse; + Response response; + if (enable_exactly_once) { + response.mutable_subscription_properties() + ->set_exactly_once_delivery_enabled(true); + } + return make_ready_future(absl::make_optional(std::move(response))); + }; + auto finish_response = []() { return make_ready_future(Status{}); }; + + auto stream = absl::make_unique(); + EXPECT_CALL(*stream, Start).WillOnce(start_response); + EXPECT_CALL(*stream, Write).WillRepeatedly(write_response); + EXPECT_CALL(*stream, Read).WillRepeatedly(read_response); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); + EXPECT_CALL(*stream, Finish).Times(AtMost(1)).WillRepeatedly(finish_response); + return stream; +} + +TEST(StreamingSubscriptionBatchSourceTest, BulkNackMultipleRequests) { + auto constexpr kMaxIds = + StreamingSubscriptionBatchSource::kMaxAckIdsPerMessage; + + std::vector> groups; + auto make_ids = [](std::string const& prefix, int count) { + std::vector ids(count); + std::generate(ids.begin(), ids.end(), [&prefix, count = 0]() mutable { + return prefix + std::to_string(++count); + }); + return ids; + }; + groups.push_back(make_ids("group-1-", kMaxIds)); + groups.push_back(make_ids("group-2-", kMaxIds)); + groups.push_back(make_ids("group-3-", 2)); + + auto make_on_modify = [](std::vector e) { + return [expected_ids = std::move(e)](auto, auto, auto const& request) { + EXPECT_THAT(request.ack_ids(), ElementsAreArray(expected_ids)); + return make_ready_future(Status{}); + }; + }; + + AutomaticallyCreatedBackgroundThreads background; + auto mock = std::make_shared(); + + EXPECT_CALL(*mock, AsyncStreamingPull) + .WillOnce([&](google::cloud::CompletionQueue&, + std::unique_ptr, + google::pubsub::v1::StreamingPullRequest const&) { + return MakeUnusedStream(false); + }); + + EXPECT_CALL( + *mock, + AsyncModifyAckDeadline( + _, _, + Property(&ModifyRequest::subscription, + "projects/test-project/subscriptions/test-subscription"))) + .WillOnce(make_on_modify(groups[0])) + .WillOnce(make_on_modify(groups[1])) + .WillOnce(make_on_modify(groups[2])); + + auto shutdown = std::make_shared(); + auto uut = MakeTestBatchSource(background.cq(), shutdown, mock); + + auto done = shutdown->Start({}); + uut->Start([](StatusOr const&) {}); + + std::vector nacks; + for (auto& ids : groups) { + nacks.insert(nacks.end(), ids.begin(), ids.end()); + } + + uut->BulkNack(nacks); + + shutdown->MarkAsShutdown("test", {}); +} + +void CheckExtendLeasesMultipleRequests(bool enable_exactly_once) { + auto constexpr kMaxIds = + StreamingSubscriptionBatchSource::kMaxAckIdsPerMessage; + + std::vector> groups; + auto make_ids = [](std::string const& prefix, int count) { + std::vector ids(count); + std::generate(ids.begin(), ids.end(), [&prefix, count = 0]() mutable { + return prefix + std::to_string(++count); + }); + return ids; + }; + groups.push_back(make_ids("group-1-", kMaxIds)); + groups.push_back(make_ids("group-2-", kMaxIds)); + groups.push_back(make_ids("group-3-", 2)); + + auto make_on_modify = [](std::vector e) { + return [expected_ids = std::move(e)](auto, auto, auto const& request) { + EXPECT_THAT(request.ack_ids(), ElementsAreArray(expected_ids)); + return make_ready_future(Status{}); + }; + }; + + AutomaticallyCreatedBackgroundThreads background; + auto mock = std::make_shared(); + + EXPECT_CALL(*mock, AsyncStreamingPull) + .WillOnce([&](google::cloud::CompletionQueue&, + std::unique_ptr, + google::pubsub::v1::StreamingPullRequest const&) { + return MakeUnusedStream(enable_exactly_once); + }); + + EXPECT_CALL( + *mock, + AsyncModifyAckDeadline( + _, _, + Property(&ModifyRequest::subscription, + "projects/test-project/subscriptions/test-subscription"))) + .WillOnce(make_on_modify(groups[0])) + .WillOnce(make_on_modify(groups[1])) + .WillOnce(make_on_modify(groups[2])); + + auto shutdown = std::make_shared(); + auto uut = MakeTestBatchSource(background.cq(), shutdown, mock); + + auto done = shutdown->Start({}); + uut->Start([](StatusOr const&) {}); + + std::vector acks; + for (auto& ids : groups) { + acks.insert(acks.end(), ids.begin(), ids.end()); + } + + uut->ExtendLeases(acks, std::chrono::seconds(60)); + + shutdown->MarkAsShutdown("test", {}); +} + +TEST(StreamingSubscriptionBatchSourceTest, ExtendLeasesMultipleRequests) { + CheckExtendLeasesMultipleRequests(false); +} + +TEST(StreamingSubscriptionBatchSourceTest, + ExtendLeasesMultipleRequestsWithExactlyOnce) { + CheckExtendLeasesMultipleRequests(true); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal