Skip to content

Commit

Permalink
[#7126 #7135] PITR: Restore table schema
Browse files Browse the repository at this point in the history
Summary:
This diff adds logic to restore table schema. After this, we should be able to undo an ALTER TABLE operation!

There are two important changes as part of this diff.
1) Restoring master side sys_catalog metadata.
2) Sending the restored version of the schema from the master to the TS, as part of the explicit command to restore the TS.

As part of applying the restore operation on the master, we add new state tracking, which can do the diff between current sys_catalog state vs the state at the time at which we want to restore. This is done by restoring the corresponding sys_catalog snapshot into a temporary directory, with the HybridTime filter applied, for the restore_at time. We then load the relevant TABLE and TABLET data into memory and overwrite the existing rocksdb data directly in memory. This is safe to do because
- It is done as part of the apply step of a raft operation, so it is already persisted and will be replayed accordingly at bootstrap, in case of a restart.
- It is done on both leader and follower.

Once the master state is rolled back, we then run the TS side of the restore operation. The master now sends over the restored schema information, as part of the Restore request. On the TS side, we update our tablet schema information on disk accordingly.

Note: In between the master state being rolled back and all the TS processing their respective restores, there is a time window in which the master can receive heartbeats from a TS, with newer schema information than what the master has persisted. Currently, that seems to only lead to some log spew, but will be investigated later, as part of fault tolerance testing.

Test Plan: ybd --gtest_filter SnapshotScheduleTest.RestoreSchema

Reviewers: amitanand, bogdan

Reviewed By: bogdan

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D11013
  • Loading branch information
spolitov committed Apr 1, 2021
1 parent d4f88e4 commit 331c251
Show file tree
Hide file tree
Showing 34 changed files with 555 additions and 101 deletions.
1 change: 1 addition & 0 deletions ent/src/yb/master/CMakeLists-include.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ set(MASTER_SRCS_EXTENSIONS
${YB_ENT_CURRENT_SOURCE_DIR}/catalog_entity_info.cc
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_rpc_tasks.cc
${YB_ENT_CURRENT_SOURCE_DIR}/cdc_consumer_registry_service.cc
${YB_ENT_CURRENT_SOURCE_DIR}/restore_sys_catalog_state.cc
PARENT_SCOPE)

set(MASTER_ADDITIONAL_TESTS
Expand Down
5 changes: 5 additions & 0 deletions ent/src/yb/master/async_snapshot_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ bool AsyncTabletSnapshotOp::SendRequest(int attempt) {
if (snapshot_hybrid_time_) {
req.set_snapshot_hybrid_time(snapshot_hybrid_time_.ToUint64());
}
if (has_metadata_) {
req.set_schema_version(schema_version_);
*req.mutable_schema() = schema_;
*req.mutable_indexes() = indexes_;
}
req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64());

ts_backup_proxy_->TabletSnapshotOpAsync(req, &resp_, &rpc_, BindRpcCallback());
Expand Down
12 changes: 12 additions & 0 deletions ent/src/yb/master/async_snapshot_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ class AsyncTabletSnapshotOp : public enterprise::RetryingTSRpcTask {
snapshot_hybrid_time_ = value;
}

void SetMetadata(uint32_t schema_version, const SchemaPB& schema,
const google::protobuf::RepeatedPtrField<IndexInfoPB>& indexes) {
has_metadata_ = true;
schema_version_ = schema_version;
schema_ = schema;
indexes_ = indexes;
}

void SetCallback(TabletSnapshotOperationCallback callback) {
callback_ = std::move(callback);
}
Expand All @@ -67,6 +75,10 @@ class AsyncTabletSnapshotOp : public enterprise::RetryingTSRpcTask {
HybridTime snapshot_hybrid_time_;
tserver::TabletSnapshotOpResponsePB resp_;
TabletSnapshotOperationCallback callback_;
bool has_metadata_ = false;
uint32_t schema_version_;
SchemaPB schema_;
google::protobuf::RepeatedPtrField<IndexInfoPB> indexes_;
};

} // namespace master
Expand Down
7 changes: 7 additions & 0 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon
void SendRestoreTabletSnapshotRequest(const scoped_refptr<TabletInfo>& tablet,
const std::string& snapshot_id,
HybridTime restore_at,
SendMetadata send_metadata,
TabletSnapshotOperationCallback callback) override;

void SendDeleteTabletSnapshotRequest(const scoped_refptr<TabletInfo>& tablet,
Expand All @@ -268,6 +269,10 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

CHECKED_STATUS CreateSysCatalogSnapshot(const tablet::CreateSnapshotData& data) override;

CHECKED_STATUS RestoreSysCatalog(
const TxnSnapshotId& snapshot_id, HybridTime restore_at, const OpId& op_id,
HybridTime write_time, const SnapshotScheduleFilterPB& filter) override;

rpc::Scheduler& Scheduler() override;

bool IsLeader() override;
Expand Down Expand Up @@ -354,6 +359,8 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

void Started() override;

void SysCatalogLoaded(int64_t term) override;

// Snapshot map: snapshot-id -> SnapshotInfo.
typedef std::unordered_map<SnapshotId, scoped_refptr<SnapshotInfo>> SnapshotInfoMap;
SnapshotInfoMap non_txn_snapshot_ids_map_;
Expand Down
86 changes: 85 additions & 1 deletion ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
#include "yb/common/ql_name.h"
#include "yb/common/wire_protocol.h"
#include "yb/consensus/consensus.h"

#include "yb/docdb/consensus_frontier.h"
#include "yb/docdb/cql_operation.h"
#include "yb/docdb/doc_rowwise_iterator.h"

#include "yb/gutil/bind.h"
#include "yb/gutil/strings/join.h"
#include "yb/gutil/strings/substitute.h"
Expand All @@ -44,6 +49,7 @@
#include "yb/master/async_snapshot_tasks.h"
#include "yb/master/async_rpc_tasks.h"
#include "yb/master/encryption_manager.h"
#include "yb/master/restore_sys_catalog_state.h"

#include "yb/rpc/messenger.h"

Expand Down Expand Up @@ -601,7 +607,8 @@ Status CatalogManager::RestoreEntry(const SysRowEntry& entry, const SnapshotId&
LOG(INFO) << "Sending RestoreTabletSnapshot to tablet: " << tablet->ToString();
// Send RestoreSnapshot requests to all TServers (one tablet - one request).
SendRestoreTabletSnapshotRequest(
tablet, snapshot_id, HybridTime(), TabletSnapshotOperationCallback());
tablet, snapshot_id, HybridTime(), SendMetadata::kFalse,
TabletSnapshotOperationCallback());
}
break;
}
Expand Down Expand Up @@ -1361,13 +1368,19 @@ void CatalogManager::SendRestoreTabletSnapshotRequest(
const scoped_refptr<TabletInfo>& tablet,
const string& snapshot_id,
HybridTime restore_at,
SendMetadata send_metadata,
TabletSnapshotOperationCallback callback) {
auto call = std::make_shared<AsyncTabletSnapshotOp>(
master_, AsyncTaskPool(), tablet, snapshot_id,
tserver::TabletSnapshotOpRequestPB::RESTORE_ON_TABLET);
if (restore_at) {
call->SetSnapshotHybridTime(restore_at);
}
if (send_metadata) {
auto lock = tablet->table()->LockForRead();
const auto& pb = lock->data().pb;
call->SetMetadata(pb.version(), pb.schema(), pb.indexes());
}
call->SetCallback(std::move(callback));
tablet->table()->AddTask(call);
WARN_NOT_OK(ScheduleTask(call), "Failed to send restore snapshot request");
Expand All @@ -1388,6 +1401,73 @@ Status CatalogManager::CreateSysCatalogSnapshot(const tablet::CreateSnapshotData
return tablet_peer()->tablet()->snapshots().Create(data);
}

Status CatalogManager::RestoreSysCatalog(
const TxnSnapshotId& snapshot_id, HybridTime restore_at, const OpId& op_id,
HybridTime write_time, const SnapshotScheduleFilterPB& filter) {
auto& tablet = *tablet_peer()->tablet();
auto dir = VERIFY_RESULT(tablet.snapshots().RestoreToTemporary(
snapshot_id, restore_at));
rocksdb::Options rocksdb_options;
std::string log_prefix = LogPrefix();
// Remove ": " to patch suffix.
log_prefix.erase(log_prefix.size() - 2);
tablet.InitRocksDBOptions(&rocksdb_options, log_prefix + " [TMP]: ");
auto db = VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, dir));

const auto& schema = this->schema();
auto doc_db = docdb::DocDB::FromRegularUnbounded(db.get());

RestoreSysCatalogState state;
{
auto iter = std::make_unique<docdb::DocRowwiseIterator>(
schema, schema, boost::none, doc_db, CoarseTimePoint::max(),
ReadHybridTime::SingleTime(restore_at), nullptr);
RETURN_NOT_OK(EnumerateSysCatalog(
iter.get(), schema, SysRowEntry::TABLE,
std::bind(&RestoreSysCatalogState::LoadTable, &state, _1, _2)));
}
{
auto iter = std::make_unique<docdb::DocRowwiseIterator>(
schema, schema, boost::none, doc_db, CoarseTimePoint::max(),
ReadHybridTime::SingleTime(restore_at), nullptr);
RETURN_NOT_OK(EnumerateSysCatalog(
iter.get(), schema, SysRowEntry::TABLET,
std::bind(&RestoreSysCatalogState::LoadTablet, &state, _1, _2)));
}
auto entries = VERIFY_RESULT(state.FilterEntries(filter));

docdb::DocWriteBatch write_batch(doc_db, docdb::InitMarkerBehavior::kOptional);
docdb::DocOperationApplyData apply_data{.doc_write_batch = &write_batch};
std::shared_ptr<const Schema> schema_ptr(&schema, [](const Schema* schema){});
for (const auto& entry : entries.entries()) {
QLWriteRequestPB write_request;
RETURN_NOT_OK(FillSysCatalogWriteRequest(
entry.type(), entry.id(), entry.data(), QLWriteRequestPB::QL_STMT_INSERT, schema,
&write_request));
docdb::QLWriteOperation operation(schema_ptr, IndexMap(), nullptr, boost::none);
QLResponsePB response;
RETURN_NOT_OK(operation.Init(&write_request, &response));
RETURN_NOT_OK(operation.Apply(apply_data));
}
docdb::KeyValueWriteBatchPB kv_write_batch;
write_batch.MoveToWriteBatchPB(&kv_write_batch);

rocksdb::WriteBatch rocksdb_write_batch;
PrepareNonTransactionWriteBatch(
kv_write_batch, write_time, nullptr, &rocksdb_write_batch, nullptr);
docdb::ConsensusFrontiers frontiers;
set_op_id(op_id, &frontiers);
set_hybrid_time(write_time, &frontiers);

tablet.WriteToRocksDB(
&frontiers, &rocksdb_write_batch, docdb::StorageDbType::kRegular);

// TODO(pitr) Handle master leader failover.
RETURN_NOT_OK(ElectedAsLeaderCb());

return Status::OK();
}

rpc::Scheduler& CatalogManager::Scheduler() {
return master_->messenger()->scheduler();
}
Expand Down Expand Up @@ -3344,6 +3424,10 @@ Result<SnapshotSchedulesToTabletsMap> CatalogManager::MakeSnapshotSchedulesToTab
return snapshot_coordinator_.MakeSnapshotSchedulesToTabletsMap();
}

void CatalogManager::SysCatalogLoaded(int64_t term) {
return snapshot_coordinator_.SysCatalogLoaded(term);
}

} // namespace enterprise
} // namespace master
} // namespace yb
103 changes: 103 additions & 0 deletions ent/src/yb/master/restore_sys_catalog_state.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/master/restore_sys_catalog_state.h"

#include "yb/master/master.pb.h"
#include "yb/master/master_backup.pb.h"

#include "yb/util/pb_util.h"

namespace yb {
namespace master {

// Utility class to restore sys catalog.
// Initially we load tables and tablets into it, then match schedule filter.
CHECKED_STATUS RestoreSysCatalogState::LoadTable(const Slice& id, const Slice& data) {
tables_.emplace(id.ToBuffer(), VERIFY_RESULT(pb_util::ParseFromSlice<SysTablesEntryPB>(data)));
return Status::OK();
}

CHECKED_STATUS RestoreSysCatalogState::LoadTablet(const Slice& id, const Slice& data) {
tablets_.emplace(
id.ToBuffer(), VERIFY_RESULT(pb_util::ParseFromSlice<SysTabletsEntryPB>(data)));
return Status::OK();
}

void AddEntry(
SysRowEntry::Type type, const std::string& id, const google::protobuf::MessageLite& pb,
SysRowEntries* out, faststring* buffer) {
auto& entry = *out->mutable_entries()->Add();
entry.set_type(type);
entry.set_id(id);
pb_util::SerializeToString(pb, buffer);
entry.set_data(buffer->data(), buffer->size());
}

Result<SysRowEntries> RestoreSysCatalogState::FilterEntries(
const SnapshotScheduleFilterPB& filter) {
SysRowEntries result;
std::unordered_set<TableId> restored_tables;
faststring buffer;
for (const auto& id_and_metadata : tables_) {
bool match;
if (id_and_metadata.second.has_index_info()) {
auto it = tables_.find(id_and_metadata.second.index_info().indexed_table_id());
if (it == tables_.end()) {
return STATUS_FORMAT(
NotFound, "Indexed table $0 not found for index $1 ($2)",
id_and_metadata.second.index_info().indexed_table_id(), id_and_metadata.first,
id_and_metadata.second.name());
}
match = VERIFY_RESULT(MatchTable(filter, it->first, it->second));
} else {
match = VERIFY_RESULT(MatchTable(filter, id_and_metadata.first, id_and_metadata.second));
}
if (!match) {
continue;
}
AddEntry(SysRowEntry::TABLE, id_and_metadata.first, id_and_metadata.second, &result, &buffer);
restored_tables.insert(id_and_metadata.first);
VLOG(2) << "Table to restore: " << id_and_metadata.first << ", "
<< id_and_metadata.second.ShortDebugString();
}
for (const auto& id_and_metadata : tablets_) {
if (restored_tables.count(id_and_metadata.second.table_id()) == 0) {
continue;
}
AddEntry(SysRowEntry::TABLET, id_and_metadata.first, id_and_metadata.second, &result, &buffer);
VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", "
<< id_and_metadata.second.ShortDebugString();
}
return result;
}

Result<bool> RestoreSysCatalogState::MatchTable(
const SnapshotScheduleFilterPB& filter, const TableId& id, const SysTablesEntryPB& table) {
VLOG(1) << __func__ << "(" << filter.ShortDebugString() << ", " << id << ", "
<< table.ShortDebugString() << ")";
for (const auto& table_identifier : filter.tables().tables()) {
if (table_identifier.has_table_id()) {
return id == table_identifier.table_id();
}
if (table_identifier.has_table_name()) {
return STATUS(NotSupported, "Table name filters are not implemented for PITR");
}
return STATUS_FORMAT(
InvalidArgument, "Wrong table identifier format: $0", table_identifier);
}
return false;
}

} // namespace master
} // namespace yb
48 changes: 48 additions & 0 deletions ent/src/yb/master/restore_sys_catalog_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#ifndef ENT_SRC_YB_MASTER_RESTORE_SYS_CATALOG_STATE_H
#define ENT_SRC_YB_MASTER_RESTORE_SYS_CATALOG_STATE_H

#include <unordered_map>

#include "yb/common/entity_ids.h"

#include "yb/master/master_fwd.h"
#include "yb/master/master.pb.h"

#include "yb/util/result.h"

namespace yb {
namespace master {

// Utility class to restore sys catalog.
// Initially we load tables and tablets into it, then match schedule filter.
class RestoreSysCatalogState {
public:
CHECKED_STATUS LoadTable(const Slice& id, const Slice& data);
CHECKED_STATUS LoadTablet(const Slice& id, const Slice& data);
Result<SysRowEntries> FilterEntries(const SnapshotScheduleFilterPB& filter);

private:
Result<bool> MatchTable(
const SnapshotScheduleFilterPB& filter, const TableId& id, const SysTablesEntryPB& table);

std::unordered_map<TableId, SysTablesEntryPB> tables_;
std::unordered_map<TabletId, SysTabletsEntryPB> tablets_;
};

} // namespace master
} // namespace yb

#endif // ENT_SRC_YB_MASTER_RESTORE_SYS_CATALOG_STATE_H
8 changes: 2 additions & 6 deletions ent/src/yb/master/universe_key_registry_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ CHECKED_STATUS RotateUniverseKey(const Slice& old_universe_key,
Slice registry_for_flush;
string encrypted;
if (!enable) {
if (!pb_util::SerializeToString(universe_key_registry, &encoded)) {
return STATUS(InvalidArgument, "Registry could not be encoded.");
}
pb_util::SerializeToString(universe_key_registry, &encoded);
registry_for_flush = Slice(encoded);
} else {
LOG_IF(DFATAL, new_universe_key.empty());
Expand All @@ -82,9 +80,7 @@ CHECKED_STATUS RotateUniverseKey(const Slice& old_universe_key,
params->ToEncryptionParamsPB(&params_pb);
(*universe_key_registry.mutable_universe_keys())[new_key_version_id] = params_pb;
universe_key_registry.set_latest_version_id(new_key_version_id);
if (!pb_util::SerializeToString(universe_key_registry, &encoded)) {
return STATUS(InvalidArgument, "Registry could not be encoded.");
}
pb_util::SerializeToString(universe_key_registry, &encoded);

encrypted = VERIFY_RESULT(EncryptUniverseKeyRegistry(Slice(encoded), new_universe_key));
registry_for_flush = Slice(encrypted);
Expand Down
Loading

0 comments on commit 331c251

Please sign in to comment.