Skip to content

Commit

Permalink
[BACKPORT 2024.1][#23047] docdb: Fix cotable ids in flushed frontier …
Browse files Browse the repository at this point in the history
…at restore

Summary:
Original commit: 550458d / D36041
When restoring a snopshot of a colocated tablet to a new database/table group, all tables are re-created in the new database so that the cotable ids are different from those in the snapshot.
At restore, the cotables in flushed frontiers should be updated to the ids of newly created tables, otherwise we will probably hit the following issue after restore:
```
1. we have 3 sst files after restore
   1.sst (smallest:old_id=0, largest:old_id=0)
   2.sst (smallest:old_id=0, largest:old_id=0)
   3.sst (smallest:old_id=0, largest:old_id=0)
2. compact 1.sst and 2.sst and generate 4.sst
   3.sst (smallest:old_id=0, largest:old_id=0)
   4.sst (smallest:new_id=1, largest:new_id=1)
   After compaction, schema packing with version 0 for new_id can be dropped because from frontier we can only find new_id=1
3. When compact 3.sst and 4.sst
   there are still rows with version 0 for old_id but schema version 0 has been GCed in step 2
```
Jira: DB-11979

Test Plan:
PackedRows/YBBackupTestWithPackedRowsAndColocation.*/1
CrossColocationTests/YBBackupCrossColocation.TestYSQLRestoreWithInvalidIndex/1
TableRewriteTests/YBBackupTestWithTableRewrite.TestYSQLBackupAndRestoreAfterRewrite/1
TableRewriteTests/YBBackupTestWithTableRewrite.TestYSQLBackupAndRestoreAfterFailedRewrite/1

Reviewers: sergei, zdrudi, mhaddad

Reviewed By: zdrudi

Subscribers: qhu, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D36930
  • Loading branch information
Huqicheng committed Aug 1, 2024
1 parent fcb488c commit 1e0f747
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 21 deletions.
14 changes: 14 additions & 0 deletions src/yb/docdb/consensus_frontier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,20 @@ void ConsensusFrontier::ResetSchemaVersion() {
cotable_schema_versions_.clear();
}

bool ConsensusFrontier::UpdateCoTableId(const Uuid& cotable_id, const Uuid& new_cotable_id) {
if (cotable_id == new_cotable_id) {
return false;
}
auto it = cotable_schema_versions_.find(cotable_id);
if (it == cotable_schema_versions_.end()) {
return false;
}
auto schema_version = it->second;
cotable_schema_versions_.erase(it);
cotable_schema_versions_[new_cotable_id] = schema_version;
return true;
}

void ConsensusFrontier::MakeExternalSchemaVersionsAtMost(
std::unordered_map<Uuid, SchemaVersion, UuidHash>* min_schema_versions) const {
if (primary_schema_version_) {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/docdb/consensus_frontier.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ class ConsensusFrontier : public rocksdb::UserFrontier {
void AddSchemaVersion(const Uuid& table_id, SchemaVersion version);
void ResetSchemaVersion();

// Update cotable_id to new_cotable_id in current frontier's cotable_schema_versions_ map.
// Return true if the map is modified, otherwise, return false.
bool UpdateCoTableId(const Uuid& cotable_id, const Uuid& new_cotable_id);

// Merge current frontier with provided map, preferring min values.
void MakeExternalSchemaVersionsAtMost(
std::unordered_map<Uuid, SchemaVersion, UuidHash>* min_schema_versions) const;
Expand Down
16 changes: 12 additions & 4 deletions src/yb/docdb/docdb_rocksdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,8 @@ class RocksDBPatcher::Impl {
return helper.Apply(options_, imm_cf_options_);
}

Status ModifyFlushedFrontier(const ConsensusFrontier& frontier) {
Status ModifyFlushedFrontier(
const ConsensusFrontier& frontier, const CotableIdsMap& cotable_ids_map) {
RocksDBPatcherHelper helper(&version_set_);

docdb::ConsensusFrontier final_frontier = frontier;
Expand All @@ -940,7 +941,8 @@ class RocksDBPatcher::Impl {
helper.Edit().ModifyFlushedFrontier(
final_frontier.Clone(), rocksdb::FrontierModificationMode::kForce);

helper.IterateFiles([&helper, &frontier](int level, rocksdb::FileMetaData fmd) {
helper.IterateFiles([&helper, &frontier, &cotable_ids_map](
int level, rocksdb::FileMetaData fmd) {
bool modified = false;
for (auto* user_frontier : {&fmd.smallest.user_frontier, &fmd.largest.user_frontier}) {
if (!*user_frontier) {
Expand All @@ -955,6 +957,11 @@ class RocksDBPatcher::Impl {
consensus_frontier.set_history_cutoff_information(frontier.history_cutoff());
modified = true;
}
for (const auto& [table_id, new_table_id] : cotable_ids_map) {
if (consensus_frontier.UpdateCoTableId(table_id, new_table_id)) {
modified = true;
}
}
}
if (modified) {
helper.ModifyFile(level, fmd);
Expand Down Expand Up @@ -1031,8 +1038,9 @@ Status RocksDBPatcher::SetHybridTimeFilter(std::optional<uint32_t> db_oid, Hybri
return impl_->SetHybridTimeFilter(db_oid, value);
}

Status RocksDBPatcher::ModifyFlushedFrontier(const ConsensusFrontier& frontier) {
return impl_->ModifyFlushedFrontier(frontier);
Status RocksDBPatcher::ModifyFlushedFrontier(
const ConsensusFrontier& frontier, const CotableIdsMap& cotable_ids_map) {
return impl_->ModifyFlushedFrontier(frontier, cotable_ids_map);
}

Status RocksDBPatcher::UpdateFileSizes() {
Expand Down
7 changes: 6 additions & 1 deletion src/yb/docdb/docdb_rocksdb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
namespace yb {
namespace docdb {

// Map from old cotable id to new cotable id.
// Used to restore snapshot to a new database/tablegroup and update cotable ids in the frontiers.
using CotableIdsMap = std::unordered_map<Uuid, Uuid, UuidHash>;

const int kDefaultGroupNo = 0;

dockv::KeyBytes AppendDocHt(Slice key, const DocHybridTime& doc_ht);
Expand Down Expand Up @@ -133,7 +137,8 @@ class RocksDBPatcher {
Status SetHybridTimeFilter(std::optional<uint32_t> db_oid, HybridTime value);

// Modify flushed frontier and clean up smallest/largest op id in per-SST file metadata.
Status ModifyFlushedFrontier(const ConsensusFrontier& frontier);
Status ModifyFlushedFrontier(
const ConsensusFrontier& frontier, const CotableIdsMap& cotable_ids_map);

// Update file sizes in manifest if actual file size was changed because of direct manipulation
// with .sst files.
Expand Down
21 changes: 20 additions & 1 deletion src/yb/tablet/tablet_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,13 @@ Result<RaftGroupMetadataPtr> RaftGroupMetadata::Load(
return ret;
}

Result<RaftGroupMetadataPtr> RaftGroupMetadata::LoadFromPath(
FsManager* fs_manager, const std::string& path) {
RaftGroupMetadataPtr ret(new RaftGroupMetadata(fs_manager, ""));
RETURN_NOT_OK(ret->LoadFromDisk(path));
return ret;
}

Result<RaftGroupMetadataPtr> RaftGroupMetadata::TEST_LoadOrCreate(
const RaftGroupMetadataData& data) {
if (data.fs_manager->LookupTablet(data.raft_group_id)) {
Expand Down Expand Up @@ -772,6 +779,18 @@ Result<TableInfoPtr> RaftGroupMetadata::GetTableInfoUnlocked(const TableId& tabl
return iter->second;
}

std::vector<TableInfoPtr> RaftGroupMetadata::GetColocatedTableInfos() const {
std::vector<TableInfoPtr> table_infos;
{
std::lock_guard lock(data_mutex_);
for (const auto& [_, table_info] : kv_store_.colocation_to_table) {
DCHECK(table_info->schema().has_colocation_id());
table_infos.push_back(table_info);
}
}
return table_infos;
}

Result<TableInfoPtr> RaftGroupMetadata::GetTableInfoUnlocked(ColocationId colocation_id) const {
if (colocation_id == kColocationIdNotSet) {
return GetTableInfoUnlocked(primary_table_id_);
Expand Down Expand Up @@ -965,7 +984,7 @@ Status RaftGroupMetadata::LoadFromSuperBlock(const RaftGroupReplicaSuperBlockPB&
std::lock_guard lock(data_mutex_);

// Verify that the Raft group id matches with the one in the protobuf.
if (superblock.raft_group_id() != raft_group_id_) {
if (!raft_group_id_.empty() && superblock.raft_group_id() != raft_group_id_) {
return STATUS(Corruption, "Expected id=" + raft_group_id_ +
" found " + superblock.raft_group_id(),
superblock.DebugString());
Expand Down
2 changes: 2 additions & 0 deletions src/yb/tablet/tablet_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ class RaftGroupMetadata : public RefCountedThreadSafe<RaftGroupMetadata>,
Result<TableInfoPtr> GetTableInfo(ColocationId colocation_id) const;
Result<TableInfoPtr> GetTableInfoUnlocked(ColocationId colocation_id) const REQUIRES(data_mutex_);

std::vector<TableInfoPtr> GetColocatedTableInfos() const;

const RaftGroupId& raft_group_id() const {
DCHECK_NE(state_, kNotLoadedYet);
return raft_group_id_;
Expand Down
61 changes: 50 additions & 11 deletions src/yb/tablet/tablet_snapshots.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ Env& TabletSnapshots::env() {
return *metadata().fs_manager()->env();
}

FsManager* TabletSnapshots::fs_manager() {
return metadata().fs_manager();
}

Status TabletSnapshots::CleanupSnapshotDir(const std::string& dir) {
auto& env = this->env();
if (!env.FileExists(dir)) {
Expand Down Expand Up @@ -361,21 +365,55 @@ Result<TabletRestorePatch> TabletSnapshots::GenerateRestoreWriteBatch(
}
}

// Get the map of snapshot cotable ids to the current cotable ids.
// The restored flushed frontiers can have cotable ids that are different from current cotable ids.
// This map is used to update the cotable ids in the restored flushed frontiers.
Result<docdb::CotableIdsMap> TabletSnapshots::GetCotableIdsMap(const std::string& snapshot_dir) {
docdb::CotableIdsMap cotable_ids_map;
if (snapshot_dir.empty() || !metadata().colocated()) {
return cotable_ids_map;
}
auto snapshot_metadata_file = TabletMetadataFile(snapshot_dir);
if (!env().FileExists(snapshot_metadata_file)) {
return cotable_ids_map;
}
auto snapshot_metadata =
VERIFY_RESULT(RaftGroupMetadata::LoadFromPath(fs_manager(), snapshot_metadata_file));
for (const auto& snapshot_table_info : snapshot_metadata->GetColocatedTableInfos()) {
auto current_table_info = metadata().GetTableInfo(
snapshot_table_info->schema().colocation_id());
if (!current_table_info.ok()) {
if (!current_table_info.status().IsNotFound()) {
return current_table_info.status();
}
LOG_WITH_PREFIX(WARNING) << "Table " << snapshot_table_info->table_id
<< " not found: " << current_table_info.status();
} else if ((*current_table_info)->cotable_id != snapshot_table_info->cotable_id) {
cotable_ids_map[snapshot_table_info->cotable_id] = (*current_table_info)->cotable_id;
}
}
if (!cotable_ids_map.empty()) {
LOG_WITH_PREFIX(INFO) << "Cotable ids map: " << yb::ToString(cotable_ids_map);
}
return cotable_ids_map;
}

Status TabletSnapshots::RestoreCheckpoint(
const std::string& dir, HybridTime restore_at, const RestoreMetadata& restore_metadata,
const std::string& snapshot_dir, HybridTime restore_at, const RestoreMetadata& restore_metadata,
const docdb::ConsensusFrontier& frontier, bool is_pitr_restore, const OpId& op_id) {
LongOperationTracker long_operation_tracker("Restore checkpoint", 5s);

// The following two lines can't just be changed to RETURN_NOT_OK(PauseReadWriteOperations()):
// op_pause has to stay in scope until the end of the function.
auto op_pauses = StartShutdownRocksDBs(DisableFlushOnShutdown(!dir.empty()), AbortOps::kTrue);
auto op_pauses = StartShutdownRocksDBs(
DisableFlushOnShutdown(!snapshot_dir.empty()), AbortOps::kTrue);

std::lock_guard lock(create_checkpoint_lock());

const string db_dir = regular_db().GetName();
const std::string intents_db_dir = has_intents_db() ? intents_db().GetName() : std::string();

if (dir.empty()) {
if (snapshot_dir.empty()) {
// Just change rocksdb hybrid time limit, because it should be in retention interval.
// TODO(pitr) apply transactions and reset intents.
CompleteShutdownRocksDBs(op_pauses);
Expand All @@ -385,7 +423,7 @@ Status TabletSnapshots::RestoreCheckpoint(
RETURN_NOT_OK(DeleteRocksDBs(CompleteShutdownRocksDBs(op_pauses)));

auto s = CopyDirectory(
&rocksdb_env(), dir, db_dir, UseHardLinks::kTrue, CreateIfMissing::kTrue);
&rocksdb_env(), snapshot_dir, db_dir, UseHardLinks::kTrue, CreateIfMissing::kTrue);
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(WARNING) << "Copy checkpoint files status: " << s;
return STATUS(IllegalState, "Unable to copy checkpoint files", s.ToString());
Expand All @@ -402,7 +440,8 @@ Status TabletSnapshots::RestoreCheckpoint(
docdb::RocksDBPatcher patcher(db_dir, rocksdb_options);

RETURN_NOT_OK(patcher.Load());
RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier));
RETURN_NOT_OK(patcher.ModifyFlushedFrontier(
frontier, VERIFY_RESULT(GetCotableIdsMap(snapshot_dir))));
if (restore_at) {
RETURN_NOT_OK(patcher.SetHybridTimeFilter(std::nullopt, restore_at));
}
Expand Down Expand Up @@ -434,14 +473,14 @@ Status TabletSnapshots::RestoreCheckpoint(
need_flush = true;
}

if (!dir.empty()) {
auto tablet_metadata_file = TabletMetadataFile(dir);
if (!snapshot_dir.empty()) {
auto snapshot_metadata_file = TabletMetadataFile(snapshot_dir);
// Old snapshots could lack tablet metadata, so just do nothing in this case.
if (env().FileExists(tablet_metadata_file)) {
LOG_WITH_PREFIX(INFO) << "Merging metadata with restored: " << tablet_metadata_file
if (env().FileExists(snapshot_metadata_file)) {
LOG_WITH_PREFIX(INFO) << "Merging metadata with restored: " << snapshot_metadata_file
<< " , force overwrite of schema packing " << !is_pitr_restore;
RETURN_NOT_OK(tablet().metadata()->MergeWithRestored(
tablet_metadata_file,
snapshot_metadata_file,
is_pitr_restore ? dockv::OverwriteSchemaPacking::kFalse
: dockv::OverwriteSchemaPacking::kTrue));
need_flush = true;
Expand All @@ -461,7 +500,7 @@ Status TabletSnapshots::RestoreCheckpoint(
return s;
}

LOG_WITH_PREFIX(INFO) << "Checkpoint restored from " << dir;
LOG_WITH_PREFIX(INFO) << "Checkpoint restored from " << snapshot_dir;
LOG_WITH_PREFIX(INFO) << "Re-enabling compactions";
s = tablet().EnableCompactions(&op_pauses.blocking_rocksdb_shutdown_start);
if (!s.ok()) {
Expand Down
5 changes: 4 additions & 1 deletion src/yb/tablet/tablet_snapshots.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,23 @@ class TabletSnapshots : public TabletComponent {
// Restore the RocksDB checkpoint from the provided directory.
// Only used when table_type_ == YQL_TABLE_TYPE.
Status RestoreCheckpoint(
const std::string& dir, HybridTime restore_at, const RestoreMetadata& metadata,
const std::string& snapshot_dir, HybridTime restore_at, const RestoreMetadata& metadata,
const docdb::ConsensusFrontier& frontier, bool is_pitr_restore, const OpId& op_id);

// Applies specified snapshot operation.
Status Apply(SnapshotOperation* operation);

Status CleanupSnapshotDir(const std::string& dir);
Env& env();
FsManager* fs_manager();

Status RestorePartialRows(SnapshotOperation* operation);

Result<TabletRestorePatch> GenerateRestoreWriteBatch(
const tserver::TabletSnapshotOpRequestPB& request, docdb::DocWriteBatch* write_batch);

Result<docdb::CotableIdsMap> GetCotableIdsMap(const std::string& snapshot_dir);

std::string TEST_last_rocksdb_checkpoint_dir_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/yb/tools/data-patcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ class ApplyPatch {
frontier.set_history_cutoff_information(
{ HybridTime::FromMicros(kYugaByteMicrosecondEpoch),
HybridTime::FromMicros(kYugaByteMicrosecondEpoch) });
RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier));
RETURN_NOT_OK(patcher.ModifyFlushedFrontier(frontier, docdb::CotableIdsMap()));
} else {
LOG(INFO) << "We did not see RocksDB CURRENT or MANIFEST-... files in "
<< dir << ", skipping applying " << patched_path;
Expand Down
4 changes: 2 additions & 2 deletions src/yb/tools/yb-backup/yb-backup-cross-feature-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1671,10 +1671,10 @@ TEST_P(
tserver::FlushTabletsRequestPB::COMPACT));

ASSERT_OK(RunBackupCommand(
{"--backup_location", backup_dir, "--keyspace", Format("ysql.$0", backup_db_name),
{"--backup_location", backup_dir, "--keyspace", Format("ysql.$0", restore_db_name),
"restore"}));

SetDbName(backup_db_name);
SetDbName(restore_db_name);

ASSERT_NO_FATALS(
InsertRows(Format("INSERT INTO $0 VALUES (9,9,9), (10,10,10), (11,11,11)", table_name), 3));
Expand Down

0 comments on commit 1e0f747

Please sign in to comment.