Skip to content

Commit

Permalink
[#9333][backup] Improve internal PB structure to store backup metadat…
Browse files Browse the repository at this point in the history
…a into SnapshotInfoPB file.

Summary:
Updated `SnapshotInfoPB` and the new PB wrapper `BackupRowEntryPB` are used for backups only, so they can be easily extended for new backup related features.
- `BackupRowEntryPB` replaces `SysRowEntry`
- Added new fields into `SnapshotInfoPB` (with `SysSnapshotEntryPB` inside).

`YBSnapshotInfo` backup metadata file contains an instance of `SnapshotInfoPB`.

The test flag was added: `-TEST_metadata_file_format_version=1`. (0 - latest, 1 - old format, 2 - new format)

For backward compatibility the new format reader in `yb-admin` supports the old file format too.
To cover backward compatible test-case `yb-admin export_snapshot <file> -TEST_metadata_file_format_version=1` writes the metadata file in the old format. It's used in the new test `AdminCliTest.TestImportSnapshotInOldFormat1`.

Test Plan:
ybd --java-test org.yb.cql.TestYbBackup
ybd --java-test org.yb.pgsql.TestYbBackup

ybd --cxx-test integration-tests_snapshot-test
ybd --cxx-test tools_yb-admin-test_ent
ybd --cxx-test tools_yb-admin-test_ent --gtest_filter AdminCliTest.TestImportSnapshot
ybd --cxx-test tools_yb-admin-test_ent --gtest_filter AdminCliTest.TestImportSnapshotInOldFormat1
ybd --cxx-test backup-txn-test --gtest_filter BackupTxnTest.ImportMeta

Reviewers: dsrinivasan, jhe, mihnea, bogdan

Reviewed By: mihnea, bogdan

Subscribers: yql, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D12284
  • Loading branch information
OlegLoginov committed Jul 30, 2021
1 parent 36a9ed7 commit 4c92727
Show file tree
Hide file tree
Showing 13 changed files with 171 additions and 37 deletions.
5 changes: 4 additions & 1 deletion ent/src/yb/integration-tests/snapshot-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ using client::YBTableName;
using master::MasterBackupServiceProxy;
using master::MasterServiceProxy;
using master::SysRowEntry;
using master::BackupRowEntryPB;
using master::TableInfo;
using master::TabletInfo;
using rpc::Messenger;
Expand Down Expand Up @@ -530,6 +531,7 @@ TEST_F(SnapshotTest, ImportSnapshotMeta) {
ListSnapshotsRequestPB list_req;
ListSnapshotsResponsePB list_resp;
list_req.set_snapshot_id(snapshot_id);
list_req.set_prepare_for_backup(true);
ASSERT_OK(proxy_backup_->ListSnapshots(list_req, &list_resp, ResetAndGetController()));
LOG(INFO) << "Requested available snapshots.";
SCOPED_TRACE(list_resp.DebugString());
Expand All @@ -543,7 +545,8 @@ TEST_F(SnapshotTest, ImportSnapshotMeta) {
const int old_table_num_tablets = snapshot_pb.tablet_snapshots_size();
string old_table_name, old_namespace_name;

for (const SysRowEntry& entry : snapshot_pb.entries()) {
for (const BackupRowEntryPB& backup_entry : snapshot.backup_entries()) {
const SysRowEntry& entry = backup_entry.entry();
switch (entry.type()) {
case SysRowEntry::NAMESPACE: { // Get NAMESPACE name.
SysNamespaceEntryPB meta;
Expand Down
8 changes: 4 additions & 4 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,19 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon
typedef std::map<NamespaceId, NamespaceData> NamespaceMap;
typedef std::map<TableId, ExternalTableSnapshotData> ExternalTableSnapshotDataMap;

CHECKED_STATUS ImportSnapshotPreprocess(const SysSnapshotEntryPB& snapshot_pb,
CHECKED_STATUS ImportSnapshotPreprocess(const SnapshotInfoPB& snapshot_pb,
ImportSnapshotMetaResponsePB* resp,
NamespaceMap* namespace_map,
ExternalTableSnapshotDataMap* tables_data);
CHECKED_STATUS ImportSnapshotCreateObject(const SysSnapshotEntryPB& snapshot_pb,
CHECKED_STATUS ImportSnapshotCreateObject(const SnapshotInfoPB& snapshot_pb,
ImportSnapshotMetaResponsePB* resp,
NamespaceMap* namespace_map,
ExternalTableSnapshotDataMap* tables_data,
CreateObjects create_objects);
CHECKED_STATUS ImportSnapshotWaitForTables(const SysSnapshotEntryPB& snapshot_pb,
CHECKED_STATUS ImportSnapshotWaitForTables(const SnapshotInfoPB& snapshot_pb,
ImportSnapshotMetaResponsePB* resp,
ExternalTableSnapshotDataMap* tables_data);
CHECKED_STATUS ImportSnapshotProcessTablets(const SysSnapshotEntryPB& snapshot_pb,
CHECKED_STATUS ImportSnapshotProcessTablets(const SnapshotInfoPB& snapshot_pb,
ImportSnapshotMetaResponsePB* resp,
ExternalTableSnapshotDataMap* tables_data);
void DeleteNewSnapshotObjects(const NamespaceMap& namespace_map,
Expand Down
62 changes: 51 additions & 11 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,34 @@ Status CatalogManager::ListSnapshots(const ListSnapshotsRequestPB* req,
}
}

return snapshot_coordinator_.ListSnapshots(
txn_snapshot_id, req->list_deleted_snapshots(), resp);
if (req->prepare_for_backup() && (!req->has_snapshot_id() || !txn_snapshot_id)) {
return STATUS(
InvalidArgument, "Request must have correct snapshot_id", (req->has_snapshot_id() ?
req->snapshot_id() : "None"), MasterError(MasterErrorPB::SNAPSHOT_FAILED));
}

RETURN_NOT_OK(snapshot_coordinator_.ListSnapshots(
txn_snapshot_id, req->list_deleted_snapshots(), resp));

if (req->prepare_for_backup()) {
// Repack & extend the backup row entries.
for (SnapshotInfoPB& snapshot : *resp->mutable_snapshots()) {
snapshot.set_format_version(2);
SysSnapshotEntryPB& sys_entry = *snapshot.mutable_entry();
snapshot.mutable_backup_entries()->Reserve(sys_entry.entries_size());

for (SysRowEntry& entry : *sys_entry.mutable_entries()) {
BackupRowEntryPB* const backup_entry = snapshot.add_backup_entries();
backup_entry->mutable_entry()->Swap(&entry);
// Setup other BackupRowEntryPB fields.
// E.g.: backup_entry->set_pg_schema_name(...);
}

sys_entry.clear_entries();
}
}

return Status::OK();
}

Status CatalogManager::ListSnapshotRestorations(const ListSnapshotRestorationsRequestPB* req,
Expand Down Expand Up @@ -716,11 +742,12 @@ Status CatalogManager::DeleteNonTransactionAwareSnapshot(const SnapshotId& snaps
return Status::OK();
}

Status CatalogManager::ImportSnapshotPreprocess(const SysSnapshotEntryPB& snapshot_pb,
Status CatalogManager::ImportSnapshotPreprocess(const SnapshotInfoPB& snapshot_pb,
ImportSnapshotMetaResponsePB* resp,
NamespaceMap* namespace_map,
ExternalTableSnapshotDataMap* tables_data) {
for (const SysRowEntry& entry : snapshot_pb.entries()) {
for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
const SysRowEntry& entry = backup_entry.entry();
switch (entry.type()) {
case SysRowEntry::NAMESPACE: // Recreate NAMESPACE.
RETURN_NOT_OK(ImportNamespaceEntry(entry, namespace_map));
Expand Down Expand Up @@ -762,13 +789,14 @@ Status CatalogManager::ImportSnapshotPreprocess(const SysSnapshotEntryPB& snapsh
return Status::OK();
}

Status CatalogManager::ImportSnapshotCreateObject(const SysSnapshotEntryPB& snapshot_pb,
Status CatalogManager::ImportSnapshotCreateObject(const SnapshotInfoPB& snapshot_pb,
ImportSnapshotMetaResponsePB* resp,
NamespaceMap* namespace_map,
ExternalTableSnapshotDataMap* tables_data,
CreateObjects create_objects) {
// Create ONLY TABLES or ONLY INDEXES in accordance to the argument.
for (const SysRowEntry& entry : snapshot_pb.entries()) {
for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
const SysRowEntry& entry = backup_entry.entry();
if (entry.type() == SysRowEntry::TABLE) {
ExternalTableSnapshotData& data = (*tables_data)[entry.id()];
if ((create_objects == CreateObjects::kOnlyIndexes) == data.is_index()) {
Expand All @@ -780,10 +808,11 @@ Status CatalogManager::ImportSnapshotCreateObject(const SysSnapshotEntryPB& snap
return Status::OK();
}

Status CatalogManager::ImportSnapshotWaitForTables(const SysSnapshotEntryPB& snapshot_pb,
Status CatalogManager::ImportSnapshotWaitForTables(const SnapshotInfoPB& snapshot_pb,
ImportSnapshotMetaResponsePB* resp,
ExternalTableSnapshotDataMap* tables_data) {
for (const SysRowEntry& entry : snapshot_pb.entries()) {
for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
const SysRowEntry& entry = backup_entry.entry();
if (entry.type() == SysRowEntry::TABLE) {
ExternalTableSnapshotData& data = (*tables_data)[entry.id()];
if (!data.is_index()) {
Expand All @@ -795,10 +824,11 @@ Status CatalogManager::ImportSnapshotWaitForTables(const SysSnapshotEntryPB& sna
return Status::OK();
}

Status CatalogManager::ImportSnapshotProcessTablets(const SysSnapshotEntryPB& snapshot_pb,
Status CatalogManager::ImportSnapshotProcessTablets(const SnapshotInfoPB& snapshot_pb,
ImportSnapshotMetaResponsePB* resp,
ExternalTableSnapshotDataMap* tables_data) {
for (const SysRowEntry& entry : snapshot_pb.entries()) {
for (const BackupRowEntryPB& backup_entry : snapshot_pb.backup_entries()) {
const SysRowEntry& entry = backup_entry.entry();
if (entry.type() == SysRowEntry::TABLET) {
// Create tablets IDs map.
RETURN_NOT_OK(ImportTabletEntry(entry, tables_data));
Expand Down Expand Up @@ -878,7 +908,17 @@ Status CatalogManager::ImportSnapshotMeta(const ImportSnapshotMetaRequestPB* req
}
});

const SysSnapshotEntryPB& snapshot_pb = req->snapshot().entry();
const SnapshotInfoPB& snapshot_pb = req->snapshot();

if (!snapshot_pb.has_format_version() || snapshot_pb.format_version() != 2) {
return STATUS(InternalError, "Expected snapshot data in format 2",
snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED));
}

if (snapshot_pb.backup_entries_size() == 0) {
return STATUS(InternalError, "Expected snapshot data prepared for backup",
snapshot_pb.ShortDebugString(), MasterError(MasterErrorPB::SNAPSHOT_FAILED));
}

// PHASE 1: Recreate namespaces, create table's meta data.
RETURN_NOT_OK(ImportSnapshotPreprocess(snapshot_pb, resp, &namespace_map, &tables_data));
Expand Down
17 changes: 16 additions & 1 deletion ent/src/yb/master/master_backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ message SysSnapshotEntryPB {
optional fixed64 previous_snapshot_hybrid_time = 7;
}

message BackupRowEntryPB {
optional SysRowEntry entry = 1;
// Other per-object backup related properties.
// E.g.: optional bytes pg_schema_name = 2; // For 'entry.type' == TABLE only.
}

////////////////////////////////////////////////////////////
// RPCs
////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -85,6 +91,12 @@ message SnapshotInfoPB {
optional bytes id = 1;
// Detailed snapshot entries.
optional SysSnapshotEntryPB entry = 2;

optional int64 format_version = 3 [ default = 1 ];

// Stored extended info about snapshot (1) namespaces (2) tables (3) tablets
// instead of 'SysRowEntry entry.entries'.
repeated BackupRowEntryPB backup_entries = 4;
}

message TabletRestorationPB {
Expand All @@ -111,12 +123,15 @@ message RestorationInfoPB {
message ListSnapshotsRequestPB {
optional bytes snapshot_id = 2;
optional bool list_deleted_snapshots = 3;
optional bool prepare_for_backup = 4;
}

message ListSnapshotsResponsePB {
optional MasterErrorPB error = 1;

optional bytes current_snapshot_id = 2; // Snapshot currently being created/restored.
// Non-transactional snapshot currently being created/restored.
optional bytes current_snapshot_id = 2;

repeated SnapshotInfoPB snapshots = 3;
}

Expand Down
34 changes: 24 additions & 10 deletions ent/src/yb/tools/yb-admin-test_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class AdminCliTest : public client::KeyValueTableTest<MiniCluster> {
const string& keyspace, const string& table_name, const string& index_name,
bool same_ids = false);

void DoTestImportSnapshot(const string& format = "");
void DoTestExportImportIndexSnapshot(Transactional transactional);

private:
Expand Down Expand Up @@ -217,7 +218,7 @@ TEST_F(AdminCliTest, TestCreateSnapshot) {
ASSERT_OK(ASSERT_RESULT(BackupServiceProxy())->ListSnapshots(req, &resp, &rpc));
ASSERT_EQ(resp.snapshots_size(), 1);

LOG(INFO) << "Test TestCreateSnapshot finished.";
LOG(INFO) << "Test finished: " << CURRENT_TEST_CASE_AND_TEST_NAME_STR();
}

Result<size_t> AdminCliTest::NumTables(const string& table_name) const {
Expand Down Expand Up @@ -261,7 +262,7 @@ void AdminCliTest::ImportTableAs(const string& snapshot_file,
CheckAndDeleteImportedTable(keyspace, table_name);
}

TEST_F(AdminCliTest, TestImportSnapshot) {
void AdminCliTest::DoTestImportSnapshot(const string& format) {
CreateTable(Transactional::kFalse);
const string& table_name = table_.name().table_name();
const string& keyspace = table_.name().namespace_name();
Expand All @@ -273,7 +274,13 @@ TEST_F(AdminCliTest, TestImportSnapshot) {
string tmp_dir;
ASSERT_OK(Env::Default()->GetTestDirectory(&tmp_dir));
const auto snapshot_file = JoinPathSegments(tmp_dir, "exported_snapshot.dat");
ASSERT_OK(RunAdminToolCommand("export_snapshot", snapshot_id, snapshot_file));

if (format.empty()) {
ASSERT_OK(RunAdminToolCommand("export_snapshot", snapshot_id, snapshot_file));
} else {
ASSERT_OK(RunAdminToolCommand("export_snapshot", snapshot_id, snapshot_file,
"-TEST_metadata_file_format_version=" + format));
}

// Import snapshot into the existing table.
ASSERT_OK(RunAdminToolCommand("import_snapshot", snapshot_file));
Expand All @@ -290,8 +297,16 @@ TEST_F(AdminCliTest, TestImportSnapshot) {
ImportTableAs(snapshot_file, keyspace, table_name + "_new");
// Import snapshot into already existing namespace and table.
ImportTableAs(snapshot_file, keyspace, table_name);
}

LOG(INFO) << "Test TestImportSnapshot finished.";
TEST_F(AdminCliTest, TestImportSnapshot) {
DoTestImportSnapshot();
LOG(INFO) << "Test finished: " << CURRENT_TEST_CASE_AND_TEST_NAME_STR();
}

TEST_F(AdminCliTest, TestImportSnapshotInOldFormat1) {
DoTestImportSnapshot("1");
LOG(INFO) << "Test finished: " << CURRENT_TEST_CASE_AND_TEST_NAME_STR();
}

TEST_F(AdminCliTest, TestExportImportSnapshot) {
Expand All @@ -301,8 +316,7 @@ TEST_F(AdminCliTest, TestExportImportSnapshot) {

// Create snapshot of default table that gets created.
ASSERT_OK(RunAdminToolCommand("create_snapshot", keyspace, table_name));
const auto snapshot_id =
ASSERT_RESULT(GetCompletedSnapshot());
const auto snapshot_id = ASSERT_RESULT(GetCompletedSnapshot());

string tmp_dir;
ASSERT_OK(Env::Default()->GetTestDirectory(&tmp_dir));
Expand All @@ -315,7 +329,7 @@ TEST_F(AdminCliTest, TestExportImportSnapshot) {
CheckImportedTable(table_.get(), yb_table_name, /* same_ids */ true);
ASSERT_EQ(1, ASSERT_RESULT(NumTables(table_name)));

LOG(INFO) << "Test TestExportImportSnapshot finished.";
LOG(INFO) << "Test finished: " << CURRENT_TEST_CASE_AND_TEST_NAME_STR();
}

TEST_F(AdminCliTest, TestRestoreSnapshotBasic) {
Expand Down Expand Up @@ -556,13 +570,13 @@ void AdminCliTest::DoTestExportImportIndexSnapshot(Transactional transactional)
TEST_F(AdminCliTest, TestExportImportIndexSnapshot) {
// Test non-transactional table.
DoTestExportImportIndexSnapshot(Transactional::kFalse);
LOG(INFO) << "Test TestExportImportIndexSnapshot finished.";
LOG(INFO) << "Test finished: " << CURRENT_TEST_CASE_AND_TEST_NAME_STR();
}

TEST_F(AdminCliTest, TestExportImportIndexSnapshot_ForTransactional) {
// Test the recreated transactional table.
DoTestExportImportIndexSnapshot(Transactional::kTrue);
LOG(INFO) << "Test TestExportImportIndexSnapshot_ForTransactional finished.";
LOG(INFO) << "Test finished: " << CURRENT_TEST_CASE_AND_TEST_NAME_STR();
}

TEST_F(AdminCliTest, TestFailedRestoration) {
Expand Down Expand Up @@ -598,7 +612,7 @@ TEST_F(AdminCliTest, TestFailedRestoration) {
LOG(INFO) << "Restoration: " << SysSnapshotEntryPB::State_Name(state);
ASSERT_EQ(state, SysSnapshotEntryPB::FAILED);

LOG(INFO) << "Test TestFailedRestoration finished.";
LOG(INFO) << "Test finished: " << CURRENT_TEST_CASE_AND_TEST_NAME_STR();
}

TEST_F(AdminCliTest, TestSetupUniverseReplication) {
Expand Down
35 changes: 33 additions & 2 deletions ent/src/yb/tools/yb-admin_client_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "yb/tools/yb-admin_util.h"
#include "yb/util/cast.h"
#include "yb/util/env.h"
#include "yb/util/flag_tags.h"
#include "yb/util/jsonwriter.h"
#include "yb/util/monotime.h"
#include "yb/util/pb_util.h"
Expand All @@ -42,6 +43,9 @@
#include "yb/util/timestamp.h"
#include "yb/util/encryption_util.h"

DEFINE_test_flag(int32, metadata_file_format_version, 0,
"Used in 'export_snapshot' metadata file format (0 means using latest format).");

DECLARE_bool(use_client_to_server_encryption);
DECLARE_int32(yb_client_admin_operation_timeout_sec);

Expand Down Expand Up @@ -87,6 +91,7 @@ using master::RestoreSnapshotResponsePB;
using master::SnapshotInfoPB;
using master::SysNamespaceEntryPB;
using master::SysRowEntry;
using master::BackupRowEntryPB;
using master::SysTablesEntryPB;
using master::SysSnapshotEntryPB;

Expand Down Expand Up @@ -588,6 +593,13 @@ Status ClusterAdminClient::CreateSnapshotMetaFile(const string& snapshot_id,
RETURN_NOT_OK(RequestMasterLeader(&resp, [&](RpcController* rpc) {
ListSnapshotsRequestPB req;
req.set_snapshot_id(StringToSnapshotId(snapshot_id));

// Format 0 - latest format (== Format 2 at the moment).
// Format 1 - old format.
// Format 2 - new format.
if (FLAGS_TEST_metadata_file_format_version != 1) {
req.set_prepare_for_backup(true);
}
return master_backup_proxy_->ListSnapshots(req, &resp, rpc);
}));

Expand Down Expand Up @@ -615,7 +627,7 @@ Status ClusterAdminClient::CreateSnapshotMetaFile(const string& snapshot_id,
RETURN_NOT_OK(pb_util::WritePBContainerToPath(
Env::Default(), file_name, *snapshot, pb_util::OVERWRITE, pb_util::SYNC));

cout << "Snapshot meta data was saved into file: " << file_name << endl;
cout << "Snapshot metadata was saved into file: " << file_name << endl;
return Status::OK();
}

Expand All @@ -631,12 +643,31 @@ Status ClusterAdminClient::ImportSnapshotMetaFile(const string& file_name,
// Read snapshot protobuf from given path.
RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(Env::Default(), file_name, snapshot_info));

if (!snapshot_info->has_format_version()) {
SCHECK_EQ(
0, snapshot_info->backup_entries_size(), InvalidArgument,
Format("Metadata file in Format 1 has backup entries from Format 2: $0",
snapshot_info->backup_entries_size()));

// Repack PB data loaded in the old format.
// Init BackupSnapshotPB based on loaded SnapshotInfoPB.
SysSnapshotEntryPB& sys_entry = *snapshot_info->mutable_entry();
snapshot_info->mutable_backup_entries()->Reserve(sys_entry.entries_size());
for (SysRowEntry& entry : *sys_entry.mutable_entries()) {
snapshot_info->add_backup_entries()->mutable_entry()->Swap(&entry);
}

sys_entry.clear_entries();
snapshot_info->set_format_version(2);
}

cout << "Importing snapshot " << SnapshotIdToString(snapshot_info->id())
<< " (" << snapshot_info->entry().state() << ")" << endl;

int table_index = 0;
bool was_table_renamed = false;
for (SysRowEntry& entry : *snapshot_info->mutable_entry()->mutable_entries()) {
for (BackupRowEntryPB& backup_entry : *snapshot_info->mutable_backup_entries()) {
SysRowEntry& entry = *backup_entry.mutable_entry();
const YBTableName table_name = table_index < tables.size()
? tables[table_index] : YBTableName();

Expand Down
Loading

0 comments on commit 4c92727

Please sign in to comment.