Skip to content

Commit

Permalink
[#20938] xCluster: Fail Db Scoped checkpoint if tables are created or…
Browse files Browse the repository at this point in the history
… dropped

Summary:
If tables are created or dropped during a db scoped checkpoint, the checkpoint will fail. This is to prevent the checkpoint from missing tables that were created or dropped during the checkpoint.
Jira: DB-9917

Test Plan:
XClusterOutboundReplicationGroupMockedTest.AddTableDuringCheckpoint
XClusterOutboundReplicationGroupMockedTest.DropTableDuringCheckpoint

Reviewers: slingam, xCluster

Reviewed By: slingam

Subscribers: xCluster, ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D33487
  • Loading branch information
hari90 committed Mar 23, 2024
1 parent 724ac0a commit 987ffc3
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 5 deletions.
83 changes: 78 additions & 5 deletions src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
#include "yb/util/async_util.h"
#include "yb/util/backoff_waiter.h"
#include "yb/util/is_operation_done_result.h"
#include "yb/util/sync_point.h"
#include "yb/util/test_util.h"
#include "yb/util/tsan_util.h"

DECLARE_bool(TEST_enable_sync_points);

using namespace std::placeholders;
using testing::_;
using testing::AtLeast;
Expand Down Expand Up @@ -110,14 +113,18 @@ class XClusterOutboundReplicationGroupMocked : public XClusterOutboundReplicatio
return MarkNewTablesAsCheckpointed(table_info->namespace_id(), table_info->id(), epoch);
}

Result<NamespaceId> AddNamespaceSync(
const LeaderEpoch& epoch, const NamespaceName& namespace_name, MonoDelta delta) {
auto namespace_id = VERIFY_RESULT(AddNamespace(epoch, namespace_name));
RETURN_NOT_OK(LoggedWaitFor(
Status WaitForCheckpoint(const NamespaceId& namespace_id, MonoDelta delta) {
return LoggedWaitFor(
[this, namespace_id]() -> Result<bool> {
return VERIFY_RESULT(GetNamespaceCheckpointInfo(namespace_id)).has_value();
},
delta, "Waiting for namespace checkpoint"));
delta, "Waiting for namespace checkpoint");
}

Result<NamespaceId> AddNamespaceSync(
const LeaderEpoch& epoch, const NamespaceName& namespace_name, MonoDelta delta) {
auto namespace_id = VERIFY_RESULT(AddNamespace(epoch, namespace_name));
RETURN_NOT_OK(WaitForCheckpoint(namespace_id, delta));

return namespace_id;
}
Expand Down Expand Up @@ -200,6 +207,16 @@ class XClusterOutboundReplicationGroupMockedTest : public YBTest {
return table_info;
}

void DropTable(const NamespaceId& namespace_id, const TableId& table_id) {
auto it = std::find_if(
namespace_tables[namespace_id].begin(), namespace_tables[namespace_id].end(),
[&table_id](const auto& table_info) { return table_info->id() == table_id; });

if (it != namespace_tables[namespace_id].end()) {
namespace_tables[namespace_id].erase(it);
}
}

std::shared_ptr<XClusterOutboundReplicationGroupMocked> CreateReplicationGroup() {
return std::make_shared<XClusterOutboundReplicationGroupMocked>(
kReplicationGroupId, helper_functions, *task_factory);
Expand Down Expand Up @@ -513,4 +530,60 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, AddTable) {
ASSERT_EQ(ns_info->table_infos.size(), 3);
}

// If we create a table during checkpoint, it should fail.
TEST_F(XClusterOutboundReplicationGroupMockedTest, AddTableDuringCheckpoint) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_enable_sync_points) = true;
auto* sync_point_instance = yb::SyncPoint::GetInstance();

SyncPoint::GetInstance()->LoadDependency(
{{"TESTAddTableDuringCheckpoint::TableCreated",
"XClusterOutboundReplicationGroup::CreateStreamsForInitialBootstrap"}});
sync_point_instance->EnableProcessing();

CreateTable(kNamespaceId, kTableId1, kTableName1, kPgSchemaName);

auto outbound_rg = CreateReplicationGroup();
auto namespace_id = ASSERT_RESULT(outbound_rg->AddNamespace(kEpoch, kNamespaceName));

CreateTable(kNamespaceId, kTableId2, kTableName2, kPgSchemaName2);
TEST_SYNC_POINT("TESTAddTableDuringCheckpoint::TableCreated");

auto status = outbound_rg->WaitForCheckpoint(namespace_id, kTimeout);
ASSERT_NOK(status);
ASSERT_STR_CONTAINS(
status.ToString(),
"List of tables changed during xCluster checkpoint of replication group "
"xClusterOutboundReplicationGroup rg1: [table_id_2]");

sync_point_instance->DisableProcessing();
}

// If we drop a table during checkpoint, it should fail.
TEST_F(XClusterOutboundReplicationGroupMockedTest, DropTableDuringCheckpoint) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_enable_sync_points) = true;
auto* sync_point_instance = yb::SyncPoint::GetInstance();

SyncPoint::GetInstance()->LoadDependency(
{{"TESTAddTableDuringCheckpoint::TableCreated",
"XClusterOutboundReplicationGroup::CreateStreamsForInitialBootstrap"}});
sync_point_instance->EnableProcessing();

CreateTable(kNamespaceId, kTableId1, kTableName1, kPgSchemaName);
CreateTable(kNamespaceId, kTableId2, kTableName2, kPgSchemaName2);

auto outbound_rg = CreateReplicationGroup();
auto namespace_id = ASSERT_RESULT(outbound_rg->AddNamespace(kEpoch, kNamespaceName));

DropTable(kNamespaceId, kTableId1);
TEST_SYNC_POINT("TESTAddTableDuringCheckpoint::TableCreated");

auto status = outbound_rg->WaitForCheckpoint(namespace_id, kTimeout);
ASSERT_NOK(status);
ASSERT_STR_CONTAINS(
status.ToString(),
"List of tables changed during xCluster checkpoint of replication group "
"xClusterOutboundReplicationGroup rg1: [table_id_1]");

sync_point_instance->DisableProcessing();
}
} // namespace yb::master
21 changes: 21 additions & 0 deletions src/yb/master/xcluster/xcluster_outbound_replication_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "yb/master/xcluster/xcluster_outbound_replication_group_tasks.h"
#include "yb/util/is_operation_done_result.h"
#include "yb/util/status_log.h"
#include "yb/util/sync_point.h"

DEFINE_RUNTIME_uint32(max_xcluster_streams_to_checkpoint_in_parallel, 200,
"Maximum number of xCluster streams to checkpoint in parallel");
Expand Down Expand Up @@ -129,6 +130,8 @@ void XClusterOutboundReplicationGroup::StartNamespaceCheckpointTasks(

Status XClusterOutboundReplicationGroup::CreateStreamsForInitialBootstrap(
const NamespaceId& namespace_id, const LeaderEpoch& epoch) {
TEST_SYNC_POINT("XClusterOutboundReplicationGroup::CreateStreamsForInitialBootstrap");

std::lock_guard mutex_l(mutex_);
auto l = VERIFY_RESULT(LockForWrite());

Expand Down Expand Up @@ -261,6 +264,24 @@ Result<bool> XClusterOutboundReplicationGroup::MarkBootstrapTablesAsCheckpointed
"Namespace in unexpected state");

if (table_ids.empty()) {
auto table_infos = VERIFY_RESULT(helper_functions_.get_tables_func(namespace_id));
std::set<TableId> tables;
std::transform(
table_infos.begin(), table_infos.end(), std::inserter(tables, tables.begin()),
[](const auto& table_info) { return table_info->id(); });

std::set<TableId> checkpointed_tables;
std::transform(
ns_info->table_infos().begin(), ns_info->table_infos().end(),
std::inserter(checkpointed_tables, checkpointed_tables.begin()),
[](const auto& table_info) { return table_info.first; });

auto diff = STLSetSymmetricDifference(tables, checkpointed_tables);
SCHECK_FORMAT(
diff.empty(), IllegalState,
"List of tables changed during xCluster checkpoint of replication group $0: $1", ToString(),
yb::ToString(diff));

LOG_WITH_PREFIX(INFO) << "Marking namespace " << namespace_id << " as READY";
ns_info->set_state(NamespaceInfoPB::READY);
done = true;
Expand Down

0 comments on commit 987ffc3

Please sign in to comment.