Skip to content

Commit

Permalink
[#20422] xCluster: Checkpoint table for xCluster on create
Browse files Browse the repository at this point in the history
Summary:
As part of creating a new table, also create xCluster stream if the table belongs to a namespace that is part of an xClusterOutboundReplicationGroup.
As part of table creation we create `AddTableToXClusterSourceTask` for each ReplicationGroup that we want to add the table to.
The task invokes `XClusterOutboundReplicationGroup::AddTable` to create a xcluster stream checkpointed to `OpId().Min()` (0,0) so that replication will start from the beginning of the WAL.
We currently skip extending WAL retention of the table since this requires an ALTER Table operation which cannot be performed unless the table creation has completed. #20769 tracks the task which will remove the need for performing ALTER Table in the first place.
#20938 tracks the race conditions when tables are created concurrently with xcluster replication groups
#16971 tracks delete of tables that are under replication

**Upgrade/Rollback safety:**
All the changes are guarded under flag `TEST_enable_xcluster_api_v2`

Fixes #20422
Jira: DB-9415

Test Plan:
TEST_F(XClusterOutboundReplicationGroupMockedTest, AddTable)
TEST_F(XClusterOutboundReplicationGroupTest, AddTable)

Reviewers: mlillibridge, jhe, slingam, xCluster

Reviewed By: mlillibridge

Subscribers: yql, xCluster, ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D31935
  • Loading branch information
hari90 committed Feb 6, 2024
1 parent 38844f2 commit dfb067f
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,5 +237,23 @@ TEST_F(XClusterOutboundReplicationGroupTest, AddDeleteNamespaces) {
ASSERT_TRUE(final_xcluster_streams.empty());
}

TEST_F(XClusterOutboundReplicationGroupTest, AddTable) {
auto ns1_table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1));

ASSERT_OK(client_->XClusterCreateOutboundReplicationGroup(kReplicationGroupId, {kNamespaceName}));

auto all_xcluster_streams_initial = CleanupAndGetAllXClusterStreams();
ASSERT_EQ(all_xcluster_streams_initial.size(), 1);

auto ns1_table_id_2 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName2));

all_xcluster_streams_initial = CleanupAndGetAllXClusterStreams();
ASSERT_EQ(all_xcluster_streams_initial.size(), 2);

auto ns1_info = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_NO_FATALS(VerifyNamespaceCheckpointInfo(
ns1_table_id_1, ns1_table_id_2, all_xcluster_streams_initial, ns1_info));
}

} // namespace master
} // namespace yb
1 change: 1 addition & 0 deletions src/yb/master/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ set(MASTER_SRCS
ts_manager.cc
universe_key_registry_service.cc
util/yql_vtable_helpers.cc
xcluster/add_table_to_xcluster_source_task.cc
xcluster/add_table_to_xcluster_target_task.cc
xcluster/xcluster_catalog_entity.cc
xcluster/xcluster_config.cc
Expand Down
13 changes: 7 additions & 6 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,11 @@ class CatalogManager : public tserver::TabletPeerLookupIf,

std::shared_ptr<ClusterConfigInfo> ClusterConfig() const;

Status CreateNewXReplStream(
const CreateCDCStreamRequestPB& req, CreateNewCDCStreamMode mode,
const std::vector<TableId>& table_ids, const std::optional<const NamespaceId>& namespace_id,
CreateCDCStreamResponsePB* resp, const LeaderEpoch& epoch, rpc::RpcContext* rpc);

protected:
// TODO Get rid of these friend classes and introduce formal interface.
friend class TableLoader;
Expand Down Expand Up @@ -2728,12 +2733,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
SysSnapshotEntryPB::State state, SysSnapshotEntryPB* snapshot_pb);

Status CreateNewCDCStreamForNamespace(
const CreateCDCStreamRequestPB& req,
CreateCDCStreamResponsePB* resp, rpc::RpcContext* rpc, const LeaderEpoch& epoch);
Status CreateNewXReplStream(
const CreateCDCStreamRequestPB& req, CreateNewCDCStreamMode mode,
const std::vector<TableId>& table_ids, const std::optional<const NamespaceId>& namespace_id,
CreateCDCStreamResponsePB* resp, const LeaderEpoch& epoch, rpc::RpcContext* rpc);
const CreateCDCStreamRequestPB& req, CreateCDCStreamResponsePB* resp, rpc::RpcContext* rpc,
const LeaderEpoch& epoch);

Status PopulateCDCStateTable(const xrepl::StreamId& stream_id,
const std::vector<TableId>& table_ids,
Expand Down
52 changes: 52 additions & 0 deletions src/yb/master/xcluster/add_table_to_xcluster_source_task.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/master/xcluster/add_table_to_xcluster_source_task.h"

#include "yb/master/catalog_manager.h"
#include "yb/master/xcluster/xcluster_outbound_replication_group.h"

namespace yb::master {

using namespace std::placeholders;

AddTableToXClusterSourceTask::AddTableToXClusterSourceTask(
std::shared_ptr<XClusterOutboundReplicationGroup> outbound_replication_group,
CatalogManager& catalog_manager, rpc::Messenger& messenger, TableInfoPtr table_info,
const LeaderEpoch& epoch)
: PostTabletCreateTaskBase(
catalog_manager, *catalog_manager.AsyncTaskPool(), messenger, std::move(table_info),
epoch),
outbound_replication_group_(std::move(outbound_replication_group)) {}

std::string AddTableToXClusterSourceTask::description() const {
return Format("AddTableToXClusterSourceTask [$0]", table_info_->id());
}

Status AddTableToXClusterSourceTask::FirstStep() {
outbound_replication_group_->AddTable(
table_info_, epoch_, std::bind(&AddTableToXClusterSourceTask::CompletionCallback, this, _1));

return Status::OK();
}

void AddTableToXClusterSourceTask::CompletionCallback(const Status& status) {
if (status.ok()) {
Complete();
return;
}

AbortAndReturnPrevState(status);
}

} // namespace yb::master
47 changes: 47 additions & 0 deletions src/yb/master/xcluster/add_table_to_xcluster_source_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#pragma once

#include "yb/client/client_fwd.h"

#include "yb/master/post_tablet_create_task_base.h"

namespace yb::master {

class XClusterOutboundReplicationGroup;

class AddTableToXClusterSourceTask : public PostTabletCreateTaskBase {
public:
explicit AddTableToXClusterSourceTask(
std::shared_ptr<XClusterOutboundReplicationGroup> outbound_replication_group,
CatalogManager& catalog_manager, rpc::Messenger& messenger, TableInfoPtr table_info,
const LeaderEpoch& epoch);

server::MonitoredTaskType type() const override {
return server::MonitoredTaskType::kAddTableToXClusterSource;
}

std::string type_name() const override { return "Add table to xCluster source replication"; }

std::string description() const override;

private:
Status FirstStep() override;

void CompletionCallback(const Status& status);

const std::shared_ptr<XClusterOutboundReplicationGroup> outbound_replication_group_;
};

} // namespace yb::master
2 changes: 2 additions & 0 deletions src/yb/master/xcluster/master_xcluster_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ struct IsOperationDoneResult {
Status status; // If the operation completed and it failed, this will contain the error.
};

YB_DEFINE_ENUM(StreamCheckpointLocation, (kOpId0)(kCurrentEndOfWAL));

} // namespace yb::master
45 changes: 43 additions & 2 deletions src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ class XClusterOutboundReplicationGroupMockedTest : public YBTest {
.get_tables_func =
[this](const NamespaceId& namespace_id) { return namespace_tables[namespace_id]; },
.bootstrap_tables_func =
[this](const std::vector<TableInfoPtr>& table_infos, CoarseTimePoint deadline)
-> Result<std::vector<xrepl::StreamId>> {
[this](
const std::vector<TableInfoPtr>& table_infos, CoarseTimePoint deadline,
StreamCheckpointLocation checkpoint_location,
const LeaderEpoch& epoch) -> Result<std::vector<xrepl::StreamId>> {
std::vector<xrepl::StreamId> stream_ids;
for (const auto& table_info : table_infos) {
stream_ids.emplace_back(CreateXClusterStream(table_info->id()));
Expand Down Expand Up @@ -228,8 +230,10 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, TestMultipleTable) {
auto outbound_rg_ptr = CreateReplicationGroup();
auto& outbound_rg = *outbound_rg_ptr;

ASSERT_FALSE(outbound_rg.HasNamespace(kNamespaceId));
auto namespace_id = ASSERT_RESULT(outbound_rg.AddNamespace(kEpoch, kNamespaceName, kDeadline));
ASSERT_EQ(namespace_id, kNamespaceId);
ASSERT_TRUE(outbound_rg.HasNamespace(kNamespaceId));

auto ns_info_opt = ASSERT_RESULT(outbound_rg.GetNamespaceCheckpointInfo(kNamespaceId));
ASSERT_TRUE(ns_info_opt.has_value());
Expand Down Expand Up @@ -322,6 +326,7 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, AddDeleteNamespaces) {
ASSERT_NO_FATALS(VerifyNamespaceCheckpointInfo(ns2_table_id_1, ns2_table_id_2, *ns2_info_opt));

ASSERT_OK(outbound_rg.RemoveNamespace(kEpoch, kNamespaceId));
ASSERT_FALSE(outbound_rg.HasNamespace(kNamespaceId));
ASSERT_NOK(outbound_rg.GetNamespaceCheckpointInfo(kNamespaceId));

// We should only have only the streams from second namespace.
Expand All @@ -333,6 +338,7 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, AddDeleteNamespaces) {
}

ASSERT_OK(outbound_rg.Delete(kEpoch));
ASSERT_FALSE(outbound_rg.HasNamespace(namespace_id_2));
ASSERT_NOK(outbound_rg.GetNamespaceCheckpointInfo(namespace_id_2));
ASSERT_TRUE(xcluster_streams.empty());
}
Expand Down Expand Up @@ -399,4 +405,39 @@ TEST_F(XClusterOutboundReplicationGroupMockedTest, CreateTargetReplicationGroup)
SysXClusterOutboundReplicationGroupEntryPB::TargetUniverseInfo::REPLICATING);
}

TEST_F(XClusterOutboundReplicationGroupMockedTest, AddTable) {
auto table_info1 = CreateTable(kNamespaceId, kTableId1, kTableName1, kPgSchemaName);
CreateTable(kNamespaceId, kTableId2, kTableName2, kPgSchemaName2);

auto outbound_rg = CreateReplicationGroup();
auto namespace_id = ASSERT_RESULT(outbound_rg->AddNamespace(kEpoch, kNamespaceName, kDeadline));
ASSERT_TRUE(outbound_rg->HasNamespace(kNamespaceId));
ASSERT_EQ(xcluster_streams.size(), 2);

auto ns_info = ASSERT_RESULT(outbound_rg->GetNamespaceCheckpointInfo(kNamespaceId));
ASSERT_EQ(ns_info->table_infos.size(), 2);

std::promise<Status> promise;
auto completion_cb = [&promise](const Status& status) { promise.set_value(status); };

// Same table should not get added twice.
outbound_rg->AddTable(table_info1, kEpoch, completion_cb);
ASSERT_OK(promise.get_future().get());

ASSERT_EQ(ns_info->table_infos.size(), 2);

const TableName table_3 = "table3";
const TableId table_id_3 = "table_id_3";
auto table_info3 = CreateTable(kNamespaceId, table_id_3, table_3, kPgSchemaName);

promise = {};
outbound_rg->AddTable(table_info3, kEpoch, completion_cb);
ASSERT_OK(promise.get_future().get());

ASSERT_EQ(xcluster_streams.size(), 3);
ns_info = ASSERT_RESULT(outbound_rg->GetNamespaceCheckpointInfo(kNamespaceId));
ASSERT_TRUE(ns_info.has_value());
ASSERT_EQ(ns_info->table_infos.size(), 3);
}

} // namespace yb::master
Loading

0 comments on commit dfb067f

Please sign in to comment.