From f771fd8e55d42be80cd57041ca43e43c0de43ab7 Mon Sep 17 00:00:00 2001 From: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com> Date: Sat, 29 Jan 2022 16:29:32 +0800 Subject: [PATCH] refactor LOG and add comment for meta, meta/processors/job (#3686) * refactor LOG and add comment for meta, meta/processors/job * refactor LOG and add comment for meta/processors/zone Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> --- src/meta/ActiveHostsMan.cpp | 34 ++-- src/meta/ActiveHostsMan.h | 83 +++++++++- src/meta/KVBasedClusterIdMan.h | 50 +++++- src/meta/MetaServiceUtils.cpp | 18 +-- src/meta/MetaServiceUtils.h | 49 +++++- src/meta/MetaVersionMan.cpp | 18 +-- src/meta/MetaVersionMan.h | 6 + src/meta/RootUserMan.h | 2 +- src/meta/processors/job/AdminJobProcessor.cpp | 12 +- src/meta/processors/job/AdminJobProcessor.h | 3 + .../processors/job/BalanceJobExecutor.cpp | 16 +- src/meta/processors/job/BalanceJobExecutor.h | 76 ++++++++- src/meta/processors/job/BalancePlan.cpp | 12 +- src/meta/processors/job/BalancePlan.h | 47 +++++- src/meta/processors/job/BalanceTask.cpp | 38 +++-- src/meta/processors/job/BalanceTask.h | 12 ++ src/meta/processors/job/CompactJobExecutor.h | 10 ++ .../processors/job/DataBalanceJobExecutor.cpp | 8 +- .../processors/job/DataBalanceJobExecutor.h | 28 ++++ src/meta/processors/job/FlushJobExecutor.h | 10 ++ src/meta/processors/job/GetStatsProcessor.cpp | 10 +- src/meta/processors/job/GetStatsProcessor.h | 3 + src/meta/processors/job/JobDescription.cpp | 6 +- src/meta/processors/job/JobDescription.h | 147 ++++++++++++------ src/meta/processors/job/JobExecutor.cpp | 4 +- src/meta/processors/job/JobExecutor.h | 56 ++++++- src/meta/processors/job/JobManager.cpp | 54 +++---- src/meta/processors/job/JobManager.h | 136 ++++++++++++++-- src/meta/processors/job/JobUtils.cpp | 8 +- src/meta/processors/job/JobUtils.h | 14 ++ .../job/LeaderBalanceJobExecutor.cpp | 50 +++--- .../processors/job/LeaderBalanceJobExecutor.h | 36 +++++ .../job/ListEdgeIndexStatusProcessor.cpp | 2 +- .../job/ListEdgeIndexStatusProcessor.h | 3 + .../job/ListTagIndexStatusProcessor.cpp | 2 +- .../job/ListTagIndexStatusProcessor.h | 3 + src/meta/processors/job/MetaJobExecutor.cpp | 2 +- src/meta/processors/job/MetaJobExecutor.h | 26 +++- .../processors/job/RebuildJobExecutor.cpp | 12 +- .../job/SimpleConcurrentJobExecutor.cpp | 4 +- src/meta/processors/job/StatsJobExecutor.cpp | 12 +- src/meta/processors/job/StatsJobExecutor.h | 19 ++- .../processors/job/StorageJobExecutor.cpp | 20 +-- src/meta/processors/job/StorageJobExecutor.h | 26 +++- src/meta/processors/job/TaskDescription.h | 67 +++++--- .../processors/job/ZoneBalanceJobExecutor.cpp | 12 +- .../processors/job/ZoneBalanceJobExecutor.h | 32 ++++ .../zone/AddHostsIntoZoneProcessor.cpp | 12 +- .../zone/AddHostsIntoZoneProcessor.h | 4 + src/meta/processors/zone/AddHostsProcessor.h | 3 + .../processors/zone/DivideZoneProcessor.cpp | 21 +-- .../processors/zone/DivideZoneProcessor.h | 11 ++ .../processors/zone/DropHostsProcessor.cpp | 22 +-- src/meta/processors/zone/DropHostsProcessor.h | 13 ++ .../processors/zone/DropZoneProcessor.cpp | 16 +- src/meta/processors/zone/DropZoneProcessor.h | 17 +- src/meta/processors/zone/GetZoneProcessor.cpp | 4 +- src/meta/processors/zone/GetZoneProcessor.h | 3 + .../processors/zone/ListZonesProcessor.cpp | 2 +- src/meta/processors/zone/ListZonesProcessor.h | 3 + .../processors/zone/MergeZoneProcessor.cpp | 28 ++-- src/meta/processors/zone/MergeZoneProcessor.h | 11 +- .../processors/zone/RenameZoneProcessor.cpp | 6 +- .../processors/zone/RenameZoneProcessor.h | 3 + 64 files changed, 1117 insertions(+), 360 deletions(-) diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 38d51162941..05c11df24dc 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -48,7 +48,7 @@ nebula::cpp2::ErrorCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv, if (statusVec[i].ok()) { std::tie(std::ignore, term, code) = MetaKeyUtils::parseLeaderValV3(vals[i]); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(WARNING) << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << apache::thrift::util::enumNameSafe(code); continue; } if (terms[i] <= term) { @@ -90,13 +90,13 @@ bool ActiveHostsMan::machineRegisted(kvstore::KVStore* kv, const HostAddr& hostA } ErrorOr>> -ActiveHostsMan::getServicesInHost(kvstore::KVStore* kv, std::string hostname) { +ActiveHostsMan::getServicesInHost(kvstore::KVStore* kv, const std::string& hostname) { const auto& prefix = MetaKeyUtils::hostPrefix(); std::unique_ptr iter; auto retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Failed to get services in the host: " << hostname << ", error " - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Failed to get services in the host: " << hostname << ", error " + << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -120,7 +120,7 @@ ErrorOr> ActiveHostsMan::getActiv std::unique_ptr machineIter; auto retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, machinePrefix, &machineIter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Failed to get machines, error " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Failed to get machines, error " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -135,8 +135,8 @@ ErrorOr> ActiveHostsMan::getActiv std::unique_ptr iter; retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Failed to get active hosts, error " - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Failed to get active hosts, error " + << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -155,7 +155,7 @@ ErrorOr> ActiveHostsMan::getActiv if (info.role_ == cpp2::HostRole::STORAGE && std::find(machines.begin(), machines.end(), host) == machines.end()) { retCode = nebula::cpp2::ErrorCode::E_MACHINE_NOT_FOUND; - LOG(ERROR) << "Machine not found " << host; + LOG(INFO) << "Machine not found " << host; break; } @@ -177,8 +177,8 @@ ErrorOr> ActiveHostsMan::getActiv auto zoneKey = MetaKeyUtils::zoneKey(zoneName); auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get zone " << zoneName - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get zone " << zoneName + << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -208,7 +208,7 @@ ErrorOr> ActiveHostsMan::getActiv auto spaceKey = MetaKeyUtils::spaceKey(spaceId); auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &spaceValue); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get space failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get space failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -243,8 +243,8 @@ ErrorOr ActiveHostsMan::getHostInfo(kvstore:: std::string machineValue; auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, machineKey, &machineValue); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get machine info " << host - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get machine info " << host + << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -252,8 +252,8 @@ ErrorOr ActiveHostsMan::getHostInfo(kvstore:: std::string hostValue; retCode = kv->get(kDefaultSpaceId, kDefaultPartId, hostKey, &hostValue); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get host info " << host - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get host info " << host + << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } return HostInfo::decode(hostValue); @@ -284,8 +284,8 @@ ErrorOr LastUpdateTimeMan::get(kvstore::KVStor std::string val; auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get last update time failed, error: " - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get last update time failed, error: " + << apache::thrift::util::enumNameSafe(retCode); return retCode; } return *reinterpret_cast(val.data()); diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index b79fca44197..5154e1be16c 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -50,13 +50,17 @@ struct HostInfo { return info; } - /* + /** + * @brief * int8_t dataVer * int64_t timestamp * sizeof(HostRole) hostRole * size_t lenth of gitInfoSha * string gitInfoSha - * */ + * + * @param info + * @return + */ static std::string encodeV2(const HostInfo& info) { std::string encode; int8_t dataVer = 2; @@ -74,6 +78,12 @@ struct HostInfo { return encode; } + /** + * @brief Parse a serialized value to HostInfo + * + * @param data + * @return + */ static HostInfo decodeV2(const folly::StringPiece& data) { HostInfo info; size_t offset = sizeof(int8_t); @@ -107,27 +117,92 @@ class ActiveHostsMan final { ~ActiveHostsMan() = default; using AllLeaders = std::unordered_map>; + + /** + * @brief Save host info and leader info into kvStore + * If partition leader info was updated, it will update LastUpdateTime, indicating the MetaClient + * update local cache. + * + * @param kv Where to save + * @param hostAddr Which host to save + * @param info Information of the host + * @param leaderParts Leader info of all parts, null means don't need to update leader info + * @return + */ static nebula::cpp2::ErrorCode updateHostInfo(kvstore::KVStore* kv, const HostAddr& hostAddr, const HostInfo& info, const AllLeaders* leaderParts = nullptr); + /** + * @brief Check if the host is registered + * + * @param kv From where to get + * @param hostAddr Which host to register + * @return + */ static bool machineRegisted(kvstore::KVStore* kv, const HostAddr& hostAddr); + /** + * @brief Get all registered host + * + * @param kv From where to get + * @param expiredTTL Ignore hosts who do not send heartbeat within longer than expiredTTL + * @param role Which role of the host to find. maybe GRAPH META STORAGE LISTENER AGENT + * @return + */ static ErrorOr> getActiveHosts( kvstore::KVStore* kv, int32_t expiredTTL = 0, cpp2::HostRole role = cpp2::HostRole::STORAGE); + /** + * @brief Get services in the agent host + * + * @param kv From where to get + * @param hostname Hostname or ip + * @return + */ static ErrorOr>> - getServicesInHost(kvstore::KVStore* kv, std::string hostname); - + getServicesInHost(kvstore::KVStore* kv, const std::string& hostname); + + /** + * @brief Get hosts in the zone + * + * @param kv From where to get + * @param zoneName Name of the zone + * @param expiredTTL Ignore hosts who do not send heartbeat within longer than expiredTTL + * @return + */ static ErrorOr> getActiveHostsInZone( kvstore::KVStore* kv, const std::string& zoneName, int32_t expiredTTL = 0); + /** + * @brief Get hosts in the space + * + * @param kv From where to get + * @param spaceId Id of the space + * @param expiredTTL Ignore hosts who do not send heartbeat within longer than expiredTTL + * @return + */ static ErrorOr> getActiveHostsWithZones( kvstore::KVStore* kv, GraphSpaceID spaceId, int32_t expiredTTL = 0); + /** + * @brief Check if a host is alived, by checking if the host send heartbeat within the default + * time + * + * @param kv From where to get the host's information + * @param host Which host to check + * @return + */ static ErrorOr isLived(kvstore::KVStore* kv, const HostAddr& host); + /** + * @brief Get hostInfo for a host + * + * @param kv From where to get + * @param host + * @return + */ static ErrorOr getHostInfo(kvstore::KVStore* kv, const HostAddr& host); diff --git a/src/meta/KVBasedClusterIdMan.h b/src/meta/KVBasedClusterIdMan.h index b791d2ae971..78c853d3c9b 100644 --- a/src/meta/KVBasedClusterIdMan.h +++ b/src/meta/KVBasedClusterIdMan.h @@ -22,6 +22,12 @@ namespace meta { * */ class ClusterIdMan { public: + /** + * @brief Create a ClusterID by hash function + * + * @param metaAddrs + * @return + */ static ClusterID create(const std::string& metaAddrs) { std::hash hash_fn; auto clusterId = hash_fn(metaAddrs); @@ -31,10 +37,17 @@ class ClusterIdMan { return clusterId; } + /** + * @brief Write the cluster id in a file + * + * @param clusterId + * @param filename + * @return + */ static bool persistInFile(ClusterID clusterId, const std::string& filename) { auto dirname = fs::FileUtils::dirname(filename.c_str()); if (!fs::FileUtils::makeDir(dirname)) { - LOG(ERROR) << "Failed mkdir " << dirname; + LOG(INFO) << "Failed mkdir " << dirname; return false; } if (fs::FileUtils::remove(filename.c_str())) { @@ -42,12 +55,12 @@ class ClusterIdMan { } int fd = ::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); if (fd < 0) { - LOG(ERROR) << "Open file error, file " << filename << ", error " << strerror(errno); + LOG(INFO) << "Open file error, file " << filename << ", error " << strerror(errno); return false; } int bytes = ::write(fd, reinterpret_cast(&clusterId), sizeof(ClusterID)); if (bytes != sizeof(clusterId)) { - LOG(ERROR) << "Write clusterId failed!"; + LOG(INFO) << "Write clusterId failed!"; ::close(fd); return false; } @@ -56,17 +69,23 @@ class ClusterIdMan { return true; } + /** + * @brief Get cluster id from a file + * + * @param filename + * @return + */ static ClusterID getClusterIdFromFile(const std::string& filename) { LOG(INFO) << "Try to open " << filename; int fd = ::open(filename.c_str(), O_RDONLY); if (fd < 0) { - LOG(WARNING) << "Open file failed, error " << strerror(errno); + LOG(INFO) << "Open file failed, error " << strerror(errno); return 0; } ClusterID clusterId = 0; int len = ::read(fd, reinterpret_cast(&clusterId), sizeof(ClusterID)); if (len != sizeof(ClusterID)) { - LOG(ERROR) << "Get clusterId failed!"; + LOG(INFO) << "Get clusterId failed!"; ::close(fd); return 0; } @@ -75,6 +94,13 @@ class ClusterIdMan { return clusterId; } + /** + * @brief Get cluster id from kvStore + * + * @param kv + * @param key + * @return + */ static ClusterID getClusterIdFromKV(kvstore::KVStore* kv, const std::string& key) { CHECK_NOTNULL(kv); std::string value; @@ -84,16 +110,24 @@ class ClusterIdMan { return 0; } else if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { if (value.size() != sizeof(ClusterID)) { - LOG(ERROR) << "Bad clusterId " << value; + LOG(INFO) << "Bad clusterId " << value; return 0; } return *reinterpret_cast(value.data()); } else { - LOG(ERROR) << "Error in kvstore, err " << static_cast(code); + LOG(INFO) << "Error in kvstore, err " << static_cast(code); return 0; } } + /** + * @brief Save cluster id into kvStore + * + * @param kv + * @param key + * @param clusterId + * @return + */ static bool persistInKV(kvstore::KVStore* kv, const std::string& key, ClusterID clusterId) { CHECK_NOTNULL(kv); std::vector data; @@ -102,7 +136,7 @@ class ClusterIdMan { folly::Baton baton; kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) { if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Put failed, error " << static_cast(code); + LOG(INFO) << "Put failed, error " << static_cast(code); ret = false; } else { LOG(INFO) << "Put key " << key << ", val " << clusterId; diff --git a/src/meta/MetaServiceUtils.cpp b/src/meta/MetaServiceUtils.cpp index a6367dff496..3189cf955e3 100644 --- a/src/meta/MetaServiceUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -43,7 +43,7 @@ nebula::cpp2::ErrorCode MetaServiceUtils::alterColumnDefs(std::vectorget_name() == col.get_name()) { - LOG(ERROR) << "Column existing: " << col.get_name(); + LOG(INFO) << "Column existing: " << col.get_name(); return nebula::cpp2::ErrorCode::E_EXISTED; } } @@ -55,14 +55,14 @@ nebula::cpp2::ErrorCode MetaServiceUtils::alterColumnDefs(std::vectorget_name()) { // If this col is ttl_col, change not allowed if (prop.get_ttl_col() && (*prop.get_ttl_col() == colName)) { - LOG(ERROR) << "Column: " << colName << " as ttl_col, change not allowed"; + LOG(INFO) << "Column: " << colName << " as ttl_col, change not allowed"; return nebula::cpp2::ErrorCode::E_UNSUPPORTED; } *it = col; return nebula::cpp2::ErrorCode::SUCCEEDED; } } - LOG(ERROR) << "Column not found: " << col.get_name(); + LOG(INFO) << "Column not found: " << col.get_name(); if (isEdge) { return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; } @@ -79,13 +79,13 @@ nebula::cpp2::ErrorCode MetaServiceUtils::alterColumnDefs(std::vectorempty()))) { - LOG(WARNING) << "Implicit ttl_col not support"; + LOG(INFO) << "Implicit ttl_col not support"; return nebula::cpp2::ErrorCode::E_UNSUPPORTED; } diff --git a/src/meta/MetaServiceUtils.h b/src/meta/MetaServiceUtils.h index e70091e27eb..f2bf045afae 100644 --- a/src/meta/MetaServiceUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -20,29 +20,76 @@ class MetaServiceUtils final { public: MetaServiceUtils() = delete; + /** + * @brief Change column definition in memory + * + * @param cols Existed columns in schema + * @param prop Schema property, mainly used to check if the col is ttl_col + * @param col The column that will be operated + * @param op + * ADD: add col to cols + * CHANGE: replace the column in cols with col + * DROP: remove the col from cols + * @param isEdge Is edge or tag + * @return + */ static nebula::cpp2::ErrorCode alterColumnDefs(std::vector& cols, cpp2::SchemaProp& prop, const cpp2::ColumnDef col, const cpp2::AlterSchemaOp op, bool isEdge = false); + /** + * @brief Change schema property, mainly set ttl_col + * + * @param cols Column infomartion, mainly used to check if the colType is INT64 or TIMESTAMP + * @param schemaProp Which schema property to change + * @param alterSchemaProp Where to get ttl_col + * @param existIndex If the column has index + * @param isEdge Edge or tag? + * @return + */ static nebula::cpp2::ErrorCode alterSchemaProp(std::vector& cols, cpp2::SchemaProp& schemaProp, cpp2::SchemaProp alterSchemaProp, bool existIndex, bool isEdge = false); - // backup + /** + * @brief Backup index + * + * @param kvstore Which kvStore to backup + * @param spaces Some keys need to filter by space id + * @param backupName Be used to make the name of backup file + * @param spaceName Some keys need to filter by space name + * @return Backup files + */ static ErrorOr> backupIndex( kvstore::KVStore* kvstore, const std::unordered_set& spaces, const std::string& backupName, const std::vector* spaceName); + /** + * @brief Make a function that filter spaces + * + * @param spaces Be used to filter keys don't contained + * @param parseSpace Funtion that parse the key to space id + * @return + */ static std::function spaceFilter( const std::unordered_set& spaces, std::function parseSpace); + /** + * @brief Backup tables, backup system tables if backup all spaces + * + * @param kvstore Which kvStore to backup + * @param spaceIds Skip keys not belong to these spaces + * @param backupName Be used to make the name of backup file + * @param spaceNames Null means all spaces, also used in backup indexes + * @return Backup files + */ static ErrorOr> backupTables( kvstore::KVStore* kvstore, const std::unordered_set& spaceIds, diff --git a/src/meta/MetaVersionMan.cpp b/src/meta/MetaVersionMan.cpp index 86421f05596..8e22c0d8c34 100644 --- a/src/meta/MetaVersionMan.cpp +++ b/src/meta/MetaVersionMan.cpp @@ -65,14 +65,14 @@ Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVEngine* engine) { std::string path = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot); if (!fs::FileUtils::exist(path) && !fs::FileUtils::makeDir(path)) { - LOG(ERROR) << "Make checkpoint dir: " << path << " failed"; + LOG(INFO) << "Make checkpoint dir: " << path << " failed"; return Status::Error("Create snapshot file failed"); } std::string dataPath = folly::sformat("{}/data", path); auto code = engine->createCheckpoint(dataPath); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Create snapshot failed: " << snapshot; + LOG(INFO) << "Create snapshot failed: " << snapshot; return Status::Error("Create snapshot failed"); } @@ -85,7 +85,7 @@ Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVEngine* engine) { // delete snapshot file auto checkpointPath = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot); if (fs::FileUtils::exist(checkpointPath) && !fs::FileUtils::remove(checkpointPath.data(), true)) { - LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually"; + LOG(INFO) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually"; } return Status::OK(); } @@ -100,8 +100,8 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) { std::unique_ptr zoneIter; auto code = engine->prefix(zonePrefix, &zoneIter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get zones failed"; - return Status::Error("Get zones failed"); + LOG(INFO) << "Get active hosts failed"; + return Status::Error("Get hosts failed"); } while (zoneIter->valid()) { @@ -116,7 +116,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) { std::unique_ptr iter; code = engine->prefix(prefix, &iter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get active hosts failed"; + LOG(INFO) << "Get active hosts failed"; return Status::Error("Get hosts failed"); } @@ -143,7 +143,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) { } auto status = upgrader.saveMachineAndZone(std::move(data)); if (!status.ok()) { - LOG(ERROR) << status; + LOG(INFO) << status; return status; } } @@ -154,7 +154,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) { std::unique_ptr iter; auto code = engine->prefix(prefix, &iter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get spaces failed"; + LOG(INFO) << "Get spaces failed"; return Status::Error("Get spaces failed"); } @@ -164,7 +164,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) { } auto status = upgrader.rewriteSpacesV2ToV3(iter->key(), iter->val()); if (!status.ok()) { - LOG(ERROR) << status; + LOG(INFO) << status; return status; } iter->next(); diff --git a/src/meta/MetaVersionMan.h b/src/meta/MetaVersionMan.h index 8ff45d91bfc..99a9c297df9 100644 --- a/src/meta/MetaVersionMan.h +++ b/src/meta/MetaVersionMan.h @@ -28,6 +28,12 @@ class MetaVersionMan final { public: MetaVersionMan() = delete; + /** + * @brief Get meta version + * + * @param kv + * @return + */ static MetaVersion getMetaVersionFromKV(kvstore::KVStore* kv); static bool setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion version); diff --git a/src/meta/RootUserMan.h b/src/meta/RootUserMan.h index c61a1faf70d..8cc883b0a55 100644 --- a/src/meta/RootUserMan.h +++ b/src/meta/RootUserMan.h @@ -48,7 +48,7 @@ class RootUserMan { kv->asyncMultiPut( kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Put failed, error " << static_cast(code); + LOG(INFO) << "Put failed, error " << static_cast(code); ret = false; } baton.post(); diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index 755179ae7a8..5e68ea361c4 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -35,7 +35,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { if (cmd == cpp2::AdminCmd::REBUILD_TAG_INDEX || cmd == cpp2::AdminCmd::REBUILD_EDGE_INDEX || cmd == cpp2::AdminCmd::STATS) { if (paras.empty()) { - LOG(ERROR) << "Parameter should be not empty"; + LOG(INFO) << "Parameter should be not empty"; errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } @@ -49,8 +49,8 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { break; } - // Job not exists auto jobId = autoIncrementId(); + // check if Job not exists if (!nebula::ok(jobId)) { errorCode = nebula::error(jobId); break; @@ -75,14 +75,14 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { case nebula::meta::cpp2::AdminJobOp::SHOW: { static const size_t kShowArgsNum = 2; if (req.get_paras().size() != kShowArgsNum) { - LOG(ERROR) << "Parameter number not matched"; + LOG(INFO) << "Parameter number not matched"; errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } int iJob = atoi(req.get_paras()[0].c_str()); if (iJob == 0) { - LOG(ERROR) << "Show job should have parameter as the job ID"; + LOG(INFO) << "Show job should have parameter as the job ID"; errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } @@ -98,13 +98,13 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { case nebula::meta::cpp2::AdminJobOp::STOP: { static const size_t kStopJobArgsNum = 2; if (req.get_paras().size() != kStopJobArgsNum) { - LOG(ERROR) << "Parameter number not matched"; + LOG(INFO) << "Parameter number not matched"; errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } int iJob = atoi(req.get_paras()[0].c_str()); if (iJob == 0) { - LOG(ERROR) << "Stop job should have parameter as the job ID"; + LOG(INFO) << "Stop job should have parameter as the job ID"; errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } diff --git a/src/meta/processors/job/AdminJobProcessor.h b/src/meta/processors/job/AdminJobProcessor.h index 1b70482f90a..bfb340acb67 100644 --- a/src/meta/processors/job/AdminJobProcessor.h +++ b/src/meta/processors/job/AdminJobProcessor.h @@ -13,6 +13,9 @@ namespace nebula { namespace meta { +/** + * @brief Make admin job operation, including ADD SHOW_All SHOW STOP RECOVER + */ class AdminJobProcessor : public BaseProcessor { public: static AdminJobProcessor* instance(kvstore::KVStore* kvstore, AdminClient* adminClient) { diff --git a/src/meta/processors/job/BalanceJobExecutor.cpp b/src/meta/processors/job/BalanceJobExecutor.cpp index d354fff681c..3c4aab94c61 100644 --- a/src/meta/processors/job/BalanceJobExecutor.cpp +++ b/src/meta/processors/job/BalanceJobExecutor.cpp @@ -47,7 +47,7 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() { std::string value; auto retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, jobKey, &value); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't access kvstore, ret = " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Can't access kvstore, ret = " << apache::thrift::util::enumNameSafe(retCode); return retCode; } auto optJobRet = JobDescription::makeJobDescription(jobKey, value); @@ -56,7 +56,7 @@ nebula::cpp2::ErrorCode BalanceJobExecutor::recovery() { plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { if (LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()) != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Balance plan " << plan_->id() << " update meta failed"; + LOG(INFO) << "Balance plan " << plan_->id() << " update meta failed"; } executorOnFinished_(status); }); @@ -124,8 +124,8 @@ nebula::cpp2::ErrorCode SpaceInfo::loadInfo(GraphSpaceID spaceId, kvstore::KVSto std::string spaceVal; auto rc = kvstore->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &spaceVal); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get space info " << spaceId - << " failed, error: " << apache::thrift::util::enumNameSafe(rc); + LOG(INFO) << "Get space info " << spaceId + << " failed, error: " << apache::thrift::util::enumNameSafe(rc); return rc; } meta::cpp2::SpaceDesc properties = MetaKeyUtils::parseSpace(spaceVal); @@ -137,8 +137,8 @@ nebula::cpp2::ErrorCode SpaceInfo::loadInfo(GraphSpaceID spaceId, kvstore::KVSto auto zoneKey = MetaKeyUtils::zoneKey(zoneName); auto retCode = kvstore->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get zone " << zoneName - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get zone " << zoneName + << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } std::vector hosts = MetaKeyUtils::parseZoneHosts(std::move(zoneValue)); @@ -152,8 +152,8 @@ nebula::cpp2::ErrorCode SpaceInfo::loadInfo(GraphSpaceID spaceId, kvstore::KVSto std::unique_ptr iter; auto retCode = kvstore->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId << " " - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Access kvstore failed, spaceId " << spaceId << " " + << apache::thrift::util::enumNameSafe(retCode); return retCode; } for (; iter->valid(); iter->next()) { diff --git a/src/meta/processors/job/BalanceJobExecutor.h b/src/meta/processors/job/BalanceJobExecutor.h index 64c9c7286b3..cc2ccc84746 100644 --- a/src/meta/processors/job/BalanceJobExecutor.h +++ b/src/meta/processors/job/BalanceJobExecutor.h @@ -27,10 +27,30 @@ struct Host { struct Zone { Zone() = default; explicit Zone(const std::string name) : zoneName_(name) {} + + /** + * @brief Check if this zone contains the host + * + * @param ha + * @return + */ bool hasHost(const HostAddr& ha) { return hosts_.find(ha) != hosts_.end(); } + + /** + * @brief Get part number in the zone + * + * @return + */ int32_t calPartNum(); + + /** + * @brief Check if the part exists in the zone + * + * @param partId + * @return + */ bool partExist(PartitionID partId); std::string zoneName_; @@ -39,7 +59,21 @@ struct Zone { }; struct SpaceInfo { + /** + * @brief Load hosts and zones info of this space from kvStore + * + * @param spaceId + * @param kvstore + * @return + */ nebula::cpp2::ErrorCode loadInfo(GraphSpaceID spaceId, kvstore::KVStore* kvstore); + + /** + * @brief Check if this space contains the host + * + * @param ha + * @return + */ bool hasHost(const HostAddr& ha); std::string name_; @@ -49,26 +83,66 @@ struct SpaceInfo { std::map zones_; }; +/** + * @brief The base class for balance + */ class BalanceJobExecutor : public MetaJobExecutor { public: BalanceJobExecutor(JobID jobId, kvstore::KVStore* kvstore, AdminClient* adminClient, const std::vector& params); - + /** + * @brief Check if paras are legal + * + * @return + */ bool check() override; + /** + * @brief See implementation in child class + * + * @return + */ nebula::cpp2::ErrorCode prepare() override; + /** + * @brief Stop this job + * + * @return + */ nebula::cpp2::ErrorCode stop() override; + /** + * @brief Finish this job + * + * @param ret True means succeed, false means failed + * @return + */ nebula::cpp2::ErrorCode finish(bool ret = true) override; + /** + * @brief Read balance plan and balance tasks from kvStore + * + * @return + */ nebula::cpp2::ErrorCode recovery() override; protected: + /** + * @brief Save one kv pair into kvStore + * + * @param k key + * @param v value + * @return + */ nebula::cpp2::ErrorCode save(const std::string& k, const std::string& v); + /** + * @brief See implementation in child class + * + * @return + */ virtual Status buildBalancePlan() { return Status::OK(); } diff --git a/src/meta/processors/job/BalancePlan.cpp b/src/meta/processors/job/BalancePlan.cpp index d4c86ba9e17..0bf77e5bb9a 100644 --- a/src/meta/processors/job/BalancePlan.cpp +++ b/src/meta/processors/job/BalancePlan.cpp @@ -47,7 +47,7 @@ void BalancePlan::invoke() { { std::lock_guard lg(lock_); finishedTaskNum_++; - VLOG(1) << "Balance " << id() << " has completed " << finishedTaskNum_ << " task"; + LOG(INFO) << "Balance " << id() << " has completed " << finishedTaskNum_ << " task"; if (finishedTaskNum_ == tasks_.size()) { finished = true; if (status() == meta::cpp2::JobStatus::RUNNING) { @@ -90,7 +90,7 @@ void BalancePlan::invoke() { std::lock_guard lg(lock_); finishedTaskNum_++; failed_ = true; - VLOG(1) << "Balance " << id() << " has completed " << finishedTaskNum_ << " task"; + LOG(INFO) << "Balance " << id() << " has completed " << finishedTaskNum_ << " task"; setStatus(meta::cpp2::JobStatus::FAILED); if (finishedTaskNum_ == tasks_.size()) { finished = true; @@ -151,7 +151,7 @@ nebula::cpp2::ErrorCode BalancePlan::saveInStore() { [&baton, &ret](nebula::cpp2::ErrorCode code) { if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { ret = code; - LOG(ERROR) + LOG(INFO) << "Can't write the kvstore, ret = " << static_cast(code); } baton.post(); @@ -204,7 +204,7 @@ ErrorOr> BalancePlan::getBalan std::unique_ptr iter; auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't access kvstore, ret = " << static_cast(ret); + LOG(INFO) << "Can't access kvstore, ret = " << static_cast(ret); return ret; } std::vector tasks; @@ -236,12 +236,12 @@ ErrorOr> BalancePlan::getBalan auto activeHostRet = ActiveHostsMan::isLived(kv, task.dst_); if (!nebula::ok(activeHostRet)) { auto retCode = nebula::error(activeHostRet); - LOG(ERROR) << "Get active hosts failed, error: " << static_cast(retCode); + LOG(INFO) << "Get active hosts failed, error: " << static_cast(retCode); return retCode; } else { auto isLive = nebula::value(activeHostRet); if (!isLive) { - LOG(ERROR) << "The destination is not lived"; + LOG(INFO) << "The destination is not lived"; task.ret_ = BalanceTaskResult::INVALID; } } diff --git a/src/meta/processors/job/BalancePlan.h b/src/meta/processors/job/BalancePlan.h index 902633ef161..ad0b32eba66 100644 --- a/src/meta/processors/job/BalancePlan.h +++ b/src/meta/processors/job/BalancePlan.h @@ -16,6 +16,9 @@ namespace nebula { namespace meta { +/** + * @brief A balance plan contains some balance tasks, and could parallel run the tasks across parts + */ class BalancePlan { friend class DataBalanceJobExecutor; FRIEND_TEST(BalanceTest, BalancePlanTest); @@ -40,6 +43,11 @@ class BalancePlan { tasks_(plan.tasks_), finishedTaskNum_(plan.finishedTaskNum_) {} + /** + * @brief Add a task + * + * @param task + */ void addTask(BalanceTask task) { tasks_.emplace_back(std::move(task)); } @@ -47,12 +55,13 @@ class BalancePlan { void invoke(); /** + * @brief * TODO(heng): How to rollback if the some tasks failed. * For the tasks before UPDATE_META, they will go back to the original state * before balance. For the tasks after UPDATE_META, they will go on until * succeeded. NOTES: update_meta should be an atomic operation. There is no * middle state inside. - * */ + */ void rollback() {} meta::cpp2::JobStatus status() { @@ -63,6 +72,11 @@ class BalancePlan { jobDescription_.setStatus(status); } + /** + * @brief Save balance tasks into kvStore + * + * @return + */ nebula::cpp2::ErrorCode saveInStore(); JobID id() const { @@ -82,17 +96,48 @@ class BalancePlan { stopped_ = true; } + /** + * @brief Load balance tasks from kvStore for rerun, rerun stopped or failed tasks if resume + * + * @param resume + * @return + */ nebula::cpp2::ErrorCode recovery(bool resume = true); + /** + * @brief Dispatch tasks to buckets for parallel execution + */ void dispatchTasks(); + /** + * @brief Load balance tasks from kvStore + * + * @param jobId The job that balance tasks belong to + * @param kv The kvStore + * @param client Client to make raft operation + * @param resume If rerun the failed or stopped tasks + * @return + */ static ErrorOr> getBalanceTasks( JobID jobId, kvstore::KVStore* kv, AdminClient* client, bool resume = true); + /** + * @brief Load balance tasks and convert to cpp2::BalanceTask to make them serializable + * + * @param jobId + * @param kv + * @param client + * @return + */ static ErrorOr> show(JobID jobId, kvstore::KVStore* kv, AdminClient* client); + /** + * @brief Set a callback function, which would be called when job finished + * + * @param func + */ void setFinishCallBack(std::function func); template diff --git a/src/meta/processors/job/BalanceTask.cpp b/src/meta/processors/job/BalanceTask.cpp index fabf2c25344..8bd9c56941d 100644 --- a/src/meta/processors/job/BalanceTask.cpp +++ b/src/meta/processors/job/BalanceTask.cpp @@ -26,8 +26,8 @@ void BalanceTask::invoke() { if (ret_ == BalanceTaskResult::INVALID) { endTimeMs_ = time::WallClock::fastNowInSec(); saveInStore(); - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Task invalid, status " - << static_cast(status_); + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Task invalid, status " + << static_cast(status_); // When a plan is stopped or dst is not alive any more, a task will be // marked as INVALID, the task will not be executed again. Balancer will // start a new plan instead. @@ -36,12 +36,12 @@ void BalanceTask::invoke() { } else if (ret_ == BalanceTaskResult::FAILED) { endTimeMs_ = time::WallClock::fastNowInSec(); saveInStore(); - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Task failed, status " - << static_cast(status_); + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Task failed, status " + << static_cast(status_); onError_(); return; } else { - VLOG(3) << taskIdStr_ + "," + commandStr_ << " still in processing"; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " still in processing"; } switch (status_) { @@ -52,8 +52,7 @@ void BalanceTask::invoke() { SAVE_STATE(); client_->checkPeers(spaceId_, partId_).thenValue([this](auto&& resp) { if (!resp.ok()) { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Check the peers failed, status " - << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Check the peers failed, status " << resp; ret_ = BalanceTaskResult::FAILED; } else { status_ = BalanceTaskStatus::CHANGE_LEADER; @@ -73,11 +72,11 @@ void BalanceTask::invoke() { if (resp == nebula::Status::PartNotFound()) { // if the partition has been removed before, regard as // succeeded - LOG(WARNING) << "Can't find part " << partId_ << " on " << src_; + LOG(INFO) << "Can't find part " << partId_ << " on " << src_; status_ = BalanceTaskStatus::ADD_PART_ON_DST; } else { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Transfer leader failed, status " - << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Transfer leader failed, status " + << resp; ret_ = BalanceTaskResult::FAILED; } } else { @@ -98,7 +97,7 @@ void BalanceTask::invoke() { SAVE_STATE(); client_->addPart(spaceId_, partId_, dst_, true).thenValue([this](auto&& resp) { if (!resp.ok()) { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Open part failed, status " << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Open part failed, status " << resp; ret_ = BalanceTaskResult::FAILED; } else { status_ = BalanceTaskStatus::ADD_LEARNER; @@ -112,7 +111,7 @@ void BalanceTask::invoke() { SAVE_STATE(); client_->addLearner(spaceId_, partId_, dst_).thenValue([this](auto&& resp) { if (!resp.ok()) { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Add learner failed, status " << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Add learner failed, status " << resp; ret_ = BalanceTaskResult::FAILED; } else { status_ = BalanceTaskStatus::CATCH_UP_DATA; @@ -126,7 +125,7 @@ void BalanceTask::invoke() { SAVE_STATE(); client_->waitingForCatchUpData(spaceId_, partId_, dst_).thenValue([this](auto&& resp) { if (!resp.ok()) { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Catchup data failed, status " << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Catchup data failed, status " << resp; ret_ = BalanceTaskResult::FAILED; } else { status_ = BalanceTaskStatus::MEMBER_CHANGE_ADD; @@ -141,7 +140,7 @@ void BalanceTask::invoke() { SAVE_STATE(); client_->memberChange(spaceId_, partId_, dst_, true).thenValue([this](auto&& resp) { if (!resp.ok()) { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Add peer failed, status " << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Add peer failed, status " << resp; ret_ = BalanceTaskResult::FAILED; } else { status_ = BalanceTaskStatus::MEMBER_CHANGE_REMOVE; @@ -156,7 +155,7 @@ void BalanceTask::invoke() { SAVE_STATE(); client_->memberChange(spaceId_, partId_, src_, false).thenValue([this](auto&& resp) { if (!resp.ok()) { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Remove peer failed, status " << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Remove peer failed, status " << resp; ret_ = BalanceTaskResult::FAILED; } else { status_ = BalanceTaskStatus::UPDATE_PART_META; @@ -172,7 +171,7 @@ void BalanceTask::invoke() { // The callback will be called inside raft set value. So don't call // invoke directly here. if (!resp.ok()) { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Update meta failed, status " << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Update meta failed, status " << resp; ret_ = BalanceTaskResult::FAILED; } else { LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Update meta succeeded!"; @@ -189,7 +188,7 @@ void BalanceTask::invoke() { if (nebula::ok(srcLivedRet) && nebula::value(srcLivedRet)) { client_->removePart(spaceId_, partId_, src_).thenValue([this](auto&& resp) { if (!resp.ok()) { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Remove part failed, status " << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Remove part failed, status " << resp; ret_ = BalanceTaskResult::FAILED; } else { status_ = BalanceTaskStatus::CHECK; @@ -208,8 +207,7 @@ void BalanceTask::invoke() { SAVE_STATE(); client_->checkPeers(spaceId_, partId_).thenValue([this](auto&& resp) { if (!resp.ok()) { - LOG(ERROR) << taskIdStr_ + "," + commandStr_ << " Check the peers failed, status " - << resp; + LOG(INFO) << taskIdStr_ + "," + commandStr_ << " Check the peers failed, status " << resp; ret_ = BalanceTaskResult::FAILED; } else { status_ = BalanceTaskStatus::END; @@ -251,7 +249,7 @@ bool BalanceTask::saveInStore() { [this, &ret, &baton](nebula::cpp2::ErrorCode code) { if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { ret = false; - LOG(ERROR) << taskIdStr_ << " Can't persist task!"; + LOG(INFO) << taskIdStr_ << " Can't persist task!"; } baton.post(); }); diff --git a/src/meta/processors/job/BalanceTask.h b/src/meta/processors/job/BalanceTask.h index a07d4869c2a..dd8e6067933 100644 --- a/src/meta/processors/job/BalanceTask.h +++ b/src/meta/processors/job/BalanceTask.h @@ -18,6 +18,10 @@ namespace nebula { namespace meta { +/** + * @brief A balance task mainly include jobID, spaceId, partId, srcHost, dstHost, status, result + * The jobID, spaceId, partId, srcHost, dstHost make an unique id for the task + */ class BalanceTask { friend class BalancePlan; FRIEND_TEST(BalanceTest, BalanceTaskTest); @@ -61,6 +65,9 @@ class BalanceTask { return commandStr_; } + /** + * @brief Running this task + */ void invoke(); void rollback(); @@ -99,6 +106,11 @@ class BalanceTask { "%s:%d->%s:%d", src_.host.c_str(), src_.port, dst_.host.c_str(), dst_.port); } + /** + * @brief Save this task into kvStore + * + * @return + */ bool saveInStore(); int64_t startTime() const { diff --git a/src/meta/processors/job/CompactJobExecutor.h b/src/meta/processors/job/CompactJobExecutor.h index 50862227a4a..9f3969933fa 100644 --- a/src/meta/processors/job/CompactJobExecutor.h +++ b/src/meta/processors/job/CompactJobExecutor.h @@ -11,6 +11,9 @@ namespace nebula { namespace meta { +/** + * @brief Executor for compact job, always called by job manager + */ class CompactJobExecutor : public SimpleConcurrentJobExecutor { public: CompactJobExecutor(JobID jobId, @@ -18,6 +21,13 @@ class CompactJobExecutor : public SimpleConcurrentJobExecutor { AdminClient* adminClient, const std::vector& params); + /** + * @brief + * + * @param address the host that task command send to + * @param parts parts that the host contains + * @return + */ folly::Future executeInternal(HostAddr&& address, std::vector&& parts) override; }; diff --git a/src/meta/processors/job/DataBalanceJobExecutor.cpp b/src/meta/processors/job/DataBalanceJobExecutor.cpp index e5666f94202..c38007b5709 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.cpp +++ b/src/meta/processors/job/DataBalanceJobExecutor.cpp @@ -24,7 +24,7 @@ folly::Future DataBalanceJobExecutor::executeInternal() { plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { if (LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()) != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Balance plan " << plan_->id() << " update meta failed"; + LOG(INFO) << "Balance plan " << plan_->id() << " update meta failed"; } executorOnFinished_(status); }); @@ -102,8 +102,8 @@ Status DataBalanceJobExecutor::buildBalancePlan() { for (Host* h : hostVec) { totalPartNum += h->parts_.size(); } - if (hostVec.empty()) { - LOG(ERROR) << "rebalance error: zone has no host"; + if (hostVec.size() == 0) { + LOG(INFO) << "rebalance error: zone has no host"; return; } avgPartNum = totalPartNum / hostVec.size(); @@ -196,7 +196,7 @@ nebula::cpp2::ErrorCode DataBalanceJobExecutor::stop() { nebula::cpp2::ErrorCode DataBalanceJobExecutor::prepare() { auto spaceRet = getSpaceIdFromName(paras_.back()); if (!nebula::ok(spaceRet)) { - LOG(ERROR) << "Can't find the space: " << paras_.back(); + LOG(INFO) << "Can't find the space: " << paras_.back(); return nebula::error(spaceRet); } GraphSpaceID spaceId = nebula::value(spaceRet); diff --git a/src/meta/processors/job/DataBalanceJobExecutor.h b/src/meta/processors/job/DataBalanceJobExecutor.h index fe292160967..a6b27d789a0 100644 --- a/src/meta/processors/job/DataBalanceJobExecutor.h +++ b/src/meta/processors/job/DataBalanceJobExecutor.h @@ -11,6 +11,9 @@ namespace nebula { namespace meta { +/** + * @brief Executor for balance in zone, always called by job manager + */ class DataBalanceJobExecutor : public BalanceJobExecutor { FRIEND_TEST(BalanceTest, BalanceDataPlanTest); FRIEND_TEST(BalanceTest, NormalDataTest); @@ -24,11 +27,36 @@ class DataBalanceJobExecutor : public BalanceJobExecutor { const std::vector& params) : BalanceJobExecutor(jobDescription.getJobId(), kvstore, adminClient, params), jobDescription_(jobDescription) {} + + /** + * @brief Parse paras to lost hosts + * + * @return + */ nebula::cpp2::ErrorCode prepare() override; + + /** + * @brief Mark the job as stopped + * + * @return + */ nebula::cpp2::ErrorCode stop() override; protected: + /** + * @brief Build a balance plan and run + * + * @return + */ folly::Future executeInternal() override; + + /** + * @brief Build a balance plan, which balance data in each zone + * First, move parts from lost hosts to active hosts + * Second, rebalance the active hosts in each zone + * + * @return + */ Status buildBalancePlan() override; private: diff --git a/src/meta/processors/job/FlushJobExecutor.h b/src/meta/processors/job/FlushJobExecutor.h index 22e40059b65..a1ff6081ed1 100644 --- a/src/meta/processors/job/FlushJobExecutor.h +++ b/src/meta/processors/job/FlushJobExecutor.h @@ -11,6 +11,9 @@ namespace nebula { namespace meta { +/** + * @brief Executor for flush job, always called by job manager + */ class FlushJobExecutor : public SimpleConcurrentJobExecutor { public: FlushJobExecutor(JobID jobId, @@ -18,6 +21,13 @@ class FlushJobExecutor : public SimpleConcurrentJobExecutor { AdminClient* adminClient, const std::vector& params); + /** + * @brief + * + * @param address The host that task command send to + * @param parts Parts that the host contains + * @return + */ folly::Future executeInternal(HostAddr&& address, std::vector&& parts) override; }; diff --git a/src/meta/processors/job/GetStatsProcessor.cpp b/src/meta/processors/job/GetStatsProcessor.cpp index 355f0d67600..b71e257cab1 100644 --- a/src/meta/processors/job/GetStatsProcessor.cpp +++ b/src/meta/processors/job/GetStatsProcessor.cpp @@ -19,10 +19,10 @@ void GetStatsProcessor::process(const cpp2::GetStatsReq& req) { if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { ret = nebula::cpp2::ErrorCode::E_STATS_NOT_FOUND; - LOG(ERROR) << "SpaceId " << spaceId - << " no stats info, please execute `submit job stats' under space firstly."; + LOG(INFO) << "SpaceId " << spaceId + << " no stats info, please execute `submit job stats' under space firstly."; } else { - LOG(ERROR) << "Show stats failed, error " << apache::thrift::util::enumNameSafe(ret); + LOG(INFO) << "Show stats failed, error " << apache::thrift::util::enumNameSafe(ret); } handleErrorCode(ret); @@ -32,8 +32,8 @@ void GetStatsProcessor::process(const cpp2::GetStatsReq& req) { auto statsItem = MetaKeyUtils::parseStatsVal(val); auto statsJobStatus = statsItem.get_status(); if (statsJobStatus != cpp2::JobStatus::FINISHED) { - LOG(ERROR) << "SpaceId " << spaceId - << " stats job is running or failed, please execute `show jobs' firstly."; + LOG(INFO) << "SpaceId " << spaceId + << " stats job is running or failed, please execute `show jobs' firstly."; handleErrorCode(nebula::cpp2::ErrorCode::E_JOB_NOT_FINISHED); onFinished(); return; diff --git a/src/meta/processors/job/GetStatsProcessor.h b/src/meta/processors/job/GetStatsProcessor.h index cb2800c9d94..3d7d6214dbe 100644 --- a/src/meta/processors/job/GetStatsProcessor.h +++ b/src/meta/processors/job/GetStatsProcessor.h @@ -11,6 +11,9 @@ namespace nebula { namespace meta { +/** + * @brief Get stats of a job, return error if the job is running or failed + */ class GetStatsProcessor : public BaseProcessor { public: static GetStatsProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/job/JobDescription.cpp b/src/meta/processors/job/JobDescription.cpp index 230da7e7f2e..b2b43fdf36c 100644 --- a/src/meta/processors/job/JobDescription.cpp +++ b/src/meta/processors/job/JobDescription.cpp @@ -46,7 +46,7 @@ ErrorOr JobDescription::makeJobDescript auto key = parseKey(rawkey); if (!isSupportedValue(rawval)) { - LOG(ERROR) << "not supported data ver of job " << key; + LOG(INFO) << "not supported data ver of job " << key; return nebula::cpp2::ErrorCode::E_INVALID_JOB; } auto tup = parseVal(rawval); @@ -61,7 +61,7 @@ ErrorOr JobDescription::makeJobDescript auto stopTime = std::get<4>(tup); return JobDescription(key, cmd, paras, status, startTime, stopTime); } catch (std::exception& ex) { - LOG(ERROR) << ex.what(); + LOG(INFO) << ex.what(); } return nebula::cpp2::ErrorCode::E_INVALID_JOB; } @@ -178,7 +178,7 @@ ErrorOr JobDescription::loadJobDescript std::string val; auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Loading Job Description Failed" << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Loading Job Description Failed" << apache::thrift::util::enumNameSafe(retCode); return retCode; } return makeJobDescription(key, val); diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index 0e09ec554d6..6d95cc80c71 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -45,9 +45,13 @@ class JobDescription { int64_t startTime = 0, int64_t stopTime = 0); - /* - * return the JobDescription if both key & val is valid - * */ + /** + * @brief Return the JobDescription if both key & val is valid + * + * @param key + * @param val + * @return + */ static ErrorOr makeJobDescription( folly::StringPiece key, folly::StringPiece val); @@ -55,65 +59,92 @@ class JobDescription { return id_; } - /* - * return the command for this job. (e.g. compact, flush ...) - * */ + /** + * @brief Return the command for this job. (e.g. compact, flush ...) + * + * @return + */ cpp2::AdminCmd getCmd() const { return cmd_; } - /* - * return the paras for this job. (e.g. space name for compact/flush) - * */ + /** + * @brief Return the paras for this job. (e.g. space name for compact/flush) + * + * @return + */ std::vector getParas() const { return paras_; } - /* - * return the status (e.g. Queue, running, finished, failed, stopped); - * */ + /** + * @brief Return the status (e.g. Queue, running, finished, failed, stopped); + * + * @return + */ Status getStatus() const { return status_; } - /* - * return the key write to kv store - * */ + /** + * @brief Return the key write to kv store + * + * @return + */ std::string jobKey() const; - /* - * return the val write to kv store - * */ + /** + * @brief Return the val write to kv store + * + * @return + */ std::string jobVal() const; - /* - * return the key write to kv store, while calling "backup job" - * */ + /** + * @brief Return the key write to kv store, while calling "backup job" + * + * @return + */ std::string archiveKey(); - /* - * set the internal status - * will check if newStatus is later than curr Status + /** + * @brief + * Set the internal status + * Will check if newStatus is later than curr Status * e.g. set running to a finished job is forbidden * - * will set start time if newStatus is running - * will set stop time if newStatus is finished / failed / stopped - * */ + * Will set start time if newStatus is running + * Will set stop time if newStatus is finished / failed / stopped + * + * @param newStatus + * @param force Set status fored and ignore checking + * @return + */ bool setStatus(Status newStatus, bool force = false); - /* - * get a existed job from kvstore, return folly::none if there isn't - * */ + /** + * @brief + * Get a existed job from kvstore, return folly::none if there isn't + * + * @param iJob Id of the job we would load + * @param kv Where we load the job from + * @return + */ static ErrorOr loadJobDescription( JobID iJob, nebula::kvstore::KVStore* kv); - /* - * compose a key write to kvstore, according to the given job id - * */ + /** + * @brief + * Compose a key write to kvstore, according to the given job id + * + * @param iJob + * @return + */ static std::string makeJobKey(JobID iJob); - /* - * write out all job details in human readable strings + /** + * @brief + * Write out all job details in human readable strings * if a job is * ===================================================================================== * | Job Id(TaskId) | Command(Dest) | Status | Start Time | Stop Time @@ -122,26 +153,39 @@ class JobDescription { * | 27 | flush nba | finished | 12/09/19 11:09:40 | 12/09/19 * 11:09:40 | * ------------------------------------------------------------------------------------- - * then, the vector should be + * Then, the vector should be * {27, flush nba, finished, 12/09/19 11:09:40, 12/09/19 11:09:40} - * */ + * + * @return + */ cpp2::JobDesc toJobDesc(); - /* - * decode key from kvstore, return job id - * */ + /** + * @brief Decode key from kvstore, return job id + * + * @param rawKey + * @return + */ static int32_t parseKey(const folly::StringPiece& rawKey); - /* - * decode val from kvstore, return + /** + * @brief + * Decode val from kvstore, return * {command, paras, status, start time, stop time} - * */ + * + * @param rawVal + * @return + */ static std::tuple, Status, int64_t, int64_t> parseVal( const folly::StringPiece& rawVal); - /* - * check if the given rawKey is a valid JobKey - * */ + /** + * @brief + * Check if the given rawKey is a valid JobKey + * + * @param rawKey + * @return + */ static bool isJobKey(const folly::StringPiece& rawKey); bool operator==(const JobDescription& that) const { @@ -154,10 +198,15 @@ class JobDescription { private: static bool isSupportedValue(const folly::StringPiece& val); - /* - * decode val if it stored in data ver.1, return + + /** + * @brief + * Decode val if it stored in data ver.1, return * {command, paras, status, start time, stop time} - * */ + * + * @param rawVal + * @return + */ static std::tuple, Status, int64_t, int64_t> decodeValV1( const folly::StringPiece& rawVal); diff --git a/src/meta/processors/job/JobExecutor.cpp b/src/meta/processors/job/JobExecutor.cpp index ec20a1f6602..e44d2847610 100644 --- a/src/meta/processors/job/JobExecutor.cpp +++ b/src/meta/processors/job/JobExecutor.cpp @@ -36,8 +36,8 @@ ErrorOr JobExecutor::getSpaceIdFromName( std::string val; auto retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, indexKey, &val); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get space failed, space name: " << spaceName - << " error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get space failed, space name: " << spaceName + << " error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } return *reinterpret_cast(val.c_str()); diff --git a/src/meta/processors/job/JobExecutor.h b/src/meta/processors/job/JobExecutor.h index 166464f3f52..a5819a57e7f 100644 --- a/src/meta/processors/job/JobExecutor.h +++ b/src/meta/processors/job/JobExecutor.h @@ -22,32 +22,74 @@ class JobExecutor { explicit JobExecutor(kvstore::KVStore* kv) : kvstore_(kv) {} virtual ~JobExecutor() = default; - // Check the arguments about the job. + /** + * @brief Check the arguments about the job. + * + * @return + */ virtual bool check() = 0; - // Prepare the Job info from the arguments. + /** + * @brief Prepare the Job info from the arguments. + * + * @return + */ virtual nebula::cpp2::ErrorCode prepare() = 0; - // The skeleton to run the job. - // You should rewrite the executeInternal to trigger the calling. + /** + * @brief The skeleton to run the job. + * You should rewrite the executeInternal to trigger the calling. + * + * @return + */ virtual nebula::cpp2::ErrorCode execute() = 0; - // Stop the job when the user cancel it. + /** + * @brief Stop the job when the user cancel it. + * + * @return + */ virtual nebula::cpp2::ErrorCode stop() = 0; - virtual nebula::cpp2::ErrorCode finish(bool) = 0; - + /** + * @brief Called when job finished or failed + * + * @param ret True means finished and false means failed + * @return + */ + virtual nebula::cpp2::ErrorCode finish(bool ret) = 0; + + /** + * @brief Called when recover a job + * + * @return + */ virtual nebula::cpp2::ErrorCode recovery() = 0; + /** + * @brief Set id of the space + * + * @param spaceId + */ virtual void setSpaceId(GraphSpaceID spaceId) = 0; virtual bool isMetaJob() = 0; + /** + * @brief Set a callback which will be called when job finished, storage executor don't need it, + * + * @param func + */ virtual void setFinishCallBack( std::function func) { UNUSED(func); } + /** + * @brief Provide an extra status for some special tasks + * + * @return + */ virtual nebula::cpp2::ErrorCode saveSpecialTaskStatus(const cpp2::ReportTaskReq&) = 0; protected: diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index aa5d32a7fa9..3da0f853c49 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -73,7 +73,7 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() { return nebula::cpp2::ErrorCode::SUCCEEDED; } if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } std::vector jds; @@ -125,7 +125,7 @@ void JobManager::scheduleThread() { auto jobDescRet = JobDescription::loadJobDescription(opJobId.second, kvStore_); if (!nebula::ok(jobDescRet)) { - LOG(ERROR) << "[JobManager] load an invalid job from queue " << opJobId.second; + LOG(INFO) << "[JobManager] load an invalid job from queue " << opJobId.second; continue; // leader change or archive happened } auto jobDesc = nebula::value(jobDescRet); @@ -149,7 +149,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { JobExecutor* jobExec = je.get(); runningJobs_.emplace(jobDesc.getJobId(), std::move(je)); if (jobExec == nullptr) { - LOG(ERROR) << "unreconized job cmd " << apache::thrift::util::enumNameSafe(jobDesc.getCmd()); + LOG(INFO) << "unreconized job cmd " << apache::thrift::util::enumNameSafe(jobDesc.getCmd()); return false; } @@ -159,12 +159,12 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { } if (!jobExec->check()) { - LOG(ERROR) << "Job Executor check failed"; + LOG(INFO) << "Job Executor check failed"; return false; } if (jobExec->prepare() != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Job Executor prepare failed"; + LOG(INFO) << "Job Executor prepare failed"; return false; } if (op == JbOp::RECOVER) { @@ -182,7 +182,7 @@ bool JobManager::runJobInternal(const JobDescription& jobDesc, JbOp op) { }); } if (jobExec->execute() != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Job dispatch failed"; + LOG(INFO) << "Job dispatch failed"; return false; } return true; @@ -208,7 +208,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job std::lock_guard lk(muJobFinished_); auto optJobDescRet = JobDescription::loadJobDescription(jobId, kvStore_); if (!nebula::ok(optJobDescRet)) { - LOG(WARNING) << folly::sformat("can't load job, jobId={}", jobId); + LOG(INFO) << folly::sformat("can't load job, jobId={}", jobId); if (jobStatus != cpp2::JobStatus::STOPPED) { // there is a rare condition, that when job finished, // the job description is deleted(default more than a week) @@ -232,7 +232,7 @@ nebula::cpp2::ErrorCode JobManager::jobFinished(JobID jobId, cpp2::JobStatus job auto it = runningJobs_.find(jobId); if (it == runningJobs_.end()) { - LOG(WARNING) << folly::sformat("can't find jobExecutor, jobId={}", jobId); + LOG(INFO) << folly::sformat("can't find jobExecutor, jobId={}", jobId); return nebula::cpp2::ErrorCode::E_UNKNOWN; } std::unique_ptr& jobExec = it->second; @@ -276,8 +276,8 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, auto optJobDescRet = JobDescription::loadJobDescription(jobId, kvStore_); if (!nebula::ok(optJobDescRet)) { auto retCode = nebula::error(optJobDescRet); - LOG(WARNING) << "LoadJobDesc failed, jobId " << jobId - << " error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "LoadJobDesc failed, jobId " << jobId + << " error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -285,7 +285,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, auto jobExec = JobExecutorFactory::createJobExecutor(optJobDesc, kvStore_, adminClient_); if (!jobExec) { - LOG(WARNING) << folly::sformat("createMetaJobExecutor failed(), jobId={}", jobId); + LOG(INFO) << folly::sformat("createMetaJobExecutor failed(), jobId={}", jobId); return nebula::cpp2::ErrorCode::E_TASK_REPORT_OUT_DATE; } @@ -300,8 +300,8 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td, auto spaceIdRet = getSpaceId(spaceName); if (!nebula::ok(spaceIdRet)) { auto retCode = nebula::error(spaceIdRet); - LOG(WARNING) << "Get spaceName " << spaceName - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get spaceName " << spaceName + << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); } else { spaceId = nebula::value(spaceIdRet); jobExec->setSpaceId(spaceId); @@ -345,7 +345,7 @@ nebula::cpp2::ErrorCode JobManager::reportTaskFinish(const cpp2::ReportTaskReq& return it.getJobId() == jobId && it.getTaskId() == taskId; }); if (task == tasks.end()) { - LOG(WARNING) << folly::sformat( + LOG(INFO) << folly::sformat( "report an invalid or outdate task, will ignore this report, job={}, " "task={}", jobId, @@ -398,7 +398,7 @@ nebula::cpp2::ErrorCode JobManager::addJob(const JobDescription& jobDesc, AdminC // Add job to jobMap inFlightJobs_.emplace(jobId, jobDesc); } else { - LOG(ERROR) << "Add Job Failed"; + LOG(INFO) << "Add Job Failed"; if (rc != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { rc = nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE; } @@ -434,7 +434,7 @@ ErrorOr> JobManager::showJob std::unique_ptr iter; auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Fetch Jobs Failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Fetch Jobs Failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -473,7 +473,7 @@ ErrorOr> JobManager::showJob retCode = removeExpiredJobs(std::move(expiredJobKeys)); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Remove Expired Jobs Failed"; + LOG(INFO) << "Remove Expired Jobs Failed"; return retCode; } @@ -509,8 +509,8 @@ nebula::cpp2::ErrorCode JobManager::removeExpiredJobs( std::move(expiredJobsAndTasks), [&](nebula::cpp2::ErrorCode code) { if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "kvstore asyncRemoveRange failed: " - << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "kvstore asyncRemoveRange failed: " + << apache::thrift::util::enumNameSafe(code); } ret = code; baton.post(); @@ -557,7 +557,7 @@ JobManager::showJob(JobID iJob, const std::string& spaceName) { } auto optJob = nebula::value(optJobRet); if (optJob.getParas().back() != spaceName) { - LOG(WARNING) << "Show job " << iJob << " not in current space " << spaceName; + LOG(INFO) << "Show job " << iJob << " not in current space " << spaceName; return nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE; } ret.first = optJob.toJobDesc(); @@ -590,13 +590,13 @@ nebula::cpp2::ErrorCode JobManager::stopJob(JobID iJob, const std::string& space auto optJobDescRet = JobDescription::loadJobDescription(iJob, kvStore_); if (!nebula::ok(optJobDescRet)) { auto retCode = nebula::error(optJobDescRet); - LOG(WARNING) << "LoadJobDesc failed, jobId " << iJob - << " error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "LoadJobDesc failed, jobId " << iJob + << " error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } auto optJobDesc = nebula::value(optJobDescRet); if (optJobDesc.getParas().back() != spaceName) { - LOG(WARNING) << "Stop job " << iJob << " not in space " << spaceName; + LOG(INFO) << "Stop job " << iJob << " not in space " << spaceName; return nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE; } return jobFinished(iJob, cpp2::JobStatus::STOPPED); @@ -614,7 +614,7 @@ ErrorOr JobManager::recoverJob( std::unique_ptr iter; auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } for (; iter->valid(); iter->next()) { @@ -632,7 +632,7 @@ ErrorOr JobManager::recoverJob( std::vector values; auto retCode = kvStore_->multiGet(kDefaultSpaceId, kDefaultPartId, keys, &values); if (retCode.first != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode.first); + LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode.first); return retCode.first; } for (size_t i = 0; i < keys.size(); i++) { @@ -687,7 +687,7 @@ ErrorOr JobManager::getSpaceId(const std: if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { retCode = nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND; } - LOG(ERROR) << "KVStore error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "KVStore error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } return *reinterpret_cast(val.c_str()); @@ -697,7 +697,7 @@ ErrorOr JobManager::checkIndexJobRunning() { std::unique_ptr iter; auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Fetch Jobs Failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Fetch Jobs Failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index f8532b61dfa..ad741b052b9 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -64,56 +64,134 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab RECOVER, }; + /** + * @brief Init task queue, kvStore and schedule thread + * + * @param store + * @return + */ bool init(nebula::kvstore::KVStore* store); + /** + * @brief Called when receive a system signal + */ void shutDown(); - /* - * Load job description from kvstore - * */ + /** + * @brief Load job description from kvstore + * + * @param jobDesc + * @param client + * @return + */ nebula::cpp2::ErrorCode addJob(const JobDescription& jobDesc, AdminClient* client); - /* - * The same job is in jobMap + /** + * @brief The same job is in jobMap + * + * @param cmd + * @param paras + * @param iJob + * @return */ bool checkJobExist(const cpp2::AdminCmd& cmd, const std::vector& paras, JobID& iJob); + /** + * @brief Load all jobs of the space from kvStore and convert to cpp2::JobDesc + * + * @param spaceName + * @return + */ ErrorOr> showJobs( const std::string& spaceName); + /** + * @brief Load one job and related tasks from kvStore and convert to cpp2::JobDesc + * + * @param iJob + * @param spaceName + * @return + */ ErrorOr>> showJob( JobID iJob, const std::string& spaceName); + /** + * @brief Stop a job when user cancel it + * + * @param iJob + * @param spaceName + * @return + */ nebula::cpp2::ErrorCode stopJob(JobID iJob, const std::string& spaceName); - // return error/recovered job num + /** + * @brief + * + * @param spaceName + * @param client + * @param jobIds + * @return Return error/recovered job num + */ ErrorOr recoverJob(const std::string& spaceName, AdminClient* client, const std::vector& jobIds = {}); /** - * @brief persist job executed result, and do the cleanup + * @brief Persist job executed result, and do the cleanup + * + * @param jobId + * @param jobStatus * @return cpp2::ErrorCode if error when write to kv store */ nebula::cpp2::ErrorCode jobFinished(JobID jobId, cpp2::JobStatus jobStatus); - // report task finished. + /** + * @brief Report task finished. + * + * @param req + * @return + */ nebula::cpp2::ErrorCode reportTaskFinish(const cpp2::ReportTaskReq& req); - // Only used for Test - // The number of jobs in lowPriorityQueue_ a and highPriorityQueue_ + /** + * @brief Only used for Test + * The number of jobs in lowPriorityQueue_ a and highPriorityQueue_ + * + * @return + */ size_t jobSize() const; - // Tries to extract an element from the front of the highPriorityQueue_, - // if failed, then extract an element from lowPriorityQueue_. - // If the element is obtained, return true, otherwise return false. + /** + * @brief Tries to extract an element from the front of the highPriorityQueue_, + * If failed, then extract an element from lowPriorityQueue_. + * If the element is obtained, return true, otherwise return false. + * + * @param opJobId + * @return + */ bool try_dequeue(std::pair& opJobId); - // Enter different priority queues according to the command type + /** + * @brief Enter different priority queues according to the command type + * + * @param op Recover a job or add a new one + * @param jobId Id of the job + * @param cmd Cmd type of the job + */ void enqueue(const JbOp& op, const JobID& jobId, const cpp2::AdminCmd& cmd); + /** + * @brief Check if there is a rebuild_tag_index or rebuild_edge_index running + * + * @return + */ ErrorOr checkIndexJobRunning(); + /** + * @brief Load jobs that are running before crashed and add them into queue + * + * @return + */ nebula::cpp2::ErrorCode handleRemainingJobs(); private: @@ -133,14 +211,44 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab static bool isRunningJob(const JobDescription& jobDesc); + /** + * @brief Remove jobs of the given keys + * + * @param jobKeys + * @return + */ nebula::cpp2::ErrorCode removeExpiredJobs(std::vector&& jobKeys); + /** + * @brief Get all tasks of this job + * + * @param jobId + * @return + */ ErrorOr> getAllTasks(JobID jobId); + /** + * @brief Remove a job from the queue and running map + * + * @param jobId + */ void cleanJob(JobID jobId); + /** + * @brief Save a task into kvStore + * + * @param td + * @param req + * @return + */ nebula::cpp2::ErrorCode saveTaskStatus(TaskDescription& td, const cpp2::ReportTaskReq& req); + /** + * @brief Cas operation to set status + * + * @param expected + * @param desired + */ void compareChangeStatus(JbmgrStatus expected, JbmgrStatus desired); private: diff --git a/src/meta/processors/job/JobUtils.cpp b/src/meta/processors/job/JobUtils.cpp index 0cf484c14fa..1cab6e0c4f7 100644 --- a/src/meta/processors/job/JobUtils.cpp +++ b/src/meta/processors/job/JobUtils.cpp @@ -55,15 +55,15 @@ const std::string& JobUtil::archivePrefix() { std::string JobUtil::parseString(folly::StringPiece rawVal, size_t offset) { if (rawVal.size() < offset + sizeof(size_t)) { - LOG(ERROR) << "Error: rawVal: " << toHexStr(rawVal) << ", offset: " << offset; + LOG(INFO) << "Error: rawVal: " << toHexStr(rawVal) << ", offset: " << offset; throw std::runtime_error( folly::stringPrintf("%s: offset=%zu, rawVal.size()=%zu", __func__, offset, rawVal.size())); } auto len = *reinterpret_cast(rawVal.data() + offset); offset += sizeof(size_t); if (rawVal.size() < offset + len) { - LOG(ERROR) << "Error: rawVal: " << toHexStr(rawVal) << ", len: " << len - << ", offset: " << offset; + LOG(INFO) << "Error: rawVal: " << toHexStr(rawVal) << ", len: " << len + << ", offset: " << offset; throw std::runtime_error( folly::stringPrintf("%s: offset=%zu, rawVal.size()=%zu", __func__, offset, rawVal.size())); } @@ -73,7 +73,7 @@ std::string JobUtil::parseString(folly::StringPiece rawVal, size_t offset) { std::vector JobUtil::parseStrVector(folly::StringPiece rawVal, size_t* offset) { std::vector ret; if (rawVal.size() < *offset + sizeof(size_t)) { - LOG(ERROR) << "Error: rawVal: " << toHexStr(rawVal) << ", offset: " << offset; + LOG(INFO) << "Error: rawVal: " << toHexStr(rawVal) << ", offset: " << offset; throw std::runtime_error( folly::stringPrintf("%s: offset=%zu, rawVal.size()=%zu", __func__, *offset, rawVal.size())); } diff --git a/src/meta/processors/job/JobUtils.h b/src/meta/processors/job/JobUtils.h index 12f057d96b3..81c0e21a511 100644 --- a/src/meta/processors/job/JobUtils.h +++ b/src/meta/processors/job/JobUtils.h @@ -33,8 +33,22 @@ class JobUtil { return *reinterpret_cast(rawVal.data() + offset); } + /** + * @brief Get a string from a serialized value + * + * @param rawVal string to read + * @param offset from where to read the string + * @return + */ static std::string parseString(folly::StringPiece rawVal, size_t offset); + /** + * @brief Get vector of string from a serialized value + * + * @param rawVal + * @param offset + * @return + */ static std::vector parseStrVector(folly::StringPiece rawVal, size_t* offset); }; diff --git a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp index d55144d6bc3..6f950ada72f 100644 --- a/src/meta/processors/job/LeaderBalanceJobExecutor.cpp +++ b/src/meta/processors/job/LeaderBalanceJobExecutor.cpp @@ -27,7 +27,7 @@ nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::getAllSpaces( std::unique_ptr iter; auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get all spaces failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get all spaces failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -50,8 +50,8 @@ ErrorOr LeaderBalanceJobExecutor::getHostParts(Gr std::unique_ptr iter; auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId << " " - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Access kvstore failed, spaceId " << spaceId << " " + << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -72,15 +72,15 @@ ErrorOr LeaderBalanceJobExecutor::getHostParts(Gr std::string value; retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, key, &value); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Access kvstore failed, spaceId " << spaceId + << apache::thrift::util::enumNameSafe(retCode); return retCode; } auto properties = MetaKeyUtils::parseSpace(value); if (totalParts != properties.get_partition_num()) { - LOG(ERROR) << "Partition number not equals " << totalParts << " : " - << properties.get_partition_num(); + LOG(INFO) << "Partition number not equals " << totalParts << " : " + << properties.get_partition_num(); return false; } @@ -113,7 +113,7 @@ ErrorOr LeaderBalanceJobExecutor::getHostParts(Gr auto zonePartsRet = assembleZoneParts(zoneNames, confirmedHostParts); if (zonePartsRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Assemble Zone Parts failed"; + LOG(INFO) << "Assemble Zone Parts failed"; return zonePartsRet; } } @@ -132,8 +132,8 @@ nebula::cpp2::ErrorCode LeaderBalanceJobExecutor::assembleZoneParts( std::string zoneValue; auto retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get zone " << zoneName - << " failed: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get zone " << zoneName + << " failed: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -199,14 +199,14 @@ void LeaderBalanceJobExecutor::calDiff(const HostParts& hostParts, std::vector& expand, std::vector& lost) { for (auto it = hostParts.begin(); it != hostParts.end(); it++) { - VLOG(1) << "Original Host " << it->first << ", parts " << it->second.size(); + LOG(INFO) << "Original Host " << it->first << ", parts " << it->second.size(); if (std::find(activeHosts.begin(), activeHosts.end(), it->first) == activeHosts.end() && std::find(lost.begin(), lost.end(), it->first) == lost.end()) { lost.emplace_back(it->first); } } for (auto& h : activeHosts) { - VLOG(1) << "Active host " << h; + LOG(INFO) << "Active host " << h; if (hostParts.find(h) == hostParts.end()) { expand.emplace_back(h); } @@ -259,8 +259,8 @@ folly::Future LeaderBalanceJobExecutor::executeInternal() { auto balanceResult = buildLeaderBalancePlan( hostLeaderMap_.get(), spaceId, replicaFactor, dependentOnZone, plan); if (!nebula::ok(balanceResult) || !nebula::value(balanceResult)) { - LOG(ERROR) << "Building leader balance plan failed " - << "Space: " << spaceId; + LOG(INFO) << "Building leader balance plan failed " + << "Space: " << spaceId; continue; } simplifyLeaderBalancePlan(spaceId, plan); @@ -312,7 +312,7 @@ ErrorOr LeaderBalanceJobExecutor::buildLeaderBala std::unique_ptr iter; auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId << static_cast(retCode); + LOG(INFO) << "Access kvstore failed, spaceId " << spaceId << static_cast(retCode); return retCode; } @@ -334,7 +334,7 @@ ErrorOr LeaderBalanceJobExecutor::buildLeaderBala } else { auto retVal = nebula::value(result); if (!retVal || totalParts == 0 || allHostParts.empty()) { - LOG(ERROR) << "Invalid space " << spaceId; + LOG(INFO) << "Invalid space " << spaceId; return false; } } @@ -349,14 +349,14 @@ ErrorOr LeaderBalanceJobExecutor::buildLeaderBala } if (activeHosts.empty()) { - LOG(ERROR) << "No active hosts"; + LOG(INFO) << "No active hosts"; return false; } if (dependentOnZone) { for (auto it = allHostParts.begin(); it != allHostParts.end(); it++) { auto min = it->second.size() / replicaFactor; - VLOG(3) << "Host: " << it->first << " Bounds: " << min << " : " << min + 1; + LOG(INFO) << "Host: " << it->first << " Bounds: " << min << " : " << min + 1; hostBounds_[it->first] = std::make_pair(min, min + 1); } } else { @@ -374,8 +374,8 @@ ErrorOr LeaderBalanceJobExecutor::buildLeaderBala globalMax = std::floor(static_cast(leaderParts) / activeSize * (1 + FLAGS_leader_balance_deviation)); } - VLOG(3) << "Build leader balance plan, expected min load: " << globalMin - << ", max load: " << globalMax << " avg: " << globalAvg; + LOG(INFO) << "Build leader balance plan, expected min load: " << globalMin + << ", max load: " << globalMax << " avg: " << globalAvg; for (auto it = allHostParts.begin(); it != allHostParts.end(); it++) { hostBounds_[it->first] = std::make_pair(globalMin, globalMax); @@ -391,8 +391,8 @@ ErrorOr LeaderBalanceJobExecutor::buildLeaderBala auto& hostMaxLoad = hostBounds_[host].second; int32_t partSize = hostEntry.second.size(); if (hostMinLoad <= partSize && partSize <= hostMaxLoad) { - VLOG(3) << partSize << " is between min load " << hostMinLoad << " and max load " - << hostMaxLoad; + LOG(INFO) << partSize << " is between min load " << hostMinLoad << " and max load " + << hostMaxLoad; continue; } @@ -440,7 +440,7 @@ int32_t LeaderBalanceJobExecutor::acquireLeaders(HostParts& allHostParts, auto& targetLeaders = leaderHostParts[target]; size_t minLoad = hostBounds_[target].first; for (const auto& partId : diff) { - VLOG(3) << "Try acquire leader for part " << partId; + LOG(INFO) << "Try acquire leader for part " << partId; // find the leader of partId auto sources = peersMap[partId]; for (const auto& source : sources) { @@ -451,8 +451,8 @@ int32_t LeaderBalanceJobExecutor::acquireLeaders(HostParts& allHostParts, // if peer is the leader of partId and can transfer, then transfer it to // host auto& sourceLeaders = leaderHostParts[source]; - VLOG(3) << "Check peer: " << source << " min load: " << minLoad - << " peerLeaders size: " << sourceLeaders.size(); + LOG(INFO) << "Check peer: " << source << " min load: " << minLoad + << " peerLeaders size: " << sourceLeaders.size(); auto it = std::find(sourceLeaders.begin(), sourceLeaders.end(), partId); if (it != sourceLeaders.end() && minLoad < sourceLeaders.size()) { sourceLeaders.erase(it); diff --git a/src/meta/processors/job/LeaderBalanceJobExecutor.h b/src/meta/processors/job/LeaderBalanceJobExecutor.h index e4b7352dc2c..2c07ff4d581 100644 --- a/src/meta/processors/job/LeaderBalanceJobExecutor.h +++ b/src/meta/processors/job/LeaderBalanceJobExecutor.h @@ -36,8 +36,24 @@ class LeaderBalanceJobExecutor : public MetaJobExecutor { nebula::cpp2::ErrorCode finish(bool ret = true) override; protected: + /** + * @brief Build balance plan and run + * + * @return + */ folly::Future executeInternal() override; + /** + * @brief Build a plan to balance leader + * + * @param hostLeaderMap + * @param spaceId + * @param replicaFactor + * @param dependentOnZone + * @param plan + * @param useDeviation + * @return + */ ErrorOr buildLeaderBalancePlan(HostLeaderMap* hostLeaderMap, GraphSpaceID spaceId, int32_t replicaFactor, @@ -45,6 +61,18 @@ class LeaderBalanceJobExecutor : public MetaJobExecutor { LeaderBalancePlan& plan, bool useDeviation = true); + /** + * @brief Host will loop for the partition which is not leader, and try to acuire the leader + * + * @param allHostParts + * @param leaderHostParts + * @param peersMap + * @param activeHosts + * @param target + * @param plan + * @param spaceId + * @return + */ int32_t acquireLeaders(HostParts& allHostParts, HostParts& leaderHostParts, PartAllocation& peersMap, @@ -70,6 +98,14 @@ class LeaderBalanceJobExecutor : public MetaJobExecutor { HostParts& hostParts, int32_t& totalParts); + /** + * @brief Compare and get the lost hosts and expand hosts + * + * @param hostParts + * @param activeHosts + * @param expand + * @param lost + */ void calDiff(const HostParts& hostParts, const std::vector& activeHosts, std::vector& expand, diff --git a/src/meta/processors/job/ListEdgeIndexStatusProcessor.cpp b/src/meta/processors/job/ListEdgeIndexStatusProcessor.cpp index 0e2b5daa2bc..62796909f07 100644 --- a/src/meta/processors/job/ListEdgeIndexStatusProcessor.cpp +++ b/src/meta/processors/job/ListEdgeIndexStatusProcessor.cpp @@ -14,7 +14,7 @@ void ListEdgeIndexStatusProcessor::process(const cpp2::ListIndexStatusReq& req) std::unique_ptr iter; auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(retCode); handleErrorCode(retCode); onFinished(); return; diff --git a/src/meta/processors/job/ListEdgeIndexStatusProcessor.h b/src/meta/processors/job/ListEdgeIndexStatusProcessor.h index 105db2ebaa3..ba6257b07b1 100644 --- a/src/meta/processors/job/ListEdgeIndexStatusProcessor.h +++ b/src/meta/processors/job/ListEdgeIndexStatusProcessor.h @@ -13,6 +13,9 @@ namespace nebula { namespace meta { +/** + * @brief Show status all rebuild-edge-index jobs + */ class ListEdgeIndexStatusProcessor : public BaseProcessor { public: static ListEdgeIndexStatusProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/job/ListTagIndexStatusProcessor.cpp b/src/meta/processors/job/ListTagIndexStatusProcessor.cpp index 860b1a4c51a..d7f184a5690 100644 --- a/src/meta/processors/job/ListTagIndexStatusProcessor.cpp +++ b/src/meta/processors/job/ListTagIndexStatusProcessor.cpp @@ -14,7 +14,7 @@ void ListTagIndexStatusProcessor::process(const cpp2::ListIndexStatusReq& req) { std::unique_ptr iter; auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Loading Job Failed" << apache::thrift::util::enumNameSafe(retCode); handleErrorCode(retCode); onFinished(); return; diff --git a/src/meta/processors/job/ListTagIndexStatusProcessor.h b/src/meta/processors/job/ListTagIndexStatusProcessor.h index 0fc01689257..14513f33a55 100644 --- a/src/meta/processors/job/ListTagIndexStatusProcessor.h +++ b/src/meta/processors/job/ListTagIndexStatusProcessor.h @@ -13,6 +13,9 @@ namespace nebula { namespace meta { +/** + * @brief Show status all rebuild-tag-index jobs + */ class ListTagIndexStatusProcessor : public BaseProcessor { public: static ListTagIndexStatusProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/job/MetaJobExecutor.cpp b/src/meta/processors/job/MetaJobExecutor.cpp index 643aaa20921..57056a700a1 100644 --- a/src/meta/processors/job/MetaJobExecutor.cpp +++ b/src/meta/processors/job/MetaJobExecutor.cpp @@ -28,7 +28,7 @@ nebula::cpp2::ErrorCode MetaJobExecutor::execute() { auto rc = nebula::cpp2::ErrorCode::SUCCEEDED; future.wait(); if (!future.value().ok()) { - LOG(ERROR) << future.value().toString(); + LOG(INFO) << future.value().toString(); rc = nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE; } return rc; diff --git a/src/meta/processors/job/MetaJobExecutor.h b/src/meta/processors/job/MetaJobExecutor.h index 7c461488aff..3ccf9678f24 100644 --- a/src/meta/processors/job/MetaJobExecutor.h +++ b/src/meta/processors/job/MetaJobExecutor.h @@ -29,17 +29,33 @@ class MetaJobExecutor : public JobExecutor { virtual ~MetaJobExecutor() = default; - // Check the arguments about the job. + /** + * @brief Check the arguments about the job. + * + * @return + */ bool check() override; - // Prepare the Job info from the arguments. + /** + * @brief Prepare the Job info from the arguments. + * + * @return + */ nebula::cpp2::ErrorCode prepare() override; - // The skeleton to run the job. - // You should rewrite the executeInternal to trigger the calling. + /** + * @brief The skeleton to run the job. + * You should rewrite the executeInternal to trigger the calling. + * + * @return + */ nebula::cpp2::ErrorCode execute() override; - // Stop the job when the user cancel it. + /** + * @brief Stop the job when the user cancel it. + * + * @return + */ nebula::cpp2::ErrorCode stop() override; nebula::cpp2::ErrorCode finish(bool) override; diff --git a/src/meta/processors/job/RebuildJobExecutor.cpp b/src/meta/processors/job/RebuildJobExecutor.cpp index a1f826fafa2..e8377a21488 100644 --- a/src/meta/processors/job/RebuildJobExecutor.cpp +++ b/src/meta/processors/job/RebuildJobExecutor.cpp @@ -24,7 +24,7 @@ nebula::cpp2::ErrorCode RebuildJobExecutor::prepare() { // the last value of paras_ is the space name, others are index name auto spaceRet = getSpaceIdFromName(paras_.back()); if (!nebula::ok(spaceRet)) { - LOG(ERROR) << "Can't find the space: " << paras_.back(); + LOG(INFO) << "Can't find the space: " << paras_.back(); return nebula::error(spaceRet); } space_ = nebula::value(spaceRet); @@ -35,8 +35,8 @@ nebula::cpp2::ErrorCode RebuildJobExecutor::prepare() { auto indexKey = MetaKeyUtils::indexIndexKey(space_, paras_[i]); auto retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, indexKey, &indexValue); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get indexKey error indexName: " << paras_[i] - << " error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get indexKey error indexName: " << paras_[i] + << " error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -50,7 +50,7 @@ nebula::cpp2::ErrorCode RebuildJobExecutor::prepare() { nebula::cpp2::ErrorCode RebuildJobExecutor::stop() { auto errOrTargetHost = getTargetHost(space_); if (!nebula::ok(errOrTargetHost)) { - LOG(ERROR) << "Get target host failed"; + LOG(INFO) << "Get target host failed"; auto retCode = nebula::error(errOrTargetHost); if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { retCode = nebula::cpp2::ErrorCode::E_NO_HOSTS; @@ -68,12 +68,12 @@ nebula::cpp2::ErrorCode RebuildJobExecutor::stop() { auto tries = folly::collectAll(std::move(futures)).get(); if (std::any_of(tries.begin(), tries.end(), [](auto& t) { return t.hasException(); })) { - LOG(ERROR) << "RebuildJobExecutor::stop() RPC failure."; + LOG(INFO) << "RebuildJobExecutor::stop() RPC failure."; return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } for (const auto& t : tries) { if (!t.value().ok()) { - LOG(ERROR) << "Stop Build Index Failed"; + LOG(INFO) << "Stop Build Index Failed"; return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } } diff --git a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp index a69ea88c9e1..35c2a4ba897 100644 --- a/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp +++ b/src/meta/processors/job/SimpleConcurrentJobExecutor.cpp @@ -26,14 +26,14 @@ nebula::cpp2::ErrorCode SimpleConcurrentJobExecutor::prepare() { std::string spaceName = paras_.back(); auto errOrSpaceId = getSpaceIdFromName(spaceName); if (!nebula::ok(errOrSpaceId)) { - LOG(ERROR) << "Can't find the space: " << spaceName; + LOG(INFO) << "Can't find the space: " << spaceName; return nebula::error(errOrSpaceId); } space_ = nebula::value(errOrSpaceId); ErrOrHosts errOrHost = getTargetHost(space_); if (!nebula::ok(errOrHost)) { - LOG(ERROR) << "Can't get any host according to space"; + LOG(INFO) << "Can't get any host according to space"; return nebula::error(errOrHost); } diff --git a/src/meta/processors/job/StatsJobExecutor.cpp b/src/meta/processors/job/StatsJobExecutor.cpp index 705d38eb982..b533db664cd 100644 --- a/src/meta/processors/job/StatsJobExecutor.cpp +++ b/src/meta/processors/job/StatsJobExecutor.cpp @@ -44,7 +44,7 @@ nebula::cpp2::ErrorCode StatsJobExecutor::doRemove(const std::string& key) { nebula::cpp2::ErrorCode StatsJobExecutor::prepare() { auto spaceRet = getSpaceIdFromName(paras_[0]); if (!nebula::ok(spaceRet)) { - LOG(ERROR) << "Can't find the space: " << paras_[0]; + LOG(INFO) << "Can't find the space: " << paras_[0]; return nebula::error(spaceRet); } space_ = nebula::value(spaceRet); @@ -167,7 +167,7 @@ nebula::cpp2::ErrorCode StatsJobExecutor::finish(bool exeSuccessed) { std::string val; auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, tempKey, &val); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't find the stats data, spaceId : " << space_; + LOG(INFO) << "Can't find the stats data, spaceId : " << space_; return ret; } auto statsItem = MetaKeyUtils::parseStatsVal(val); @@ -179,7 +179,7 @@ nebula::cpp2::ErrorCode StatsJobExecutor::finish(bool exeSuccessed) { auto statsVal = MetaKeyUtils::statsVal(statsItem); auto retCode = save(statsKey, statsVal); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Sace stats data failed, error " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Sace stats data failed, error " << apache::thrift::util::enumNameSafe(retCode); return retCode; } return doRemove(tempKey); @@ -188,7 +188,7 @@ nebula::cpp2::ErrorCode StatsJobExecutor::finish(bool exeSuccessed) { nebula::cpp2::ErrorCode StatsJobExecutor::stop() { auto errOrTargetHost = getTargetHost(space_); if (!nebula::ok(errOrTargetHost)) { - LOG(ERROR) << "Get target host failed"; + LOG(INFO) << "Get target host failed"; auto retCode = nebula::error(errOrTargetHost); if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { retCode = nebula::cpp2::ErrorCode::E_NO_HOSTS; @@ -206,13 +206,13 @@ nebula::cpp2::ErrorCode StatsJobExecutor::stop() { auto tries = folly::collectAll(std::move(futures)).get(); if (std::any_of(tries.begin(), tries.end(), [](auto& t) { return t.hasException(); })) { - LOG(ERROR) << "stats job stop() RPC failure."; + LOG(INFO) << "stats job stop() RPC failure."; return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } for (const auto& t : tries) { if (!t.value().ok()) { - LOG(ERROR) << "Stop stats job Failed"; + LOG(INFO) << "Stop stats job Failed"; return nebula::cpp2::ErrorCode::E_BALANCER_FAILURE; } } diff --git a/src/meta/processors/job/StatsJobExecutor.h b/src/meta/processors/job/StatsJobExecutor.h index 9b6797753d2..ace89e0aa43 100644 --- a/src/meta/processors/job/StatsJobExecutor.h +++ b/src/meta/processors/job/StatsJobExecutor.h @@ -32,15 +32,26 @@ class StatsJobExecutor : public StorageJobExecutor { folly::Future executeInternal(HostAddr&& address, std::vector&& parts) override; - // Summarize the results of statsItem_ + /** + * @brief Summarize the results of statsItem_ + * + * @param exeSuccessed + * @return + */ nebula::cpp2::ErrorCode finish(bool exeSuccessed) override; nebula::cpp2::ErrorCode saveSpecialTaskStatus(const cpp2::ReportTaskReq& req) override; private: - // Stats job writes an additional data. - // The additional data is written when the stats job passes the check - // function. Update this additional data when job finishes. + /** + * @brief Stats job writes an additional data. + * The additional data is written when the stats job passes the check + * function. Update this additional data when job finishes. + * + * @param key + * @param val + * @return + */ nebula::cpp2::ErrorCode save(const std::string& key, const std::string& val); void addStats(cpp2::StatsItem& lhs, const cpp2::StatsItem& rhs); diff --git a/src/meta/processors/job/StorageJobExecutor.cpp b/src/meta/processors/job/StorageJobExecutor.cpp index e00b3493ad1..6933a0e6c3f 100644 --- a/src/meta/processors/job/StorageJobExecutor.cpp +++ b/src/meta/processors/job/StorageJobExecutor.cpp @@ -32,7 +32,7 @@ ErrOrHosts StorageJobExecutor::getTargetHost(GraphSpaceID spaceId) { const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId); auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Fetch Parts Failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Fetch Parts Failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -58,8 +58,8 @@ ErrOrHosts StorageJobExecutor::getLeaderHost(GraphSpaceID space) { std::unique_ptr leaderIter; auto retCode = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, hostPrefix, &leaderIter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get space " << space - << "'s part failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get space " << space + << "'s part failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -89,8 +89,8 @@ ErrOrHosts StorageJobExecutor::getListenerHost(GraphSpaceID space, cpp2::Listene std::unique_ptr iter; auto ret = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get space " << space - << "'s listener failed, error: " << apache::thrift::util::enumNameSafe(ret); + LOG(INFO) << "Get space " << space + << "'s listener failed, error: " << apache::thrift::util::enumNameSafe(ret); return ret; } @@ -109,7 +109,7 @@ ErrOrHosts StorageJobExecutor::getListenerHost(GraphSpaceID space, cpp2::Listene auto host = MetaKeyUtils::deserializeHostAddr(iter->val()); auto part = MetaKeyUtils::parseListenerPart(iter->key()); if (std::find(activeHosts.begin(), activeHosts.end(), host) == activeHosts.end()) { - LOG(ERROR) << "Invalid host : " << network::NetworkUtils::toHostsStr({host}); + LOG(INFO) << "Invalid host : " << network::NetworkUtils::toHostsStr({host}); return nebula::cpp2::ErrorCode::E_INVALID_HOST; } auto it = std::find_if( @@ -145,7 +145,7 @@ nebula::cpp2::ErrorCode StorageJobExecutor::execute() { } if (!nebula::ok(addressesRet)) { - LOG(ERROR) << "Can't get hosts"; + LOG(INFO) << "Can't get hosts"; return nebula::error(addressesRet); } @@ -165,7 +165,7 @@ nebula::cpp2::ErrorCode StorageJobExecutor::execute() { }); baton.wait(); if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "write to kv store failed, error: " << apache::thrift::util::enumNameSafe(rc); + LOG(INFO) << "write to kv store failed, error: " << apache::thrift::util::enumNameSafe(rc); return rc; } } @@ -180,12 +180,12 @@ nebula::cpp2::ErrorCode StorageJobExecutor::execute() { auto tries = folly::collectAll(std::move(futures)).get(); for (auto& t : tries) { if (t.hasException()) { - LOG(ERROR) << t.exception().what(); + LOG(INFO) << t.exception().what(); rc = nebula::cpp2::ErrorCode::E_RPC_FAILURE; continue; } if (!t.value().ok()) { - LOG(ERROR) << t.value().toString(); + LOG(INFO) << t.value().toString(); rc = nebula::cpp2::ErrorCode::E_RPC_FAILURE; continue; } diff --git a/src/meta/processors/job/StorageJobExecutor.h b/src/meta/processors/job/StorageJobExecutor.h index c5eb0d32170..fdc1964b696 100644 --- a/src/meta/processors/job/StorageJobExecutor.h +++ b/src/meta/processors/job/StorageJobExecutor.h @@ -32,23 +32,39 @@ class StorageJobExecutor : public JobExecutor { virtual ~StorageJobExecutor() = default; - // Check the arguments about the job. + /** + * @brief Check the arguments about the job. + * + * @return + */ bool check() override { return true; } - // Prepare the Job info from the arguments. + /** + * @brief Prepare the Job info from the arguments. + * + * @return + */ nebula::cpp2::ErrorCode prepare() override { return nebula::cpp2::ErrorCode::SUCCEEDED; } - // The skeleton to run the job. - // You should rewrite the executeInternal to trigger the calling. + /** + * @brief The skeleton to run the job. + * You should rewrite the executeInternal to trigger the calling. + * + * @return + */ nebula::cpp2::ErrorCode execute() override; void interruptExecution(JobID jobId); - // Stop the job when the user cancel it. + /** + * @brief Stop the job when the user cancel it. + * + * @return + */ nebula::cpp2::ErrorCode stop() override { return nebula::cpp2::ErrorCode::SUCCEEDED; } diff --git a/src/meta/processors/job/TaskDescription.h b/src/meta/processors/job/TaskDescription.h index f9f05a77950..6743a6a9794 100644 --- a/src/meta/processors/job/TaskDescription.h +++ b/src/meta/processors/job/TaskDescription.h @@ -45,37 +45,50 @@ class TaskDescription { TaskDescription(JobID iJob, TaskID iTask, std::string addr, int32_t port); TaskDescription(const folly::StringPiece& key, const folly::StringPiece& val); - /* - * encoded key going to write to kvstore + /** + * @brief Encoded key going to write to kvstore * kJobKey+jobid+taskid - * */ + * + * @return + */ std::string taskKey(); - /* - * decode jobid and taskid from kv store - * */ + /** + * @brief Decode jobid and taskid from kv store + * + * @param rawKey + * @return + */ static std::pair parseKey(const folly::StringPiece& rawKey); - /* - * encode task val to write to kvstore - * */ + /** + * @brief Encode task val to write to kvstore + * + * @return + */ std::string taskVal(); - /* - * decode task val from kvstore + /** + * @brief Decode task val from kvstore * should be * {host, status, start time, stop time} - * */ + * + * @param rawVal + * @return + */ static std::tuple parseVal( const folly::StringPiece& rawVal); - /* - * encoded key when dba called "backup jobs" - * */ + /** + * @brief Encoded key when dba called "backup jobs" + * + * @return + */ std::string archiveKey(); - /* - * write out task details in human readable strings + /** + * @brief + * Write out task details in human readable strings * if a task is * ===================================================================================== * | Job Id(TaskId) | Command(Dest) | Status | Start Time | Stop Time @@ -86,17 +99,23 @@ class TaskDescription { * ------------------------------------------------------------------------------------- * then the vector should be * {27-0, 192.168.8.5, finished, 12/09/19 11:09:40, 12/09/19 11:09:40} - * */ + * + * @return + */ cpp2::TaskDesc toTaskDesc(); - /* - * set the internal status - * will check if newStatus is later than curr Status + /** + * @brief + * Set the internal status + * Will check if newStatus is later than curr Status * e.g. set running to a finished job is forbidden * - * will set start time if newStatus is running - * will set stop time if newStatus is finished / failed / stopped - * */ + * Will set start time if newStatus is running + * Will set stop time if newStatus is finished / failed / stopped + * + * @param newStatus + * @return + */ bool setStatus(cpp2::JobStatus newStatus); JobID getJobId() { diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp index 14a514e2dd6..33b557ccc55 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.cpp +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.cpp @@ -17,7 +17,7 @@ namespace meta { nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::prepare() { auto spaceRet = getSpaceIdFromName(paras_.back()); if (!nebula::ok(spaceRet)) { - LOG(ERROR) << "Can't find the space: " << paras_.back(); + LOG(INFO) << "Can't find the space: " << paras_.back(); return nebula::error(spaceRet); } GraphSpaceID spaceId = nebula::value(spaceRet); @@ -47,7 +47,7 @@ folly::Future ZoneBalanceJobExecutor::executeInternal() { plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { if (LastUpdateTimeMan::update(kvstore_, time::WallClock::fastNowInMilliSec()) != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Balance plan " << plan_->id() << " update meta failed"; + LOG(INFO) << "Balance plan " << plan_->id() << " update meta failed"; } if (status == meta::cpp2::JobStatus::FINISHED) { nebula::cpp2::ErrorCode ret = updateMeta(); @@ -85,8 +85,8 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::updateMeta() { [&baton, &ret](nebula::cpp2::ErrorCode code) { if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { ret = code; - LOG(ERROR) << "Can't write the kvstore, ret = " - << static_cast(code); + LOG(INFO) << "Can't write the kvstore, ret = " + << static_cast(code); } baton.post(); }); @@ -121,8 +121,8 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones( for (auto& z : sortedActiveZonesRef) { totalPartNum += z->partNum_; } - if (sortedActiveZonesRef.empty()) { - LOG(ERROR) << "rebalance error: no active zones"; + if (sortedActiveZonesRef.size() == 0) { + LOG(INFO) << "rebalance error: no active zones"; return nebula::cpp2::ErrorCode::E_NO_HOSTS; } avgPartNum = totalPartNum / sortedActiveZonesRef.size(); diff --git a/src/meta/processors/job/ZoneBalanceJobExecutor.h b/src/meta/processors/job/ZoneBalanceJobExecutor.h index e264e7b822a..52207b08987 100644 --- a/src/meta/processors/job/ZoneBalanceJobExecutor.h +++ b/src/meta/processors/job/ZoneBalanceJobExecutor.h @@ -33,11 +33,43 @@ class ZoneBalanceJobExecutor : public BalanceJobExecutor { protected: folly::Future executeInternal() override; + + /** + * @brief + * First, move the lostZones' parts to the active zones + * Second, make balance for the active zones + * + * @return + */ Status buildBalancePlan() override; + + /** + * @brief If removed zones, update zone-space info when job finished + * + * @return + */ nebula::cpp2::ErrorCode updateMeta(); + + /** + * @brief For a given zone, choose a host of the zone to insert the given part + * + * @param sortedZoneHosts + * @param zone + * @param partId + * @return + */ HostAddr insertPartIntoZone(std::map>* sortedZoneHosts, Zone* zone, PartitionID partId); + + /** + * @brief Give a zone vector sorted by part number, make balance for each zone + * + * @param sortedActiveZones + * @param sortedZoneHosts + * @param tasks + * @return + */ nebula::cpp2::ErrorCode rebalanceActiveZones( std::vector* sortedActiveZones, std::map>* sortedZoneHosts, diff --git a/src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp b/src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp index 93480117cad..b5274dd332f 100644 --- a/src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp +++ b/src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp @@ -15,7 +15,7 @@ void AddHostsIntoZoneProcessor::process(const cpp2::AddHostsIntoZoneReq& req) { // Confirm that there are no duplicates in the parameters. if (std::unique(hosts.begin(), hosts.end()) != hosts.end()) { - LOG(ERROR) << "Hosts have duplicated element"; + LOG(INFO) << "Hosts have duplicated element"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; @@ -23,7 +23,7 @@ void AddHostsIntoZoneProcessor::process(const cpp2::AddHostsIntoZoneReq& req) { // Confirm that the parameter is not empty. if (hosts.empty()) { - LOG(ERROR) << "Hosts is empty"; + LOG(INFO) << "Hosts is empty"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; @@ -37,7 +37,7 @@ void AddHostsIntoZoneProcessor::process(const cpp2::AddHostsIntoZoneReq& req) { // Ensure that the node is not registered. auto machineKey = MetaKeyUtils::machineKey(host.host, host.port); if (machineExist(machineKey) == nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "The host " << host << " have existed!"; + LOG(INFO) << "The host " << host << " have existed!"; code = nebula::cpp2::ErrorCode::E_EXISTED; break; } @@ -56,7 +56,7 @@ void AddHostsIntoZoneProcessor::process(const cpp2::AddHostsIntoZoneReq& req) { if (isNew) { // If you are creating a new zone, should make sure the zone not existed. if (nebula::ok(zoneValueRet)) { - LOG(ERROR) << "Zone " << zoneName << " have existed"; + LOG(INFO) << "Zone " << zoneName << " have existed"; handleErrorCode(nebula::cpp2::ErrorCode::E_EXISTED); onFinished(); return; @@ -68,8 +68,8 @@ void AddHostsIntoZoneProcessor::process(const cpp2::AddHostsIntoZoneReq& req) { if (code == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND; } - LOG(ERROR) << "Get zone " << zoneName << " failed, error " - << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "Get zone " << zoneName << " failed, error " + << apache::thrift::util::enumNameSafe(code); handleErrorCode(code); onFinished(); return; diff --git a/src/meta/processors/zone/AddHostsIntoZoneProcessor.h b/src/meta/processors/zone/AddHostsIntoZoneProcessor.h index 93929601f4b..0ba768ea568 100644 --- a/src/meta/processors/zone/AddHostsIntoZoneProcessor.h +++ b/src/meta/processors/zone/AddHostsIntoZoneProcessor.h @@ -11,6 +11,10 @@ namespace nebula { namespace meta { +/** + * @brief Add hosts into a given zone which is existing or a new one + * The hosts should be unregistered hosts + */ class AddHostsIntoZoneProcessor : public BaseProcessor { public: static AddHostsIntoZoneProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/zone/AddHostsProcessor.h b/src/meta/processors/zone/AddHostsProcessor.h index 0e43023ae79..ed975cd743c 100644 --- a/src/meta/processors/zone/AddHostsProcessor.h +++ b/src/meta/processors/zone/AddHostsProcessor.h @@ -11,6 +11,9 @@ namespace nebula { namespace meta { +/** + * @brief For each host, make a default zone and add it into the zone. + */ class AddHostsProcessor : public BaseProcessor { public: static AddHostsProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/zone/DivideZoneProcessor.cpp b/src/meta/processors/zone/DivideZoneProcessor.cpp index ec1084f81b5..1badc76ec58 100644 --- a/src/meta/processors/zone/DivideZoneProcessor.cpp +++ b/src/meta/processors/zone/DivideZoneProcessor.cpp @@ -16,8 +16,8 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { auto zoneValueRet = doGet(zoneKey); // Check the source zone exist or not if (!nebula::ok(zoneValueRet)) { - LOG(ERROR) << "Zone " << zoneName << " not existed error: " - << apache::thrift::util::enumNameSafe(nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND); + LOG(INFO) << "Zone " << zoneName << " not existed error: " + << apache::thrift::util::enumNameSafe(nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND); handleErrorCode(nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND); onFinished(); return; @@ -43,7 +43,7 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { } if (zoneItems.size() > zoneHosts.size()) { - LOG(ERROR) << "Zone Item should not greater than hosts size"; + LOG(INFO) << "Zone Item should not greater than hosts size"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; @@ -61,13 +61,14 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { auto hosts = iter->second; auto valueRet = doGet(MetaKeyUtils::zoneKey(zone)); if (nebula::ok(valueRet) && zone != zoneName) { - LOG(ERROR) << "Zone " << zone << " have existed"; + LOG(INFO) << "Zone " << zone << " have existed"; code = nebula::cpp2::ErrorCode::E_EXISTED; break; } auto it = std::find(zoneNames.begin(), zoneNames.end(), zone); if (it == zoneNames.end()) { + LOG(INFO) << "Zone have duplicated name"; zoneNames.emplace_back(zone); } else { LOG(ERROR) << "Zone have duplicated name " << zone; @@ -76,12 +77,12 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { } if (hosts.empty()) { - LOG(ERROR) << "Hosts should not be empty"; + LOG(INFO) << "Hosts should not be empty"; code = nebula::cpp2::ErrorCode::E_INVALID_PARM; } if (std::unique(hosts.begin(), hosts.end()) != hosts.end()) { - LOG(ERROR) << "Zone have duplicated host"; + LOG(INFO) << "Zone have duplicated host"; code = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } @@ -101,14 +102,14 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { } if (totalHosts.size() != zoneHosts.size()) { - LOG(ERROR) << "The total host is not all hosts"; + LOG(INFO) << "The total host is not all hosts"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; } if (totalHostsSize != totalHosts.size()) { - LOG(ERROR) << "The host in zone list have duplicate element"; + LOG(INFO) << "The host in zone list have duplicate element"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; @@ -117,7 +118,7 @@ void DivideZoneProcessor::process(const cpp2::DivideZoneReq& req) { for (auto& host : totalHosts) { auto iter = std::find(zoneHosts.begin(), zoneHosts.end(), host); if (iter == zoneHosts.end()) { - LOG(ERROR) << "Host " << host << " not exist in original zone"; + LOG(INFO) << "Host " << host << " not exist in original zone"; code = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } @@ -148,7 +149,7 @@ nebula::cpp2::ErrorCode DivideZoneProcessor::updateSpacesZone( auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { - LOG(ERROR) << "List spaces failed"; + LOG(INFO) << "List spaces failed"; return nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND; } diff --git a/src/meta/processors/zone/DivideZoneProcessor.h b/src/meta/processors/zone/DivideZoneProcessor.h index d6df91685c6..3a3aa0e32b2 100644 --- a/src/meta/processors/zone/DivideZoneProcessor.h +++ b/src/meta/processors/zone/DivideZoneProcessor.h @@ -12,6 +12,9 @@ namespace nebula { namespace meta { +/** + * @brief Divide an existing zone to several zones + */ class DivideZoneProcessor : public BaseProcessor { public: static DivideZoneProcessor* instance(kvstore::KVStore* kvstore) { @@ -24,6 +27,14 @@ class DivideZoneProcessor : public BaseProcessor { explicit DivideZoneProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} + /** + * @brief remove the originalZoneName and add the zoneNames in batchHolder + * + * @param batchHolder + * @param originalZoneName + * @param zoneNames + * @return + */ nebula::cpp2::ErrorCode updateSpacesZone(kvstore::BatchHolder* batchHolder, const std::string& originalZoneName, const std::vector& zoneNames); diff --git a/src/meta/processors/zone/DropHostsProcessor.cpp b/src/meta/processors/zone/DropHostsProcessor.cpp index c27bc9f53c6..4f11f6f0922 100644 --- a/src/meta/processors/zone/DropHostsProcessor.cpp +++ b/src/meta/processors/zone/DropHostsProcessor.cpp @@ -15,14 +15,14 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { folly::SharedMutex::WriteHolder mHolder(LockUtils::machineLock()); auto hosts = req.get_hosts(); if (std::unique(hosts.begin(), hosts.end()) != hosts.end()) { - LOG(ERROR) << "Hosts have duplicated element"; + LOG(INFO) << "Hosts have duplicated element"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; } if (hosts.empty()) { - LOG(ERROR) << "Hosts is empty"; + LOG(INFO) << "Hosts is empty"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; @@ -43,8 +43,8 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { auto ret = doGet(spaceKey); if (!nebula::ok(ret)) { code = nebula::error(ret); - LOG(ERROR) << "Get Space " << spaceId - << " error: " << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "Get Space " << spaceId + << " error: " << apache::thrift::util::enumNameSafe(code); break; } @@ -55,7 +55,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { auto partHosts = MetaKeyUtils::parsePartVal(partIter->val()); for (auto& h : partHosts) { if (std::find(hosts.begin(), hosts.end(), h) != hosts.end()) { - LOG(ERROR) << h << " is related with partition"; + LOG(INFO) << h << " is related with partition"; code = nebula::cpp2::ErrorCode::E_CONFLICT; break; } @@ -76,7 +76,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { auto retCode = nebula::error(iterRet); - LOG(ERROR) << "List zones failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "List zones failed, error: " << apache::thrift::util::enumNameSafe(retCode); handleErrorCode(retCode); onFinished(); return; @@ -94,7 +94,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { LOG(INFO) << "Drop zone " << zoneName; code = checkRelatedSpaceAndCollect(zoneName, holder.get()); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Check related space failed"; + LOG(INFO) << "Check related space failed"; break; } @@ -127,7 +127,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { auto machineKey = MetaKeyUtils::machineKey(host.host, host.port); auto ret = machineExist(machineKey); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "The machine " << host << " not existed!"; + LOG(INFO) << "The machine " << host << " not existed!"; code = nebula::cpp2::ErrorCode::E_NO_HOSTS; break; } @@ -136,7 +136,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { auto hostKey = MetaKeyUtils::hostKey(host.host, host.port); ret = hostExist(hostKey); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "The host " << host << " not existed!"; + LOG(INFO) << "The host " << host << " not existed!"; } else { holder->remove(std::move(hostKey)); } @@ -159,7 +159,7 @@ nebula::cpp2::ErrorCode DropHostsProcessor::checkRelatedSpaceAndCollect( auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { auto retCode = nebula::error(ret); - LOG(ERROR) << "List spaces failed, error " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "List spaces failed, error " << apache::thrift::util::enumNameSafe(retCode); return nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND; } @@ -172,7 +172,7 @@ nebula::cpp2::ErrorCode DropHostsProcessor::checkRelatedSpaceAndCollect( auto it = std::find(zones.begin(), zones.end(), zoneName); if (it != zones.end()) { if (zones.size() == replicaFactor) { - LOG(ERROR) << "Zone size is same with replica factor"; + LOG(INFO) << "Zone size is same with replica factor"; return nebula::cpp2::ErrorCode::E_CONFLICT; } else { zones.erase(it); diff --git a/src/meta/processors/zone/DropHostsProcessor.h b/src/meta/processors/zone/DropHostsProcessor.h index 8d3f8cd8806..fbb189d0c5f 100644 --- a/src/meta/processors/zone/DropHostsProcessor.h +++ b/src/meta/processors/zone/DropHostsProcessor.h @@ -12,6 +12,12 @@ namespace nebula { namespace meta { +/** + * @brief Drop hosts from this cluster + * The hosts should not hold any parts + * It will remove the hosts from zones they belong to, + * and detach the machine from cluster + */ class DropHostsProcessor : public BaseProcessor { public: static DropHostsProcessor* instance(kvstore::KVStore* kvstore) { @@ -23,6 +29,13 @@ class DropHostsProcessor : public BaseProcessor { private: explicit DropHostsProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} + /** + * @brief check all spaces to find the zone, and remove it from the space + * + * @param zoneName + * @param holder + * @return + */ nebula::cpp2::ErrorCode checkRelatedSpaceAndCollect(const std::string& zoneName, kvstore::BatchHolder* holder); }; diff --git a/src/meta/processors/zone/DropZoneProcessor.cpp b/src/meta/processors/zone/DropZoneProcessor.cpp index 8b9284b20e4..aee55e19e19 100644 --- a/src/meta/processors/zone/DropZoneProcessor.cpp +++ b/src/meta/processors/zone/DropZoneProcessor.cpp @@ -15,7 +15,7 @@ void DropZoneProcessor::process(const cpp2::DropZoneReq& req) { auto zoneValueRet = doGet(std::move(zoneKey)); if (!nebula::ok(zoneValueRet)) { auto code = nebula::error(zoneValueRet); - LOG(ERROR) << "Drop Zone Failed, error: " << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "Drop Zone Failed, error: " << apache::thrift::util::enumNameSafe(code); handleErrorCode(nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND); onFinished(); return; @@ -51,7 +51,7 @@ void DropZoneProcessor::process(const cpp2::DropZoneReq& req) { auto machineKey = MetaKeyUtils::machineKey(host.host, host.port); auto ret = machineExist(machineKey); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "The host " << host << " not existed!"; + LOG(INFO) << "The host " << host << " not existed!"; code = nebula::cpp2::ErrorCode::E_NO_HOSTS; break; } @@ -66,7 +66,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkSpaceReplicaZone() { nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED; if (!nebula::ok(ret)) { code = nebula::error(ret); - LOG(ERROR) << "List spaces failed, error " << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "List spaces failed, error " << apache::thrift::util::enumNameSafe(code); return code; } @@ -78,7 +78,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkSpaceReplicaZone() { auto spaceZones = properties.get_zone_names(); size_t replicaFactor = properties.get_replica_factor(); if (replicaFactor == spaceZones.size()) { - LOG(ERROR) << "Space " << spaceId << " replica factor and zone size are the same"; + LOG(INFO) << "Space " << spaceId << " replica factor and zone size are the same"; code = nebula::cpp2::ErrorCode::E_CONFLICT; break; } @@ -92,7 +92,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkHostPartition(const HostAddr& ad auto spaceIterRet = doPrefix(spacePrefix); if (!nebula::ok(spaceIterRet)) { auto result = nebula::error(spaceIterRet); - LOG(ERROR) << "Get Spaces Failed, error " << apache::thrift::util::enumNameSafe(result); + LOG(INFO) << "Get Spaces Failed, error " << apache::thrift::util::enumNameSafe(result); return result; } @@ -104,8 +104,8 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkHostPartition(const HostAddr& ad auto partIterRet = doPrefix(partPrefix); if (!nebula::ok(partIterRet)) { code = nebula::error(partIterRet); - LOG(ERROR) << "List part failed in list hosts, error: " - << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "List part failed in list hosts, error: " + << apache::thrift::util::enumNameSafe(code); return code; } auto& partIter = nebula::value(partIterRet); @@ -113,7 +113,7 @@ nebula::cpp2::ErrorCode DropZoneProcessor::checkHostPartition(const HostAddr& ad auto hosts = MetaKeyUtils::parsePartVal(partIter->val()); for (auto& host : hosts) { if (host == address) { - LOG(ERROR) << "Host " << address << " have partition on it"; + LOG(INFO) << "Host " << address << " have partition on it"; code = nebula::cpp2::ErrorCode::E_CONFLICT; break; } diff --git a/src/meta/processors/zone/DropZoneProcessor.h b/src/meta/processors/zone/DropZoneProcessor.h index 25752a35294..3dd6ea3c9b6 100644 --- a/src/meta/processors/zone/DropZoneProcessor.h +++ b/src/meta/processors/zone/DropZoneProcessor.h @@ -11,6 +11,11 @@ namespace nebula { namespace meta { +/** + * @brief Drop zone from the cluster + * The hosts that belong to the zone should not contain any parts + * It will drop the hosts too + */ class DropZoneProcessor : public BaseProcessor { public: static DropZoneProcessor* instance(kvstore::KVStore* kvstore) { @@ -22,9 +27,19 @@ class DropZoneProcessor : public BaseProcessor { private: explicit DropZoneProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} + /** + * @brief check all spaces if they have enough zones to hold replica when dropping one zone + * + * @return + */ nebula::cpp2::ErrorCode checkSpaceReplicaZone(); - // Check whether the node holds zones on each space + /** + * @brief Check whether the node holds zones on each space + * + * @param address + * @return + */ nebula::cpp2::ErrorCode checkHostPartition(const HostAddr& address); }; diff --git a/src/meta/processors/zone/GetZoneProcessor.cpp b/src/meta/processors/zone/GetZoneProcessor.cpp index f5445b456b4..7de7965de53 100644 --- a/src/meta/processors/zone/GetZoneProcessor.cpp +++ b/src/meta/processors/zone/GetZoneProcessor.cpp @@ -18,8 +18,8 @@ void GetZoneProcessor::process(const cpp2::GetZoneReq& req) { if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { retCode = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND; } - LOG(ERROR) << "Get zone " << zoneName - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get zone " << zoneName + << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); handleErrorCode(retCode); onFinished(); return; diff --git a/src/meta/processors/zone/GetZoneProcessor.h b/src/meta/processors/zone/GetZoneProcessor.h index 5ac22298b6c..141f6c310ca 100644 --- a/src/meta/processors/zone/GetZoneProcessor.h +++ b/src/meta/processors/zone/GetZoneProcessor.h @@ -11,6 +11,9 @@ namespace nebula { namespace meta { +/** + * @brief Get a zone and its hosts' info + */ class GetZoneProcessor : public BaseProcessor { public: static GetZoneProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/zone/ListZonesProcessor.cpp b/src/meta/processors/zone/ListZonesProcessor.cpp index 282468ecf13..3bc2efab614 100644 --- a/src/meta/processors/zone/ListZonesProcessor.cpp +++ b/src/meta/processors/zone/ListZonesProcessor.cpp @@ -14,7 +14,7 @@ void ListZonesProcessor::process(const cpp2::ListZonesReq&) { auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { auto retCode = nebula::error(iterRet); - LOG(ERROR) << "List zones failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "List zones failed, error: " << apache::thrift::util::enumNameSafe(retCode); handleErrorCode(retCode); onFinished(); return; diff --git a/src/meta/processors/zone/ListZonesProcessor.h b/src/meta/processors/zone/ListZonesProcessor.h index 89d8a36156c..fdc8c9baa80 100644 --- a/src/meta/processors/zone/ListZonesProcessor.h +++ b/src/meta/processors/zone/ListZonesProcessor.h @@ -11,6 +11,9 @@ namespace nebula { namespace meta { +/** + * @brief Get all zones info + */ class ListZonesProcessor : public BaseProcessor { public: static ListZonesProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/zone/MergeZoneProcessor.cpp b/src/meta/processors/zone/MergeZoneProcessor.cpp index 98c6b4b4379..9d4117a4e4e 100644 --- a/src/meta/processors/zone/MergeZoneProcessor.cpp +++ b/src/meta/processors/zone/MergeZoneProcessor.cpp @@ -18,14 +18,14 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { // Confirm that the parameter is not empty. if (zones.empty()) { - LOG(ERROR) << "Zones is empty"; + LOG(INFO) << "Zones is empty"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; } if (zones.size() == 1) { - LOG(ERROR) << "Only one zone is no need to merge"; + LOG(INFO) << "Only one zone is no need to merge"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; @@ -33,7 +33,7 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { // Confirm that there are no duplicates in the parameters. if (std::unique(zones.begin(), zones.end()) != zones.end()) { - LOG(ERROR) << "Zones have duplicated element"; + LOG(INFO) << "Zones have duplicated element"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; @@ -47,7 +47,7 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { auto zoneValueRet = doGet(std::move(zoneKey)); if (!nebula::ok(zoneValueRet)) { code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND; - LOG(ERROR) << "Zone " << zone << " not existed"; + LOG(INFO) << "Zone " << zone << " not existed"; break; } } @@ -63,7 +63,7 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { auto ret = doPrefix(spacePrefix); if (!nebula::ok(ret)) { code = nebula::error(ret); - LOG(ERROR) << "List spaces failed, error " << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "List spaces failed, error " << apache::thrift::util::enumNameSafe(code); handleErrorCode(code); onFinished(); return; @@ -87,7 +87,7 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { std::back_inserter(intersectionZones)); if (spaceZones.size() - intersectionZones.size() + 1 < replicaFactor) { - LOG(ERROR) << "Merge Zone will cause replica number not enough"; + LOG(INFO) << "Merge Zone will cause replica number not enough"; code = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } @@ -95,7 +95,7 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { auto hostPartsRet = assembleHostParts(spaceId); if (!nebula::ok(hostPartsRet)) { code = nebula::error(hostPartsRet); - LOG(ERROR) << "Assemble host parts failed: " << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "Assemble host parts failed: " << apache::thrift::util::enumNameSafe(code); break; } @@ -108,7 +108,7 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { for (auto& host : hosts) { auto hp = hostParts.find(host); if (hp == hostParts.end()) { - LOG(ERROR) << "Host " << host << " not found"; + LOG(INFO) << "Host " << host << " not found"; code = nebula::cpp2::ErrorCode::E_NO_HOSTS; break; } @@ -117,7 +117,7 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { for (auto part : parts) { auto it = std::find(totalParts.begin(), totalParts.end(), part); if (it != totalParts.end()) { - LOG(ERROR) << "Part " << part << " have exist"; + LOG(INFO) << "Part " << part << " have exist"; code = nebula::cpp2::ErrorCode::E_CONFLICT; break; } @@ -133,7 +133,7 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { } // space if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Check parts failed, error " << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "Check parts failed, error " << apache::thrift::util::enumNameSafe(code); handleErrorCode(code); onFinished(); return; @@ -173,7 +173,7 @@ void MergeZoneProcessor::process(const cpp2::MergeZoneReq& req) { if (nebula::ok(valueRet)) { auto it = std::find(zones.begin(), zones.end(), zoneName); if (it == zones.end()) { - LOG(ERROR) << "The target zone should exist in merge zone list"; + LOG(INFO) << "The target zone should exist in merge zone list"; handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); onFinished(); return; @@ -205,7 +205,7 @@ ErrorOr MergeZoneProcessor::assembleHostPart std::unordered_map> hostParts; auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_); if (!nebula::ok(activeHostsRet)) { - LOG(ERROR) << "Get active hosts failed"; + LOG(INFO) << "Get active hosts failed"; return nebula::error(activeHostsRet); } @@ -218,8 +218,8 @@ ErrorOr MergeZoneProcessor::assembleHostPart const auto& prefix = MetaKeyUtils::partPrefix(spaceId); auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId << " " - << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "Access kvstore failed, spaceId " << spaceId << " " + << apache::thrift::util::enumNameSafe(code); return code; } diff --git a/src/meta/processors/zone/MergeZoneProcessor.h b/src/meta/processors/zone/MergeZoneProcessor.h index 1b5074ca9db..aeef888606c 100644 --- a/src/meta/processors/zone/MergeZoneProcessor.h +++ b/src/meta/processors/zone/MergeZoneProcessor.h @@ -13,6 +13,9 @@ namespace meta { using HostParts = std::unordered_map>; +/** + * @brief Merge several zones to one zone + */ class MergeZoneProcessor : public BaseProcessor { public: static MergeZoneProcessor* instance(kvstore::KVStore* kvstore) { @@ -24,9 +27,13 @@ class MergeZoneProcessor : public BaseProcessor { private: explicit MergeZoneProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} + /** + * @brief Get all parts of a space in each host + * + * @param spaceId Which space's parts to get + * @return ErrorCode or parts of each host + */ ErrorOr assembleHostParts(GraphSpaceID spaceId); - - nebula::cpp2::ErrorCode updateSpaceProperties(); }; } // namespace meta diff --git a/src/meta/processors/zone/RenameZoneProcessor.cpp b/src/meta/processors/zone/RenameZoneProcessor.cpp index 11176c2fc6f..0d785e0b4bc 100644 --- a/src/meta/processors/zone/RenameZoneProcessor.cpp +++ b/src/meta/processors/zone/RenameZoneProcessor.cpp @@ -18,7 +18,7 @@ void RenameZoneProcessor::process(const cpp2::RenameZoneReq& req) { auto originalZoneKey = MetaKeyUtils::zoneKey(originalZoneName); auto originalZoneValueRet = doGet(std::move(originalZoneKey)); if (!nebula::ok(originalZoneValueRet)) { - LOG(ERROR) << "Zone " << originalZoneName << " not existed"; + LOG(INFO) << "Zone " << originalZoneName << " not existed"; handleErrorCode(nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND); onFinished(); return; @@ -29,7 +29,7 @@ void RenameZoneProcessor::process(const cpp2::RenameZoneReq& req) { auto zoneKey = MetaKeyUtils::zoneKey(zoneName); auto zoneValueRet = doGet(std::move(zoneKey)); if (nebula::ok(zoneValueRet)) { - LOG(ERROR) << "Zone " << zoneName << " have existed"; + LOG(INFO) << "Zone " << zoneName << " have existed"; handleErrorCode(nebula::cpp2::ErrorCode::E_EXISTED); onFinished(); return; @@ -38,7 +38,7 @@ void RenameZoneProcessor::process(const cpp2::RenameZoneReq& req) { const auto& prefix = MetaKeyUtils::spacePrefix(); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { - LOG(ERROR) << "List spaces failed"; + LOG(INFO) << "List spaces failed"; handleErrorCode(nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND); onFinished(); return; diff --git a/src/meta/processors/zone/RenameZoneProcessor.h b/src/meta/processors/zone/RenameZoneProcessor.h index f74033acaf3..70649e2f489 100644 --- a/src/meta/processors/zone/RenameZoneProcessor.h +++ b/src/meta/processors/zone/RenameZoneProcessor.h @@ -11,6 +11,9 @@ namespace nebula { namespace meta { +/** + * @brief Rename a zone + */ class RenameZoneProcessor : public BaseProcessor { public: static RenameZoneProcessor* instance(kvstore::KVStore* kvstore) {