Skip to content

Commit

Permalink
fix(pubsub): limit ModifyAckDeadlineRequest size (#10032)
Browse files Browse the repository at this point in the history
Pub/Sub sets a quota of 512KB for `ModifyAckDeadlineRequest` messages.
Theoretically we could have gone over that limit, for example, if the
application had thousands of messages in memory, and we needed to
extend their leases.  With this change such requests will be broken
into multiple RPCs.

It also sets a quota for `AcknowledgeRequest` messages, but always send
a single `ack_id` in this case, there is no chance to go over the
limit.
  • Loading branch information
coryan committed Oct 12, 2022
1 parent 1bd280f commit d5ca4a7
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,38 @@
#include "google/cloud/pubsub/internal/extend_leases_with_retry.h"
#include "google/cloud/internal/async_retry_loop.h"
#include "google/cloud/log.h"
#include <iterator>
#include <ostream>

namespace google {
namespace cloud {
namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {
// NOLINTNEXTLINE(misc-no-recursion)
future<std::vector<Status>> WaitAll(std::vector<future<Status>> v) {
if (v.empty()) return make_ready_future(std::vector<Status>{});
auto back = std::move(v.back());
v.pop_back();
return WaitAll(std::move(v)).then([b = std::move(back)](auto f) mutable {
return b.then([list = f.get()](auto g) mutable {
list.push_back(g.get());
return list;
});
});
}

future<Status> Reduce(std::vector<future<Status>> v) {
return WaitAll(std::move(v)).then([](auto f) {
auto ready = f.get();
for (auto& s : ready) {
if (!s.ok()) return std::move(s);
}
return Status{};
});
}

} // namespace

StreamingSubscriptionBatchSource::StreamingSubscriptionBatchSource(
CompletionQueue cq,
Expand Down Expand Up @@ -119,8 +145,21 @@ future<Status> StreamingSubscriptionBatchSource::BulkNack(
request.set_subscription(subscription_full_name_);
for (auto& a : ack_ids) *request.add_ack_ids() = std::move(a);
request.set_ack_deadline_seconds(0);
return stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), request);

auto requests =
SplitModifyAckDeadline(std::move(request), kMaxAckIdsPerMessage);
if (requests.size() == 1) {
return stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), requests.front());
}

std::vector<future<Status>> pending(requests.size());
std::transform(requests.begin(), requests.end(), pending.begin(),
[this](auto const& request) {
return stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), request);
});
return Reduce(std::move(pending));
}

void StreamingSubscriptionBatchSource::ExtendLeases(
Expand All @@ -133,14 +172,17 @@ void StreamingSubscriptionBatchSource::ExtendLeases(
request.add_ack_ids(std::move(a));
}
std::unique_lock<std::mutex> lk(mu_);
auto split = SplitModifyAckDeadline(std::move(request), kMaxAckIdsPerMessage);
if (exactly_once_delivery_enabled_.value_or(false)) {
lk.unlock();
(void)ExtendLeasesWithRetry(stub_, cq_, std::move(request));
for (auto& r : split) (void)ExtendLeasesWithRetry(stub_, cq_, std::move(r));
return;
}
lk.unlock();
(void)stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), request);
for (auto& r : split) {
(void)stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), r);
}
}

void StreamingSubscriptionBatchSource::StartStream(
Expand Down Expand Up @@ -370,7 +412,7 @@ void StreamingSubscriptionBatchSource::ShutdownStream(
lk.unlock();
auto weak = WeakFromThis();
// There are no pending reads or writes, and something (probable a read or
// write error) recommends we shutdown the stream
// write error) recommends we shut down the stream
stream->Finish().then([weak, stream](future<Status> f) {
if (auto self = weak.lock()) self->OnFinish(f.get());
});
Expand Down Expand Up @@ -454,6 +496,30 @@ void StreamingSubscriptionBatchSource::ChangeState(
stream_state_ = s;
}

std::vector<google::pubsub::v1::ModifyAckDeadlineRequest>
SplitModifyAckDeadline(google::pubsub::v1::ModifyAckDeadlineRequest request,
int max_ack_ids) {
// We expect this to be the common case.
if (request.ack_ids_size() <= max_ack_ids) return {std::move(request)};

std::vector<google::pubsub::v1::ModifyAckDeadlineRequest> result;
auto& source = *request.mutable_ack_ids();
while (request.ack_ids_size() > max_ack_ids) {
google::pubsub::v1::ModifyAckDeadlineRequest r;
r.set_subscription(request.subscription());
r.set_ack_deadline_seconds(request.ack_deadline_seconds());

auto begin = source.begin();
auto end = std::next(source.begin(), max_ack_ids);
r.mutable_ack_ids()->Reserve(max_ack_ids);
for (auto i = begin; i != end; ++i) r.add_ack_ids(std::move(*i));
source.erase(begin, end);
result.push_back(std::move(r));
}
if (!request.ack_ids().empty()) result.push_back(std::move(request));
return result;
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub_internal
} // namespace cloud
Expand Down
15 changes: 15 additions & 0 deletions google/cloud/pubsub/internal/streaming_subscription_batch_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ class StreamingSubscriptionBatchSource
kFinishing,
};

// The maximum size for `ModifyAckDeadlineRequest` is 512 KB:
// https://cloud.google.com/pubsub/quotas#resource_limits
// Typical ack ids are less than 200 bytes. This value is safe, but there is
// no need to over optimize it:
// - Google does not charge for these messages
// - The value is reached rarely
// - The CPU costs saved between 2,048 ids per message vs. the theoretical
// maximum are minimal
static int constexpr kMaxAckIdsPerMessage = 2048;

private:
// C++17 adds weak_from_this(), we cannot use the same name as (1) some
// versions of the standard library include `weak_from_this()` even with
Expand Down Expand Up @@ -136,6 +146,11 @@ class StreamingSubscriptionBatchSource
std::ostream& operator<<(std::ostream& os,
StreamingSubscriptionBatchSource::StreamState s);

/// Split @p request such that each request has at most @p max_ack_ids.
std::vector<google::pubsub::v1::ModifyAckDeadlineRequest>
SplitModifyAckDeadline(google::pubsub::v1::ModifyAckDeadlineRequest request,
int max_ack_ids);

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub_internal
} // namespace cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
#include "google/cloud/pubsub/subscription.h"
#include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
#include "google/cloud/pubsub/testing/test_retry_policies.h"
#include "google/cloud/credentials.h"
#include "google/cloud/internal/background_threads_impl.h"
#include "google/cloud/log.h"
#include "google/cloud/testing_util/async_sequencer.h"
#include "google/cloud/testing_util/is_proto_equal.h"
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
#include "google/cloud/testing_util/status_matchers.h"
#include <gmock/gmock.h>
Expand All @@ -36,6 +38,7 @@ using ::google::cloud::internal::AutomaticallyCreatedBackgroundThreads;
using ::google::cloud::internal::RunAsyncBase;
using ::google::cloud::testing_util::AsyncSequencer;
using ::google::cloud::testing_util::IsOk;
using ::google::cloud::testing_util::IsProtoEqual;
using ::google::cloud::testing_util::MockCompletionQueueImpl;
using ::google::cloud::testing_util::StatusIs;
using ::testing::_;
Expand All @@ -44,6 +47,7 @@ using ::testing::AtLeast;
using ::testing::AtMost;
using ::testing::ByMove;
using ::testing::ElementsAre;
using ::testing::ElementsAreArray;
using ::testing::HasSubstr;
using ::testing::Property;
using ::testing::Return;
Expand Down Expand Up @@ -115,6 +119,7 @@ std::shared_ptr<StreamingSubscriptionBatchSource> MakeTestBatchSource(
auto subscription = pubsub::Subscription("test-project", "test-subscription");
auto opts = DefaultSubscriberOptions(pubsub_testing::MakeTestOptions(
Options{}
.set<UnifiedCredentialsOption>(MakeInsecureCredentials())
.set<pubsub::MaxOutstandingMessagesOption>(100)
.set<pubsub::MaxOutstandingBytesOption>(100 * 1024 * 1024L)
.set<pubsub::MaxHoldTimeOption>(std::chrono::seconds(300))));
Expand Down Expand Up @@ -1120,6 +1125,208 @@ TEST(StreamingSubscriptionBatchSourceTest, ExtendLeasesWithRetry) {
EXPECT_THAT(done.get(), IsOk());
}

TEST(StreamingSubscriptionBatchSourceTest, SplitModifyAckDeadlineSmall) {
auto constexpr kMaxIds = 3;

std::vector<std::string> bulk_nacks{"fake-001", "fake-002", "fake-003"};
ModifyRequest request;
request.set_subscription(
"projects/test-project/subscriptions/test-subscription");
request.set_ack_deadline_seconds(12345);
for (auto id : bulk_nacks) request.add_ack_ids(std::move(id));

auto const actual = SplitModifyAckDeadline(request, kMaxIds);
EXPECT_THAT(actual, ElementsAre(IsProtoEqual(request)));
}

TEST(StreamingSubscriptionBatchSourceTest, SplitModifyAckDeadline) {
auto constexpr kMaxIds = 3;

std::vector<std::string> bulk_nacks{
"fake-001", "fake-002", "fake-003", "fake-004",
"fake-005", "fake-006", "fake-007",
};
ModifyRequest request;
request.set_subscription(
"projects/test-project/subscriptions/test-subscription");
request.set_ack_deadline_seconds(12345);
for (auto id : bulk_nacks) request.add_ack_ids(std::move(id));

std::vector<ModifyRequest> expected(3);
for (auto& e : expected) {
e.set_subscription(request.subscription());
e.set_ack_deadline_seconds(request.ack_deadline_seconds());
}
expected[0].add_ack_ids("fake-001");
expected[0].add_ack_ids("fake-002");
expected[0].add_ack_ids("fake-003");

expected[1].add_ack_ids("fake-004");
expected[1].add_ack_ids("fake-005");
expected[1].add_ack_ids("fake-006");

expected[2].add_ack_ids("fake-007");

auto const actual = SplitModifyAckDeadline(std::move(request), kMaxIds);
EXPECT_THAT(actual,
ElementsAre(IsProtoEqual(expected[0]), IsProtoEqual(expected[1]),
IsProtoEqual(expected[2])));
}

std::unique_ptr<pubsub_testing::MockAsyncPullStream> MakeUnusedStream(
bool enable_exactly_once) {
auto start_response = []() { return make_ready_future(true); };
auto write_response = [](google::pubsub::v1::StreamingPullRequest const&,
grpc::WriteOptions const&) {
return make_ready_future(true);
};
auto read_response = [enable_exactly_once]() {
using Response = ::google::pubsub::v1::StreamingPullResponse;
Response response;
if (enable_exactly_once) {
response.mutable_subscription_properties()
->set_exactly_once_delivery_enabled(true);
}
return make_ready_future(absl::make_optional(std::move(response)));
};
auto finish_response = []() { return make_ready_future(Status{}); };

auto stream = absl::make_unique<pubsub_testing::MockAsyncPullStream>();
EXPECT_CALL(*stream, Start).WillOnce(start_response);
EXPECT_CALL(*stream, Write).WillRepeatedly(write_response);
EXPECT_CALL(*stream, Read).WillRepeatedly(read_response);
EXPECT_CALL(*stream, Cancel).Times(AtMost(1));
EXPECT_CALL(*stream, Finish).Times(AtMost(1)).WillRepeatedly(finish_response);
return stream;
}

TEST(StreamingSubscriptionBatchSourceTest, BulkNackMultipleRequests) {
auto constexpr kMaxIds =
StreamingSubscriptionBatchSource::kMaxAckIdsPerMessage;

std::vector<std::vector<std::string>> groups;
auto make_ids = [](std::string const& prefix, int count) {
std::vector<std::string> ids(count);
std::generate(ids.begin(), ids.end(), [&prefix, count = 0]() mutable {
return prefix + std::to_string(++count);
});
return ids;
};
groups.push_back(make_ids("group-1-", kMaxIds));
groups.push_back(make_ids("group-2-", kMaxIds));
groups.push_back(make_ids("group-3-", 2));

auto make_on_modify = [](std::vector<std::string> e) {
return [expected_ids = std::move(e)](auto, auto, auto const& request) {
EXPECT_THAT(request.ack_ids(), ElementsAreArray(expected_ids));
return make_ready_future(Status{});
};
};

AutomaticallyCreatedBackgroundThreads background;
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();

EXPECT_CALL(*mock, AsyncStreamingPull)
.WillOnce([&](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>,
google::pubsub::v1::StreamingPullRequest const&) {
return MakeUnusedStream(false);
});

EXPECT_CALL(
*mock,
AsyncModifyAckDeadline(
_, _,
Property(&ModifyRequest::subscription,
"projects/test-project/subscriptions/test-subscription")))
.WillOnce(make_on_modify(groups[0]))
.WillOnce(make_on_modify(groups[1]))
.WillOnce(make_on_modify(groups[2]));

auto shutdown = std::make_shared<SessionShutdownManager>();
auto uut = MakeTestBatchSource(background.cq(), shutdown, mock);

auto done = shutdown->Start({});
uut->Start([](StatusOr<google::pubsub::v1::StreamingPullResponse> const&) {});

std::vector<std::string> nacks;
for (auto& ids : groups) {
nacks.insert(nacks.end(), ids.begin(), ids.end());
}

uut->BulkNack(nacks);

shutdown->MarkAsShutdown("test", {});
}

void CheckExtendLeasesMultipleRequests(bool enable_exactly_once) {
auto constexpr kMaxIds =
StreamingSubscriptionBatchSource::kMaxAckIdsPerMessage;

std::vector<std::vector<std::string>> groups;
auto make_ids = [](std::string const& prefix, int count) {
std::vector<std::string> ids(count);
std::generate(ids.begin(), ids.end(), [&prefix, count = 0]() mutable {
return prefix + std::to_string(++count);
});
return ids;
};
groups.push_back(make_ids("group-1-", kMaxIds));
groups.push_back(make_ids("group-2-", kMaxIds));
groups.push_back(make_ids("group-3-", 2));

auto make_on_modify = [](std::vector<std::string> e) {
return [expected_ids = std::move(e)](auto, auto, auto const& request) {
EXPECT_THAT(request.ack_ids(), ElementsAreArray(expected_ids));
return make_ready_future(Status{});
};
};

AutomaticallyCreatedBackgroundThreads background;
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();

EXPECT_CALL(*mock, AsyncStreamingPull)
.WillOnce([&](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>,
google::pubsub::v1::StreamingPullRequest const&) {
return MakeUnusedStream(enable_exactly_once);
});

EXPECT_CALL(
*mock,
AsyncModifyAckDeadline(
_, _,
Property(&ModifyRequest::subscription,
"projects/test-project/subscriptions/test-subscription")))
.WillOnce(make_on_modify(groups[0]))
.WillOnce(make_on_modify(groups[1]))
.WillOnce(make_on_modify(groups[2]));

auto shutdown = std::make_shared<SessionShutdownManager>();
auto uut = MakeTestBatchSource(background.cq(), shutdown, mock);

auto done = shutdown->Start({});
uut->Start([](StatusOr<google::pubsub::v1::StreamingPullResponse> const&) {});

std::vector<std::string> acks;
for (auto& ids : groups) {
acks.insert(acks.end(), ids.begin(), ids.end());
}

uut->ExtendLeases(acks, std::chrono::seconds(60));

shutdown->MarkAsShutdown("test", {});
}

TEST(StreamingSubscriptionBatchSourceTest, ExtendLeasesMultipleRequests) {
CheckExtendLeasesMultipleRequests(false);
}

TEST(StreamingSubscriptionBatchSourceTest,
ExtendLeasesMultipleRequestsWithExactlyOnce) {
CheckExtendLeasesMultipleRequests(true);
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub_internal
Expand Down

0 comments on commit d5ca4a7

Please sign in to comment.