diff --git a/ent/src/yb/master/CMakeLists-include.txt b/ent/src/yb/master/CMakeLists-include.txt index d521913305e3..c4a2ccaa3c7f 100644 --- a/ent/src/yb/master/CMakeLists-include.txt +++ b/ent/src/yb/master/CMakeLists-include.txt @@ -55,6 +55,7 @@ set(MASTER_SRCS_EXTENSIONS ${YB_ENT_CURRENT_SOURCE_DIR}/catalog_entity_info.cc ${YB_ENT_CURRENT_SOURCE_DIR}/cdc_rpc_tasks.cc ${YB_ENT_CURRENT_SOURCE_DIR}/cdc_consumer_registry_service.cc + ${YB_ENT_CURRENT_SOURCE_DIR}/restore_sys_catalog_state.cc PARENT_SCOPE) set(MASTER_ADDITIONAL_TESTS diff --git a/ent/src/yb/master/async_snapshot_tasks.cc b/ent/src/yb/master/async_snapshot_tasks.cc index 58beeba95dff..104beebae028 100644 --- a/ent/src/yb/master/async_snapshot_tasks.cc +++ b/ent/src/yb/master/async_snapshot_tasks.cc @@ -153,6 +153,11 @@ bool AsyncTabletSnapshotOp::SendRequest(int attempt) { if (snapshot_hybrid_time_) { req.set_snapshot_hybrid_time(snapshot_hybrid_time_.ToUint64()); } + if (has_metadata_) { + req.set_schema_version(schema_version_); + *req.mutable_schema() = schema_; + *req.mutable_indexes() = indexes_; + } req.set_propagated_hybrid_time(master_->clock()->Now().ToUint64()); ts_backup_proxy_->TabletSnapshotOpAsync(req, &resp_, &rpc_, BindRpcCallback()); diff --git a/ent/src/yb/master/async_snapshot_tasks.h b/ent/src/yb/master/async_snapshot_tasks.h index 1baae1823c8b..e1cc75987b58 100644 --- a/ent/src/yb/master/async_snapshot_tasks.h +++ b/ent/src/yb/master/async_snapshot_tasks.h @@ -48,6 +48,14 @@ class AsyncTabletSnapshotOp : public enterprise::RetryingTSRpcTask { snapshot_hybrid_time_ = value; } + void SetMetadata(uint32_t schema_version, const SchemaPB& schema, + const google::protobuf::RepeatedPtrField& indexes) { + has_metadata_ = true; + schema_version_ = schema_version; + schema_ = schema; + indexes_ = indexes; + } + void SetCallback(TabletSnapshotOperationCallback callback) { callback_ = std::move(callback); } @@ -67,6 +75,10 @@ class AsyncTabletSnapshotOp : public enterprise::RetryingTSRpcTask { HybridTime snapshot_hybrid_time_; tserver::TabletSnapshotOpResponsePB resp_; TabletSnapshotOperationCallback callback_; + bool has_metadata_ = false; + uint32_t schema_version_; + SchemaPB schema_; + google::protobuf::RepeatedPtrField indexes_; }; } // namespace master diff --git a/ent/src/yb/master/catalog_manager.h b/ent/src/yb/master/catalog_manager.h index 76d3bc82f33f..74c7bd840c27 100644 --- a/ent/src/yb/master/catalog_manager.h +++ b/ent/src/yb/master/catalog_manager.h @@ -260,6 +260,7 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon void SendRestoreTabletSnapshotRequest(const scoped_refptr& tablet, const std::string& snapshot_id, HybridTime restore_at, + SendMetadata send_metadata, TabletSnapshotOperationCallback callback) override; void SendDeleteTabletSnapshotRequest(const scoped_refptr& tablet, @@ -268,6 +269,10 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon CHECKED_STATUS CreateSysCatalogSnapshot(const tablet::CreateSnapshotData& data) override; + CHECKED_STATUS RestoreSysCatalog( + const TxnSnapshotId& snapshot_id, HybridTime restore_at, const OpId& op_id, + HybridTime write_time, const SnapshotScheduleFilterPB& filter) override; + rpc::Scheduler& Scheduler() override; bool IsLeader() override; @@ -354,6 +359,8 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon void Started() override; + void SysCatalogLoaded(int64_t term) override; + // Snapshot map: snapshot-id -> SnapshotInfo. typedef std::unordered_map> SnapshotInfoMap; SnapshotInfoMap non_txn_snapshot_ids_map_; diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index 0506a070af70..effdf10a61f9 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -34,6 +34,11 @@ #include "yb/common/ql_name.h" #include "yb/common/wire_protocol.h" #include "yb/consensus/consensus.h" + +#include "yb/docdb/consensus_frontier.h" +#include "yb/docdb/cql_operation.h" +#include "yb/docdb/doc_rowwise_iterator.h" + #include "yb/gutil/bind.h" #include "yb/gutil/strings/join.h" #include "yb/gutil/strings/substitute.h" @@ -44,6 +49,7 @@ #include "yb/master/async_snapshot_tasks.h" #include "yb/master/async_rpc_tasks.h" #include "yb/master/encryption_manager.h" +#include "yb/master/restore_sys_catalog_state.h" #include "yb/rpc/messenger.h" @@ -601,7 +607,8 @@ Status CatalogManager::RestoreEntry(const SysRowEntry& entry, const SnapshotId& LOG(INFO) << "Sending RestoreTabletSnapshot to tablet: " << tablet->ToString(); // Send RestoreSnapshot requests to all TServers (one tablet - one request). SendRestoreTabletSnapshotRequest( - tablet, snapshot_id, HybridTime(), TabletSnapshotOperationCallback()); + tablet, snapshot_id, HybridTime(), SendMetadata::kFalse, + TabletSnapshotOperationCallback()); } break; } @@ -1361,6 +1368,7 @@ void CatalogManager::SendRestoreTabletSnapshotRequest( const scoped_refptr& tablet, const string& snapshot_id, HybridTime restore_at, + SendMetadata send_metadata, TabletSnapshotOperationCallback callback) { auto call = std::make_shared( master_, AsyncTaskPool(), tablet, snapshot_id, @@ -1368,6 +1376,11 @@ void CatalogManager::SendRestoreTabletSnapshotRequest( if (restore_at) { call->SetSnapshotHybridTime(restore_at); } + if (send_metadata) { + auto lock = tablet->table()->LockForRead(); + const auto& pb = lock->data().pb; + call->SetMetadata(pb.version(), pb.schema(), pb.indexes()); + } call->SetCallback(std::move(callback)); tablet->table()->AddTask(call); WARN_NOT_OK(ScheduleTask(call), "Failed to send restore snapshot request"); @@ -1388,6 +1401,73 @@ Status CatalogManager::CreateSysCatalogSnapshot(const tablet::CreateSnapshotData return tablet_peer()->tablet()->snapshots().Create(data); } +Status CatalogManager::RestoreSysCatalog( + const TxnSnapshotId& snapshot_id, HybridTime restore_at, const OpId& op_id, + HybridTime write_time, const SnapshotScheduleFilterPB& filter) { + auto& tablet = *tablet_peer()->tablet(); + auto dir = VERIFY_RESULT(tablet.snapshots().RestoreToTemporary( + snapshot_id, restore_at)); + rocksdb::Options rocksdb_options; + std::string log_prefix = LogPrefix(); + // Remove ": " to patch suffix. + log_prefix.erase(log_prefix.size() - 2); + tablet.InitRocksDBOptions(&rocksdb_options, log_prefix + " [TMP]: "); + auto db = VERIFY_RESULT(rocksdb::DB::Open(rocksdb_options, dir)); + + const auto& schema = this->schema(); + auto doc_db = docdb::DocDB::FromRegularUnbounded(db.get()); + + RestoreSysCatalogState state; + { + auto iter = std::make_unique( + schema, schema, boost::none, doc_db, CoarseTimePoint::max(), + ReadHybridTime::SingleTime(restore_at), nullptr); + RETURN_NOT_OK(EnumerateSysCatalog( + iter.get(), schema, SysRowEntry::TABLE, + std::bind(&RestoreSysCatalogState::LoadTable, &state, _1, _2))); + } + { + auto iter = std::make_unique( + schema, schema, boost::none, doc_db, CoarseTimePoint::max(), + ReadHybridTime::SingleTime(restore_at), nullptr); + RETURN_NOT_OK(EnumerateSysCatalog( + iter.get(), schema, SysRowEntry::TABLET, + std::bind(&RestoreSysCatalogState::LoadTablet, &state, _1, _2))); + } + auto entries = VERIFY_RESULT(state.FilterEntries(filter)); + + docdb::DocWriteBatch write_batch(doc_db, docdb::InitMarkerBehavior::kOptional); + docdb::DocOperationApplyData apply_data{.doc_write_batch = &write_batch}; + std::shared_ptr schema_ptr(&schema, [](const Schema* schema){}); + for (const auto& entry : entries.entries()) { + QLWriteRequestPB write_request; + RETURN_NOT_OK(FillSysCatalogWriteRequest( + entry.type(), entry.id(), entry.data(), QLWriteRequestPB::QL_STMT_INSERT, schema, + &write_request)); + docdb::QLWriteOperation operation(schema_ptr, IndexMap(), nullptr, boost::none); + QLResponsePB response; + RETURN_NOT_OK(operation.Init(&write_request, &response)); + RETURN_NOT_OK(operation.Apply(apply_data)); + } + docdb::KeyValueWriteBatchPB kv_write_batch; + write_batch.MoveToWriteBatchPB(&kv_write_batch); + + rocksdb::WriteBatch rocksdb_write_batch; + PrepareNonTransactionWriteBatch( + kv_write_batch, write_time, nullptr, &rocksdb_write_batch, nullptr); + docdb::ConsensusFrontiers frontiers; + set_op_id(op_id, &frontiers); + set_hybrid_time(write_time, &frontiers); + + tablet.WriteToRocksDB( + &frontiers, &rocksdb_write_batch, docdb::StorageDbType::kRegular); + + // TODO(pitr) Handle master leader failover. + RETURN_NOT_OK(ElectedAsLeaderCb()); + + return Status::OK(); +} + rpc::Scheduler& CatalogManager::Scheduler() { return master_->messenger()->scheduler(); } @@ -3344,6 +3424,10 @@ Result CatalogManager::MakeSnapshotSchedulesToTab return snapshot_coordinator_.MakeSnapshotSchedulesToTabletsMap(); } +void CatalogManager::SysCatalogLoaded(int64_t term) { + return snapshot_coordinator_.SysCatalogLoaded(term); +} + } // namespace enterprise } // namespace master } // namespace yb diff --git a/ent/src/yb/master/restore_sys_catalog_state.cc b/ent/src/yb/master/restore_sys_catalog_state.cc new file mode 100644 index 000000000000..7c241fa6d56c --- /dev/null +++ b/ent/src/yb/master/restore_sys_catalog_state.cc @@ -0,0 +1,103 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/master/restore_sys_catalog_state.h" + +#include "yb/master/master.pb.h" +#include "yb/master/master_backup.pb.h" + +#include "yb/util/pb_util.h" + +namespace yb { +namespace master { + +// Utility class to restore sys catalog. +// Initially we load tables and tablets into it, then match schedule filter. +CHECKED_STATUS RestoreSysCatalogState::LoadTable(const Slice& id, const Slice& data) { + tables_.emplace(id.ToBuffer(), VERIFY_RESULT(pb_util::ParseFromSlice(data))); + return Status::OK(); +} + +CHECKED_STATUS RestoreSysCatalogState::LoadTablet(const Slice& id, const Slice& data) { + tablets_.emplace( + id.ToBuffer(), VERIFY_RESULT(pb_util::ParseFromSlice(data))); + return Status::OK(); +} + +void AddEntry( + SysRowEntry::Type type, const std::string& id, const google::protobuf::MessageLite& pb, + SysRowEntries* out, faststring* buffer) { + auto& entry = *out->mutable_entries()->Add(); + entry.set_type(type); + entry.set_id(id); + pb_util::SerializeToString(pb, buffer); + entry.set_data(buffer->data(), buffer->size()); +} + +Result RestoreSysCatalogState::FilterEntries( + const SnapshotScheduleFilterPB& filter) { + SysRowEntries result; + std::unordered_set restored_tables; + faststring buffer; + for (const auto& id_and_metadata : tables_) { + bool match; + if (id_and_metadata.second.has_index_info()) { + auto it = tables_.find(id_and_metadata.second.index_info().indexed_table_id()); + if (it == tables_.end()) { + return STATUS_FORMAT( + NotFound, "Indexed table $0 not found for index $1 ($2)", + id_and_metadata.second.index_info().indexed_table_id(), id_and_metadata.first, + id_and_metadata.second.name()); + } + match = VERIFY_RESULT(MatchTable(filter, it->first, it->second)); + } else { + match = VERIFY_RESULT(MatchTable(filter, id_and_metadata.first, id_and_metadata.second)); + } + if (!match) { + continue; + } + AddEntry(SysRowEntry::TABLE, id_and_metadata.first, id_and_metadata.second, &result, &buffer); + restored_tables.insert(id_and_metadata.first); + VLOG(2) << "Table to restore: " << id_and_metadata.first << ", " + << id_and_metadata.second.ShortDebugString(); + } + for (const auto& id_and_metadata : tablets_) { + if (restored_tables.count(id_and_metadata.second.table_id()) == 0) { + continue; + } + AddEntry(SysRowEntry::TABLET, id_and_metadata.first, id_and_metadata.second, &result, &buffer); + VLOG(2) << "Tablet to restore: " << id_and_metadata.first << ", " + << id_and_metadata.second.ShortDebugString(); + } + return result; +} + +Result RestoreSysCatalogState::MatchTable( + const SnapshotScheduleFilterPB& filter, const TableId& id, const SysTablesEntryPB& table) { + VLOG(1) << __func__ << "(" << filter.ShortDebugString() << ", " << id << ", " + << table.ShortDebugString() << ")"; + for (const auto& table_identifier : filter.tables().tables()) { + if (table_identifier.has_table_id()) { + return id == table_identifier.table_id(); + } + if (table_identifier.has_table_name()) { + return STATUS(NotSupported, "Table name filters are not implemented for PITR"); + } + return STATUS_FORMAT( + InvalidArgument, "Wrong table identifier format: $0", table_identifier); + } + return false; +} + +} // namespace master +} // namespace yb diff --git a/ent/src/yb/master/restore_sys_catalog_state.h b/ent/src/yb/master/restore_sys_catalog_state.h new file mode 100644 index 000000000000..82822ae2b27d --- /dev/null +++ b/ent/src/yb/master/restore_sys_catalog_state.h @@ -0,0 +1,48 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef ENT_SRC_YB_MASTER_RESTORE_SYS_CATALOG_STATE_H +#define ENT_SRC_YB_MASTER_RESTORE_SYS_CATALOG_STATE_H + +#include + +#include "yb/common/entity_ids.h" + +#include "yb/master/master_fwd.h" +#include "yb/master/master.pb.h" + +#include "yb/util/result.h" + +namespace yb { +namespace master { + +// Utility class to restore sys catalog. +// Initially we load tables and tablets into it, then match schedule filter. +class RestoreSysCatalogState { + public: + CHECKED_STATUS LoadTable(const Slice& id, const Slice& data); + CHECKED_STATUS LoadTablet(const Slice& id, const Slice& data); + Result FilterEntries(const SnapshotScheduleFilterPB& filter); + + private: + Result MatchTable( + const SnapshotScheduleFilterPB& filter, const TableId& id, const SysTablesEntryPB& table); + + std::unordered_map tables_; + std::unordered_map tablets_; +}; + +} // namespace master +} // namespace yb + +#endif // ENT_SRC_YB_MASTER_RESTORE_SYS_CATALOG_STATE_H diff --git a/ent/src/yb/master/universe_key_registry_service.cc b/ent/src/yb/master/universe_key_registry_service.cc index a910cc4a8206..46e9820acbc5 100644 --- a/ent/src/yb/master/universe_key_registry_service.cc +++ b/ent/src/yb/master/universe_key_registry_service.cc @@ -71,9 +71,7 @@ CHECKED_STATUS RotateUniverseKey(const Slice& old_universe_key, Slice registry_for_flush; string encrypted; if (!enable) { - if (!pb_util::SerializeToString(universe_key_registry, &encoded)) { - return STATUS(InvalidArgument, "Registry could not be encoded."); - } + pb_util::SerializeToString(universe_key_registry, &encoded); registry_for_flush = Slice(encoded); } else { LOG_IF(DFATAL, new_universe_key.empty()); @@ -82,9 +80,7 @@ CHECKED_STATUS RotateUniverseKey(const Slice& old_universe_key, params->ToEncryptionParamsPB(¶ms_pb); (*universe_key_registry.mutable_universe_keys())[new_key_version_id] = params_pb; universe_key_registry.set_latest_version_id(new_key_version_id); - if (!pb_util::SerializeToString(universe_key_registry, &encoded)) { - return STATUS(InvalidArgument, "Registry could not be encoded."); - } + pb_util::SerializeToString(universe_key_registry, &encoded); encrypted = VERIFY_RESULT(EncryptUniverseKeyRegistry(Slice(encoded), new_universe_key)); registry_for_flush = Slice(encrypted); diff --git a/src/yb/client/snapshot-schedule-test.cc b/src/yb/client/snapshot-schedule-test.cc index e3384c97aa9a..95c759a78aa4 100644 --- a/src/yb/client/snapshot-schedule-test.cc +++ b/src/yb/client/snapshot-schedule-test.cc @@ -23,6 +23,7 @@ #include "yb/tablet/tablet_peer.h" #include "yb/tablet/tablet_retention_policy.h" +#include "yb/yql/cql/ql/util/errcodes.h" #include "yb/yql/cql/ql/util/statement_result.h" using namespace std::literals; @@ -273,8 +274,12 @@ TEST_F(SnapshotScheduleTest, RestoreSchema) { ASSERT_OK(RestoreSnapshot(TryFullyDecodeTxnSnapshotId(snapshots[0].id()), hybrid_time)); - auto new_schema = ASSERT_RESULT(client_->GetYBTableInfo(table_.name())).schema; - // TODO ASSERT_EQ(old_schema, new_schema); + auto select_result = SelectRow(CreateSession(), 1, kValueColumn); + ASSERT_NOK(select_result); + ASSERT_TRUE(select_result.status().IsQLError()); + ASSERT_EQ(ql::QLError(select_result.status()), ql::ErrorCode::WRONG_METADATA_VERSION); + ASSERT_OK(table_.Reopen()); + ASSERT_EQ(old_schema, table_.schema()); ASSERT_NO_FATALS(VerifyData()); } diff --git a/src/yb/client/snapshot_test_base.cc b/src/yb/client/snapshot_test_base.cc index 791345bfe95d..211187872da0 100644 --- a/src/yb/client/snapshot_test_base.cc +++ b/src/yb/client/snapshot_test_base.cc @@ -145,14 +145,21 @@ Result SnapshotTestBase::StartRestoration( Result SnapshotTestBase::IsRestorationDone(const TxnSnapshotRestorationId& restoration_id) { master::ListSnapshotRestorationsRequestPB req; master::ListSnapshotRestorationsResponsePB resp; - - rpc::RpcController controller; - controller.set_timeout(60s); req.set_restoration_id(restoration_id.data(), restoration_id.size()); - RETURN_NOT_OK(MakeBackupServiceProxy().ListSnapshotRestorations(req, &resp, &controller)); - LOG(INFO) << "Restoration: " << resp.ShortDebugString(); - if (resp.has_status()) { - return StatusFromPB(resp.status()); + + auto deadline = CoarseMonoClock::now() + 60s; + for (;;) { + rpc::RpcController controller; + controller.set_deadline(deadline); + RETURN_NOT_OK(MakeBackupServiceProxy().ListSnapshotRestorations(req, &resp, &controller)); + LOG(INFO) << "Restoration: " << resp.ShortDebugString(); + if (!resp.has_status()) { + break; + } + auto status = StatusFromPB(resp.status()); + if (!status.IsServiceUnavailable()) { + return status; + } } if (resp.restorations().size() != 1) { return STATUS_FORMAT(RuntimeError, "Wrong number of restorations, one expected but $0 found", diff --git a/src/yb/consensus/log.cc b/src/yb/consensus/log.cc index d24d49867017..b7b34e9a0e3a 100644 --- a/src/yb/consensus/log.cc +++ b/src/yb/consensus/log.cc @@ -1550,10 +1550,7 @@ Status LogEntryBatch::Serialize() { total_size_bytes_ = entry_batch_pb_.ByteSize(); buffer_.reserve(total_size_bytes_); - if (!pb_util::AppendToString(entry_batch_pb_, &buffer_)) { - return STATUS(IOError, Substitute("unable to serialize the entry batch, contents: $1", - entry_batch_pb_.DebugString())); - } + pb_util::AppendToString(entry_batch_pb_, &buffer_); state_ = kEntrySerialized; return Status::OK(); diff --git a/src/yb/consensus/log_util.cc b/src/yb/consensus/log_util.cc index 9bd6288f1050..6b7b702dd2ad 100644 --- a/src/yb/consensus/log_util.cc +++ b/src/yb/consensus/log_util.cc @@ -807,9 +807,7 @@ Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB& new_head // Then Length-prefixed header. PutFixed32(&buf, new_header.ByteSize()); // Then Serialize the PB. - if (!pb_util::AppendToString(new_header, &buf)) { - return STATUS(Corruption, "unable to encode header"); - } + pb_util::AppendToString(new_header, &buf); RETURN_NOT_OK(writable_file()->Append(Slice(buf))); header_.CopyFrom(new_header); @@ -829,9 +827,7 @@ Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer) faststring buf; - if (!pb_util::AppendToString(footer, &buf)) { - return STATUS(Corruption, "unable to encode header"); - } + pb_util::AppendToString(footer, &buf); buf.append(kLogSegmentFooterMagicString); PutFixed32(&buf, footer.ByteSize()); diff --git a/src/yb/docdb/docdb_fwd.h b/src/yb/docdb/docdb_fwd.h index f35ab2597458..76dda4413be9 100644 --- a/src/yb/docdb/docdb_fwd.h +++ b/src/yb/docdb/docdb_fwd.h @@ -23,6 +23,7 @@ class ConsensusFrontier; class DeadlineInfo; class DocKey; class DocPath; +class DocRowwiseIterator; class DocWriteBatch; class IntentAwareIterator; class KeyBytes; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index d8ad0d88b1f1..8fc8b14b7e91 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -778,9 +778,12 @@ void CatalogManager::LoadSysCatalogDataTask() { } } - std::lock_guard l(state_lock_); - leader_ready_term_ = term; - LOG_WITH_PREFIX(INFO) << "Completed load of sys catalog in term " << term; + { + std::lock_guard l(state_lock_); + leader_ready_term_ = term; + LOG_WITH_PREFIX(INFO) << "Completed load of sys catalog in term " << term; + } + SysCatalogLoaded(term); } CHECKED_STATUS CatalogManager::WaitForWorkerPoolTests(const MonoDelta& timeout) const { diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 16cfe5d5b95c..2164e04ef56f 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -1228,6 +1228,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf { virtual void Started() {} + virtual void SysCatalogLoaded(int64_t term) {} + // Respect leader affinity with master sys catalog tablet by stepping down if we don't match // the cluster config affinity specification. CHECKED_STATUS SysCatalogRespectLeaderAffinity(); diff --git a/src/yb/master/master_fwd.h b/src/yb/master/master_fwd.h index 95af8b07d6a0..5a0f7d840735 100644 --- a/src/yb/master/master_fwd.h +++ b/src/yb/master/master_fwd.h @@ -46,9 +46,12 @@ class ListSnapshotSchedulesResponsePB; class ListSnapshotsResponsePB; class ReportedTabletPB; class SnapshotCoordinatorContext; +class SnapshotScheduleFilterPB; class SnapshotState; class SysRowEntries; class SysSnapshotEntryPB; +class SysTablesEntryPB; +class SysTabletsEntryPB; class TSHeartbeatRequestPB; class TSHeartbeatResponsePB; class TSRegistrationPB; diff --git a/src/yb/master/master_snapshot_coordinator.cc b/src/yb/master/master_snapshot_coordinator.cc index ac6a77497a0c..61f80e5219ac 100644 --- a/src/yb/master/master_snapshot_coordinator.cc +++ b/src/yb/master/master_snapshot_coordinator.cc @@ -56,6 +56,7 @@ namespace master { namespace { YB_DEFINE_ENUM(Bound, (kFirst)(kLast)); +YB_DEFINE_ENUM(RestorePhase, (kInitial)(kPostSysCatalogLoad)); void SubmitWrite( docdb::KeyValueWriteBatchPB&& write_batch, SnapshotCoordinatorContext* context, @@ -309,13 +310,28 @@ class MasterSnapshotCoordinator::Impl { int64_t leader_term, const tablet::SnapshotOperationState& state) { auto snapshot_id = VERIFY_RESULT(FullyDecodeTxnSnapshotId(state.request()->snapshot_id())); auto restore_at = HybridTime::FromPB(state.request()->snapshot_hybrid_time()); + SnapshotScheduleFilterPB filter; { std::lock_guard lock(mutex_); SnapshotState& snapshot = VERIFY_RESULT(FindSnapshot(snapshot_id)); - LOG(INFO) << "Restore sys catalog from " << snapshot.ToString() << " at " << restore_at; - // TODO implement restore + SnapshotScheduleState& schedule_state = VERIFY_RESULT( + FindSnapshotSchedule(snapshot.schedule_id())); + LOG(INFO) << "Restore sys catalog from snapshot: " << snapshot.ToString() << ", schedule: " + << schedule_state.ToString() << " at " << restore_at; + filter = schedule_state.options().filter(); + // Postpone restore on the leader. + if (leader_term != OpId::kUnknownTerm) { + postponed_restores_.push_back(PostponedRestore { + .term = leader_term, + .snapshot_id = snapshot_id, + .restore_at = restore_at, + .restoration_id = VERIFY_RESULT(FullyDecodeTxnSnapshotRestorationId( + state.request()->restoration_id())) + }); + } } - return Status::OK(); + return context_.RestoreSysCatalog( + snapshot_id, restore_at, OpId::FromPB(state.op_id()), state.hybrid_time(), filter); } CHECKED_STATUS ListRestorations( @@ -338,32 +354,7 @@ class MasterSnapshotCoordinator::Impl { Result Restore( const TxnSnapshotId& snapshot_id, HybridTime restore_at) { auto restoration_id = TxnSnapshotRestorationId::GenerateRandom(); - TabletInfos tablet_infos; - bool restore_sys_catalog; - { - std::lock_guard lock(mutex_); - SnapshotState& snapshot = VERIFY_RESULT(FindSnapshot(snapshot_id)); - if (!VERIFY_RESULT(snapshot.Complete())) { - return STATUS(IllegalState, "The snapshot state is not complete", snapshot_id.ToString(), - MasterError(MasterErrorPB::SNAPSHOT_IS_NOT_READY)); - } - restore_sys_catalog = !snapshot.schedule_id().IsNil(); - - auto restoration = std::make_unique(&context_, restoration_id, &snapshot); - tablet_infos = restoration->PrepareOperations(); - restorations_.emplace(std::move(restoration)); - } - - auto snapshot_id_str = snapshot_id.AsSlice().ToBuffer(); - for (const auto& tablet : tablet_infos) { - context_.SendRestoreTabletSnapshotRequest( - tablet, snapshot_id_str, restore_at, - MakeDoneCallback(&mutex_, restorations_, restoration_id, tablet->tablet_id())); - } - if (restore_sys_catalog) { - SubmitRestore(snapshot_id, restore_at, nullptr); - } - + RETURN_NOT_OK(DoRestore(snapshot_id, restore_at, restoration_id, RestorePhase::kInitial)); return restoration_id; } @@ -413,6 +404,33 @@ class MasterSnapshotCoordinator::Impl { return Status::OK(); } + void SysCatalogLoaded(int64_t term) { + if (term == OpId::kUnknownTerm) { + // Do nothing on follower. + return; + } + std::vector postponed_restores; + { + std::lock_guard lock(mutex_); + auto filter = [term, &postponed_restores](const PostponedRestore& postponed_restore) { + if (postponed_restore.term == term) { + postponed_restores.push_back(postponed_restore); + } + // TODO(pitr) cancel restorations + return postponed_restore.term <= term; + }; + postponed_restores_.erase( + std::remove_if(postponed_restores_.begin(), postponed_restores_.end(), filter), + postponed_restores_.end()); + } + for (const auto& postponed_restore : postponed_restores) { + WARN_NOT_OK( + DoRestore(postponed_restore.snapshot_id, postponed_restore.restore_at, + postponed_restore.restoration_id, RestorePhase::kPostSysCatalogLoad), + Format("Failed to restore tablets for restoration $0", postponed_restore.restoration_id)); + } + } + Result MakeSnapshotSchedulesToTabletsMap() { std::vector> schedules; { @@ -739,6 +757,7 @@ class MasterSnapshotCoordinator::Impl { } void SubmitRestore(const TxnSnapshotId& snapshot_id, HybridTime restore_at, + const TxnSnapshotRestorationId& restoration_id, const std::shared_ptr& synchronizer) { auto operation_state = std::make_unique(nullptr); auto request = operation_state->AllocateRequest(); @@ -746,6 +765,9 @@ class MasterSnapshotCoordinator::Impl { request->set_operation(tserver::TabletSnapshotOpRequestPB::RESTORE_SYS_CATALOG); request->set_snapshot_id(snapshot_id.data(), snapshot_id.size()); request->set_snapshot_hybrid_time(restore_at.ToUint64()); + if (restoration_id) { + request->set_restoration_id(restoration_id.data(), restoration_id.size()); + } operation_state->set_completion_callback(std::make_unique< tablet::WeakSynchronizerOperationCompletionCallback>(synchronizer)); @@ -829,6 +851,48 @@ class MasterSnapshotCoordinator::Impl { return context_.CollectEntries(filter.tables().tables(), true, true, true); } + CHECKED_STATUS DoRestore( + const TxnSnapshotId& snapshot_id, HybridTime restore_at, + const TxnSnapshotRestorationId& restoration_id, RestorePhase phase) { + TabletInfos tablet_infos; + bool restore_sys_catalog; + { + std::lock_guard lock(mutex_); + SnapshotState& snapshot = VERIFY_RESULT(FindSnapshot(snapshot_id)); + if (!VERIFY_RESULT(snapshot.Complete())) { + return STATUS(IllegalState, "The snapshot state is not complete", snapshot_id.ToString(), + MasterError(MasterErrorPB::SNAPSHOT_IS_NOT_READY)); + } + restore_sys_catalog = phase == RestorePhase::kInitial && !snapshot.schedule_id().IsNil(); + RestorationState* restoration_ptr; + if (phase == RestorePhase::kInitial) { + auto restoration = std::make_unique(&context_, restoration_id, &snapshot); + restoration_ptr = restorations_.emplace(std::move(restoration)).first->get(); + } else { + restoration_ptr = &VERIFY_RESULT(FindRestoration(restoration_id)).get(); + } + if (!restore_sys_catalog) { + tablet_infos = restoration_ptr->PrepareOperations(); + } + } + + // If sys catalog is restored, then tablets data will be restored after that using postponed + // restores. + if (restore_sys_catalog) { + SubmitRestore(snapshot_id, restore_at, restoration_id, nullptr); + } else { + auto snapshot_id_str = snapshot_id.AsSlice().ToBuffer(); + SendMetadata send_metadata(phase == RestorePhase::kPostSysCatalogLoad); + for (const auto& tablet : tablet_infos) { + context_.SendRestoreTabletSnapshotRequest( + tablet, snapshot_id_str, restore_at, send_metadata, + MakeDoneCallback(&mutex_, restorations_, restoration_id, tablet->tablet_id())); + } + } + + return Status::OK(); + } + SnapshotCoordinatorContext& context_; std::mutex mutex_; class ScheduleTag; @@ -879,6 +943,15 @@ class MasterSnapshotCoordinator::Impl { Restorations restorations_ GUARDED_BY(mutex_); Schedules schedules_ GUARDED_BY(mutex_); rpc::Poller poller_; + + // Restores postponed until sys catalog is reloaed. + struct PostponedRestore { + int64_t term = OpId::kUnknownTerm; + TxnSnapshotId snapshot_id = TxnSnapshotId::Nil(); + HybridTime restore_at; + TxnSnapshotRestorationId restoration_id = TxnSnapshotRestorationId::Nil(); + }; + std::vector postponed_restores_ GUARDED_BY(mutex_); }; MasterSnapshotCoordinator::MasterSnapshotCoordinator(SnapshotCoordinatorContext* context) @@ -962,5 +1035,9 @@ Result return impl_->MakeSnapshotSchedulesToTabletsMap(); } +void MasterSnapshotCoordinator::SysCatalogLoaded(int64_t term) { + impl_->SysCatalogLoaded(term); +} + } // namespace master } // namespace yb diff --git a/src/yb/master/master_snapshot_coordinator.h b/src/yb/master/master_snapshot_coordinator.h index b761c5827a1a..1afbbc00dd08 100644 --- a/src/yb/master/master_snapshot_coordinator.h +++ b/src/yb/master/master_snapshot_coordinator.h @@ -79,6 +79,8 @@ class MasterSnapshotCoordinator : public tablet::SnapshotCoordinator { CHECKED_STATUS FillHeartbeatResponse(TSHeartbeatResponsePB* resp); + void SysCatalogLoaded(int64_t term); + // For each returns map from schedule id to sorted vectors of tablets id in this schedule. Result MakeSnapshotSchedulesToTabletsMap(); diff --git a/src/yb/master/snapshot_coordinator_context.h b/src/yb/master/snapshot_coordinator_context.h index c684f9c1b01e..f3a5cf7565f4 100644 --- a/src/yb/master/snapshot_coordinator_context.h +++ b/src/yb/master/snapshot_coordinator_context.h @@ -39,6 +39,8 @@ namespace master { using TabletSnapshotOperationCallback = std::function)>; +YB_STRONGLY_TYPED_BOOL(SendMetadata); + // Context class for MasterSnapshotCoordinator. class SnapshotCoordinatorContext { public: @@ -54,7 +56,8 @@ class SnapshotCoordinatorContext { virtual void SendRestoreTabletSnapshotRequest( const scoped_refptr& tablet, const std::string& snapshot_id, - HybridTime restore_at, TabletSnapshotOperationCallback callback) = 0; + HybridTime restore_at, SendMetadata send_metadata, + TabletSnapshotOperationCallback callback) = 0; virtual void SendDeleteTabletSnapshotRequest( const scoped_refptr& tablet, const std::string& snapshot_id, @@ -67,6 +70,9 @@ class SnapshotCoordinatorContext { bool succeed_if_create_in_progress) = 0; virtual CHECKED_STATUS CreateSysCatalogSnapshot(const tablet::CreateSnapshotData& data) = 0; + virtual CHECKED_STATUS RestoreSysCatalog( + const TxnSnapshotId& snapshot_id, HybridTime restore_at, const OpId& op_id, + HybridTime write_time, const SnapshotScheduleFilterPB& filter) = 0; virtual const Schema& schema() = 0; diff --git a/src/yb/master/sys_catalog_writer.cc b/src/yb/master/sys_catalog_writer.cc index c4d01c249015..3c2bdda906a5 100644 --- a/src/yb/master/sys_catalog_writer.cc +++ b/src/yb/master/sys_catalog_writer.cc @@ -123,22 +123,13 @@ Status SysCatalogWriter::InsertPgsqlTableRow(const Schema& source_schema, } Status FillSysCatalogWriteRequest( - int8_t type, const std::string& item_id, const google::protobuf::Message& new_pb, + int8_t type, const std::string& item_id, const Slice& data, QLWriteRequestPB::QLStmtType op_type, const Schema& schema_with_ids, QLWriteRequestPB* req) { - req->set_type(op_type); - if (IsWrite(op_type)) { - faststring metadata_buf; - - if (!pb_util::SerializeToString(new_pb, &metadata_buf)) { - return STATUS_FORMAT( - Corruption, "Unable to serialize SysCatalog entry $0", item_id); - } - // Add the metadata column. QLColumnValuePB* metadata = req->add_column_values(); RETURN_NOT_OK(SetColumnId(schema_with_ids, kSysCatalogTableColMetadata, metadata)); - SetBinaryValue(metadata_buf, metadata->mutable_expr()); + SetBinaryValue(data, metadata->mutable_expr()); } // Add column type. @@ -150,18 +141,42 @@ Status FillSysCatalogWriteRequest( return Status::OK(); } +Status FillSysCatalogWriteRequest( + int8_t type, const std::string& item_id, const google::protobuf::Message& new_pb, + QLWriteRequestPB::QLStmtType op_type, const Schema& schema_with_ids, QLWriteRequestPB* req) { + req->set_type(op_type); + + if (IsWrite(op_type)) { + faststring metadata_buf; + + pb_util::SerializeToString(new_pb, &metadata_buf); + + return FillSysCatalogWriteRequest( + type, item_id, Slice(metadata_buf.data(), metadata_buf.size()), op_type, schema_with_ids, + req); + } + + return FillSysCatalogWriteRequest(type, item_id, Slice(), op_type, schema_with_ids, req); +} + Status EnumerateSysCatalog( tablet::Tablet* tablet, const Schema& schema, int8_t entry_type, - const std::function& callback) { + const EnumerationCallback& callback) { + auto iter = VERIFY_RESULT(tablet->NewRowIterator( + schema.CopyWithoutColumnIds(), boost::none, ReadHybridTime::Max(), /* table_id= */ "", + CoarseTimePoint::max(), tablet::AllowBootstrappingState::kTrue)); + + return EnumerateSysCatalog( + down_cast(iter.get()), schema, entry_type, callback); +} + +Status EnumerateSysCatalog( + docdb::DocRowwiseIterator* doc_iter, const Schema& schema, int8_t entry_type, + const EnumerationCallback& callback) { const int type_col_idx = VERIFY_RESULT(schema.ColumnIndexByName(kSysCatalogTableColType)); const int entry_id_col_idx = VERIFY_RESULT(schema.ColumnIndexByName(kSysCatalogTableColId)); const int metadata_col_idx = VERIFY_RESULT(schema.ColumnIndexByName(kSysCatalogTableColMetadata)); - auto iter = VERIFY_RESULT(tablet->NewRowIterator( - schema.CopyWithoutColumnIds(), boost::none, ReadHybridTime::Max(), /* table_id= */"", - CoarseTimePoint::max(), tablet::AllowBootstrappingState::kTrue)); - - auto doc_iter = down_cast(iter.get()); QLConditionPB cond; cond.set_op(QL_OP_AND); QLAddInt8Condition(&cond, schema.column_id(type_col_idx), QL_OP_EQUAL, entry_type); @@ -173,8 +188,8 @@ Status EnumerateSysCatalog( QLTableRow value_map; QLValue found_entry_type, entry_id, metadata; - while (VERIFY_RESULT(iter->HasNext())) { - RETURN_NOT_OK(iter->NextRow(&value_map)); + while (VERIFY_RESULT(doc_iter->HasNext())) { + RETURN_NOT_OK(doc_iter->NextRow(&value_map)); RETURN_NOT_OK(value_map.GetValue(schema.column_id(type_col_idx), &found_entry_type)); SCHECK_EQ(found_entry_type.int8_value(), entry_type, Corruption, "Found wrong entry type"); RETURN_NOT_OK(value_map.GetValue(schema.column_id(entry_id_col_idx), &entry_id)); diff --git a/src/yb/master/sys_catalog_writer.h b/src/yb/master/sys_catalog_writer.h index a298635d4375..0c5bc9185eed 100644 --- a/src/yb/master/sys_catalog_writer.h +++ b/src/yb/master/sys_catalog_writer.h @@ -17,6 +17,8 @@ #include "yb/common/common_fwd.h" #include "yb/common/entity_ids.h" +#include "yb/docdb/docdb_fwd.h" + #include "yb/master/catalog_entity_info.h" #include "yb/tablet/tablet_fwd.h" @@ -77,11 +79,20 @@ CHECKED_STATUS FillSysCatalogWriteRequest( int8_t type, const std::string& item_id, const google::protobuf::Message& new_pb, QLWriteRequestPB::QLStmtType op_type, const Schema& schema_with_ids, QLWriteRequestPB* req); +CHECKED_STATUS FillSysCatalogWriteRequest( + int8_t type, const std::string& item_id, const Slice& data, + QLWriteRequestPB::QLStmtType op_type, const Schema& schema_with_ids, QLWriteRequestPB* req); + +using EnumerationCallback = std::function; + // Enumerate sys catalog calling provided callback for all entries of the specified type in sys // catalog. CHECKED_STATUS EnumerateSysCatalog( tablet::Tablet* tablet, const Schema& schema, int8_t entry_type, - const std::function& callback); + const EnumerationCallback& callback); +CHECKED_STATUS EnumerateSysCatalog( + docdb::DocRowwiseIterator* doc_iter, const Schema& schema, int8_t entry_type, + const EnumerationCallback& callback); } // namespace master } // namespace yb diff --git a/src/yb/tablet/operations/snapshot_operation.cc b/src/yb/tablet/operations/snapshot_operation.cc index 4aa528b12230..072c020703d3 100644 --- a/src/yb/tablet/operations/snapshot_operation.cc +++ b/src/yb/tablet/operations/snapshot_operation.cc @@ -53,15 +53,7 @@ Result SnapshotOperationState::GetSnapshotDir() const { snapshot_id_str = request.snapshot_id(); } - return JoinPathSegments(VERIFY_RESULT(TopSnapshotsDir()), snapshot_id_str); -} - -Result SnapshotOperationState::TopSnapshotsDir() const { - const string top_snapshots_dir = tablet()->metadata()->snapshots_dir(); - RETURN_NOT_OK_PREPEND( - tablet()->metadata()->fs_manager()->CreateDirIfMissingAndSync(top_snapshots_dir), - Format("Unable to create snapshots directory $0", top_snapshots_dir)); - return top_snapshots_dir; + return JoinPathSegments(VERIFY_RESULT(tablet()->metadata()->TopSnapshotsDir()), snapshot_id_str); } Status SnapshotOperationState::DoCheckOperationRequirements() { diff --git a/src/yb/tablet/operations/snapshot_operation.h b/src/yb/tablet/operations/snapshot_operation.h index 89a0f7dbb8b2..25d98ed07e16 100644 --- a/src/yb/tablet/operations/snapshot_operation.h +++ b/src/yb/tablet/operations/snapshot_operation.h @@ -55,7 +55,6 @@ class SnapshotOperationState : bool CheckOperationRequirements(); private: - Result TopSnapshotsDir() const; CHECKED_STATUS DoCheckOperationRequirements(); DISALLOW_COPY_AND_ASSIGN(SnapshotOperationState); diff --git a/src/yb/tablet/tablet_component.cc b/src/yb/tablet/tablet_component.cc index 9b3adbbede2e..3092e4beb169 100644 --- a/src/yb/tablet/tablet_component.cc +++ b/src/yb/tablet/tablet_component.cc @@ -65,5 +65,9 @@ rocksdb::Env& TabletComponent::rocksdb_env() const { return tablet_.rocksdb_env(); } +void TabletComponent::ResetYBMetaDataCache() { + tablet_.ResetYBMetaDataCache(); +} + } // namespace tablet } // namespace yb diff --git a/src/yb/tablet/tablet_component.h b/src/yb/tablet/tablet_component.h index c0e584713f33..30655e804012 100644 --- a/src/yb/tablet/tablet_component.h +++ b/src/yb/tablet/tablet_component.h @@ -54,6 +54,8 @@ class TabletComponent { rocksdb::Env& rocksdb_env() const; + void ResetYBMetaDataCache(); + private: Tablet& tablet_; }; diff --git a/src/yb/tablet/tablet_metadata.cc b/src/yb/tablet/tablet_metadata.cc index a481b0fd3d41..fa6e301ce2b9 100644 --- a/src/yb/tablet/tablet_metadata.cc +++ b/src/yb/tablet/tablet_metadata.cc @@ -841,6 +841,13 @@ Result RaftGroupMetadata::CreateSubtabletMetadata( return metadata; } +Result RaftGroupMetadata::TopSnapshotsDir() const { + auto result = snapshots_dir(); + RETURN_NOT_OK_PREPEND( + fs_manager()->CreateDirIfMissingAndSync(result), + Format("Unable to create snapshots directory $0", result)); + return result; +} namespace { // MigrateSuperblockForDXXXX functions are only needed for backward compatibility with diff --git a/src/yb/tablet/tablet_metadata.h b/src/yb/tablet/tablet_metadata.h index fe5574489b4b..0daf21e86e07 100644 --- a/src/yb/tablet/tablet_metadata.h +++ b/src/yb/tablet/tablet_metadata.h @@ -471,6 +471,8 @@ class RaftGroupMetadata : public RefCountedThreadSafe { bool colocated() const { return colocated_; } + Result TopSnapshotsDir() const; + // Return standard "T xxx P yyy" log prefix. std::string LogPrefix() const; diff --git a/src/yb/tablet/tablet_snapshots.cc b/src/yb/tablet/tablet_snapshots.cc index 5c570e70a79e..38fbf93fd82d 100644 --- a/src/yb/tablet/tablet_snapshots.cc +++ b/src/yb/tablet/tablet_snapshots.cc @@ -42,6 +42,12 @@ const std::string kTempSnapshotDirSuffix = ".tmp"; } // namespace +struct TabletSnapshots::RestoreMetadata { + boost::optional schema; + boost::optional index_map; + uint32_t schema_version; +}; + TabletSnapshots::TabletSnapshots(Tablet* tablet) : TabletComponent(tablet) {} std::string TabletSnapshots::SnapshotsDirName(const std::string& rocksdb_dir) { @@ -203,14 +209,22 @@ Status TabletSnapshots::Restore(SnapshotOperationState* tx_state) { docdb::ConsensusFrontier frontier; frontier.set_op_id(tx_state->op_id()); frontier.set_hybrid_time(tx_state->hybrid_time()); - const Status s = RestoreCheckpoint(snapshot_dir, restore_at, frontier); + RestoreMetadata restore_metadata; + if (tx_state->request()->has_schema()) { + restore_metadata.schema.emplace(); + RETURN_NOT_OK(SchemaFromPB(tx_state->request()->schema(), restore_metadata.schema.get_ptr())); + restore_metadata.index_map.emplace(tx_state->request()->indexes()); + restore_metadata.schema_version = tx_state->request()->schema_version(); + } + const Status s = RestoreCheckpoint(snapshot_dir, restore_at, restore_metadata, frontier); VLOG_WITH_PREFIX(1) << "Complete checkpoint restoring with result " << s << " in folder: " << metadata().rocksdb_dir(); return s; } Status TabletSnapshots::RestoreCheckpoint( - const std::string& dir, HybridTime restore_at, const docdb::ConsensusFrontier& frontier) { + const std::string& dir, HybridTime restore_at, const RestoreMetadata& restore_metadata, + const docdb::ConsensusFrontier& frontier) { // The following two lines can't just be changed to RETURN_NOT_OK(PauseReadWriteOperations()): // op_pause has to stay in scope until the end of the function. auto op_pause = PauseReadWriteOperations(); @@ -249,6 +263,15 @@ Status TabletSnapshots::RestoreCheckpoint( } } + if (restore_metadata.schema) { + // TODO(pitr) check deleted columns + tablet().metadata()->SetSchema( + *restore_metadata.schema, *restore_metadata.index_map, {} /* deleted_columns */, + restore_metadata.schema_version); + RETURN_NOT_OK(tablet().metadata()->Flush()); + ResetYBMetaDataCache(); + } + // Reopen database from copied checkpoint. // Note: db_dir == metadata()->rocksdb_dir() is still valid db dir. s = OpenRocksDBs(); @@ -273,6 +296,27 @@ Status TabletSnapshots::RestoreCheckpoint( return Status::OK(); } +Result TabletSnapshots::RestoreToTemporary( + const TxnSnapshotId& snapshot_id, HybridTime restore_at) { + auto source_dir = JoinPathSegments( + VERIFY_RESULT(metadata().TopSnapshotsDir()), snapshot_id.ToString()); + auto dest_dir = source_dir + kTempSnapshotDirSuffix; + RETURN_NOT_OK(CleanupSnapshotDir(dest_dir)); + RETURN_NOT_OK(CopyDirectory( + &rocksdb_env(), source_dir, dest_dir, UseHardLinks::kTrue, CreateIfMissing::kTrue)); + + { + rocksdb::Options rocksdb_options; + tablet().InitRocksDBOptions(&rocksdb_options, LogPrefix()); + docdb::RocksDBPatcher patcher(dest_dir, rocksdb_options); + + RETURN_NOT_OK(patcher.Load()); + RETURN_NOT_OK(patcher.SetHybridTimeFilter(restore_at)); + } + + return dest_dir; +} + Status TabletSnapshots::Delete(SnapshotOperationState* tx_state) { const std::string top_snapshots_dir = metadata().snapshots_dir(); const auto& snapshot_id = tx_state->request()->snapshot_id(); diff --git a/src/yb/tablet/tablet_snapshots.h b/src/yb/tablet/tablet_snapshots.h index fe1c115421cf..10a95baefdbf 100644 --- a/src/yb/tablet/tablet_snapshots.h +++ b/src/yb/tablet/tablet_snapshots.h @@ -63,6 +63,8 @@ class TabletSnapshots : public TabletComponent { // Prepares the operation context for a snapshot operation. CHECKED_STATUS Prepare(SnapshotOperation* operation); + Result RestoreToTemporary(const TxnSnapshotId& snapshot_id, HybridTime restore_at); + //------------------------------------------------------------------------------------------------ // Create a RocksDB checkpoint in the provided directory. Only used when table_type_ == // YQL_TABLE_TYPE. @@ -83,10 +85,13 @@ class TabletSnapshots : public TabletComponent { static bool IsTempSnapshotDir(const std::string& dir); private: + struct RestoreMetadata; + // Restore the RocksDB checkpoint from the provided directory. // Only used when table_type_ == YQL_TABLE_TYPE. CHECKED_STATUS RestoreCheckpoint( - const std::string& dir, HybridTime restore_at, const docdb::ConsensusFrontier& frontier); + const std::string& dir, HybridTime restore_at, const RestoreMetadata& metadata, + const docdb::ConsensusFrontier& frontier); // Applies specified snapshot operation. CHECKED_STATUS Apply(SnapshotOperationState* tx_state); diff --git a/src/yb/tserver/backup.proto b/src/yb/tserver/backup.proto index 0f8d577464ba..8eee1c061510 100644 --- a/src/yb/tserver/backup.proto +++ b/src/yb/tserver/backup.proto @@ -18,6 +18,7 @@ package yb.tserver; option java_package = "org.yb.tserver"; import "google/protobuf/any.proto"; +import "yb/common/common.proto"; import "yb/tserver/tserver.proto"; service TabletServerBackupService { @@ -61,6 +62,12 @@ message TabletSnapshotOpRequestPB { bool imported = 9; bytes schedule_id = 10; + + bytes restoration_id = 11; + + uint32 schema_version = 12; + SchemaPB schema = 13; + repeated IndexInfoPB indexes = 14; } message TabletSnapshotOpResponsePB { diff --git a/src/yb/util/env.cc b/src/yb/util/env.cc index db9668b33043..344907060872 100644 --- a/src/yb/util/env.cc +++ b/src/yb/util/env.cc @@ -107,4 +107,14 @@ Status ReadFileToString(Env* env, const std::string& fname, faststring* data) { EnvWrapper::~EnvWrapper() { } +Status DeleteIfExists(const std::string& path, Env* env) { + if (env->DirExists(path)) { + return env->DeleteRecursively(path); + } + if (env->FileExists(path)) { + return env->DeleteFile(path); + } + return Status::OK(); +} + } // namespace yb diff --git a/src/yb/util/env.h b/src/yb/util/env.h index 3bdba2dff96b..c86093afdda4 100644 --- a/src/yb/util/env.h +++ b/src/yb/util/env.h @@ -823,6 +823,8 @@ class EnvWrapper : public Env { Env* target_; }; +CHECKED_STATUS DeleteIfExists(const std::string& path, Env* env); + } // namespace yb #endif // YB_UTIL_ENV_H diff --git a/src/yb/util/pb_util.cc b/src/yb/util/pb_util.cc index b5151d41ed6d..02bfed601284 100644 --- a/src/yb/util/pb_util.cc +++ b/src/yb/util/pb_util.cc @@ -158,12 +158,12 @@ string InitializationErrorMessage(const char* action, } // anonymous namespace -bool AppendToString(const MessageLite &msg, faststring *output) { +void AppendToString(const MessageLite &msg, faststring *output) { DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg); - return AppendPartialToString(msg, output); + AppendPartialToString(msg, output); } -bool AppendPartialToString(const MessageLite &msg, faststring* output) { +void AppendPartialToString(const MessageLite &msg, faststring* output) { int old_size = output->size(); int byte_size = msg.ByteSize(); @@ -174,12 +174,11 @@ bool AppendPartialToString(const MessageLite &msg, faststring* output) { if (end - start != byte_size) { ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start); } - return true; } -bool SerializeToString(const MessageLite &msg, faststring *output) { +void SerializeToString(const MessageLite &msg, faststring *output) { output->clear(); - return AppendToString(msg, output); + AppendToString(msg, output); } bool ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) { diff --git a/src/yb/util/pb_util.h b/src/yb/util/pb_util.h index 49da087a5c61..467b63ec543e 100644 --- a/src/yb/util/pb_util.h +++ b/src/yb/util/pb_util.h @@ -77,13 +77,13 @@ enum CreateMode { }; // See MessageLite::AppendToString -bool AppendToString(const MessageLite &msg, faststring *output); +void AppendToString(const MessageLite &msg, faststring *output); // See MessageLite::AppendPartialToString -bool AppendPartialToString(const MessageLite &msg, faststring *output); +void AppendPartialToString(const MessageLite &msg, faststring *output); // See MessageLite::SerializeToString. -bool SerializeToString(const MessageLite &msg, faststring *output); +void SerializeToString(const MessageLite &msg, faststring *output); // See MessageLite::ParseFromZeroCopyStream // TODO: change this to return Status - differentiate IO error from bad PB