Skip to content

Commit

Permalink
refactor LOG and add comment for meta, meta/processors/job (vesoft-in…
Browse files Browse the repository at this point in the history
…c#3686)

* refactor LOG and add comment for meta, meta/processors/job

* refactor LOG and add comment for meta/processors/zone

Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
  • Loading branch information
liwenhui-soul and critical27 committed Jan 29, 2022
1 parent e821ba3 commit f771fd8
Show file tree
Hide file tree
Showing 64 changed files with 1,117 additions and 360 deletions.
34 changes: 17 additions & 17 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ nebula::cpp2::ErrorCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
if (statusVec[i].ok()) {
std::tie(std::ignore, term, code) = MetaKeyUtils::parseLeaderValV3(vals[i]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << apache::thrift::util::enumNameSafe(code);
LOG(INFO) << apache::thrift::util::enumNameSafe(code);
continue;
}
if (terms[i] <= term) {
Expand Down Expand Up @@ -90,13 +90,13 @@ bool ActiveHostsMan::machineRegisted(kvstore::KVStore* kv, const HostAddr& hostA
}

ErrorOr<nebula::cpp2::ErrorCode, std::vector<std::pair<HostAddr, cpp2::HostRole>>>
ActiveHostsMan::getServicesInHost(kvstore::KVStore* kv, std::string hostname) {
ActiveHostsMan::getServicesInHost(kvstore::KVStore* kv, const std::string& hostname) {
const auto& prefix = MetaKeyUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Failed to get services in the host: " << hostname << ", error "
<< apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Failed to get services in the host: " << hostname << ", error "
<< apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand All @@ -120,7 +120,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
std::unique_ptr<kvstore::KVIterator> machineIter;
auto retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, machinePrefix, &machineIter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Failed to get machines, error " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Failed to get machines, error " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand All @@ -135,8 +135,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
std::unique_ptr<kvstore::KVIterator> iter;
retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Failed to get active hosts, error "
<< apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Failed to get active hosts, error "
<< apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand All @@ -155,7 +155,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
if (info.role_ == cpp2::HostRole::STORAGE &&
std::find(machines.begin(), machines.end(), host) == machines.end()) {
retCode = nebula::cpp2::ErrorCode::E_MACHINE_NOT_FOUND;
LOG(ERROR) << "Machine not found " << host;
LOG(INFO) << "Machine not found " << host;
break;
}

Expand All @@ -177,8 +177,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
auto zoneKey = MetaKeyUtils::zoneKey(zoneName);
auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get zone " << zoneName
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get zone " << zoneName
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand Down Expand Up @@ -208,7 +208,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
auto spaceKey = MetaKeyUtils::spaceKey(spaceId);
auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &spaceValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get space failed, error: " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get space failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand Down Expand Up @@ -243,17 +243,17 @@ ErrorOr<nebula::cpp2::ErrorCode, HostInfo> ActiveHostsMan::getHostInfo(kvstore::
std::string machineValue;
auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, machineKey, &machineValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get machine info " << host
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get machine info " << host
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

auto hostKey = MetaKeyUtils::hostKey(host.host, host.port);
std::string hostValue;
retCode = kv->get(kDefaultSpaceId, kDefaultPartId, hostKey, &hostValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get host info " << host
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get host info " << host
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
return HostInfo::decode(hostValue);
Expand Down Expand Up @@ -284,8 +284,8 @@ ErrorOr<nebula::cpp2::ErrorCode, int64_t> LastUpdateTimeMan::get(kvstore::KVStor
std::string val;
auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get last update time failed, error: "
<< apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get last update time failed, error: "
<< apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
return *reinterpret_cast<const int64_t*>(val.data());
Expand Down
83 changes: 79 additions & 4 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ struct HostInfo {
return info;
}

/*
/**
* @brief
* int8_t dataVer
* int64_t timestamp
* sizeof(HostRole) hostRole
* size_t lenth of gitInfoSha
* string gitInfoSha
* */
*
* @param info
* @return
*/
static std::string encodeV2(const HostInfo& info) {
std::string encode;
int8_t dataVer = 2;
Expand All @@ -74,6 +78,12 @@ struct HostInfo {
return encode;
}

/**
* @brief Parse a serialized value to HostInfo
*
* @param data
* @return
*/
static HostInfo decodeV2(const folly::StringPiece& data) {
HostInfo info;
size_t offset = sizeof(int8_t);
Expand Down Expand Up @@ -107,27 +117,92 @@ class ActiveHostsMan final {
~ActiveHostsMan() = default;

using AllLeaders = std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>;

/**
* @brief Save host info and leader info into kvStore
* If partition leader info was updated, it will update LastUpdateTime, indicating the MetaClient
* update local cache.
*
* @param kv Where to save
* @param hostAddr Which host to save
* @param info Information of the host
* @param leaderParts Leader info of all parts, null means don't need to update leader info
* @return
*/
static nebula::cpp2::ErrorCode updateHostInfo(kvstore::KVStore* kv,
const HostAddr& hostAddr,
const HostInfo& info,
const AllLeaders* leaderParts = nullptr);

/**
* @brief Check if the host is registered
*
* @param kv From where to get
* @param hostAddr Which host to register
* @return
*/
static bool machineRegisted(kvstore::KVStore* kv, const HostAddr& hostAddr);

/**
* @brief Get all registered host
*
* @param kv From where to get
* @param expiredTTL Ignore hosts who do not send heartbeat within longer than expiredTTL
* @param role Which role of the host to find. maybe GRAPH META STORAGE LISTENER AGENT
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> getActiveHosts(
kvstore::KVStore* kv, int32_t expiredTTL = 0, cpp2::HostRole role = cpp2::HostRole::STORAGE);

/**
* @brief Get services in the agent host
*
* @param kv From where to get
* @param hostname Hostname or ip
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, std::vector<std::pair<HostAddr, cpp2::HostRole>>>
getServicesInHost(kvstore::KVStore* kv, std::string hostname);

getServicesInHost(kvstore::KVStore* kv, const std::string& hostname);

/**
* @brief Get hosts in the zone
*
* @param kv From where to get
* @param zoneName Name of the zone
* @param expiredTTL Ignore hosts who do not send heartbeat within longer than expiredTTL
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> getActiveHostsInZone(
kvstore::KVStore* kv, const std::string& zoneName, int32_t expiredTTL = 0);

/**
* @brief Get hosts in the space
*
* @param kv From where to get
* @param spaceId Id of the space
* @param expiredTTL Ignore hosts who do not send heartbeat within longer than expiredTTL
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> getActiveHostsWithZones(
kvstore::KVStore* kv, GraphSpaceID spaceId, int32_t expiredTTL = 0);

/**
* @brief Check if a host is alived, by checking if the host send heartbeat within the default
* time
*
* @param kv From where to get the host's information
* @param host Which host to check
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, bool> isLived(kvstore::KVStore* kv, const HostAddr& host);

/**
* @brief Get hostInfo for a host
*
* @param kv From where to get
* @param host
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, HostInfo> getHostInfo(kvstore::KVStore* kv,
const HostAddr& host);

Expand Down
50 changes: 42 additions & 8 deletions src/meta/KVBasedClusterIdMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ namespace meta {
* */
class ClusterIdMan {
public:
/**
* @brief Create a ClusterID by hash function
*
* @param metaAddrs
* @return
*/
static ClusterID create(const std::string& metaAddrs) {
std::hash<std::string> hash_fn;
auto clusterId = hash_fn(metaAddrs);
Expand All @@ -31,23 +37,30 @@ class ClusterIdMan {
return clusterId;
}

/**
* @brief Write the cluster id in a file
*
* @param clusterId
* @param filename
* @return
*/
static bool persistInFile(ClusterID clusterId, const std::string& filename) {
auto dirname = fs::FileUtils::dirname(filename.c_str());
if (!fs::FileUtils::makeDir(dirname)) {
LOG(ERROR) << "Failed mkdir " << dirname;
LOG(INFO) << "Failed mkdir " << dirname;
return false;
}
if (fs::FileUtils::remove(filename.c_str())) {
LOG(INFO) << "Remove the existed file " << filename;
}
int fd = ::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0) {
LOG(ERROR) << "Open file error, file " << filename << ", error " << strerror(errno);
LOG(INFO) << "Open file error, file " << filename << ", error " << strerror(errno);
return false;
}
int bytes = ::write(fd, reinterpret_cast<const char*>(&clusterId), sizeof(ClusterID));
if (bytes != sizeof(clusterId)) {
LOG(ERROR) << "Write clusterId failed!";
LOG(INFO) << "Write clusterId failed!";
::close(fd);
return false;
}
Expand All @@ -56,17 +69,23 @@ class ClusterIdMan {
return true;
}

/**
* @brief Get cluster id from a file
*
* @param filename
* @return
*/
static ClusterID getClusterIdFromFile(const std::string& filename) {
LOG(INFO) << "Try to open " << filename;
int fd = ::open(filename.c_str(), O_RDONLY);
if (fd < 0) {
LOG(WARNING) << "Open file failed, error " << strerror(errno);
LOG(INFO) << "Open file failed, error " << strerror(errno);
return 0;
}
ClusterID clusterId = 0;
int len = ::read(fd, reinterpret_cast<char*>(&clusterId), sizeof(ClusterID));
if (len != sizeof(ClusterID)) {
LOG(ERROR) << "Get clusterId failed!";
LOG(INFO) << "Get clusterId failed!";
::close(fd);
return 0;
}
Expand All @@ -75,6 +94,13 @@ class ClusterIdMan {
return clusterId;
}

/**
* @brief Get cluster id from kvStore
*
* @param kv
* @param key
* @return
*/
static ClusterID getClusterIdFromKV(kvstore::KVStore* kv, const std::string& key) {
CHECK_NOTNULL(kv);
std::string value;
Expand All @@ -84,16 +110,24 @@ class ClusterIdMan {
return 0;
} else if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
if (value.size() != sizeof(ClusterID)) {
LOG(ERROR) << "Bad clusterId " << value;
LOG(INFO) << "Bad clusterId " << value;
return 0;
}
return *reinterpret_cast<const ClusterID*>(value.data());
} else {
LOG(ERROR) << "Error in kvstore, err " << static_cast<int32_t>(code);
LOG(INFO) << "Error in kvstore, err " << static_cast<int32_t>(code);
return 0;
}
}

/**
* @brief Save cluster id into kvStore
*
* @param kv
* @param key
* @param clusterId
* @return
*/
static bool persistInKV(kvstore::KVStore* kv, const std::string& key, ClusterID clusterId) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
Expand All @@ -102,7 +136,7 @@ class ClusterIdMan {
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Put failed, error " << static_cast<int32_t>(code);
LOG(INFO) << "Put failed, error " << static_cast<int32_t>(code);
ret = false;
} else {
LOG(INFO) << "Put key " << key << ", val " << clusterId;
Expand Down
Loading

0 comments on commit f771fd8

Please sign in to comment.