diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 98504ab0782..20d2855e7fe 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -11,6 +11,8 @@ #include #include +#include + #include "clients/meta/FileBasedClusterIdMan.h" #include "common/base/Base.h" #include "common/base/MurmurHash2.h" @@ -55,6 +57,8 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool updateLeader(); bgThread_ = std::make_unique(); LOG(INFO) << "Create meta client to " << active_; + LOG(INFO) << folly::sformat( + "root path: {}, data path size: {}", options_.rootPath_, options_.dataPaths_.size()); } MetaClient::~MetaClient() { @@ -2430,6 +2434,20 @@ folly::Future> MetaClient::heartbeat() { } } + // info used in the agent, only set once + // TOOD(spw): if we could add data path(disk) dynamicly in the future, it should be + // reported every time it changes + if (!dirInfoReported_) { + nebula::cpp2::DirInfo dirInfo; + if (options_.role_ == cpp2::HostRole::GRAPH) { + dirInfo.set_root(options_.rootPath_); + } else if (options_.role_ == cpp2::HostRole::STORAGE) { + dirInfo.set_root(options_.rootPath_); + dirInfo.set_data(options_.dataPaths_); + } + req.set_dir(dirInfo); + } + folly::Promise> promise; auto future = promise.getFuture(); VLOG(1) << "Send heartbeat to " << leader_ << ", clusterId " << req.get_cluster_id(); @@ -2449,7 +2467,12 @@ folly::Future> MetaClient::heartbeat() { metadLastUpdateTime_ = resp.get_last_update_time_in_ms(); VLOG(1) << "Metad last update time: " << metadLastUpdateTime_; metaServerVersion_ = resp.get_meta_version(); - return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED; + + bool succeeded = resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED; + if (succeeded) { + dirInfoReported_ = true; + } + return succeeded; }, std::move(promise)); return future; diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 80f975e0181..5aa1d19f0f6 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -184,7 +184,9 @@ struct MetaClientOptions { serviceName_(opt.serviceName_), skipConfig_(opt.skipConfig_), role_(opt.role_), - gitInfoSHA_(opt.gitInfoSHA_) {} + gitInfoSHA_(opt.gitInfoSHA_), + dataPaths_(opt.dataPaths_), + rootPath_(opt.rootPath_) {} // Current host address HostAddr localHost_{"", 0}; @@ -200,6 +202,10 @@ struct MetaClientOptions { cpp2::HostRole role_ = cpp2::HostRole::UNKNOWN; // gitInfoSHA of Host using this client std::string gitInfoSHA_{""}; + // data path list, used in storaged + std::vector dataPaths_; + // install path, used in metad/graphd/storaged + std::string rootPath_; }; class MetaClient { @@ -756,6 +762,8 @@ class MetaClient { HostAddr leader_; HostAddr localHost_; + // Only report dir info once when started + bool dirInfoReported_ = false; struct ThreadLocalInfo { int64_t localLastUpdateTime_{-2}; LocalCache localCache_; diff --git a/src/common/graph/Response.h b/src/common/graph/Response.h index f3f0ef08268..811f5ebc2ff 100644 --- a/src/common/graph/Response.h +++ b/src/common/graph/Response.h @@ -120,6 +120,7 @@ X(E_GET_META_DIR_FAILURE, -2072) \ \ X(E_QUERY_NOT_FOUND, -2073) \ + X(E_AGENT_HB_FAILUE, -2074) \ /* 3xxx for storaged */ \ X(E_CONSENSUS_ERROR, -3001) \ X(E_KEY_HAS_EXISTS, -3002) \ diff --git a/src/common/utils/MetaKeyUtils.cpp b/src/common/utils/MetaKeyUtils.cpp index 4098bde0ccb..f6ee7dd0a13 100644 --- a/src/common/utils/MetaKeyUtils.cpp +++ b/src/common/utils/MetaKeyUtils.cpp @@ -22,6 +22,7 @@ static const std::unordered_map> syste {"hosts", {"__hosts__", false}}, {"versions", {"__versions__", false}}, {"machines", {"__machines__", false}}, + {"host_dirs", {"__host_dirs__", false}}, {"snapshots", {"__snapshots__", false}}, {"configs", {"__configs__", true}}, {"groups", {"__groups__", true}}, @@ -62,6 +63,7 @@ static const std::string kPartsTable = tableMaps.at("parts").first; static const std::string kVersionsTable = systemTableMaps.at("versions").first; // NOLINT static const std::string kHostsTable = systemTableMaps.at("hosts").first; // NOLINT static const std::string kMachinesTable = systemTableMaps.at("machines").first; // NOLINT +static const std::string kHostDirsTable = systemTableMaps.at("host_dirs").first;// NOLINT static const std::string kTagsTable = tableMaps.at("tags").first; // NOLINT static const std::string kEdgesTable = tableMaps.at("edges").first; // NOLINT static const std::string kIndexesTable = tableMaps.at("indexes").first; // NOLINT @@ -97,7 +99,7 @@ const std::string kLastUpdateTimeTable = systemInfoMaps.at("lastUpdateTime").fir const int kMaxIpAddrLen = 15; // '255.255.255.255' -std::string MetaKeyUtils::getIndexTable() { return tableMaps.at("index").first; } +std::string MetaKeyUtils::getIndexTable() { return kIndexTable; } std::unordered_map>> @@ -256,6 +258,41 @@ HostAddr MetaKeyUtils::parseMachineKey(folly::StringPiece key) { return MetaKeyUtils::deserializeHostAddr(key); } +const std::string& MetaKeyUtils::hostDirPrefix() { return kHostDirsTable; } + +const std::string MetaKeyUtils::hostDirHostPrefix(std::string host) { + return kHostDirsTable + host; +} + +std::string MetaKeyUtils::hostDirKey(std::string host, Port port) { + std::string key; + key.reserve(kHostDirsTable.size() + host.size() + sizeof(port)); + key.append(kHostDirsTable.data(), kHostDirsTable.size()).append(host); + key.append(reinterpret_cast(&port), sizeof(Port)); + return key; +} + +HostAddr MetaKeyUtils::parseHostDirKey(folly::StringPiece key) { + HostAddr addr; + auto hostSize = key.size() - kHostDirsTable.size() - sizeof(Port); + addr.host = key.subpiece(kHostDirsTable.size(), hostSize).toString(); + key.advance(kHostDirsTable.size() + hostSize); + addr.port = *reinterpret_cast(key.begin()); + return addr; +} + +std::string MetaKeyUtils::hostDirVal(cpp2::DirInfo dir) { + std::string val; + apache::thrift::CompactSerializer::serialize(dir, &val); + return val; +} + +cpp2::DirInfo MetaKeyUtils::parseHostDir(folly::StringPiece val) { + cpp2::DirInfo dir; + apache::thrift::CompactSerializer::deserialize(val, dir); + return dir; +} + std::string MetaKeyUtils::hostKey(std::string addr, Port port) { return hostKeyV2(addr, port); } std::string MetaKeyUtils::hostKeyV2(std::string addr, Port port) { @@ -639,6 +676,16 @@ std::string MetaKeyUtils::indexSpaceKey(const std::string& name) { return key; } +std::string MetaKeyUtils::parseIndexSpaceKey(folly::StringPiece key) { + auto nameSize = key.size() - kIndexTable.size() - sizeof(EntryType); + return key.subpiece(kIndexTable.size() + sizeof(EntryType), nameSize).str(); +} + +EntryType MetaKeyUtils::parseIndexType(folly::StringPiece key) { + auto type = *reinterpret_cast(key.data() + kIndexTable.size()); + return type; +} + std::string MetaKeyUtils::indexTagKey(GraphSpaceID spaceId, const std::string& name) { EntryType type = EntryType::TAG; std::string key; diff --git a/src/common/utils/MetaKeyUtils.h b/src/common/utils/MetaKeyUtils.h index 3bd831726e6..bad713be151 100644 --- a/src/common/utils/MetaKeyUtils.h +++ b/src/common/utils/MetaKeyUtils.h @@ -116,9 +116,25 @@ class MetaKeyUtils final { static HostAddr parseMachineKey(folly::StringPiece key); - static std::string hostKey(std::string ip, Port port); + // hostDir store service(metad/storaged/graphd) address -> dir info(root path and data paths) + // agent will use these to start/stop service and backup/restore data + static std::string hostDirKey(std::string ip); - static std::string hostKeyV2(std::string addr, Port port); + static std::string hostDirKey(std::string host, Port port); + + static HostAddr parseHostDirKey(folly::StringPiece key); + + static const std::string& hostDirPrefix(); + + static const std::string hostDirHostPrefix(std::string host); + + static std::string hostDirVal(cpp2::DirInfo dir); + + static cpp2::DirInfo parseHostDir(folly::StringPiece val); + + static std::string hostKey(std::string host, Port port); + + static std::string hostKeyV2(std::string host, Port port); static const std::string& hostPrefix(); @@ -213,6 +229,10 @@ class MetaKeyUtils final { static std::string indexSpaceKey(const std::string& name); + static std::string parseIndexSpaceKey(folly::StringPiece key); + + static EntryType parseIndexType(folly::StringPiece key); + static std::string indexTagKey(GraphSpaceID spaceId, const std::string& name); static std::string indexEdgeKey(GraphSpaceID spaceId, const std::string& name); diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index a4acb2f757e..d89f448bd12 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -8,6 +8,7 @@ #include "common/base/Base.h" #include "common/base/SignalHandler.h" +#include "common/fs/FileUtils.h" #include "common/hdfs/HdfsCommandHelper.h" #include "common/hdfs/HdfsHelper.h" #include "common/network/NetworkUtils.h" @@ -86,7 +87,10 @@ std::unique_ptr initKV(std::vector p threadManager->setNamePrefix("executor"); threadManager->start(); nebula::kvstore::KVOptions options; - options.dataPaths_ = {FLAGS_data_path}; + + auto absolute = boost::filesystem::absolute(FLAGS_data_path); + options.dataPaths_ = {absolute.string()}; + options.partMan_ = std::move(partMan); auto kvstore = std::make_unique( std::move(options), ioPool, localhost, threadManager); diff --git a/src/daemons/StorageDaemon.cpp b/src/daemons/StorageDaemon.cpp index 37e5957c4f8..cf7722d2a1e 100644 --- a/src/daemons/StorageDaemon.cpp +++ b/src/daemons/StorageDaemon.cpp @@ -8,6 +8,7 @@ #include "common/base/Base.h" #include "common/base/SignalHandler.h" +#include "common/fs/FileUtils.h" #include "common/network/NetworkUtils.h" #include "common/process/ProcessUtils.h" #include "common/time/TimezoneInfo.h" @@ -132,9 +133,14 @@ int main(int argc, char *argv[]) { std::vector paths; folly::split(",", FLAGS_data_path, paths, true); - std::transform(paths.begin(), paths.end(), paths.begin(), [](auto &p) { - return folly::trimWhitespace(p).str(); - }); + // make the paths absolute + std::transform( + paths.begin(), paths.end(), paths.begin(), [](const std::string &p) -> std::string { + auto path = folly::trimWhitespace(p).str(); + path = boost::filesystem::absolute(path).string(); + LOG(INFO) << "data path= " << path; + return path; + }); if (paths.empty()) { LOG(ERROR) << "Bad data_path format:" << FLAGS_data_path; return EXIT_FAILURE; diff --git a/src/graph/service/GraphService.cpp b/src/graph/service/GraphService.cpp index 3b58c3be851..bacbed56595 100644 --- a/src/graph/service/GraphService.cpp +++ b/src/graph/service/GraphService.cpp @@ -5,6 +5,8 @@ #include "graph/service/GraphService.h" +#include + #include "clients/storage/StorageClient.h" #include "common/base/Base.h" #include "common/encryption/MD5Utils.h" @@ -33,6 +35,7 @@ Status GraphService::init(std::shared_ptr ioExecuto options.role_ = meta::cpp2::HostRole::GRAPH; options.localHost_ = hostAddr; options.gitInfoSHA_ = gitInfoSha(); + options.rootPath_ = boost::filesystem::current_path().string(); metaClient_ = std::make_unique(ioExecutor, std::move(addrs.value()), options); diff --git a/src/interface/common.thrift b/src/interface/common.thrift index 8fa6ed9a49e..49043bb04d2 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -237,19 +237,11 @@ struct DirInfo { 2: list data, } -struct NodeInfo { - 1: HostAddr host, - 2: DirInfo dir, -} - -struct PartitionBackupInfo { - 1: map (cpp.template = "std::unordered_map") info, -} - struct CheckpointInfo { - 1: PartitionBackupInfo partition_info, + 1: GraphSpaceID space_id, + 2: map (cpp.template = "std::unordered_map") parts, // storage checkpoint directory name - 2: binary path, + 3: binary path, } // used for raft and drainer @@ -402,6 +394,7 @@ enum ErrorCode { E_GET_META_DIR_FAILURE = -2072, E_QUERY_NOT_FOUND = -2073, + E_AGENT_HB_FAILUE = -2074, // 3xxx for storaged E_CONSENSUS_ERROR = -3001, diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 6b3fe96a08c..7f2e34593ca 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -554,7 +554,8 @@ enum HostRole { META = 0x01, STORAGE = 0x02, LISTENER = 0x03, - UNKNOWN = 0x04 + AGENT = 0x04, + UNKNOWN = 0x05 } (cpp.enum_strict) struct LeaderInfo { @@ -567,15 +568,40 @@ struct PartitionList { } struct HBReq { - 1: HostRole role, - 2: common.HostAddr host, - 3: ClusterID cluster_id, + 1: HostRole role, + 2: common.HostAddr host, + 3: ClusterID cluster_id, 4: optional map> (cpp.template = "std::unordered_map") leader_partIds; - 5: binary git_info_sha, + 5: binary git_info_sha, 6: optional map (cpp.template = "std::unordered_map")> (cpp.template = "std::unordered_map") disk_parts; + 7: optional common.DirInfo dir, + // version of binary + 8: optional binary version, +} + +// service(agent/metad/storaged/graphd) info +struct ServiceInfo { + 1: common.DirInfo dir, + 2: common.HostAddr addr, + 3: HostRole role, +} + +struct AgentHBReq { + 1: common.HostAddr host, + 2: binary git_info_sha, + // version of binary + 3: optional binary version, +} + +struct AgentHBResp { + 1: common.ErrorCode code, + 2: common.HostAddr leader, + // metad/graphd/storaged may in the same host + // do not include agent it self + 3: list service_list, } struct IndexFieldDef { @@ -915,25 +941,26 @@ struct GetStatsResp { 3: StatsItem stats, } -struct BackupInfo { - 1: common.HostAddr host, - 2: list info, +struct HostBackupInfo { + 1: common.HostAddr host, + // each for one data path + 2: list checkpoints, } struct SpaceBackupInfo { - 1: SpaceDesc space, - 2: list info, + 1: SpaceDesc space, + 2: list host_backups, } struct BackupMeta { // space_name => SpaceBackupInfo - 1: map (cpp.template = "std::unordered_map") backup_info, + 1: map(cpp.template = "std::unordered_map") space_backups, // sst file 2: list meta_files, // backup 3: binary backup_name, 4: bool full, - 5: bool include_system_space, + 5: bool all_spaces, 6: i64 create_time, } @@ -1098,10 +1125,9 @@ struct ReportTaskReq { } struct ListClusterInfoResp { - 1: common.ErrorCode code, - 2: common.HostAddr leader, - 3: list meta_servers, - 4: list storage_servers, + 1: common.ErrorCode code, + 2: common.HostAddr leader, + 3: map>(cpp.template = "std::unordered_map") host_services, } struct ListClusterInfoReq { @@ -1186,6 +1212,7 @@ service MetaService { ExecResp changePassword(1: ChangePasswordReq req); HBResp heartBeat(1: HBReq req); + AgentHBResp agentHeartbeat(1: AgentHBReq req); ExecResp regConfig(1: RegConfigReq req); GetConfigResp getConfig(1: GetConfigReq req); @@ -1205,10 +1232,8 @@ service MetaService { GetZoneResp getZone(1: GetZoneReq req); ListZonesResp listZones(1: ListZonesReq req); - CreateBackupResp createBackup(1: CreateBackupReq req); - ExecResp restoreMeta(1: RestoreMetaReq req); - ExecResp addListener(1: AddListenerReq req); - ExecResp removeListener(1: RemoveListenerReq req); + ExecResp addListener(1: AddListenerReq req); + ExecResp removeListener(1: RemoveListenerReq req); ListListenerResp listListener(1: ListListenerReq req); GetStatsResp getStats(1: GetStatsReq req); @@ -1229,6 +1254,9 @@ service MetaService { ExecResp reportTaskFinish(1: ReportTaskReq req); + // Interfaces for backup and restore + CreateBackupResp createBackup(1: CreateBackupReq req); + ExecResp restoreMeta(1: RestoreMetaReq req); ListClusterInfoResp listCluster(1: ListClusterInfoReq req); GetMetaDirInfoResp getMetaDirInfo(1: GetMetaDirInfoReq req); diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 07c56e2b2fa..f677c488bec 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -748,14 +748,14 @@ struct GetLeaderReq { } struct CreateCPRequest { - 1: common.GraphSpaceID space_id, - 2: binary name, + 1: list space_ids, + 2: binary name, } struct DropCPRequest { - 1: common.GraphSpaceID space_id, - 2: binary name, + 1: list space_ids, + 2: binary name, } @@ -766,8 +766,8 @@ enum EngineSignType { struct BlockingSignRequest { - 1: common.GraphSpaceID space_id, - 2: required EngineSignType sign, + 1: list space_ids, + 2: required EngineSignType sign, } @@ -843,8 +843,6 @@ service StorageAdminService { AdminExecResp addAdminTask(1: AddAdminTaskRequest req); AdminExecResp stopAdminTask(1: StopAdminTaskRequest req); - - ListClusterInfoResp listClusterInfo(1: ListClusterInfoReq req); } diff --git a/src/kvstore/DiskManager.cpp b/src/kvstore/DiskManager.cpp index 8070b64f51d..c6199a0907e 100644 --- a/src/kvstore/DiskManager.cpp +++ b/src/kvstore/DiskManager.cpp @@ -69,6 +69,7 @@ void DiskManager::addPartToPath(GraphSpaceID spaceId, PartitionID partId, const try { auto canonical = boost::filesystem::canonical(path); auto dataPath = canonical.parent_path().parent_path(); + dataPath = boost::filesystem::absolute(dataPath); auto iter = std::find(dataPaths_.begin(), dataPaths_.end(), dataPath); CHECK(iter != dataPaths_.end()); partIndex_[spaceId][partId] = iter - dataPaths_.begin(); @@ -85,6 +86,7 @@ void DiskManager::removePartFromPath(GraphSpaceID spaceId, try { auto canonical = boost::filesystem::canonical(path); auto dataPath = canonical.parent_path().parent_path(); + dataPath = boost::filesystem::absolute(dataPath); auto iter = std::find(dataPaths_.begin(), dataPaths_.end(), dataPath); CHECK(iter != dataPaths_.end()); partIndex_[spaceId].erase(partId); diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 5f902bc9665..d178c056f68 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -120,7 +120,7 @@ class KVEngine { virtual nebula::cpp2::ErrorCode flush() = 0; - virtual nebula::cpp2::ErrorCode createCheckpoint(const std::string& name) = 0; + virtual nebula::cpp2::ErrorCode createCheckpoint(const std::string& checkpointPath) = 0; // For meta virtual ErrorOr backupTable( diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 6d6c841828c..86f29990d90 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -883,61 +883,85 @@ nebula::cpp2::ErrorCode NebulaStore::flush(GraphSpaceID spaceId) { ErrorOr> NebulaStore::createCheckpoint( GraphSpaceID spaceId, const std::string& name) { + /* + * The default checkpoint directory structure is : + * |--FLAGS_data_path + * |----nebula + * |------space1 + * |--------data + * |--------wal + * |--------checkpoints + * |----------snapshot1 + * |------------data + * |------------wal + * |----------snapshot2 + * |----------snapshot3 + * + */ auto spaceRet = space(spaceId); if (!ok(spaceRet)) { return error(spaceRet); } - auto space = nebula::value(spaceRet); - std::string cpPath; - std::unordered_map partitionInfo; - std::vector cpInfo; + std::vector cpInfoList; DCHECK(!space->engines_.empty()); - for (auto& engine : space->engines_) { - auto code = engine->createCheckpoint(name); + std::string path = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), name); + if (!fs::FileUtils::exist(path)) { + if (!fs::FileUtils::makeDir(path)) { + LOG(ERROR) << "Make checkpoint dir: " << path << " failed"; + return nebula::cpp2::ErrorCode::E_UNKNOWN; + } + } + + // create data checkpoint + std::string dataPath = folly::sformat("{}/data", path); + auto code = engine->createCheckpoint(dataPath); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } - // Maybe there's a judgment call here. - cpPath = folly::stringPrintf("%s/checkpoints/%s", engine->getDataRoot(), name.c_str()); - // create wal hard link for all parts + + // create wal checkpoints: make hard link for all parts + std::unordered_map partsInfo; auto parts = engine->allParts(); - for (auto& part : parts) { - auto ret = this->part(spaceId, part); + for (auto& partId : parts) { + auto ret = this->part(spaceId, partId); if (!ok(ret)) { - LOG(ERROR) << "Part not found. space : " << spaceId << " Part : " << part; + LOG(ERROR) << "Part not found. space : " << spaceId << " Part : " << partId; return error(ret); } - auto walPath = - folly::stringPrintf("%s/checkpoints/%s/wal/%d", engine->getWalRoot(), name.c_str(), part); + auto p = nebula::value(ret); + auto walPath = folly::sformat("{}/wal/{}", path, partId); if (!p->linkCurrentWAL(walPath.data())) { return nebula::cpp2::ErrorCode::E_FAILED_TO_CHECKPOINT; } + // return last wal info of each part if (p->isLeader()) { auto logInfo = p->lastLogInfo(); cpp2::LogInfo info; info.set_log_id(logInfo.first); info.set_term_id(logInfo.second); - partitionInfo.emplace(part, std::move(info)); + partsInfo.emplace(partId, std::move(info)); } } - auto result = nebula::fs::FileUtils::realPath(cpPath.c_str()); + + auto result = nebula::fs::FileUtils::realPath(path.c_str()); if (!result.ok()) { + LOG(ERROR) << "Failed to get path:" << path << "'s real path"; return nebula::cpp2::ErrorCode::E_FAILED_TO_CHECKPOINT; } - nebula::cpp2::PartitionBackupInfo backupInfo; - nebula::cpp2::CheckpointInfo info; - backupInfo.set_info(std::move(partitionInfo)); - info.set_path(std::move(result.value())); - info.set_partition_info(std::move(backupInfo)); - cpInfo.emplace_back(std::move(info)); + + nebula::cpp2::CheckpointInfo cpInfo; + cpInfo.set_path(std::move(result.value())); + cpInfo.set_parts(std::move(partsInfo)); + cpInfo.set_space_id(spaceId); + cpInfoList.emplace_back(std::move(cpInfo)); } - return cpInfo; + return cpInfoList; } nebula::cpp2::ErrorCode NebulaStore::dropCheckpoint(GraphSpaceID spaceId, const std::string& name) { @@ -950,12 +974,12 @@ nebula::cpp2::ErrorCode NebulaStore::dropCheckpoint(GraphSpaceID spaceId, const /** * Drop checkpoint and wal together **/ - auto checkpointPath = - folly::stringPrintf("%s/checkpoints/%s", engine->getDataRoot(), name.c_str()); + auto checkpointPath = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), name); LOG(INFO) << "Drop checkpoint : " << checkpointPath; if (!fs::FileUtils::exist(checkpointPath)) { continue; } + if (!fs::FileUtils::remove(checkpointPath.data(), true)) { LOG(ERROR) << "Drop checkpoint dir failed : " << checkpointPath; return nebula::cpp2::ErrorCode::E_STORE_FAILURE; diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index e386743c415..82f85ca9fd9 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -523,48 +523,21 @@ void RocksEngine::openBackupEngine(GraphSpaceID spaceId) { } } -nebula::cpp2::ErrorCode RocksEngine::createCheckpoint(const std::string& name) { - LOG(INFO) << "Begin checkpoint : " << dataPath_; - - /* - * The default checkpoint directory structure is : - * |--FLAGS_data_path - * |----nebula - * |------space1 - * |--------data - * |--------wal - * |--------checkpoints - * |----------snapshot1 - * |------------data - * |------------wal - * |----------snapshot2 - * |----------snapshot3 - * - */ - - auto checkpointPath = - folly::stringPrintf("%s/checkpoints/%s/data", dataPath_.c_str(), name.c_str()); - LOG(INFO) << "Target checkpoint path : " << checkpointPath; +nebula::cpp2::ErrorCode RocksEngine::createCheckpoint(const std::string& checkpointPath) { + LOG(INFO) << "Target checkpoint data path : " << checkpointPath; if (fs::FileUtils::exist(checkpointPath) && !fs::FileUtils::remove(checkpointPath.data(), true)) { - LOG(ERROR) << "Remove exist dir failed of checkpoint : " << checkpointPath; + LOG(ERROR) << "Remove exist checkpoint data dir failed: " << checkpointPath; return nebula::cpp2::ErrorCode::E_STORE_FAILURE; } - auto parent = checkpointPath.substr(0, checkpointPath.rfind('/')); - if (!FileUtils::exist(parent)) { - if (!FileUtils::makeDir(parent)) { - LOG(ERROR) << "Make dir " << parent << " failed"; - return nebula::cpp2::ErrorCode::E_UNKNOWN; - } - } - rocksdb::Checkpoint* checkpoint; rocksdb::Status status = rocksdb::Checkpoint::Create(db_.get(), &checkpoint); - std::unique_ptr cp(checkpoint); if (!status.ok()) { LOG(ERROR) << "Init checkpoint Failed: " << status.ToString(); return nebula::cpp2::ErrorCode::E_FAILED_TO_CHECKPOINT; } + + std::unique_ptr cp(checkpoint); status = cp->CreateCheckpoint(checkpointPath, 0); if (!status.ok()) { LOG(ERROR) << "Create checkpoint Failed: " << status.ToString(); @@ -590,47 +563,43 @@ ErrorOr RocksEngine::backupTable( } } - rocksdb::Options options; - options.file_checksum_gen_factory = rocksdb::GetFileChecksumGenCrc32cFactory(); - rocksdb::SstFileWriter sstFileWriter(rocksdb::EnvOptions(), options); - std::unique_ptr iter; auto ret = prefix(tablePrefix, &iter); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { return nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE; } - if (!iter->valid()) { return nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE; } + rocksdb::Options options; + options.file_checksum_gen_factory = rocksdb::GetFileChecksumGenCrc32cFactory(); + rocksdb::SstFileWriter sstFileWriter(rocksdb::EnvOptions(), options); auto s = sstFileWriter.Open(backupPath); if (!s.ok()) { LOG(ERROR) << "BackupTable failed, path: " << backupPath << ", error: " << s.ToString(); return nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED; } - while (iter->valid()) { + for (; iter->valid(); iter->next()) { if (filter && filter(iter->key())) { - iter->next(); continue; } + s = sstFileWriter.Put(iter->key().toString(), iter->val().toString()); if (!s.ok()) { LOG(ERROR) << "BackupTable failed, path: " << backupPath << ", error: " << s.ToString(); sstFileWriter.Finish(); return nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED; } - iter->next(); } s = sstFileWriter.Finish(); if (!s.ok()) { - LOG(WARNING) << "Failure to insert data when backupTable, " << backupPath + LOG(WARNING) << "Failed to insert data when backupTable, " << backupPath << ", error: " << s.ToString(); return nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE; } - if (sstFileWriter.FileSize() == 0) { return nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE; } @@ -638,8 +607,7 @@ ErrorOr RocksEngine::backupTable( if (backupPath[0] == '/') { return backupPath; } - - auto result = nebula::fs::FileUtils::realPath(backupPath.c_str()); + auto result = FileUtils::realPath(backupPath.c_str()); if (!result.ok()) { return nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED; } diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 06226b5789c..157ff3724cc 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -203,7 +203,7 @@ class RocksEngine : public KVEngine { /********************* * Checkpoint operation ********************/ - nebula::cpp2::ErrorCode createCheckpoint(const std::string& path) override; + nebula::cpp2::ErrorCode createCheckpoint(const std::string& checkpointPath) override; ErrorOr backupTable( const std::string& path, diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 5a9b243de3c..af94e214051 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -92,6 +92,31 @@ bool ActiveHostsMan::machineRegisted(kvstore::KVStore* kv, const HostAddr& hostA return code == nebula::cpp2::ErrorCode::SUCCEEDED; } +ErrorOr>> +ActiveHostsMan::getServicesInHost(kvstore::KVStore* kv, std::string hostname) { + const auto& prefix = MetaKeyUtils::hostPrefix(); + std::unique_ptr iter; + auto retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Failed to get services in the host: " << hostname << ", error " + << apache::thrift::util::enumNameSafe(retCode); + return retCode; + } + + std::vector> hosts; + while (iter->valid()) { + auto addr = MetaKeyUtils::parseHostKey(iter->key()); + HostInfo info = HostInfo::decode(iter->val()); + + if (addr.host == hostname) { + hosts.emplace_back(addr, info.role_); + } + iter->next(); + } + + return hosts; +} + ErrorOr> ActiveHostsMan::getActiveHosts( kvstore::KVStore* kv, int32_t expiredTTL, cpp2::HostRole role) { const auto& machinePrefix = MetaKeyUtils::machinePrefix(); diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index 97b70d35577..ac89b6d4932 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -115,6 +115,9 @@ class ActiveHostsMan final { static ErrorOr> getActiveHosts( kvstore::KVStore* kv, int32_t expiredTTL = 0, cpp2::HostRole role = cpp2::HostRole::STORAGE); + static ErrorOr>> + getServicesInHost(kvstore::KVStore* kv, std::string hostname); + static ErrorOr> getActiveHostsInZone( kvstore::KVStore* kv, const std::string& zoneName, int32_t expiredTTL = 0); diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 77449574b61..0fde9f97036 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -43,6 +43,7 @@ nebula_add_library( processors/kv/RemoveRangeProcessor.cpp processors/kv/ScanProcessor.cpp processors/admin/HBProcessor.cpp + processors/admin/AgentHBProcessor.cpp processors/user/AuthenticationProcessor.cpp processors/admin/CreateSnapshotProcessor.cpp processors/admin/DropSnapshotProcessor.cpp diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index cc78fcbf2c6..9293e4634d2 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -6,6 +6,7 @@ #include "meta/MetaServiceHandler.h" #include "common/utils/MetaKeyUtils.h" +#include "meta/processors/admin/AgentHBProcessor.h" #include "meta/processors/admin/CreateBackupProcessor.h" #include "meta/processors/admin/CreateSnapshotProcessor.h" #include "meta/processors/admin/DropSnapshotProcessor.h" @@ -334,6 +335,12 @@ folly::Future MetaServiceHandler::future_heartBeat(const cpp2::HBR RETURN_FUTURE(processor); } +folly::Future MetaServiceHandler::future_agentHeartbeat( + const cpp2::AgentHBReq& req) { + auto* processor = AgentHBProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + folly::Future MetaServiceHandler::future_createUser( const cpp2::CreateUserReq& req) { auto* processor = CreateUserProcessor::instance(kvstore_); @@ -501,7 +508,7 @@ folly::Future MetaServiceHandler::future_getStats( folly::Future MetaServiceHandler::future_listCluster( const cpp2::ListClusterInfoReq& req) { - auto* processor = ListClusterInfoProcessor::instance(kvstore_, adminClient_.get()); + auto* processor = ListClusterInfoProcessor::instance(kvstore_); RETURN_FUTURE(processor); } diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index a1d7332fbf8..74f8dcb9d3c 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -10,6 +10,7 @@ #include "interface/gen-cpp2/MetaService.h" #include "kvstore/KVStore.h" #include "meta/processors/admin/AdminClient.h" +#include "meta/processors/admin/AgentHBProcessor.h" #include "meta/processors/admin/HBProcessor.h" namespace nebula { @@ -23,6 +24,7 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { // Initialize counters kHBCounters.init(); + kAgentHBCounters.init(); } /** @@ -158,6 +160,8 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { * */ folly::Future future_heartBeat(const cpp2::HBReq& req) override; + folly::Future future_agentHeartbeat(const cpp2::AgentHBReq& req) override; + folly::Future future_regConfig(const cpp2::RegConfigReq& req) override; folly::Future future_getConfig(const cpp2::GetConfigReq& req) override; diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index 96a5bb857a9..e1645519ee9 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -17,18 +17,19 @@ nebula::cpp2::ErrorCode backupTable(kvstore::KVStore* kvstore, const std::string& tableName, std::vector& files, std::function filter) { - auto backupFilePath = kvstore->backupTable(kDefaultSpaceId, backupName, tableName, filter); - if (!ok(backupFilePath)) { - auto result = error(backupFilePath); - if (result == nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE) { + auto backupRet = kvstore->backupTable(kDefaultSpaceId, backupName, tableName, filter); + if (!ok(backupRet)) { + auto code = error(backupRet); + if (code == nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE) { return nebula::cpp2::ErrorCode::SUCCEEDED; } - return result; + return code; } + auto backupTableFiles = std::move(value(backupRet)); files.insert(files.end(), - std::make_move_iterator(value(backupFilePath).begin()), - std::make_move_iterator(value(backupFilePath).end())); + std::make_move_iterator(backupTableFiles.begin()), + std::make_move_iterator(backupTableFiles.end())); return nebula::cpp2::ErrorCode::SUCCEEDED; } } // namespace @@ -174,70 +175,73 @@ std::function MetaServiceUtils::spaceFilter ErrorOr> MetaServiceUtils::backupIndex( kvstore::KVStore* kvstore, - const std::unordered_set& spaces, + const std::unordered_set& spaceIds, const std::string& backupName, - const std::vector* spaceName) { + const std::vector* spaceNames) { auto indexTable = MetaKeyUtils::getIndexTable(); - return kvstore->backupTable( - kDefaultSpaceId, - backupName, - indexTable, - [spaces, spaceName, indexTable](const folly::StringPiece& key) -> bool { - if (spaces.empty()) { - return false; - } + return kvstore->backupTable(kDefaultSpaceId, + backupName, + indexTable, + // will filter out the index table when this function returns true + [spaceIds, spaceNames](const folly::StringPiece& key) -> bool { + if (spaceIds.empty()) { + return false; + } - auto type = *reinterpret_cast(key.data() + indexTable.size()); - if (type == EntryType::SPACE) { - if (spaceName == nullptr) { - return false; - } - auto sn = key.subpiece(indexTable.size() + sizeof(EntryType), - key.size() - indexTable.size() - sizeof(EntryType)) - .str(); - LOG(INFO) << "sn was " << sn; - auto it = std::find_if( - spaceName->cbegin(), spaceName->cend(), [&sn](auto& name) { return sn == name; }); + // space index: space name -> space id + auto type = MetaKeyUtils::parseIndexType(key); + if (type == EntryType::SPACE) { + if (spaceNames == nullptr || spaceNames->empty()) { + return false; + } - if (it == spaceName->cend()) { - return true; - } - return false; - } - - auto id = MetaKeyUtils::parseIndexKeySpaceID(key); - auto it = spaces.find(id); - if (it == spaces.end()) { - return true; - } + auto spaceName = MetaKeyUtils::parseIndexSpaceKey(key); + LOG(INFO) << "Space name was " << spaceName; + auto it = std::find_if( + spaceNames->cbegin(), + spaceNames->cend(), + [&spaceName](auto& name) { return spaceName == name; }); + if (it == spaceNames->cend()) { + return true; + } + return false; + } - return false; - }); + // other index: space id -> values + auto id = MetaKeyUtils::parseIndexKeySpaceID(key); + auto it = spaceIds.find(id); + if (it == spaceIds.end()) { + return true; + } + return false; + }); } -ErrorOr> MetaServiceUtils::backupSpaces( +ErrorOr> MetaServiceUtils::backupTables( kvstore::KVStore* kvstore, - const std::unordered_set& spaces, + const std::unordered_set& spaceIds, const std::string& backupName, const std::vector* spaceNames) { std::vector files; - auto tables = MetaKeyUtils::getTableMaps(); - files.reserve(tables.size()); + // backup space relative tables + auto tables = MetaKeyUtils::getTableMaps(); for (const auto& table : tables) { if (table.second.second == nullptr) { LOG(INFO) << table.first << " table skipped"; continue; } auto result = backupTable( - kvstore, backupName, table.second.first, files, spaceFilter(spaces, table.second.second)); + kvstore, backupName, table.second.first, files, spaceFilter(spaceIds, table.second.second)); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { return result; } LOG(INFO) << table.first << " table backup succeeded"; } - if (spaceNames == nullptr) { + // backup system tables if backup all spaces + bool allSpaces = spaceNames == nullptr || spaceNames->empty(); + if (allSpaces) { auto sysTables = MetaKeyUtils::getSystemTableMaps(); for (const auto& table : sysTables) { if (!table.second.second) { @@ -252,6 +256,7 @@ ErrorOr> MetaServiceUtils::bac } } + // backup system info tables auto sysInfos = MetaKeyUtils::getSystemInfoMaps(); for (const auto& table : sysInfos) { if (!table.second.second) { @@ -265,8 +270,9 @@ ErrorOr> MetaServiceUtils::bac LOG(INFO) << table.first << " table backup succeeded"; } - // The mapping of space name and space id needs to be handled separately. - auto ret = backupIndex(kvstore, spaces, backupName, spaceNames); + // backup the mapping of space name and space id separately, + // which skipped in space relative tables + auto ret = backupIndex(kvstore, spaceIds, backupName, spaceNames); if (!ok(ret)) { auto result = error(ret); if (result == nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE) { diff --git a/src/meta/MetaServiceUtils.h b/src/meta/MetaServiceUtils.h index f6e9d2e17f7..e70091e27eb 100644 --- a/src/meta/MetaServiceUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -43,11 +43,11 @@ class MetaServiceUtils final { const std::unordered_set& spaces, std::function parseSpace); - static ErrorOr> backupSpaces( + static ErrorOr> backupTables( kvstore::KVStore* kvstore, - const std::unordered_set& spaces, + const std::unordered_set& spaceIds, const std::string& backupName, - const std::vector* spaceName); + const std::vector* spaceNames); }; } // namespace meta diff --git a/src/meta/processors/BaseProcessor-inl.h b/src/meta/processors/BaseProcessor-inl.h index 04bcb80b232..2851ac50e28 100644 --- a/src/meta/processors/BaseProcessor-inl.h +++ b/src/meta/processors/BaseProcessor-inl.h @@ -116,27 +116,6 @@ ErrorOr> BaseProcessor:: return values; } -template -ErrorOr> BaseProcessor::allHosts() { - std::vector hosts; - const auto& prefix = MetaKeyUtils::hostPrefix(); - std::unique_ptr iter; - auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - VLOG(2) << "Can't find any hosts"; - return code; - } - - while (iter->valid()) { - HostAddr h; - auto hostAddrPiece = iter->key().subpiece(prefix.size()); - memcpy(&h, hostAddrPiece.data(), hostAddrPiece.size()); - hosts.emplace_back(std::move(h)); - iter->next(); - } - return hosts; -} - template ErrorOr BaseProcessor::autoIncrementId() { folly::SharedMutex::WriteHolder holder(LockUtils::idLock()); diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index d09f4d73327..a8bd52d71e3 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -149,11 +149,6 @@ class BaseProcessor { **/ void doMultiRemove(std::vector keys); - /** - * Get all hosts - * */ - ErrorOr> allHosts(); - /** * Get one auto-increment Id. * */ diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index 61fe5cf001e..e2915c8be27 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -618,18 +618,19 @@ folly::Future AdminClient::getLeaderDist(HostLeaderMap* result) { return future; } -folly::Future> AdminClient::createSnapshot(GraphSpaceID spaceId, - const std::string& name, - const HostAddr& host) { - folly::Promise> pro; +folly::Future> AdminClient::createSnapshot( + const std::set& spaceIds, const std::string& name, const HostAddr& host) { + folly::Promise> pro; auto f = pro.getFuture(); auto* evb = ioThreadPool_->getEventBase(); auto storageHost = Utils::getAdminAddrFromStoreAddr(host); - folly::via(evb, [evb, storageHost, host, pro = std::move(pro), spaceId, name, this]() mutable { + folly::via(evb, [evb, storageHost, host, pro = std::move(pro), spaceIds, name, this]() mutable { auto client = clientsMan_->client(storageHost, evb); storage::cpp2::CreateCPRequest req; - req.set_space_id(spaceId); + std::vector idList; + idList.insert(idList.end(), spaceIds.begin(), spaceIds.end()); + req.set_space_ids(idList); req.set_name(name); client->future_createCheckpoint(std::move(req)) .via(evb) @@ -644,10 +645,10 @@ folly::Future> AdminClient::createSnapshot(GraphSpace auto&& resp = std::move(t).value(); auto&& result = resp.get_result(); if (result.get_failed_parts().empty()) { - cpp2::BackupInfo backupInfo; - backupInfo.set_host(host); - backupInfo.set_info(std::move(resp.get_info())); - p.setValue(std::move(backupInfo)); + cpp2::HostBackupInfo hostBackupInfo; + hostBackupInfo.set_host(host); + hostBackupInfo.set_checkpoints(std::move(resp.get_info())); + p.setValue(std::move(hostBackupInfo)); return; } p.setValue(Status::Error("create checkpoint failed")); @@ -657,11 +658,13 @@ folly::Future> AdminClient::createSnapshot(GraphSpace return f; } -folly::Future AdminClient::dropSnapshot(GraphSpaceID spaceId, +folly::Future AdminClient::dropSnapshot(const std::set& spaceIds, const std::string& name, const HostAddr& host) { storage::cpp2::DropCPRequest req; - req.set_space_id(spaceId); + std::vector idList; + idList.insert(idList.end(), spaceIds.begin(), spaceIds.end()); + req.set_space_ids(idList); req.set_name(name); folly::Promise pro; auto f = pro.getFuture(); @@ -676,11 +679,13 @@ folly::Future AdminClient::dropSnapshot(GraphSpaceID spaceId, return f; } -folly::Future AdminClient::blockingWrites(GraphSpaceID spaceId, +folly::Future AdminClient::blockingWrites(const std::set& spaceIds, storage::cpp2::EngineSignType sign, const HostAddr& host) { storage::cpp2::BlockingSignRequest req; - req.set_space_id(spaceId); + std::vector idList; + idList.insert(idList.end(), spaceIds.begin(), spaceIds.end()); + req.set_space_ids(idList); req.set_sign(sign); folly::Promise pro; auto f = pro.getFuture(); @@ -783,37 +788,5 @@ folly::Future AdminClient::stopTask(const std::vector& target, return f; } -folly::Future> AdminClient::listClusterInfo(const HostAddr& host) { - folly::Promise> pro; - auto f = pro.getFuture(); - - auto* evb = ioThreadPool_->getEventBase(); - auto storageHost = Utils::getAdminAddrFromStoreAddr(host); - folly::via(evb, [evb, storageHost, pro = std::move(pro), this]() mutable { - auto client = clientsMan_->client(storageHost, evb); - storage::cpp2::ListClusterInfoReq req; - client->future_listClusterInfo(std::move(req)) - .via(evb) - .then([p = std::move(pro), - storageHost](folly::Try&& t) mutable { - if (t.hasException()) { - LOG(ERROR) << folly::stringPrintf("RPC failure in AdminClient: %s", - t.exception().what().c_str()); - p.setValue(Status::Error("RPC failure in listClusterInfo")); - return; - } - auto&& resp = std::move(t).value(); - auto&& result = resp.get_result(); - if (result.get_failed_parts().empty()) { - p.setValue(resp.get_dir()); - return; - } - p.setValue(Status::Error("list clusterInfo failed")); - }); - }); - - return f; -} - } // namespace meta } // namespace nebula diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index e43394560e2..a3f7bfdb19a 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -81,15 +81,14 @@ class AdminClient { virtual folly::Future getLeaderDist(HostLeaderMap* result); - virtual folly::Future> createSnapshot(GraphSpaceID spaceId, - const std::string& name, - const HostAddr& host); + virtual folly::Future> createSnapshot( + const std::set& spaceIds, const std::string& name, const HostAddr& host); - virtual folly::Future dropSnapshot(GraphSpaceID spaceId, + virtual folly::Future dropSnapshot(const std::set& spaceIds, const std::string& name, const HostAddr& host); - virtual folly::Future blockingWrites(GraphSpaceID spaceId, + virtual folly::Future blockingWrites(const std::set& spaceIds, storage::cpp2::EngineSignType sign, const HostAddr& host); @@ -107,8 +106,6 @@ class AdminClient { int32_t jobId, int32_t taskId); - virtual folly::Future> listClusterInfo(const HostAddr& host); - private: template folly::Future getResponse(const HostAddr& host, diff --git a/src/meta/processors/admin/AgentHBProcessor.cpp b/src/meta/processors/admin/AgentHBProcessor.cpp new file mode 100644 index 00000000000..ddd19906421 --- /dev/null +++ b/src/meta/processors/admin/AgentHBProcessor.cpp @@ -0,0 +1,137 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "meta/processors/admin/AgentHBProcessor.h" + +#include "common/time/WallClock.h" +#include "meta/ActiveHostsMan.h" +#include "meta/KVBasedClusterIdMan.h" +#include "meta/MetaVersionMan.h" + +namespace nebula { +namespace meta { + +AgentHBCounters kAgentHBCounters; + +void AgentHBProcessor::onFinished() { + if (counters_) { + stats::StatsManager::addValue(counters_->numCalls_); + stats::StatsManager::addValue(counters_->latency_, this->duration_.elapsedInUSec()); + } + + Base::onFinished(); +} + +// Agent heartbeat register agent to meta and pull all services info in agent's host +void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { + HostAddr agentAddr((*req.host_ref()).host, (*req.host_ref()).port); + LOG(INFO) << "Receive heartbeat from " << agentAddr << ", role = AGENT"; + + nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; + do { + // update agent host info + HostInfo info( + time::WallClock::fastNowInMilliSec(), cpp2::HostRole::AGENT, req.get_git_info_sha()); + ret = ActiveHostsMan::updateHostInfo(kvstore_, agentAddr, info); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << folly::sformat("Put agent {} info failed: {}", + agentAddr.toString(), + apache::thrift::util::enumNameSafe(ret)); + break; + } + + // get services in the agent host + auto servicesRet = ActiveHostsMan::getServicesInHost(kvstore_, agentAddr.host); + if (!nebula::ok(servicesRet)) { + ret = nebula::error(servicesRet); + LOG(ERROR) << folly::sformat("Get active services for {} failed: {}", + agentAddr.host, + apache::thrift::util::enumNameSafe(ret)); + break; + } + + // get dir info for the services in agent host + std::unordered_map serviceDirinfo; + std::string hostDirHostPrefix = MetaKeyUtils::hostDirHostPrefix(agentAddr.host); + auto dirIterRet = doPrefix(hostDirHostPrefix); + if (!nebula::ok(dirIterRet)) { + ret = nebula::error(dirIterRet); + LOG(ERROR) << folly::sformat("Get host {} dir prefix iterator failed: {}", + agentAddr.host, + apache::thrift::util::enumNameSafe(ret)); + break; + } + auto dirIter = std::move(nebula::value(dirIterRet)); + for (; dirIter->valid(); dirIter->next()) { + HostAddr addr = MetaKeyUtils::parseHostDirKey(dirIter->key()); + nebula::cpp2::DirInfo dir = MetaKeyUtils::parseHostDir(dirIter->val()); + serviceDirinfo[addr] = dir; + } + + // join the service host info and dir info + auto services = std::move(nebula::value(servicesRet)); + std::vector serviceList; + for (const auto& [addr, role] : services) { + if (addr == agentAddr) { + // skip iteself + continue; + } + + if (role == cpp2::HostRole::AGENT) { + LOG(INFO) << folly::sformat("there is another agent: {} in the host", addr.toString()); + continue; + } + + auto it = serviceDirinfo.find(addr); + if (it == serviceDirinfo.end()) { + LOG(ERROR) << folly::sformat("{} dir info not found", addr.toString()); + break; + } + + cpp2::ServiceInfo serviceInfo; + serviceInfo.set_addr(addr); + serviceInfo.set_dir(it->second); + serviceInfo.set_role(role); + serviceList.emplace_back(serviceInfo); + } + if (serviceList.size() != services.size() - 1) { + ret = nebula::cpp2::ErrorCode::E_AGENT_HB_FAILUE; + // missing some services' dir info + LOG(ERROR) << folly::sformat( + "Missing some services's dir info, excepted service {}, but only got {}", + services.size() - 1, + serviceList.size()); + break; + } + + // add meta service if have, agent should get meta dir info in separate rpc + // because follower metad can't report it's dir info to the leader metad + auto partRet = kvstore_->part(kDefaultSpaceId, kDefaultPartId); + if (!nebula::ok(partRet)) { + ret = nebula::error(partRet); + LOG(ERROR) << "Get meta part store failed, error: " + << apache::thrift::util::enumNameSafe(ret); + return; + } + auto raftPeers = nebula::value(partRet)->peers(); + for (auto& raftAddr : raftPeers) { + auto metaAddr = Utils::getStoreAddrFromRaftAddr(raftAddr); + if (metaAddr.host == agentAddr.host) { + cpp2::ServiceInfo serviceInfo; + serviceInfo.set_addr(metaAddr); + serviceInfo.set_role(cpp2::HostRole::META); + serviceList.emplace_back(serviceInfo); + } + } + + resp_.set_service_list(serviceList); + } while (false); + + handleErrorCode(ret); + onFinished(); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/admin/AgentHBProcessor.h b/src/meta/processors/admin/AgentHBProcessor.h new file mode 100644 index 00000000000..0119bb730fb --- /dev/null +++ b/src/meta/processors/admin/AgentHBProcessor.h @@ -0,0 +1,57 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#pragma once + +#include + +#include "common/stats/StatsManager.h" +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +struct AgentHBCounters final { + stats::CounterId numCalls_; + stats::CounterId latency_; + + void init() { + if (!numCalls_.valid()) { + numCalls_ = stats::StatsManager::registerStats("num_agent_heartbeats", "rate, sum"); + latency_ = stats::StatsManager::registerHisto( + "agent_heartbeat_latency_us", 1000, 0, 20000, "avg, p75, p95, p99"); + VLOG(1) << "Succeeded in initializing the AgentHBCounters"; + } else { + VLOG(1) << "AgentHBCounters has been initialized"; + } + } +}; +extern AgentHBCounters kAgentHBCounters; + +class AgentHBProcessor : public BaseProcessor { + FRIEND_TEST(AgentHBProcessorTest, AgentHBTest); + + using Base = BaseProcessor; + + public: + static AgentHBProcessor* instance(kvstore::KVStore* kvstore, + const AgentHBCounters* counters = &kAgentHBCounters) { + return new AgentHBProcessor(kvstore, counters); + } + + void process(const cpp2::AgentHBReq& req); + + protected: + void onFinished() override; + + private: + explicit AgentHBProcessor(kvstore::KVStore* kvstore, const AgentHBCounters* counters) + : BaseProcessor(kvstore), counters_(counters) {} + + const AgentHBCounters* counters_{nullptr}; +}; + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/admin/CreateBackupProcessor.cpp b/src/meta/processors/admin/CreateBackupProcessor.cpp index e2454366a5e..1a343a53076 100644 --- a/src/meta/processors/admin/CreateBackupProcessor.cpp +++ b/src/meta/processors/admin/CreateBackupProcessor.cpp @@ -18,11 +18,10 @@ CreateBackupProcessor::spaceNameToId(const std::vector* backupSpace folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); std::unordered_set spaces; - if (backupSpaces != nullptr) { - DCHECK(!backupSpaces->empty()); + bool allSpaces = backupSpaces == nullptr || backupSpaces->empty(); + if (!allSpaces) { std::vector keys; keys.reserve(backupSpaces->size()); - std::transform( backupSpaces->begin(), backupSpaces->end(), std::back_inserter(keys), [](auto& name) { return MetaKeyUtils::indexSpaceKey(name); @@ -84,6 +83,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { } JobManager* jobMgr = JobManager::getInstance(); + // make sure there is no index job auto result = jobMgr->checkIndexJobRunning(); if (!nebula::ok(result)) { LOG(ERROR) << "get Index status failed, not allowed to create backup."; @@ -91,7 +91,6 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { onFinished(); return; } - if (nebula::value(result)) { LOG(ERROR) << "Index is rebuilding, not allowed to create backup."; handleErrorCode(nebula::cpp2::ErrorCode::E_BACKUP_BUILDING_INDEX); @@ -101,6 +100,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { folly::SharedMutex::WriteHolder wHolder(LockUtils::snapshotLock()); + // get active storage host list auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_); if (!nebula::ok(activeHostsRet)) { handleErrorCode(nebula::error(activeHostsRet)); @@ -108,7 +108,6 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { return; } auto hosts = std::move(nebula::value(activeHostsRet)); - if (hosts.empty()) { LOG(ERROR) << "There has some offline hosts"; handleErrorCode(nebula::cpp2::ErrorCode::E_NO_HOSTS); @@ -116,25 +115,32 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { return; } + // transform space names to id list auto spaceIdRet = spaceNameToId(backupSpaces); if (!nebula::ok(spaceIdRet)) { handleErrorCode(nebula::error(spaceIdRet)); onFinished(); return; } - auto spaces = nebula::value(spaceIdRet); // The entire process follows mostly snapshot logic. + // step 1 : write a flag key to handle backup failed std::vector data; auto backupName = folly::format("BACKUP_{}", MetaKeyUtils::genTimestampStr()).str(); - data.emplace_back( MetaKeyUtils::snapshotKey(backupName), MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, NetworkUtils::toHostsStr(hosts))); - Snapshot::instance(kvstore_, client_)->setSpaces(spaces); + auto putRet = doSyncPut(data); + if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Write backup meta error"; + handleErrorCode(putRet); + onFinished(); + return; + } - // step 1 : Blocking all writes action for storage engines. + Snapshot::instance(kvstore_, client_)->setSpaces(spaces); + // step 2 : Blocking all writes action for storage engines. auto ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_ON); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Send blocking sign to storage engine error"; @@ -150,7 +156,8 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { // step 3 : Create checkpoint for all storage engines. auto sret = Snapshot::instance(kvstore_, client_)->createSnapshot(backupName); if (!nebula::ok(sret)) { - LOG(ERROR) << "Checkpoint create error on storage engine"; + LOG(ERROR) << "Checkpoint create error on storage engine: " + << apache::thrift::util::enumNameSafe(nebula::error(sret)); handleErrorCode(nebula::error(sret)); ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -161,7 +168,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { } // step 4 created backup for meta(export sst). - auto backupFiles = MetaServiceUtils::backupSpaces(kvstore_, spaces, backupName, backupSpaces); + auto backupFiles = MetaServiceUtils::backupTables(kvstore_, spaces, backupName, backupSpaces); if (!nebula::ok(backupFiles)) { LOG(ERROR) << "Failed backup meta"; handleErrorCode(nebula::cpp2::ErrorCode::E_BACKUP_FAILED); @@ -178,12 +185,12 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { return; } - // step 6 : update snapshot status from INVALID to VALID. + // step 6 : update backup status from INVALID to VALID. + data.clear(); data.emplace_back( MetaKeyUtils::snapshotKey(backupName), MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::VALID, NetworkUtils::toHostsStr(hosts))); - - auto putRet = doSyncPut(std::move(data)); + putRet = doSyncPut(std::move(data)); if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "All checkpoint creations are done, " "but update checkpoint status error. " @@ -194,13 +201,20 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { return; } - std::unordered_map backupInfo; - - // set backup info + // space backup info: + // - space desc + // - backup info list: + // - backup info: + // - host + // - checkpoint info: + // - space id + // - parts + // - path + std::unordered_map backups; auto snapshotInfo = std::move(nebula::value(sret)); for (auto id : spaces) { LOG(INFO) << "backup space " << id; - cpp2::SpaceBackupInfo spaceInfo; + cpp2::SpaceBackupInfo spaceBackup; auto spaceKey = MetaKeyUtils::spaceKey(id); auto spaceRet = doGet(spaceKey); if (!nebula::ok(spaceRet)) { @@ -208,24 +222,27 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { onFinished(); return; } - auto properties = MetaKeyUtils::parseSpace(nebula::value(spaceRet)); + auto spaceDesc = MetaKeyUtils::parseSpace(nebula::value(spaceRet)); // todo we should save partition info. auto it = snapshotInfo.find(id); DCHECK(it != snapshotInfo.end()); - spaceInfo.set_info(std::move(it->second)); - spaceInfo.set_space(std::move(properties)); - backupInfo.emplace(id, std::move(spaceInfo)); + + spaceBackup.set_host_backups(std::move(it->second)); + spaceBackup.set_space(std::move(spaceDesc)); + backups.emplace(id, std::move(spaceBackup)); } + cpp2::BackupMeta backup; LOG(INFO) << "sst files count was:" << nebula::value(backupFiles).size(); backup.set_meta_files(std::move(nebula::value(backupFiles))); - backup.set_backup_info(std::move(backupInfo)); + backup.set_space_backups(std::move(backups)); backup.set_backup_name(std::move(backupName)); backup.set_full(true); - if (backupSpaces == nullptr) { - backup.set_include_system_space(true); + bool allSpaces = backupSpaces == nullptr || backupSpaces->empty(); + if (allSpaces) { + backup.set_all_spaces(true); } else { - backup.set_include_system_space(false); + backup.set_all_spaces(false); } backup.set_create_time(time::WallClock::fastNowInMilliSec()); diff --git a/src/meta/processors/admin/DropSnapshotProcessor.cpp b/src/meta/processors/admin/DropSnapshotProcessor.cpp index a4d05a4d4b3..140ae8c4bfe 100644 --- a/src/meta/processors/admin/DropSnapshotProcessor.cpp +++ b/src/meta/processors/admin/DropSnapshotProcessor.cpp @@ -17,9 +17,14 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { // Check snapshot is exists auto key = MetaKeyUtils::snapshotKey(snapshot); - auto ret = doGet(std::move(key)); + auto ret = doGet(key); if (!nebula::ok(ret)) { auto retCode = nebula::error(ret); + if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { + LOG(INFO) << "Snapshot " << snapshot << " does not exist or already dropped."; + onFinished(); + return; + } LOG(ERROR) << "Get snapshot " << snapshot << " failed, error " << apache::thrift::util::enumNameSafe(retCode); handleErrorCode(retCode); @@ -76,7 +81,7 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { return; } // Delete metadata of checkpoint - doRemove(MetaKeyUtils::snapshotKey(snapshot)); + doRemove(key); LOG(INFO) << "Drop snapshot " << snapshot << " successfully"; } diff --git a/src/meta/processors/admin/GetMetaDirInfoProcessor.cpp b/src/meta/processors/admin/GetMetaDirInfoProcessor.cpp index cc16d2679b5..79692bf6753 100644 --- a/src/meta/processors/admin/GetMetaDirInfoProcessor.cpp +++ b/src/meta/processors/admin/GetMetaDirInfoProcessor.cpp @@ -15,33 +15,9 @@ namespace meta { void GetMetaDirInfoProcessor::process(const cpp2::GetMetaDirInfoReq& req) { UNUSED(req); - auto data_root = kvstore_->getDataRoot(); - - std::vector realpaths; - bool failed = false; - std::transform(std::make_move_iterator(data_root.begin()), - std::make_move_iterator(data_root.end()), - std::back_inserter(realpaths), - [&failed](auto f) { - if (f[0] == '/') { - return f; - } else { - auto result = nebula::fs::FileUtils::realPath(f.c_str()); - if (!result.ok()) { - failed = true; - LOG(ERROR) << "Failed to get the absolute path of file: " << f; - return f; - } - return std::string(result.value()); - } - }); - if (failed) { - resp_.set_code(nebula::cpp2::ErrorCode::E_GET_META_DIR_FAILURE); - onFinished(); - return; - } + auto datapaths = kvstore_->getDataRoot(); nebula::cpp2::DirInfo dir; - dir.set_data(realpaths); + dir.set_data(datapaths); dir.set_root(boost::filesystem::current_path().string()); resp_.set_dir(std::move(dir)); diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index ef2e05c5e6c..562ab4ec025 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -41,6 +41,7 @@ void HBProcessor::process(const cpp2::HBReq& req) { return; } + // set or check storaged's cluster id ClusterID peerClusterId = req.get_cluster_id(); if (peerClusterId == 0) { LOG(INFO) << "Set clusterId for new host " << host << "!"; @@ -52,6 +53,7 @@ void HBProcessor::process(const cpp2::HBReq& req) { return; } + // set disk parts map if (req.disk_parts_ref().has_value()) { for (const auto& [spaceId, partDiskMap] : *req.get_disk_parts()) { for (const auto& [path, partList] : partDiskMap) { @@ -74,19 +76,34 @@ void HBProcessor::process(const cpp2::HBReq& req) { } } + // update host info HostInfo info(time::WallClock::fastNowInMilliSec(), role, req.get_git_info_sha()); if (req.leader_partIds_ref().has_value()) { ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info, &*req.leader_partIds_ref()); } else { ret = ActiveHostsMan::updateHostInfo(kvstore_, host, info); } - if (ret == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - auto leaderRet = kvstore_->partLeader(kDefaultSpaceId, kDefaultPartId); - if (nebula::ok(leaderRet)) { - resp_.set_leader(toThriftHost(nebula::value(leaderRet))); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleErrorCode(ret); + onFinished(); + return; + } + + // update host dir info + if (req.get_role() == cpp2::HostRole::STORAGE || req.get_role() == cpp2::HostRole::GRAPH) { + if (req.dir_ref().has_value()) { + std::vector data; + LOG(INFO) << folly::sformat("Update host {} dir info, root path: {}, data path size: {}", + host.toString(), + req.get_dir()->get_root(), + req.get_dir()->get_data().size()); + data.emplace_back(std::make_pair(MetaKeyUtils::hostDirKey(host.host, host.port), + MetaKeyUtils::hostDirVal(*req.get_dir()))); + ret = doSyncPut(data); } } + // set update time and meta version auto lastUpdateTimeRet = LastUpdateTimeMan::get(kvstore_); if (nebula::ok(lastUpdateTimeRet)) { resp_.set_last_update_time_in_ms(nebula::value(lastUpdateTimeRet)); diff --git a/src/meta/processors/admin/ListClusterInfoProcessor.cpp b/src/meta/processors/admin/ListClusterInfoProcessor.cpp index 3776ef2cd7b..0dd623a859b 100644 --- a/src/meta/processors/admin/ListClusterInfoProcessor.cpp +++ b/src/meta/processors/admin/ListClusterInfoProcessor.cpp @@ -22,52 +22,96 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) { return; } - const auto& prefix = MetaKeyUtils::hostPrefix(); - std::unique_ptr iter; - auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter, true); - if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "prefix failed:" << apache::thrift::util::enumNameSafe(ret); + // services(include agent service) group by ip/hostname + std::unordered_map> hostServices; + + // non-meta services, may include inactive services + const auto& hostPrefix = MetaKeyUtils::hostPrefix(); + auto iterRet = doPrefix(hostPrefix); + if (!nebula::ok(iterRet)) { + LOG(ERROR) << "get host prefix failed:" + << apache::thrift::util::enumNameSafe(nebula::error(iterRet)); handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE); onFinished(); return; } - - std::vector storages; - while (iter->valid()) { - auto host = MetaKeyUtils::parseHostKey(iter->key()); + auto iter = nebula::value(iterRet).get(); + for (; iter->valid(); iter->next()) { + HostAddr addr = MetaKeyUtils::parseHostKey(iter->key()); HostInfo info = HostInfo::decode(iter->val()); - if (info.role_ != cpp2::HostRole::STORAGE) { - iter->next(); - continue; - } + cpp2::ServiceInfo service; + service.set_role(info.role_); + service.set_addr(addr); - auto status = client_->listClusterInfo(host).get(); - if (!status.ok()) { - LOG(ERROR) << "listcluster info from storage failed, host: " << host; - handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE); - onFinished(); - return; + // fill the dir info + if (info.role_ == meta::cpp2::HostRole::GRAPH || info.role_ == meta::cpp2::HostRole::STORAGE) { + auto dirKey = MetaKeyUtils::hostDirKey(addr.host, addr.port); + auto dirRet = doGet(dirKey); + if (!nebula::ok(dirRet)) { + LOG(ERROR) << folly::sformat("Get host {} dir info for {} failed: {}", + addr.toString(), + apache::thrift::util::enumNameSafe(info.role_), + apache::thrift::util::enumNameSafe(nebula::error(dirRet))); + handleErrorCode(nebula::error(dirRet)); + onFinished(); + return; + } + auto dir = MetaKeyUtils::parseHostDir(std::move(nebula::value(dirRet))); + service.set_dir(std::move(dir)); } - storages.emplace_back(apache::thrift::FragileConstructor(), std::move(host), status.value()); - iter->next(); + if (hostServices.find(addr.host) == hostServices.end()) { + hostServices[addr.host] = std::vector(); + } + hostServices[addr.host].emplace_back(std::move(service)); } - auto* pm = store->partManager(); - auto* mpm = dynamic_cast(pm); - if (mpm == nullptr) { - resp_.set_code(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE); + // meta service + auto partRet = kvstore_->part(kDefaultSpaceId, kDefaultPartId); + if (!nebula::ok(partRet)) { + auto code = nebula::error(partRet); + LOG(ERROR) << "get meta part store failed, error: " << apache::thrift::util::enumNameSafe(code); + handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE); onFinished(); return; } - auto& map = mpm->partsMap(); - auto hosts = map[kDefaultSpaceId][kDefaultPartId].hosts_; - LOG(INFO) << "meta servers count: " << hosts.size(); - resp_.set_meta_servers(std::move(hosts)); + auto raftPeers = nebula::value(partRet)->peers(); + for (auto& raftAddr : raftPeers) { + auto metaAddr = Utils::getStoreAddrFromRaftAddr(raftAddr); + cpp2::ServiceInfo service; + service.set_role(cpp2::HostRole::META); + service.set_addr(metaAddr); + + if (hostServices.find(metaAddr.host) == hostServices.end()) { + hostServices[metaAddr.host] = std::vector(); + } + hostServices[metaAddr.host].push_back(service); + } + + // check: there should be only one agent in each host + for (const auto& [host, services] : hostServices) { + int agentCount = 0; + for (auto& s : services) { + if (s.get_role() == cpp2::HostRole::AGENT) { + agentCount++; + } + } + if (agentCount != 1) { + LOG(ERROR) << folly::sformat("There are {} agent count is host {}", agentCount, host); + handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE); + onFinished(); + return; + } + + if (services.size() <= 1) { + LOG(ERROR) << "There is no other service than agent in host: " << host; + continue; + } + } + resp_.set_host_services(hostServices); resp_.set_code(nebula::cpp2::ErrorCode::SUCCEEDED); - resp_.set_storage_servers(std::move(storages)); onFinished(); } } // namespace meta diff --git a/src/meta/processors/admin/ListClusterInfoProcessor.h b/src/meta/processors/admin/ListClusterInfoProcessor.h index 1b754c192de..a62b098905d 100644 --- a/src/meta/processors/admin/ListClusterInfoProcessor.h +++ b/src/meta/processors/admin/ListClusterInfoProcessor.h @@ -14,15 +14,14 @@ namespace meta { class ListClusterInfoProcessor : public BaseProcessor { public: - static ListClusterInfoProcessor* instance(kvstore::KVStore* kvstore, AdminClient* client) { - return new ListClusterInfoProcessor(kvstore, client); + static ListClusterInfoProcessor* instance(kvstore::KVStore* kvstore) { + return new ListClusterInfoProcessor(kvstore); } void process(const cpp2::ListClusterInfoReq& req); private: - explicit ListClusterInfoProcessor(kvstore::KVStore* kvstore, AdminClient* client) - : BaseProcessor(kvstore), client_(client) {} - AdminClient* client_; + explicit ListClusterInfoProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} }; } // namespace meta } // namespace nebula diff --git a/src/meta/processors/admin/SnapShot.cpp b/src/meta/processors/admin/SnapShot.cpp index 3c0c90f8872..733b9cae007 100644 --- a/src/meta/processors/admin/SnapShot.cpp +++ b/src/meta/processors/admin/SnapShot.cpp @@ -15,11 +15,12 @@ namespace nebula { namespace meta { -ErrorOr>> +ErrorOr>> Snapshot::createSnapshot(const std::string& name) { - auto retSpacesHostsRet = getSpacesHosts(); - if (!nebula::ok(retSpacesHostsRet)) { - auto retcode = nebula::error(retSpacesHostsRet); + auto hostSpacesRet = getHostSpaces(); + if (!nebula::ok(hostSpacesRet)) { + auto retcode = nebula::error(hostSpacesRet); if (retcode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { retcode = nebula::cpp2::ErrorCode::E_STORE_FAILURE; } @@ -27,21 +28,37 @@ Snapshot::createSnapshot(const std::string& name) { } // This structure is used for the subsequent construction of the // common.PartitionBackupInfo - std::unordered_map> info; - - auto spacesHosts = nebula::value(retSpacesHostsRet); - for (const auto& spaceHosts : spacesHosts) { - for (const auto& host : spaceHosts.second) { - auto status = client_->createSnapshot(spaceHosts.first, name, host).get(); - if (!status.ok()) { - return nebula::cpp2::ErrorCode::E_RPC_FAILURE; + std::unordered_map> info; + + auto hostSpaces = nebula::value(hostSpacesRet); + for (auto const& [host, spaces] : hostSpaces) { + auto snapshotRet = client_->createSnapshot(spaces, name, host).get(); + if (!snapshotRet.ok()) { + return nebula::cpp2::ErrorCode::E_RPC_FAILURE; + } + auto backupInfo = snapshotRet.value(); + + // split backup info by space id + std::unordered_map spaceBackup; + for (auto& ck : backupInfo.get_checkpoints()) { + auto it = spaceBackup.find(ck.get_space_id()); + if (it == spaceBackup.cend()) { + cpp2::HostBackupInfo hostBackup; + hostBackup.set_host(host); + hostBackup.checkpoints_ref().value().push_back(ck); + spaceBackup[ck.get_space_id()] = std::move(hostBackup); + } else { + it->second.checkpoints_ref().value().push_back(ck); } - auto backupInfo = status.value(); - auto it = info.find(spaceHosts.first); - if (it != info.cend()) { - it->second.emplace_back(backupInfo); + } + + // insert to global result + for (auto& [spaceId, binfo] : spaceBackup) { + auto it = info.find(spaceId); + if (it == info.cend()) { + info[spaceId] = {std::move(binfo)}; } else { - info[spaceHosts.first] = {std::move(backupInfo)}; + it->second.emplace_back(binfo); } } } @@ -50,62 +67,61 @@ Snapshot::createSnapshot(const std::string& name) { nebula::cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name, const std::vector& hosts) { - auto retSpacesHostsRet = getSpacesHosts(); - if (!nebula::ok(retSpacesHostsRet)) { - auto retcode = nebula::error(retSpacesHostsRet); + auto hostSpacesRet = getHostSpaces(); + if (!nebula::ok(hostSpacesRet)) { + auto retcode = nebula::error(hostSpacesRet); if (retcode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { retcode = nebula::cpp2::ErrorCode::E_STORE_FAILURE; } return retcode; } - auto spacesHosts = nebula::value(retSpacesHostsRet); - for (const auto& spaceHosts : spacesHosts) { - for (const auto& host : spaceHosts.second) { - if (std::find(hosts.begin(), hosts.end(), host) != hosts.end()) { - auto status = client_->dropSnapshot(spaceHosts.first, name, host).get(); - if (!status.ok()) { - auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s"; - auto error = folly::stringPrintf( - msg, name.c_str(), host.toString().c_str(), status.toString().c_str()); - LOG(ERROR) << error; - } - } + auto hostSpaces = nebula::value(hostSpacesRet); + for (const auto& [host, spaces] : hostSpaces) { + if (std::find(hosts.begin(), hosts.end(), host) == hosts.end()) { + continue; + } + + auto status = client_->dropSnapshot(spaces, name, host).get(); + if (!status.ok()) { + auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s"; + auto error = folly::stringPrintf( + msg, name.c_str(), host.toString().c_str(), status.toString().c_str()); + LOG(ERROR) << error; } } return nebula::cpp2::ErrorCode::SUCCEEDED; } nebula::cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType sign) { - auto retSpacesHostsRet = getSpacesHosts(); - if (!nebula::ok(retSpacesHostsRet)) { - auto retcode = nebula::error(retSpacesHostsRet); + auto hostSpacesRet = getHostSpaces(); + if (!nebula::ok(hostSpacesRet)) { + auto retcode = nebula::error(hostSpacesRet); if (retcode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { retcode = nebula::cpp2::ErrorCode::E_STORE_FAILURE; } return retcode; } - auto spacesHosts = nebula::value(retSpacesHostsRet); + auto hostSpaces = nebula::value(hostSpacesRet); auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; - for (const auto& spaceHosts : spacesHosts) { - for (const auto& host : spaceHosts.second) { - LOG(INFO) << "will block write host: " << host; - auto status = client_->blockingWrites(spaceHosts.first, sign, host).get(); - if (!status.ok()) { - LOG(ERROR) << "Send blocking sign error on host : " << host; - ret = nebula::cpp2::ErrorCode::E_BLOCK_WRITE_FAILURE; - if (sign == storage::cpp2::EngineSignType::BLOCK_ON) { - break; - } + for (const auto& [host, spaces] : hostSpaces) { + LOG(INFO) << "will block write host: " << host; + auto status = client_->blockingWrites(spaces, sign, host).get(); + if (!status.ok()) { + LOG(ERROR) << "Send blocking sign error on host " << host + << ", errorcode: " << status.message(); + ret = nebula::cpp2::ErrorCode::E_BLOCK_WRITE_FAILURE; + if (sign == storage::cpp2::EngineSignType::BLOCK_ON) { + break; } } } return ret; } -ErrorOr>> -Snapshot::getSpacesHosts() { +ErrorOr>> +Snapshot::getHostSpaces() { folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); const auto& prefix = MetaKeyUtils::partPrefix(); std::unique_ptr iter; @@ -116,23 +132,24 @@ Snapshot::getSpacesHosts() { return retCode; } - std::map> hostsByspaces; - + std::map> hostSpaces; for (; iter->valid(); iter->next()) { auto partHosts = MetaKeyUtils::parsePartVal(iter->val()); - auto space = MetaKeyUtils::parsePartKeySpaceId(iter->key()); - if (!spaces_.empty()) { - auto it = spaces_.find(space); + auto spaceId = MetaKeyUtils::parsePartKeySpaceId(iter->key()); + + bool allSpaces = spaces_.empty(); + if (!allSpaces) { + auto it = spaces_.find(spaceId); if (it == spaces_.end()) { continue; } } - for (auto& ph : partHosts) { - hostsByspaces[space].emplace(std::move(ph)); + for (auto& host : partHosts) { + hostSpaces[host].emplace(spaceId); } } - return hostsByspaces; + return hostSpaces; } } // namespace meta diff --git a/src/meta/processors/admin/SnapShot.h b/src/meta/processors/admin/SnapShot.h index 82e9ef5bf47..b8147a2a0ed 100644 --- a/src/meta/processors/admin/SnapShot.h +++ b/src/meta/processors/admin/SnapShot.h @@ -28,7 +28,8 @@ class Snapshot { inline void setSpaces(std::unordered_set spaces) { spaces_ = std::move(spaces); } - ErrorOr>> + ErrorOr>> createSnapshot(const std::string& name); nebula::cpp2::ErrorCode dropSnapshot(const std::string& name, const std::vector& hosts); @@ -40,7 +41,7 @@ class Snapshot { executor_.reset(new folly::CPUThreadPoolExecutor(1)); } - ErrorOr>> getSpacesHosts(); + ErrorOr>> getHostSpaces(); private: kvstore::KVStore* kv_{nullptr}; diff --git a/src/meta/test/AdminClientTest.cpp b/src/meta/test/AdminClientTest.cpp index 3a4ac07c134..b1259f33279 100644 --- a/src/meta/test/AdminClientTest.cpp +++ b/src/meta/test/AdminClientTest.cpp @@ -318,24 +318,28 @@ TEST(AdminClientTest, SnapshotTest) { auto client = std::make_unique(kv.get()); { LOG(INFO) << "Test Blocking Writes On..."; + std::set ids{1}; auto status = - client->blockingWrites(1, storage::cpp2::EngineSignType::BLOCK_ON, storageHost).get(); + client->blockingWrites(ids, storage::cpp2::EngineSignType::BLOCK_ON, storageHost).get(); ASSERT_TRUE(status.ok()); } { LOG(INFO) << "Test Create Snapshot..."; - auto status = client->createSnapshot(1, "test_snapshot", storageHost).get(); + std::set ids{1}; + auto status = client->createSnapshot(ids, "test_snapshot", storageHost).get(); ASSERT_TRUE(status.ok()); } { LOG(INFO) << "Test Drop Snapshot..."; - auto status = client->dropSnapshot(1, "test_snapshot", storageHost).get(); + std::set ids{1}; + auto status = client->dropSnapshot(ids, "test_snapshot", storageHost).get(); ASSERT_TRUE(status.ok()); } { LOG(INFO) << "Test Blocking Writes Off..."; + std::set ids{1}; auto status = - client->blockingWrites(1, storage::cpp2::EngineSignType::BLOCK_OFF, storageHost).get(); + client->blockingWrites(ids, storage::cpp2::EngineSignType::BLOCK_OFF, storageHost).get(); ASSERT_TRUE(status.ok()); } } diff --git a/src/meta/test/AgentHBProcessorTest.cpp b/src/meta/test/AgentHBProcessorTest.cpp new file mode 100644 index 00000000000..edda9d32521 --- /dev/null +++ b/src/meta/test/AgentHBProcessorTest.cpp @@ -0,0 +1,97 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include + +#include "common/base/Base.h" +#include "common/fs/TempDir.h" +#include "common/utils/MetaKeyUtils.h" +#include "meta/processors/admin/AgentHBProcessor.h" +#include "meta/test/TestUtils.h" + +namespace nebula { +namespace meta { + +TEST(AgentHBProcessorTest, AgentHBTest) { + fs::TempDir rootPath("/tmp/AgentHBTest.XXXXXX"); + std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path())); + + // mock 5 storage service in 5 hosts + { + // register storage machines so their heartbeat can be accepted + std::vector machines; + for (auto i = 0; i < 5; i++) { + machines.emplace_back(nebula::MetaKeyUtils::machineKey(std::to_string(i), i), ""); + } + folly::Baton baton; + kv->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(machines), [&](auto) { baton.post(); }); + baton.wait(); + + // mock one heartbeat for each storage service + const ClusterID kClusterId = 10; + for (auto i = 0; i < 5; i++) { + cpp2::HBReq req; + req.set_host(HostAddr(std::to_string(i), i)); + req.set_cluster_id(kClusterId); + req.set_role(cpp2::HostRole::STORAGE); + nebula::cpp2::DirInfo dir; + dir.set_root("/tmp/nebula"); + std::vector ds; + ds.push_back("/tmp/nebula/data"); + dir.set_data(ds); + req.set_dir(dir); + + auto* processor = HBProcessor::instance(kv.get(), nullptr, kClusterId); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + } + } + + // mock an agent in each host + for (auto i = 0; i < 5; i++) { + cpp2::AgentHBReq req; + req.set_host(HostAddr(std::to_string(i), 10 + i)); + auto* processor = AgentHBProcessor::instance(kv.get(), nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + ASSERT_EQ(1, resp.get_service_list().size()); + + auto s = resp.get_service_list()[0]; + ASSERT_EQ(cpp2::HostRole::STORAGE, s.get_role()); + ASSERT_EQ(HostAddr(std::to_string(i), i), s.get_addr()); + ASSERT_EQ(1, s.get_dir().get_data().size()); + } + + auto hostsRet = ActiveHostsMan::getActiveHosts(kv.get(), 1, cpp2::HostRole::STORAGE); + ASSERT_TRUE(nebula::ok(hostsRet)); + ASSERT_EQ(5, nebula::value(hostsRet).size()); + + hostsRet = ActiveHostsMan::getActiveHosts(kv.get(), 1, cpp2::HostRole::AGENT); + ASSERT_TRUE(nebula::ok(hostsRet)); + ASSERT_EQ(5, nebula::value(hostsRet).size()); + + sleep(3); + hostsRet = ActiveHostsMan::getActiveHosts(kv.get(), 1, cpp2::HostRole::STORAGE); + ASSERT_TRUE(nebula::ok(hostsRet)); + ASSERT_EQ(0, nebula::value(hostsRet).size()); + hostsRet = ActiveHostsMan::getActiveHosts(kv.get(), 1, cpp2::HostRole::AGENT); + ASSERT_TRUE(nebula::ok(hostsRet)); + ASSERT_EQ(0, nebula::value(hostsRet).size()); +} + +} // namespace meta +} // namespace nebula + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 83c2851bff8..958cba2a3b9 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -47,6 +47,21 @@ nebula_add_test( gtest ) +nebula_add_test( + NAME + agent_hb_processor_test + SOURCES + AgentHBProcessorTest.cpp + OBJECTS + ${meta_test_deps} + LIBRARIES + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + ${PROXYGEN_LIBRARIES} + wangle + gtest +) + nebula_add_test( NAME meta_client_test diff --git a/src/meta/test/CreateBackupProcessorTest.cpp b/src/meta/test/CreateBackupProcessorTest.cpp index b06341894d3..4da7ded6132 100644 --- a/src/meta/test/CreateBackupProcessorTest.cpp +++ b/src/meta/test/CreateBackupProcessorTest.cpp @@ -47,18 +47,17 @@ class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf { storage::cpp2::CreateCPResp resp; storage::cpp2::ResponseCommon result; std::vector partRetCode; - nebula::cpp2::PartitionBackupInfo partitionInfo; std::unordered_map info; nebula::cpp2::LogInfo logInfo; logInfo.set_log_id(logId); logInfo.set_term_id(termId); info.emplace(1, std::move(logInfo)); - partitionInfo.set_info(std::move(info)); result.set_failed_parts(partRetCode); resp.set_result(result); nebula::cpp2::CheckpointInfo cpInfo; cpInfo.set_path("snapshot_path"); - cpInfo.set_partition_info(std::move(partitionInfo)); + cpInfo.set_parts(std::move(info)); + cpInfo.set_space_id(req.get_space_ids()[0]); resp.set_info({cpInfo}); pro.setValue(std::move(resp)); return f; @@ -81,32 +80,24 @@ TEST(ProcessorTest, CreateBackupTest) { rpcServer->start("storage-admin", 0, handler); LOG(INFO) << "Start storage server on " << rpcServer->port_; - std::string localIp("127.0.0.1"); - LOG(INFO) << "Now test interfaces with retry to leader!"; - fs::TempDir rootPath("/tmp/create_backup_test.XXXXXX"); std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path())); + // register machines std::vector machines; + std::string localIp("127.0.0.1"); machines.emplace_back(nebula::MetaKeyUtils::machineKey(localIp, rpcServer->port_), ""); - folly::Baton b; kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(machines), [&](auto) { b.post(); }); b.wait(); + // resgister active hosts, same with heartbeat auto now = time::WallClock::fastNowInMilliSec(); HostAddr host(localIp, rpcServer->port_); ActiveHostsMan::updateHostInfo(kv.get(), host, HostInfo(now, meta::cpp2::HostRole::STORAGE, "")); - HostAddr storageHost = Utils::getStoreAddrFromAdminAddr(host); - - auto client = std::make_unique(kv.get()); - std::vector hosts; - hosts.emplace_back(host); - meta::TestUtils::registerHB(kv.get(), hosts); - - // mock admin client + // mock space 1: test_space bool ret = false; cpp2::SpaceDesc properties; GraphSpaceID id = 1; @@ -119,6 +110,7 @@ TEST(ProcessorTest, CreateBackupTest) { std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); data.emplace_back(MetaKeyUtils::spaceKey(id), MetaKeyUtils::spaceVal(properties)); + // mock space 2: test_space2 cpp2::SpaceDesc properties2; GraphSpaceID id2 = 2; properties2.set_space_name("test_space2"); @@ -126,12 +118,12 @@ TEST(ProcessorTest, CreateBackupTest) { properties2.set_replica_factor(1); spaceVal = MetaKeyUtils::spaceVal(properties2); data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space2"), - std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); + std::string(reinterpret_cast(&id2), sizeof(GraphSpaceID))); data.emplace_back(MetaKeyUtils::spaceKey(id2), MetaKeyUtils::spaceVal(properties2)); + // mock index data std::string indexName = "test_space_index"; int32_t tagIndex = 2; - cpp2::IndexItem item; item.set_index_id(tagIndex); item.set_index_name(indexName); @@ -145,9 +137,10 @@ TEST(ProcessorTest, CreateBackupTest) { std::string(reinterpret_cast(&tagIndex), sizeof(IndexID))); data.emplace_back(MetaKeyUtils::indexKey(id, tagIndex), MetaKeyUtils::indexVal(item)); + // mock partition data std::vector allHosts; + HostAddr storageHost = Utils::getStoreAddrFromAdminAddr(host); allHosts.emplace_back(storageHost); - for (auto partId = 1; partId <= 1; partId++) { std::vector hosts2; size_t idx = partId; @@ -164,6 +157,7 @@ TEST(ProcessorTest, CreateBackupTest) { }); baton.wait(); + auto client = std::make_unique(kv.get()); { cpp2::CreateBackupReq req; std::vector spaces = {"test_space"}; @@ -204,19 +198,19 @@ TEST(ProcessorTest, CreateBackupTest) { }); ASSERT_EQ(it, metaFiles.cend()); - ASSERT_EQ(1, meta.get_backup_info().size()); - for (auto s : meta.get_backup_info()) { - ASSERT_EQ(1, s.first); - ASSERT_EQ(1, s.second.get_info().size()); - ASSERT_EQ(1, s.second.get_info()[0].get_info().size()); + ASSERT_EQ(1, meta.get_space_backups().size()); + for (auto s : meta.get_space_backups()) { + auto spaceBackup = s.second; + ASSERT_EQ(1, spaceBackup.get_host_backups().size()); + ASSERT_EQ(1, spaceBackup.get_host_backups()[0].get_checkpoints().size()); - auto checkInfo = s.second.get_info()[0].get_info()[0]; + auto checkInfo = spaceBackup.get_host_backups()[0].get_checkpoints()[0]; ASSERT_EQ("snapshot_path", checkInfo.get_path()); ASSERT_TRUE(meta.get_full()); - ASSERT_FALSE(meta.get_include_system_space()); - auto partitionInfo = checkInfo.get_partition_info().get_info(); - ASSERT_EQ(partitionInfo.size(), 1); - for (auto p : partitionInfo) { + ASSERT_FALSE(meta.get_all_spaces()); + auto parts = checkInfo.get_parts(); + ASSERT_EQ(parts.size(), 1); + for (auto p : parts) { ASSERT_EQ(p.first, 1); auto logInfo = p.second; ASSERT_EQ(logInfo.get_log_id(), logId); diff --git a/src/meta/test/HBProcessorTest.cpp b/src/meta/test/HBProcessorTest.cpp index 77a4d515e44..0e8ca9d1cf1 100644 --- a/src/meta/test/HBProcessorTest.cpp +++ b/src/meta/test/HBProcessorTest.cpp @@ -32,7 +32,6 @@ TEST(HBProcessorTest, HBTest) { { for (auto i = 0; i < 5; i++) { cpp2::HBReq req; - req.set_role(cpp2::HostRole::STORAGE); req.set_host(HostAddr(std::to_string(i), i)); req.set_cluster_id(kClusterId); req.set_role(cpp2::HostRole::STORAGE); diff --git a/src/meta/test/ListClusterInfoTest.cpp b/src/meta/test/ListClusterInfoTest.cpp index 3ba124f2626..1a6cdbd1b02 100644 --- a/src/meta/test/ListClusterInfoTest.cpp +++ b/src/meta/test/ListClusterInfoTest.cpp @@ -19,26 +19,7 @@ const char root_dir[] = "/tmp/create_backup_test.XXXXXX"; const char data_dir[] = "/tmp/create_backup_test.XXXXXX/data"; } // namespace -class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf { - public: - folly::Future future_listClusterInfo( - const storage::cpp2::ListClusterInfoReq& req) override { - UNUSED(req); - folly::Promise pro; - auto f = pro.getFuture(); - storage::cpp2::ListClusterInfoResp resp; - storage::cpp2::ResponseCommon result; - std::vector partRetCode; - result.set_failed_parts(partRetCode); - resp.set_result(result); - nebula::cpp2::DirInfo dir; - dir.set_root(root_dir); - dir.set_data({data_dir}); - resp.set_dir(std::move(dir)); - pro.setValue(std::move(resp)); - return f; - } -}; +class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf {}; TEST(ProcessorTest, ListClusterInfoTest) { auto rpcServer = std::make_unique(); @@ -46,33 +27,61 @@ TEST(ProcessorTest, ListClusterInfoTest) { rpcServer->start("storage-admin", 0, handler); LOG(INFO) << "Start storage server on " << rpcServer->port_; - std::string localIp("127.0.0.1"); - LOG(INFO) << "Now test interfaces with retry to leader!"; - + std::string localIp("127.0.0.1"); fs::TempDir rootPath(root_dir); - std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path())); - HostAddr host(localIp, rpcServer->port_); - HostAddr storageHost = Utils::getStoreAddrFromAdminAddr(host); + HostAddr kvAddr(localIp, 10079); + std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path(), kvAddr)); + HostAddr rpcHost(localIp, rpcServer->port_); - auto client = std::make_unique(kv.get()); + // register storage and save its dir info + HostAddr storageHost = Utils::getStoreAddrFromAdminAddr(rpcHost); std::vector hosts; hosts.emplace_back(storageHost); meta::TestUtils::registerHB(kv.get(), hosts); + std::vector dirs; + nebula::cpp2::DirInfo dir; + dir.set_root(std::string(root_dir)); + std::vector ds; + ds.push_back(std::string(data_dir)); + dir.set_data(ds); + dirs.emplace_back(std::make_pair(MetaKeyUtils::hostDirKey(storageHost.host, storageHost.port), + MetaKeyUtils::hostDirVal(dir))); + folly::Baton b; + kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(dirs), [&](auto) { b.post(); }); + b.wait(); + + // register agent + Port randomPort = 10080; + HostAddr agentHost(localIp, randomPort); + hosts.clear(); + hosts.emplace_back(agentHost); + meta::TestUtils::setupHB(kv.get(), hosts, cpp2::HostRole::AGENT, gitInfoSha()); + { cpp2::ListClusterInfoReq req; - auto* processor = ListClusterInfoProcessor::instance(kv.get(), client.get()); + auto* processor = ListClusterInfoProcessor::instance(kv.get()); auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); LOG(INFO) << folly::to(resp.get_code()); ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); - for (auto s : resp.get_storage_servers()) { - ASSERT_EQ(storageHost, s.get_host()); - ASSERT_EQ(s.get_dir().get_root(), root_dir); - ASSERT_EQ(s.get_dir().get_data()[0], data_dir); + ASSERT_EQ(resp.get_host_services().size(), 1); + for (auto iter : resp.get_host_services()) { + auto host = iter.first; + auto services = iter.second; + + ASSERT_EQ(services.size(), 3); + + std::unordered_map m; + for (auto s : services) { + m[s.get_role()] = s; + } + ASSERT_NE(m.find(cpp2::HostRole::META), m.end()); + ASSERT_NE(m.find(cpp2::HostRole::STORAGE), m.end()); + ASSERT_NE(m.find(cpp2::HostRole::AGENT), m.end()); } } } diff --git a/src/meta/test/MockAdminClient.h b/src/meta/test/MockAdminClient.h index 077b98f9f63..25715351004 100644 --- a/src/meta/test/MockAdminClient.h +++ b/src/meta/test/MockAdminClient.h @@ -29,13 +29,17 @@ class MockAdminClient : public AdminClient { MOCK_METHOD2(checkPeers, folly::Future(GraphSpaceID, PartitionID)); MOCK_METHOD1(getLeaderDist, folly::Future(HostLeaderMap*)); MOCK_METHOD3(createSnapshot, - folly::Future>(GraphSpaceID, - const std::string&, - const HostAddr&)); + folly::Future>(const std::set&, + const std::string&, + const HostAddr&)); MOCK_METHOD3(dropSnapshot, - folly::Future(GraphSpaceID, const std::string&, const HostAddr&)); + folly::Future(const std::set&, + const std::string&, + const HostAddr&)); MOCK_METHOD3(blockingWrites, - folly::Future(GraphSpaceID, storage::cpp2::EngineSignType, const HostAddr&)); + folly::Future(const std::set&, + storage::cpp2::EngineSignType, + const HostAddr&)); MOCK_METHOD9(addTask, folly::Future(cpp2::AdminCmd, int32_t, diff --git a/src/meta/test/RestoreProcessorTest.cpp b/src/meta/test/RestoreProcessorTest.cpp index 704698a4605..cda83b1bdba 100644 --- a/src/meta/test/RestoreProcessorTest.cpp +++ b/src/meta/test/RestoreProcessorTest.cpp @@ -100,7 +100,7 @@ TEST(RestoreProcessorTest, RestoreTest) { auto backupName = folly::format("BACKUP_{}", MetaKeyUtils::genTimestampStr()).str(); auto spaceNames = std::make_unique>(); spaceNames->emplace_back("test_space"); - auto backupFiles = MetaServiceUtils::backupSpaces(kv.get(), spaces, backupName, spaceNames.get()); + auto backupFiles = MetaServiceUtils::backupTables(kv.get(), spaces, backupName, spaceNames.get()); DCHECK(nebula::hasValue(backupFiles)); { cpp2::RestoreMetaReq req; @@ -302,7 +302,7 @@ TEST(RestoreProcessorTest, RestoreFullTest) { std::unordered_set spaces = {id}; auto backupName = folly::format("BACKUP_{}", MetaKeyUtils::genTimestampStr()).str(); - auto backupFiles = MetaServiceUtils::backupSpaces(kv.get(), spaces, backupName, nullptr); + auto backupFiles = MetaServiceUtils::backupTables(kv.get(), spaces, backupName, nullptr); DCHECK(nebula::hasValue(backupFiles)); { cpp2::RestoreMetaReq req; diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 4f263476edc..869390f1eca 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -21,7 +21,6 @@ nebula_add_library( admin/RebuildEdgeIndexTask.cpp admin/RebuildFTIndexTask.cpp admin/StatsTask.cpp - admin/ListClusterInfoProcessor.cpp ) nebula_add_library( diff --git a/src/storage/StorageAdminServiceHandler.cpp b/src/storage/StorageAdminServiceHandler.cpp index 125a7c60e82..a81bde93052 100644 --- a/src/storage/StorageAdminServiceHandler.cpp +++ b/src/storage/StorageAdminServiceHandler.cpp @@ -9,7 +9,6 @@ #include "storage/admin/AdminTaskProcessor.h" #include "storage/admin/CreateCheckpointProcessor.h" #include "storage/admin/DropCheckpointProcessor.h" -#include "storage/admin/ListClusterInfoProcessor.h" #include "storage/admin/SendBlockSignProcessor.h" #include "storage/admin/StopAdminTaskProcessor.h" @@ -99,11 +98,5 @@ folly::Future StorageAdminServiceHandler::future_stopAdminT RETURN_FUTURE(processor); } -folly::Future StorageAdminServiceHandler::future_listClusterInfo( - const cpp2::ListClusterInfoReq& req) { - auto* processor = ListClusterInfoProcessor::instance(env_); - RETURN_FUTURE(processor); -} - } // namespace storage } // namespace nebula diff --git a/src/storage/StorageAdminServiceHandler.h b/src/storage/StorageAdminServiceHandler.h index 4e754a74ea3..6f8bf994cc3 100644 --- a/src/storage/StorageAdminServiceHandler.h +++ b/src/storage/StorageAdminServiceHandler.h @@ -50,9 +50,6 @@ class StorageAdminServiceHandler final : public cpp2::StorageAdminServiceSvIf { folly::Future future_stopAdminTask( const cpp2::StopAdminTaskRequest& req) override; - folly::Future future_listClusterInfo( - const cpp2::ListClusterInfoReq& req) override; - private: StorageEnv* env_{nullptr}; }; diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index c03ec525dcc..a1978008aba 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -7,6 +7,8 @@ #include +#include + #include "clients/storage/InternalStorageClient.h" #include "common/hdfs/HdfsCommandHelper.h" #include "common/meta/ServerBasedIndexManager.h" @@ -160,6 +162,8 @@ bool StorageServer::start() { options.role_ = nebula::meta::cpp2::HostRole::LISTENER; } options.gitInfoSHA_ = gitInfoSha(); + options.rootPath_ = boost::filesystem::current_path().string(); + options.dataPaths_ = dataPaths_; metaClient_ = std::make_unique(ioThreadPool_, metaAddrs_, options); if (!metaClient_->waitForMetadReady()) { diff --git a/src/storage/admin/CreateCheckpointProcessor.cpp b/src/storage/admin/CreateCheckpointProcessor.cpp index f33e04374e0..1b04e08591f 100644 --- a/src/storage/admin/CreateCheckpointProcessor.cpp +++ b/src/storage/admin/CreateCheckpointProcessor.cpp @@ -10,18 +10,30 @@ namespace storage { void CreateCheckpointProcessor::process(const cpp2::CreateCPRequest& req) { CHECK_NOTNULL(env_); - auto spaceId = req.get_space_id(); + auto spaceIdList = req.get_space_ids(); auto& name = req.get_name(); - auto ret = env_->kvstore_->createCheckpoint(spaceId, std::move(name)); - if (!ok(ret)) { - cpp2::PartitionResult thriftRet; - thriftRet.set_code(error(ret)); - codes_.emplace_back(std::move(thriftRet)); - onFinished(); - return; + + std::vector ckInfoList; + for (auto& spaceId : spaceIdList) { + auto ckRet = env_->kvstore_->createCheckpoint(spaceId, name); + if (!ok(ckRet) && error(ckRet) == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { + LOG(ERROR) << folly::sformat("Space {} to create backup is not found", spaceId); + continue; + } + + if (!ok(ckRet)) { + cpp2::PartitionResult thriftRet; + thriftRet.set_code(error(ckRet)); + codes_.emplace_back(std::move(thriftRet)); + onFinished(); + return; + } + + auto spaceCkList = std::move(nebula::value(ckRet)); + ckInfoList.insert(ckInfoList.end(), spaceCkList.begin(), spaceCkList.end()); } - resp_.set_info(std::move(nebula::value(ret))); + resp_.set_info(ckInfoList); onFinished(); } diff --git a/src/storage/admin/DropCheckpointProcessor.cpp b/src/storage/admin/DropCheckpointProcessor.cpp index c3bef9948fc..b0226b8e1c4 100644 --- a/src/storage/admin/DropCheckpointProcessor.cpp +++ b/src/storage/admin/DropCheckpointProcessor.cpp @@ -10,13 +10,21 @@ namespace storage { void DropCheckpointProcessor::process(const cpp2::DropCPRequest& req) { CHECK_NOTNULL(env_); - auto spaceId = req.get_space_id(); + auto spaceIdList = req.get_space_ids(); auto& name = req.get_name(); - auto retCode = env_->kvstore_->dropCheckpoint(spaceId, std::move(name)); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - cpp2::PartitionResult thriftRet; - thriftRet.set_code(retCode); - codes_.emplace_back(std::move(thriftRet)); + for (auto spaceId : spaceIdList) { + auto code = env_->kvstore_->dropCheckpoint(spaceId, std::move(name)); + + if (code == nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND) { + continue; + } + + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + cpp2::PartitionResult res; + res.set_code(code); + codes_.emplace_back(std::move(res)); + break; + } } onFinished(); diff --git a/src/storage/admin/ListClusterInfoProcessor.cpp b/src/storage/admin/ListClusterInfoProcessor.cpp deleted file mode 100644 index fa2e402af4a..00000000000 --- a/src/storage/admin/ListClusterInfoProcessor.cpp +++ /dev/null @@ -1,55 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "storage/admin/ListClusterInfoProcessor.h" - -#include - -#include "common/fs/FileUtils.h" - -namespace nebula { -namespace storage { - -void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) { - UNUSED(req); - CHECK_NOTNULL(env_); - - auto data_root = env_->kvstore_->getDataRoot(); - - std::vector realpaths; - bool failed = false; - std::transform(std::make_move_iterator(data_root.begin()), - std::make_move_iterator(data_root.end()), - std::back_inserter(realpaths), - [&failed](auto f) { - if (f[0] == '/') { - return f; - } - auto result = nebula::fs::FileUtils::realPath(f.c_str()); - if (!result.ok()) { - LOG(ERROR) << "Failed to get the absolute path of file: " << f; - failed = true; - return f; - } - return std::string(result.value()); - }); - if (failed) { - cpp2::PartitionResult thriftRet; - thriftRet.set_code(nebula::cpp2::ErrorCode::E_FAILED_GET_ABS_PATH); - codes_.emplace_back(std::move(thriftRet)); - onFinished(); - return; - } - nebula::cpp2::DirInfo dir; - dir.set_data(std::move(realpaths)); - dir.set_root(boost::filesystem::current_path().string()); - - resp_.set_dir(std::move(dir)); - - onFinished(); -} - -} // namespace storage -} // namespace nebula diff --git a/src/storage/admin/ListClusterInfoProcessor.h b/src/storage/admin/ListClusterInfoProcessor.h deleted file mode 100644 index 5628831f4f1..00000000000 --- a/src/storage/admin/ListClusterInfoProcessor.h +++ /dev/null @@ -1,32 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef STORAGE_ADMIN_LISTCLUSTERINFO_H_ -#define STORAGE_ADMIN_LISTCLUSTERINFO_H_ - -#include "common/base/Base.h" -#include "kvstore/KVEngine.h" -#include "kvstore/NebulaStore.h" -#include "storage/BaseProcessor.h" - -namespace nebula { -namespace storage { - -class ListClusterInfoProcessor : public BaseProcessor { - public: - static ListClusterInfoProcessor* instance(StorageEnv* env) { - return new ListClusterInfoProcessor(env); - } - - void process(const cpp2::ListClusterInfoReq& req); - - private: - explicit ListClusterInfoProcessor(StorageEnv* env) - : BaseProcessor(env) {} -}; - -} // namespace storage -} // namespace nebula -#endif // STORAGE_ADMIN_LISTCLUSTERINFO_H_ diff --git a/src/storage/admin/SendBlockSignProcessor.cpp b/src/storage/admin/SendBlockSignProcessor.cpp index 070f42c595e..66b239420e2 100644 --- a/src/storage/admin/SendBlockSignProcessor.cpp +++ b/src/storage/admin/SendBlockSignProcessor.cpp @@ -9,17 +9,22 @@ namespace nebula { namespace storage { void SendBlockSignProcessor::process(const cpp2::BlockingSignRequest& req) { - auto spaceId = req.get_space_id(); + auto spaceIds = req.get_space_ids(); auto sign = req.get_sign() == cpp2::EngineSignType::BLOCK_ON; - LOG(INFO) << "Receive block sign for space " << req.get_space_id() << ", block: " << sign; - auto code = env_->kvstore_->setWriteBlocking(spaceId, sign); - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - cpp2::PartitionResult thriftRet; - thriftRet.set_code(code); - codes_.emplace_back(std::move(thriftRet)); - LOG(INFO) << "set block sign failed, error: " << apache::thrift::util::enumNameSafe(code); + for (auto spaceId : spaceIds) { + LOG(INFO) << "Receive block sign for space " << spaceId << ", block: " << sign; + auto code = env_->kvstore_->setWriteBlocking(spaceId, sign); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + cpp2::PartitionResult thriftRet; + thriftRet.set_code(code); + codes_.emplace_back(std::move(thriftRet)); + LOG(INFO) << "set block sign failed, error: " << apache::thrift::util::enumNameSafe(code); + onFinished(); + return; + } } + onFinished(); } diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 2c6e0ea5a3c..1a3582aa559 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -678,21 +678,6 @@ nebula_add_executable( boost_regex ) -nebula_add_test( - NAME - list_cluster_info_test - SOURCES - ListClusterInfoTest.cpp - OBJECTS - ${storage_test_deps} - LIBRARIES - ${ROCKSDB_LIBRARIES} - ${THRIFT_LIBRARIES} - ${PROXYGEN_LIBRARIES} - wangle - gtest -) - nebula_add_test( NAME chain_add_edge_test diff --git a/src/storage/test/CheckpointTest.cpp b/src/storage/test/CheckpointTest.cpp index 8c50a07213e..c6f4f0e50b8 100644 --- a/src/storage/test/CheckpointTest.cpp +++ b/src/storage/test/CheckpointTest.cpp @@ -37,7 +37,8 @@ TEST(CheckpointTest, simpleTest) { { auto* processor = CreateCheckpointProcessor::instance(env); cpp2::CreateCPRequest req; - req.set_space_id(1); + std::vector ids{1}; + req.set_space_ids(ids); req.set_name("checkpoint_test"); auto fut = processor->getFuture(); processor->process(req); diff --git a/src/storage/test/ListClusterInfoTest.cpp b/src/storage/test/ListClusterInfoTest.cpp deleted file mode 100644 index 3a3e3105eba..00000000000 --- a/src/storage/test/ListClusterInfoTest.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include - -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "mock/MockCluster.h" -#include "mock/MockData.h" -#include "storage/admin/ListClusterInfoProcessor.h" - -namespace nebula { -namespace storage { -TEST(ListClusterInfoTest, simpleTest) { - fs::TempDir dataPath("/tmp/ListClusterInfo_Test_src.XXXXXX"); - mock::MockCluster cluster; - cluster.initStorageKV(dataPath.path()); - auto* env = cluster.storageEnv_.get(); - - // Begin list clusterinfo - { - auto* processor = ListClusterInfoProcessor::instance(env); - cpp2::ListClusterInfoReq req; - auto fut = processor->getFuture(); - processor->process(req); - auto resp = std::move(fut).get(); - EXPECT_EQ(0, resp.result.failed_parts.size()); - auto data_root = env->kvstore_->getDataRoot(); - auto dir = resp.get_dir(); - ASSERT_EQ(dir.get_data().size(), data_root.size()); - int i = 0; - for (auto d : dir.get_data()) { - ASSERT_EQ(d, data_root[i]); - i++; - } - std::cout << dir.get_root() << std::endl; - ASSERT_FALSE(dir.get_data().empty()); - } -} - -} // namespace storage -} // namespace nebula - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - return RUN_ALL_TESTS(); -}