From 9f9081921e2d79b1a0d9571a2e2821b8b2ac9d83 Mon Sep 17 00:00:00 2001 From: Hari Krishna Sunder Date: Tue, 24 Sep 2024 21:31:04 -0700 Subject: [PATCH] [#24121] xCluster: Fix xcluster_outbound_replication_group-itest TestGetStreamByTableId Summary: Test should wait for replication group creation before running DDLs. Refactored the test and added helper functions that will create a replication group and wait for it to be created. Fixed a bug in `GetNamespaceCheckpointInfo` where a variable was incorrectly used after move. Fixes #24121 Jira: DB-13013 Test Plan: xcluster_outbound_replication_group-itest Reviewers: jhe, xCluster Reviewed By: jhe Subscribers: ybase Differential Revision: https://phorge.dev.yugabyte.com/D38399 --- ...luster_outbound_replication_group-itest.cc | 123 +++++++++--------- .../xcluster_outbound_replication_group.cc | 2 +- 2 files changed, 60 insertions(+), 65 deletions(-) diff --git a/src/yb/integration-tests/xcluster/xcluster_outbound_replication_group-itest.cc b/src/yb/integration-tests/xcluster/xcluster_outbound_replication_group-itest.cc index e17ba63b815..ed0887175da 100644 --- a/src/yb/integration-tests/xcluster/xcluster_outbound_replication_group-itest.cc +++ b/src/yb/integration-tests/xcluster/xcluster_outbound_replication_group-itest.cc @@ -76,6 +76,42 @@ class XClusterOutboundReplicationGroupTest : public XClusterYsqlTestBase { return table_info->id(); } + // Create a new OutboundReplicationGroup. Use GetXClusterStreams, or IsBootstrapRequired to wait + // for the new group to be ready. + Status CreateOutboundReplicationGroupAsync( + const xcluster::ReplicationGroupId& replication_group_id, + const std::vector& namespace_ids) { + return XClusterClient().CreateOutboundReplicationGroup( + replication_group_id, namespace_ids, UseAutomaticMode()); + } + + Status CreateOutboundReplicationGroupSync( + const xcluster::ReplicationGroupId& replication_group_id = kReplicationGroupId, + const std::vector& namespace_ids = {}) { + auto namespace_ids_copy = namespace_ids; + if (namespace_ids_copy.empty()) { + namespace_ids_copy.push_back(namespace_id_); + } + + RETURN_NOT_OK(CreateOutboundReplicationGroupAsync(replication_group_id, namespace_ids_copy)); + for (const auto& namespace_id : namespace_ids_copy) { + RETURN_NOT_OK(GetXClusterStreams(replication_group_id, namespace_id)); + } + return Status::OK(); + } + + Result IsBootstrapRequired( + const xcluster::ReplicationGroupId& replication_group_id = kReplicationGroupId, + const NamespaceId& namespace_id = {}) { + std::promise> promise; + RETURN_NOT_OK(XClusterClient().IsBootstrapRequired( + CoarseMonoClock::Now() + kDeadline, replication_group_id, + namespace_id.empty() ? namespace_id_ : namespace_id, + [&promise](Result result) { promise.set_value(std::move(result)); })); + + return promise.get_future().get(); + } + // Cleanup streams marked for deletion and get the list of xcluster streams. std::unordered_set CleanupAndGetAllXClusterStreams() { catalog_manager_->RunXReplBgTasks(epoch_); @@ -132,7 +168,12 @@ class XClusterOutboundReplicationGroupTest : public XClusterYsqlTestBase { CoarseMonoClock::Now() + kDeadline, replication_group_id, namespace_id, table_names, pg_schema_names, [&promise](const auto& resp) { promise.set_value(resp); })); - return promise.get_future().get(); + auto resp = VERIFY_RESULT(promise.get_future().get()); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + + return resp; } Result GetXClusterStreamsByTableId( @@ -210,8 +251,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, TestMultipleTable) { ASSERT_NOK(GetXClusterStreams(kReplicationGroupId, namespace_id_)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); + ASSERT_OK(CreateOutboundReplicationGroupSync()); auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); @@ -260,8 +300,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, AddDeleteNamespaces) { auto ns2_table_id_1 = ASSERT_RESULT(CreateYsqlTable(namespace_name_2, kTableName1)); auto ns2_table_id_2 = ASSERT_RESULT(CreateYsqlTable(namespace_name_2, kTableName2)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); + ASSERT_OK(CreateOutboundReplicationGroupSync()); // Wait for the new streams to be ready. auto ns1_info = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); @@ -324,11 +363,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, AddTable) { auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1)); ASSERT_OK(VerifyWalRetentionOfTable(table_id_1, 900)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); - - // Wait for the new streams to be ready. - ASSERT_OK(GetXClusterStreams(kReplicationGroupId, namespace_id_)); + ASSERT_OK(CreateOutboundReplicationGroupSync()); auto all_xcluster_streams_initial = CleanupAndGetAllXClusterStreams(); ASSERT_EQ(all_xcluster_streams_initial.size(), 1 + OverheadStreamsCount()); @@ -349,17 +384,8 @@ TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredEmptyTable) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_max_xcluster_streams_to_checkpoint_in_parallel) = 1; auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); - - std::promise> promise; - - ASSERT_OK(XClusterClient().IsBootstrapRequired( - CoarseMonoClock::Now() + kDeadline, kReplicationGroupId, namespace_id_, - [&promise](Result result) { promise.set_value(std::move(result)); })); - - auto is_bootstrap_required = ASSERT_RESULT(promise.get_future().get()); - ASSERT_FALSE(is_bootstrap_required); + ASSERT_OK(CreateOutboundReplicationGroupSync()); + ASSERT_FALSE(ASSERT_RESULT(IsBootstrapRequired())); } TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredTableWithData) { @@ -371,17 +397,8 @@ TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredTableWithData) { ASSERT_OK(producer_client()->OpenTable(table_id_2, &table_2)); ASSERT_OK(InsertRowsInProducer(0, 10, table_2)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); - - std::promise> promise; - - ASSERT_OK(XClusterClient().IsBootstrapRequired( - CoarseMonoClock::Now() + kDeadline, kReplicationGroupId, namespace_id_, - [&promise](Result result) { promise.set_value(std::move(result)); })); - - auto is_bootstrap_required = ASSERT_RESULT(promise.get_future().get()); - ASSERT_TRUE(is_bootstrap_required); + ASSERT_OK(CreateOutboundReplicationGroupSync()); + ASSERT_TRUE(ASSERT_RESULT(IsBootstrapRequired())); } TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredTableWithDeletedData) { @@ -394,17 +411,9 @@ TEST_F(XClusterOutboundReplicationGroupTest, IsBootstrapRequiredTableWithDeleted ASSERT_OK(InsertRowsInProducer(0, 10, table_2)); ASSERT_OK(DeleteRowsInProducer(0, 10, table_2)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); + ASSERT_OK(CreateOutboundReplicationGroupSync()); - std::promise> promise; - - ASSERT_OK(XClusterClient().IsBootstrapRequired( - CoarseMonoClock::Now() + kDeadline, kReplicationGroupId, namespace_id_, - [&promise](Result result) { promise.set_value(std::move(result)); })); - - auto is_bootstrap_required = ASSERT_RESULT(promise.get_future().get()); - ASSERT_FALSE(is_bootstrap_required); + ASSERT_FALSE(ASSERT_RESULT(IsBootstrapRequired())); } TEST_P(XClusterOutboundReplicationGroupParameterized, MasterRestartDuringCheckpoint) { @@ -421,8 +430,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, MasterRestartDuringCheckpo ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_block_xcluster_checkpoint_namespace_task) = true; - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); + ASSERT_OK(CreateOutboundReplicationGroupAsync(kReplicationGroupId, {namespace_id_})); std::promise> promise; auto future = promise.get_future(); @@ -449,8 +457,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, Repair) { auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1)); auto table_id_2 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName2)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); + ASSERT_OK(CreateOutboundReplicationGroupSync()); auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); ASSERT_EQ(resp.table_infos_size(), 2 + OverheadStreamsCount()); @@ -546,8 +553,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, RepairWithYbAdmin) { auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1)); auto table_id_2 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName2)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); + ASSERT_OK(CreateOutboundReplicationGroupSync()); auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); ASSERT_EQ(resp.table_infos_size(), 2 + OverheadStreamsCount()); @@ -589,11 +595,8 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, TestListAPIs) { ASSERT_OK(CreateYsqlTable(namespace_name_2, "table_2")); // Replication group 1 with two namespaces. - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_, namespace_id_2}, UseAutomaticMode())); - // Wait for checkpointing to complete. - ASSERT_OK(GetXClusterStreams(kReplicationGroupId, namespace_id_)); - ASSERT_OK(GetXClusterStreams(kReplicationGroupId, namespace_id_2)); + ASSERT_OK( + CreateOutboundReplicationGroupSync(kReplicationGroupId, {namespace_id_, namespace_id_2})); { auto group_info = ASSERT_RESULT( XClusterClient().GetXClusterOutboundReplicationGroupInfo(kReplicationGroupId)); @@ -606,10 +609,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, TestListAPIs) { // Replication group 2 with one namespace. const xcluster::ReplicationGroupId replication_group2("rg2"); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - replication_group2, {namespace_id_}, UseAutomaticMode())); - ASSERT_OK(GetXClusterStreams(replication_group2, namespace_id_)); - // Wait for checkpointing to complete. + ASSERT_OK(CreateOutboundReplicationGroupSync(replication_group2, {namespace_id_})); { auto group_info = ASSERT_RESULT(XClusterClient().GetXClusterOutboundReplicationGroupInfo(replication_group2)); @@ -679,8 +679,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, TestListAPIs) { // Make sure we cleanup the streams of the failed table create. TEST_P(XClusterOutboundReplicationGroupParameterized, CleanupStreamsOfFailedTableCreate) { auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); + ASSERT_OK(CreateOutboundReplicationGroupSync()); int expected_stream_count = 1 + OverheadStreamsCount(); auto check_streams = [&]() -> Status { @@ -719,11 +718,7 @@ TEST_P(XClusterOutboundReplicationGroupParameterized, CleanupStreamsOfFailedTabl TEST_P(XClusterOutboundReplicationGroupParameterized, TestGetStreamByTableId) { auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1)); - ASSERT_OK(XClusterClient().CreateOutboundReplicationGroup( - kReplicationGroupId, {namespace_id_}, UseAutomaticMode())); - - // Wait for the namespace to be ready. - ASSERT_OK(GetXClusterStreams(kReplicationGroupId, namespace_id_)); + ASSERT_OK(CreateOutboundReplicationGroupSync()); // Delete the table to put it into HIDDEN state. ASSERT_OK(DropYsqlTable(&producer_cluster_, kNamespaceName, kPgSchemaName, kTableName1)); diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc index cc091b9a613..8ec53e2caac 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc @@ -645,7 +645,7 @@ XClusterOutboundReplicationGroup::GetNamespaceCheckpointInfo( auto it = InsertOrReturnExisting( &table_names_map, {TableSchemaNamePair(table_descriptor.name(), table_descriptor.pgschema_name()), - std::move(table_descriptor)}); + table_descriptor}); SCHECK( !it, AlreadyPresent, Format(