Skip to content

Commit

Permalink
[#24121] xCluster: Fix xcluster_outbound_replication_group-itest Test…
Browse files Browse the repository at this point in the history
…GetStreamByTableId

Summary:
Test should wait for replication group creation before running DDLs.
Refactored the test and added helper functions that will create a replication group and wait for it to be created.

Fixed a bug in `GetNamespaceCheckpointInfo` where a variable was incorrectly used after move.

Fixes #24121
Jira: DB-13013

Test Plan: xcluster_outbound_replication_group-itest

Reviewers: jhe, xCluster

Reviewed By: jhe

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D38399
  • Loading branch information
hari90 committed Sep 25, 2024
1 parent 44ae377 commit 9f90819
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,42 @@ class XClusterOutboundReplicationGroupTest : public XClusterYsqlTestBase {
return table_info->id();
}

// Create a new OutboundReplicationGroup. Use GetXClusterStreams, or IsBootstrapRequired to wait
// for the new group to be ready.
Status CreateOutboundReplicationGroupAsync(
const xcluster::ReplicationGroupId& replication_group_id,
const std::vector<NamespaceId>& namespace_ids) {
return XClusterClient().CreateOutboundReplicationGroup(
replication_group_id, namespace_ids, UseAutomaticMode());
}

Status CreateOutboundReplicationGroupSync(
const xcluster::ReplicationGroupId& replication_group_id = kReplicationGroupId,
const std::vector<NamespaceId>& namespace_ids = {}) {
auto namespace_ids_copy = namespace_ids;
if (namespace_ids_copy.empty()) {
namespace_ids_copy.push_back(namespace_id_);
}

RETURN_NOT_OK(CreateOutboundReplicationGroupAsync(replication_group_id, namespace_ids_copy));
for (const auto& namespace_id : namespace_ids_copy) {
RETURN_NOT_OK(GetXClusterStreams(replication_group_id, namespace_id));
}
return Status::OK();
}

Result<bool> IsBootstrapRequired(
const xcluster::ReplicationGroupId& replication_group_id = kReplicationGroupId,
const NamespaceId& namespace_id = {}) {
std::promise<Result<bool>> promise;
RETURN_NOT_OK(XClusterClient().IsBootstrapRequired(
CoarseMonoClock::Now() + kDeadline, replication_group_id,
namespace_id.empty() ? namespace_id_ : namespace_id,
[&promise](Result<bool> result) { promise.set_value(std::move(result)); }));

return promise.get_future().get();
}

// Cleanup streams marked for deletion and get the list of xcluster streams.
std::unordered_set<xrepl::StreamId> CleanupAndGetAllXClusterStreams() {
catalog_manager_->RunXReplBgTasks(epoch_);
Expand Down Expand Up @@ -132,7 +168,12 @@ class XClusterOutboundReplicationGroupTest : public XClusterYsqlTestBase {
CoarseMonoClock::Now() + kDeadline, replication_group_id, namespace_id, table_names,
pg_schema_names, [&promise](const auto& resp) { promise.set_value(resp); }));

return promise.get_future().get();
auto resp = VERIFY_RESULT(promise.get_future().get());
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}

return resp;
}

Result<master::GetXClusterStreamsResponsePB> GetXClusterStreamsByTableId(
Expand Down Expand Up @@ -210,8 +251,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, TestMultipleTable) {

ASSERT_NOK(GetXClusterStreams(kReplicationGroupId, namespace_id_));

ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));
ASSERT_OK(CreateOutboundReplicationGroupSync());

auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));

Expand Down Expand Up @@ -260,8 +300,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, AddDeleteNamespaces) {
auto ns2_table_id_1 = ASSERT_RESULT(CreateYsqlTable(namespace_name_2, kTableName1));
auto ns2_table_id_2 = ASSERT_RESULT(CreateYsqlTable(namespace_name_2, kTableName2));

ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));
ASSERT_OK(CreateOutboundReplicationGroupSync());

// Wait for the new streams to be ready.
auto ns1_info = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
Expand Down Expand Up @@ -324,11 +363,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, AddTable) {
auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1));
ASSERT_OK(VerifyWalRetentionOfTable(table_id_1, 900));

ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));

// Wait for the new streams to be ready.
ASSERT_OK(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_OK(CreateOutboundReplicationGroupSync());

auto all_xcluster_streams_initial = CleanupAndGetAllXClusterStreams();
ASSERT_EQ(all_xcluster_streams_initial.size(), 1 + OverheadStreamsCount());
Expand All @@ -349,17 +384,8 @@ TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredEmptyTable) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_max_xcluster_streams_to_checkpoint_in_parallel) = 1;

auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1));
ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));

std::promise<Result<bool>> promise;

ASSERT_OK(XClusterClient().IsBootstrapRequired(
CoarseMonoClock::Now() + kDeadline, kReplicationGroupId, namespace_id_,
[&promise](Result<bool> result) { promise.set_value(std::move(result)); }));

auto is_bootstrap_required = ASSERT_RESULT(promise.get_future().get());
ASSERT_FALSE(is_bootstrap_required);
ASSERT_OK(CreateOutboundReplicationGroupSync());
ASSERT_FALSE(ASSERT_RESULT(IsBootstrapRequired()));
}

TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredTableWithData) {
Expand All @@ -371,17 +397,8 @@ TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredTableWithData) {
ASSERT_OK(producer_client()->OpenTable(table_id_2, &table_2));
ASSERT_OK(InsertRowsInProducer(0, 10, table_2));

ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));

std::promise<Result<bool>> promise;

ASSERT_OK(XClusterClient().IsBootstrapRequired(
CoarseMonoClock::Now() + kDeadline, kReplicationGroupId, namespace_id_,
[&promise](Result<bool> result) { promise.set_value(std::move(result)); }));

auto is_bootstrap_required = ASSERT_RESULT(promise.get_future().get());
ASSERT_TRUE(is_bootstrap_required);
ASSERT_OK(CreateOutboundReplicationGroupSync());
ASSERT_TRUE(ASSERT_RESULT(IsBootstrapRequired()));
}

TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredTableWithDeletedData) {
Expand All @@ -394,17 +411,9 @@ TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredTableWithDeleted
ASSERT_OK(InsertRowsInProducer(0, 10, table_2));
ASSERT_OK(DeleteRowsInProducer(0, 10, table_2));

ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));
ASSERT_OK(CreateOutboundReplicationGroupSync());

std::promise<Result<bool>> promise;

ASSERT_OK(XClusterClient().IsBootstrapRequired(
CoarseMonoClock::Now() + kDeadline, kReplicationGroupId, namespace_id_,
[&promise](Result<bool> result) { promise.set_value(std::move(result)); }));

auto is_bootstrap_required = ASSERT_RESULT(promise.get_future().get());
ASSERT_FALSE(is_bootstrap_required);
ASSERT_FALSE(ASSERT_RESULT(IsBootstrapRequired()));
}

TEST_P(XClusterOutboundReplicationGroupParameterized, MasterRestartDuringCheckpoint) {
Expand All @@ -421,8 +430,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, MasterRestartDuringCheckpo

ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_block_xcluster_checkpoint_namespace_task) = true;

ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));
ASSERT_OK(CreateOutboundReplicationGroupAsync(kReplicationGroupId, {namespace_id_}));

std::promise<Result<master::GetXClusterStreamsResponsePB>> promise;
auto future = promise.get_future();
Expand All @@ -449,8 +457,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, Repair) {
auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1));
auto table_id_2 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName2));

ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));
ASSERT_OK(CreateOutboundReplicationGroupSync());

auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_EQ(resp.table_infos_size(), 2 + OverheadStreamsCount());
Expand Down Expand Up @@ -546,8 +553,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, RepairWithYbAdmin) {
auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1));
auto table_id_2 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName2));

ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));
ASSERT_OK(CreateOutboundReplicationGroupSync());

auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_EQ(resp.table_infos_size(), 2 + OverheadStreamsCount());
Expand Down Expand Up @@ -589,11 +595,8 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, TestListAPIs) {
ASSERT_OK(CreateYsqlTable(namespace_name_2, "table_2"));

// Replication group 1 with two namespaces.
ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_, namespace_id_2}, UseAutomaticMode()));
// Wait for checkpointing to complete.
ASSERT_OK(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_OK(GetXClusterStreams(kReplicationGroupId, namespace_id_2));
ASSERT_OK(
CreateOutboundReplicationGroupSync(kReplicationGroupId, {namespace_id_, namespace_id_2}));
{
auto group_info = ASSERT_RESULT(
XClusterClient().GetXClusterOutboundReplicationGroupInfo(kReplicationGroupId));
Expand All @@ -606,10 +609,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, TestListAPIs) {

// Replication group 2 with one namespace.
const xcluster::ReplicationGroupId replication_group2("rg2");
ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
replication_group2, {namespace_id_}, UseAutomaticMode()));
ASSERT_OK(GetXClusterStreams(replication_group2, namespace_id_));
// Wait for checkpointing to complete.
ASSERT_OK(CreateOutboundReplicationGroupSync(replication_group2, {namespace_id_}));
{
auto group_info =
ASSERT_RESULT(XClusterClient().GetXClusterOutboundReplicationGroupInfo(replication_group2));
Expand Down Expand Up @@ -679,8 +679,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, TestListAPIs) {
// Make sure we cleanup the streams of the failed table create.
TEST_P(XClusterOutboundReplicationGroupParameterized, CleanupStreamsOfFailedTableCreate) {
auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1));
ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));
ASSERT_OK(CreateOutboundReplicationGroupSync());
int expected_stream_count = 1 + OverheadStreamsCount();

auto check_streams = [&]() -> Status {
Expand Down Expand Up @@ -719,11 +718,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, CleanupStreamsOfFailedTabl
TEST_P(XClusterOutboundReplicationGroupParameterized, TestGetStreamByTableId) {
auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1));

ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup(
kReplicationGroupId, {namespace_id_}, UseAutomaticMode()));

// Wait for the namespace to be ready.
ASSERT_OK(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_OK(CreateOutboundReplicationGroupSync());

// Delete the table to put it into HIDDEN state.
ASSERT_OK(DropYsqlTable(&producer_cluster_, kNamespaceName, kPgSchemaName, kTableName1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ XClusterOutboundReplicationGroup::GetNamespaceCheckpointInfo(
auto it = InsertOrReturnExisting(
&table_names_map,
{TableSchemaNamePair(table_descriptor.name(), table_descriptor.pgschema_name()),
std::move(table_descriptor)});
table_descriptor});
SCHECK(
!it, AlreadyPresent,
Format(
Expand Down

0 comments on commit 9f90819

Please sign in to comment.