Skip to content

Commit

Permalink
[BACKPORT 2024.1][#23367] CDCSDK: Cleanup expired and not of interest…
Browse files Browse the repository at this point in the history
… tables from CDC stream

Summary:
####Backport Description
Major merge conflicts were encountered in cdc_service.cc in PopulateTabletCheckPointInfo() method because the refactoring present in master is not present in 2024.1
####Original Description
Currently, whenever a stream expires or a table becomes not of interest due to lack of polling on it, the entries corresponding to such tables are neither removed from the stream metadata nor are they removed from the state table. Even though the resources retained on tablets of such tables are released, the presence of state table entry can potentially stop the physical deletion of a split tablet.

This diff adds cleanup mechanism for removing the entries corresponding to the expired / not of interest tables from stream metadata as well as the cdc_state table. Update Peers and Metrics reads the state table periodically and checks for the entries that have either reached stream expiry or have become not of interest. Currently this check is done for releasing the retained resources. With the changes introduced in this diff, we will find out the `{table_id, stream_id}` pairs which have expired or have become not of interest. A new rpc `RemoveTablesFromCDCSDKStream` has been introduced. This rpc takes a list of tables to be removed and the stream_id from which these tables need to be removed in its request. For each pair it then calls the `RemoveUserTablesFromCDCSDKStream` which does the cleanup from stream metadata and cdc_state table. Generally, `RemoveTablesFromCDCSDKStream` will be called for a single `{table_id, stream_id}` pair, however in case of colocated tables, all the colocated tables which have become not of interest / expired for a stream will be processed in a single call.

The determination of a table being not of interest is done on the basis of active_time. In a scenario where lot of tables are present in database, but the user is interested in capturing only a small subset of these using CDC, it can happen that a large number of tables become not of interest at the same time. In such a case, there is a possibility of Update Peers and Metrics (which runs on each node) storming the master with cleanup requests. To prevent this, two throttling mechanisms have been put in place:

- For each expired table, the node hosting the leader of tablet with lexicographically smallest tablet_id will send the cleanup request to master. This will ensure that only one request will be sent to the master per expired table.

- If colocated tables become not of interest (or get expired for a stream), then all the colocated tables on the tablet will be cleaned up in a single call.

- The flag `cdcsdk_max_expired_tables_to_clean_per_run` determines the maximum number of cleanup requests sent from each node per iteration of Update Peers and Metrics. The default value of this flag is 1.

- With default settings the maximum number of cleanup requests sent to the master = **min(num of nodes, num of expired non-colocated tables + num of expired colocated tablets).**

A new kLocalPersisted tserver auto flag `cdcsdk_enable_cleanup_of_expired_table_entries` has been introduced with default value false. To enable this cleanup logic, this flag must be set to true.

**Upgrade/Rollback safety:**
This diff introduces a new rpc:

  - RemoveUserTablesFromCDCSDKStream : RemoveUserTablesFromCDCSDKStreamRequestPB, RemoveUserTablesFromCDCSDKStreamResponsePB
All the fields required to populate the request of this rpc are already present at the caller.

The flags `cdcsdk_enable_cleanup_of_expired_table_entries` and `cdcsdk_enable_dynamic_table_addition_with_table_cleanup` protect this new rpc.
Jira: DB-12291

Original commit: 0ea4f54 / D37450

Test Plan:
Jenkins: urgent

./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestCleanupOfTableNotOfInterest
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestCleanupOfExpiredTable
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestCleanupOfUnpolledTableWithTabletSplit
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestSplitOfTabletNotOfInterestDuringCleanup
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestCleanupOfNotOfInterestColocatedTabletWithMultipleStreams

Reviewers: skumar, siddharth.shah, asrinivasan, xCluster, hsunder

Reviewed By: siddharth.shah

Subscribers: ycdcxcluster, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D37745
  • Loading branch information
Sumukh-Phalgaonkar committed Sep 4, 2024
1 parent 4bcdb6c commit 9f8c943
Show file tree
Hide file tree
Showing 14 changed files with 525 additions and 13 deletions.
132 changes: 129 additions & 3 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ DEFINE_test_flag(bool, cdcsdk_skip_stream_active_check, false,
"When enabled, GetChanges will skip checking if stream is active as well as skip "
"updating the active time.");

DEFINE_RUNTIME_int32(
cdcsdk_max_expired_tables_to_clean_per_run, 1,
"This flag determines the maximum number of tables to be cleaned up per run of "
"UpdatePeersAndMetrics. Since a lot of tables can become not of interest at the same time, "
"this flag is used to prevent storming of cleanup requests to master. When the flag value is "
"1, the number of cleanup requests sent will be min(num_tables_to_cleanup, num_of_nodes)");

DEFINE_RUNTIME_AUTO_bool(
cdcsdk_enable_cleanup_of_expired_table_entries, kLocalPersisted, false, true,
"When enabled, Update Peers and Metrics will look for entries in the state table that have "
"either become not of interest or have expired for a stream. The cleanup logic will then "
"update these entries in cdc_state table and also move the corresponing table's entry to "
"unqualified tables list in stream metadata.");

DECLARE_bool(enable_log_retention_by_op_idx);

DECLARE_int32(cdc_checkpoint_opid_interval_ms);
Expand All @@ -207,6 +221,8 @@ DEFINE_RUNTIME_bool(enable_cdcsdk_lag_collection, false,
"When enabled, vlog containing the lag for the getchanges call as well as last commit record "
"in response will be printed.");

DECLARE_bool(cdcsdk_enable_dynamic_table_addition_with_table_cleanup);

METRIC_DEFINE_entity(xcluster);

METRIC_DEFINE_entity(cdcsdk);
Expand Down Expand Up @@ -2564,9 +2580,41 @@ CDCServiceImpl::GetNamespaceMinRecordIdCommitTimeMap(
return namespace_to_min_record_id_commit_time;
}

void CDCServiceImpl::AddTableToExpiredTablesMap(
const TabletId& tablet_id, const xrepl::StreamId& stream_id,
TableIdToStreamIdMap* expired_tables_map) {
auto tablet_peer = context_->LookupTablet(tablet_id);
if (!tablet_peer) {
LOG(WARNING) << "Could not find tablet peer for tablet_id: " << tablet_id
<< ". Will not remove its expired entry in this round";
return;
}

auto table_ids = tablet_peer->tablet_metadata()->colocated()
? tablet_peer->tablet_metadata()->GetAllColocatedTables()
: std::vector<TableId>{tablet_peer->tablet_metadata()->table_id()};

for (const auto& table_id : table_ids) {
auto it = expired_tables_map->find(table_id);
if (it != expired_tables_map->end()) {
// The cleanup request to master should be sent exactly once per {table, stream} pair. To
// ensure this, the node hosting the leader of the tablet with lexicographically smallest
// tablet_id will send the request.
if ((tablet_peer->tablet_id() <= it->second.first)) {
it->second.first = tablet_peer->tablet_id();
it->second.second.insert(stream_id);
}
} else {
expired_tables_map->emplace(
table_id,
std::make_pair(tablet_peer->tablet_id(), std::unordered_set<xrepl::StreamId>{stream_id}));
}
}
}

Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
const TabletId& input_tablet_id, TabletIdStreamIdSet* tablet_stream_to_be_deleted,
StreamIdSet* slot_entries_to_be_deleted) {
StreamIdSet* slot_entries_to_be_deleted, TableIdToStreamIdMap* expired_tables_map) {
TabletIdCDCCheckpointMap tablet_min_checkpoint_map;
std::unordered_set<xrepl::StreamId> refreshed_metadata_set;

Expand Down Expand Up @@ -2738,6 +2786,12 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
auto status = CheckTabletNotOfInterest(
producer_tablet, last_active_time_cdc_state_table, true);
if (!status.ok()) {
// If checkpoint is max, it indicates that cleanup is already in progress. No need to add
// such entries to the expired_tables_map.
if (expired_tables_map && checkpoint != OpId::Max()) {
AddTableToExpiredTablesMap(tablet_id, stream_id, expired_tables_map);
}

if (!tablet_min_checkpoint_map.contains(tablet_id)) {
VLOG(2) << "Stream: " << stream_id << ", is not of interest for tablet: " << tablet_id
<< ", hence we are adding default entries to tablet_min_checkpoint_map";
Expand All @@ -2749,6 +2803,12 @@ Result<TabletIdCDCCheckpointMap> CDCServiceImpl::PopulateTabletCheckPointInfo(
}
status = CheckStreamActive(producer_tablet, last_active_time_cdc_state_table);
if (!status.ok()) {
// If checkpoint is max, it indicates that cleanup is already in progress. No need to add
// such entries to the expired_tables_map.
if (expired_tables_map && checkpoint != OpId::Max()) {
AddTableToExpiredTablesMap(tablet_id, stream_id, expired_tables_map);
}

// It is possible that all streams associated with a tablet have expired, in which case we
// have to create a default entry in 'tablet_min_checkpoint_map' corresponding to the
// tablet. This way the fact that all the streams have expired will be communicated to the
Expand Down Expand Up @@ -3084,8 +3144,9 @@ void CDCServiceImpl::UpdatePeersAndMetrics() {
// if we fail to read cdc_state table, lets wait for the next retry after 60 secs.
TabletIdStreamIdSet cdc_state_entries_to_delete;
StreamIdSet slot_entries_to_be_deleted;
auto result =
PopulateTabletCheckPointInfo("", &cdc_state_entries_to_delete, &slot_entries_to_be_deleted);
TableIdToStreamIdMap expired_tables_map;
auto result = PopulateTabletCheckPointInfo(
"", &cdc_state_entries_to_delete, &slot_entries_to_be_deleted, &expired_tables_map);
if (!result.ok()) {
LOG(WARNING) << "Failed to populate tablets checkpoint info: " << result.status();
continue;
Expand Down Expand Up @@ -3116,6 +3177,13 @@ void CDCServiceImpl::UpdatePeersAndMetrics() {
cdc_state_entries_to_delete, failed_tablet_ids, slot_entries_to_be_deleted),
"Unable to cleanup CDC State table metadata");

if (GetAtomicFlag(&FLAGS_cdcsdk_enable_cleanup_of_expired_table_entries) &&
GetAtomicFlag(&FLAGS_cdcsdk_enable_dynamic_table_addition_with_table_cleanup)) {
WARN_NOT_OK(
CleanupExpiredTables(expired_tables_map),
"Failed to remove an expired table entry from stream");
}

rate_limiter_->SetBytesPerSecond(
GetAtomicFlag(&FLAGS_xcluster_get_changes_max_send_rate_mbps) * 1_MB);

Expand Down Expand Up @@ -3165,6 +3233,64 @@ Status CDCServiceImpl::DeleteCDCStateTableMetadata(
return Status::OK();
}

Status CDCServiceImpl::CleanupExpiredTables(const TableIdToStreamIdMap& expired_tables_map) {
if (expired_tables_map.empty()) {
return Status::OK();
}

int num_cleanup_requests = 0;
for (const auto& entry : expired_tables_map) {
const auto& table_id = entry.first;
const auto& tablet_id = entry.second.first;
const auto& streams = entry.second.second;

auto tablet_peer = context_->LookupTablet(tablet_id);
if (!tablet_peer) {
LOG(WARNING) << "Could not find tablet peer for tablet_id: " << tablet_id
<< ", for table: " << table_id
<< ". Will not remove its entries from state table in this round.";
continue;
}

if (tablet_peer->IsNotLeader()) {
continue;
}

for (const auto& stream_id : streams) {
if (num_cleanup_requests >=
GetAtomicFlag(&FLAGS_cdcsdk_max_expired_tables_to_clean_per_run)) {
return Status::OK();
}

auto colocated = tablet_peer->tablet_metadata()->colocated();

auto table_ids = colocated
? tablet_peer->tablet_metadata()->GetAllColocatedTables()
: std::vector<TableId>{table_id};

if (colocated) {
for (auto it = table_ids.begin(); it != table_ids.end();) {
if (boost::ends_with(*it, kColocatedDbParentTableIdSuffix) ||
boost::ends_with(*it, kTablegroupParentTableIdSuffix) ||
boost::ends_with(*it, kColocationParentTableIdSuffix)) {
it = table_ids.erase(it);
} else {
++it;
}
}
}

auto status = client()->RemoveTablesFromCDCSDKStream(table_ids, stream_id);
if (!status.ok()) {
LOG(WARNING) << "Failed to remove table: " << table_id << " from stream: " << stream_id
<< " : " << status;
}
num_cleanup_requests++;
}
}
return Status::OK();
}

Result<client::internal::RemoteTabletPtr> CDCServiceImpl::GetRemoteTablet(
const TabletId& tablet_id, const bool use_cache) {
std::promise<Result<client::internal::RemoteTabletPtr>> tablet_lookup_promise;
Expand Down
14 changes: 13 additions & 1 deletion src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ struct TabletCDCCheckpointInfo {
using TabletIdCDCCheckpointMap = std::unordered_map<TabletId, TabletCDCCheckpointInfo>;
using TabletIdStreamIdSet = std::set<std::pair<TabletId, xrepl::StreamId>>;
using StreamIdSet = std::set<xrepl::StreamId>;
using TableIdToStreamIdMap =
std::unordered_map<TableId, std::pair<TabletId, std::unordered_set<xrepl::StreamId>>>;
using RollBackTabletIdCheckpointMap =
std::unordered_map<const std::string*, std::pair<int64_t, OpId>>;
class CDCServiceImpl : public CDCServiceIf {
Expand Down Expand Up @@ -422,6 +424,10 @@ class CDCServiceImpl : public CDCServiceIf {
const std::unordered_set<TabletId>& failed_tablet_ids,
const StreamIdSet& slot_entries_to_be_deleted);

// This method sends an rpc to the master to remove the expired / not of interest tables from the
// stream metadata and update the checkpoint of cdc_state entries to max.
Status CleanupExpiredTables(const TableIdToStreamIdMap& expired_tables_map);

MicrosTime GetLastReplicatedTime(const std::shared_ptr<tablet::TabletPeer>& tablet_peer);

bool ShouldUpdateMetrics(MonoTime time_since_update_metrics);
Expand Down Expand Up @@ -454,7 +460,13 @@ class CDCServiceImpl : public CDCServiceIf {
Result<TabletIdCDCCheckpointMap> PopulateTabletCheckPointInfo(
const TabletId& input_tablet_id = "",
TabletIdStreamIdSet* tablet_stream_to_be_deleted = nullptr,
StreamIdSet* slot_entries_to_be_deleted = nullptr);
StreamIdSet* slot_entries_to_be_deleted = nullptr,
TableIdToStreamIdMap* expired_tables_map = nullptr);

void AddTableToExpiredTablesMap(
const TabletId& tablet_id,
const xrepl::StreamId& stream_id,
TableIdToStreamIdMap* expired_tables_map);

Status SetInitialCheckPoint(
const OpId& checkpoint, const std::string& tablet_id,
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, GetCDCDBStreamInfo);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, GetCDCStream);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, ListCDCStreams);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, UpdateCDCStream);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, RemoveTablesFromCDCSDKStream);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, IsObjectPartOfXRepl);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, IsBootstrapRequired);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, GetUDTypeMetadata);
Expand Down
22 changes: 22 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,28 @@ Status YBClient::UpdateCDCStream(
return Status::OK();
}

Status YBClient::RemoveTablesFromCDCSDKStream(
const std::vector<TableId>& table_ids,
const xrepl::StreamId stream_id) {
if (table_ids.empty()) {
return STATUS(InvalidArgument, "Table ID should not be empty");
}
if (!stream_id) {
return STATUS(InvalidArgument, "Stream ID should not be empty");
}

master::RemoveTablesFromCDCSDKStreamRequestPB req;
master::RemoveTablesFromCDCSDKStreamResponsePB resp;
req.set_stream_id(stream_id.ToString());
req.mutable_table_ids()->Reserve(narrow_cast<int>(table_ids.size()));
for (const auto& table_id : table_ids) {
req.add_table_ids(table_id);
}

CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, RemoveTablesFromCDCSDKStream);
return Status::OK();
}

Result<bool> YBClient::IsObjectPartOfXRepl(const TableId& table_id) {
IsObjectPartOfXReplRequestPB req;
IsObjectPartOfXReplResponsePB resp;
Expand Down
4 changes: 4 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,10 @@ class YBClient {
const std::vector<xrepl::StreamId>& stream_ids,
const std::vector<master::SysCDCStreamEntryPB>& new_entries);

Status RemoveTablesFromCDCSDKStream(
const std::vector<TableId>& table_id,
const xrepl::StreamId stream_id);

Result<bool> IsObjectPartOfXRepl(const TableId& table_id);

Result<bool> IsBootstrapRequired(
Expand Down
9 changes: 9 additions & 0 deletions src/yb/common/common_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ DEFINE_RUNTIME_AUTO_bool(enable_xcluster_auto_flag_validation, kLocalPersisted,
DEFINE_RUNTIME_AUTO_PG_FLAG(bool, yb_enable_ddl_atomicity_infra, kLocalPersisted, false, true,
"Enables YSQL DDL atomicity");

DEFINE_RUNTIME_AUTO_bool(cdcsdk_enable_dynamic_table_addition_with_table_cleanup,
kLocalPersisted,
false,
true,
"This flag needs to be true in order to support addition of dynamic tables "
"along with removal of not of interest/expired tables from a CDCSDK "
"stream.");
TAG_FLAG(cdcsdk_enable_dynamic_table_addition_with_table_cleanup, advanced);

namespace yb {

void InitCommonFlags() {
Expand Down
3 changes: 3 additions & 0 deletions src/yb/integration-tests/cdcsdk_consistent_snapshot-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,9 @@ TEST_F(CDCSDKConsistentSnapshotTest, TestConsistentSnapshotAcrossMultipleTables)
TEST_F(CDCSDKConsistentSnapshotTest, TestReleaseResourcesOnUnpolledTablets) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_tablet_not_of_interest_timeout_secs) = 3;
// Since the test requires the state table entries to verify the release of resources, we disable
// the cleanup of not of interest tables for this test.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_expired_table_entries) = false;
ASSERT_OK(SetUpWithParams(1, 1, false));

auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
Expand Down
Loading

0 comments on commit 9f8c943

Please sign in to comment.