Skip to content

Commit

Permalink
tidy the create checkpoint rpc
Browse files Browse the repository at this point in the history
stash

add datapath/rootpath in heartbeat && modify list cluster processor

store dir info by address

group snapshot rpc call in one host

fix snapshot parameter bug

fix canocial path bug

decrease lastUpdateTime not found info log level

stats init && options copy

rename ndoe to service

rename create backup fields

first runnable version

rename include system spaces

add balance thrift

resolve

resolve conflicts

fix test

modify tests

remove unrelated lines

remove unralted files

fix create backup test

add some comments & fix list cluster test

only report dir info once

improve code

add agent hb processor test && fix comments

fix copy right and log

add copyright
  • Loading branch information
pengweisong committed Dec 23, 2021
1 parent 6b7560b commit 0d497a0
Show file tree
Hide file tree
Showing 57 changed files with 1,035 additions and 662 deletions.
25 changes: 24 additions & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <folly/hash/Hash.h>
#include <thrift/lib/cpp/util/EnumUtils.h>

#include <boost/filesystem.hpp>

#include "clients/meta/FileBasedClusterIdMan.h"
#include "common/base/Base.h"
#include "common/base/MurmurHash2.h"
Expand Down Expand Up @@ -55,6 +57,8 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool
updateLeader();
bgThread_ = std::make_unique<thread::GenericWorker>();
LOG(INFO) << "Create meta client to " << active_;
LOG(INFO) << folly::sformat(
"root path: {}, data path size: {}", options_.rootPath_, options_.dataPaths_.size());
}

MetaClient::~MetaClient() {
Expand Down Expand Up @@ -2430,6 +2434,20 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
}
}

// info used in the agent, only set once
// TOOD(spw): if we could add data path(disk) dynamicly in the future, it should be
// reported every time it changes
if (!dirInfoReported_) {
nebula::cpp2::DirInfo dirInfo;
if (options_.role_ == cpp2::HostRole::GRAPH) {
dirInfo.set_root(options_.rootPath_);
} else if (options_.role_ == cpp2::HostRole::STORAGE) {
dirInfo.set_root(options_.rootPath_);
dirInfo.set_data(options_.dataPaths_);
}
req.set_dir(dirInfo);
}

folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
VLOG(1) << "Send heartbeat to " << leader_ << ", clusterId " << req.get_cluster_id();
Expand All @@ -2449,7 +2467,12 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
metadLastUpdateTime_ = resp.get_last_update_time_in_ms();
VLOG(1) << "Metad last update time: " << metadLastUpdateTime_;
metaServerVersion_ = resp.get_meta_version();
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;

bool succeeded = resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
if (succeeded) {
dirInfoReported_ = true;
}
return succeeded;
},
std::move(promise));
return future;
Expand Down
10 changes: 9 additions & 1 deletion src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ struct MetaClientOptions {
serviceName_(opt.serviceName_),
skipConfig_(opt.skipConfig_),
role_(opt.role_),
gitInfoSHA_(opt.gitInfoSHA_) {}
gitInfoSHA_(opt.gitInfoSHA_),
dataPaths_(opt.dataPaths_),
rootPath_(opt.rootPath_) {}

// Current host address
HostAddr localHost_{"", 0};
Expand All @@ -200,6 +202,10 @@ struct MetaClientOptions {
cpp2::HostRole role_ = cpp2::HostRole::UNKNOWN;
// gitInfoSHA of Host using this client
std::string gitInfoSHA_{""};
// data path list, used in storaged
std::vector<std::string> dataPaths_;
// install path, used in metad/graphd/storaged
std::string rootPath_;
};

class MetaClient {
Expand Down Expand Up @@ -756,6 +762,8 @@ class MetaClient {
HostAddr leader_;
HostAddr localHost_;

// Only report dir info once when started
bool dirInfoReported_ = false;
struct ThreadLocalInfo {
int64_t localLastUpdateTime_{-2};
LocalCache localCache_;
Expand Down
1 change: 1 addition & 0 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
X(E_GET_META_DIR_FAILURE, -2072) \
\
X(E_QUERY_NOT_FOUND, -2073) \
X(E_AGENT_HB_FAILUE, -2074) \
/* 3xxx for storaged */ \
X(E_CONSENSUS_ERROR, -3001) \
X(E_KEY_HAS_EXISTS, -3002) \
Expand Down
49 changes: 48 additions & 1 deletion src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ static const std::unordered_map<std::string, std::pair<std::string, bool>> syste
{"hosts", {"__hosts__", false}},
{"versions", {"__versions__", false}},
{"machines", {"__machines__", false}},
{"host_dirs", {"__host_dirs__", false}},
{"snapshots", {"__snapshots__", false}},
{"configs", {"__configs__", true}},
{"groups", {"__groups__", true}},
Expand Down Expand Up @@ -62,6 +63,7 @@ static const std::string kPartsTable = tableMaps.at("parts").first;
static const std::string kVersionsTable = systemTableMaps.at("versions").first; // NOLINT
static const std::string kHostsTable = systemTableMaps.at("hosts").first; // NOLINT
static const std::string kMachinesTable = systemTableMaps.at("machines").first; // NOLINT
static const std::string kHostDirsTable = systemTableMaps.at("host_dirs").first;// NOLINT
static const std::string kTagsTable = tableMaps.at("tags").first; // NOLINT
static const std::string kEdgesTable = tableMaps.at("edges").first; // NOLINT
static const std::string kIndexesTable = tableMaps.at("indexes").first; // NOLINT
Expand Down Expand Up @@ -97,7 +99,7 @@ const std::string kLastUpdateTimeTable = systemInfoMaps.at("lastUpdateTime").fir

const int kMaxIpAddrLen = 15; // '255.255.255.255'

std::string MetaKeyUtils::getIndexTable() { return tableMaps.at("index").first; }
std::string MetaKeyUtils::getIndexTable() { return kIndexTable; }

std::unordered_map<std::string,
std::pair<std::string, std::function<decltype(MetaKeyUtils::spaceId)>>>
Expand Down Expand Up @@ -256,6 +258,41 @@ HostAddr MetaKeyUtils::parseMachineKey(folly::StringPiece key) {
return MetaKeyUtils::deserializeHostAddr(key);
}

const std::string& MetaKeyUtils::hostDirPrefix() { return kHostDirsTable; }

const std::string MetaKeyUtils::hostDirHostPrefix(std::string host) {
return kHostDirsTable + host;
}

std::string MetaKeyUtils::hostDirKey(std::string host, Port port) {
std::string key;
key.reserve(kHostDirsTable.size() + host.size() + sizeof(port));
key.append(kHostDirsTable.data(), kHostDirsTable.size()).append(host);
key.append(reinterpret_cast<const char*>(&port), sizeof(Port));
return key;
}

HostAddr MetaKeyUtils::parseHostDirKey(folly::StringPiece key) {
HostAddr addr;
auto hostSize = key.size() - kHostDirsTable.size() - sizeof(Port);
addr.host = key.subpiece(kHostDirsTable.size(), hostSize).toString();
key.advance(kHostDirsTable.size() + hostSize);
addr.port = *reinterpret_cast<const Port*>(key.begin());
return addr;
}

std::string MetaKeyUtils::hostDirVal(cpp2::DirInfo dir) {
std::string val;
apache::thrift::CompactSerializer::serialize(dir, &val);
return val;
}

cpp2::DirInfo MetaKeyUtils::parseHostDir(folly::StringPiece val) {
cpp2::DirInfo dir;
apache::thrift::CompactSerializer::deserialize(val, dir);
return dir;
}

std::string MetaKeyUtils::hostKey(std::string addr, Port port) { return hostKeyV2(addr, port); }

std::string MetaKeyUtils::hostKeyV2(std::string addr, Port port) {
Expand Down Expand Up @@ -639,6 +676,16 @@ std::string MetaKeyUtils::indexSpaceKey(const std::string& name) {
return key;
}

std::string MetaKeyUtils::parseIndexSpaceKey(folly::StringPiece key) {
auto nameSize = key.size() - kIndexTable.size() - sizeof(EntryType);
return key.subpiece(kIndexTable.size() + sizeof(EntryType), nameSize).str();
}

EntryType MetaKeyUtils::parseIndexType(folly::StringPiece key) {
auto type = *reinterpret_cast<const EntryType*>(key.data() + kIndexTable.size());
return type;
}

std::string MetaKeyUtils::indexTagKey(GraphSpaceID spaceId, const std::string& name) {
EntryType type = EntryType::TAG;
std::string key;
Expand Down
24 changes: 22 additions & 2 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,25 @@ class MetaKeyUtils final {

static HostAddr parseMachineKey(folly::StringPiece key);

static std::string hostKey(std::string ip, Port port);
// hostDir store service(metad/storaged/graphd) address -> dir info(root path and data paths)
// agent will use these to start/stop service and backup/restore data
static std::string hostDirKey(std::string ip);

static std::string hostKeyV2(std::string addr, Port port);
static std::string hostDirKey(std::string host, Port port);

static HostAddr parseHostDirKey(folly::StringPiece key);

static const std::string& hostDirPrefix();

static const std::string hostDirHostPrefix(std::string host);

static std::string hostDirVal(cpp2::DirInfo dir);

static cpp2::DirInfo parseHostDir(folly::StringPiece val);

static std::string hostKey(std::string host, Port port);

static std::string hostKeyV2(std::string host, Port port);

static const std::string& hostPrefix();

Expand Down Expand Up @@ -213,6 +229,10 @@ class MetaKeyUtils final {

static std::string indexSpaceKey(const std::string& name);

static std::string parseIndexSpaceKey(folly::StringPiece key);

static EntryType parseIndexType(folly::StringPiece key);

static std::string indexTagKey(GraphSpaceID spaceId, const std::string& name);

static std::string indexEdgeKey(GraphSpaceID spaceId, const std::string& name);
Expand Down
6 changes: 5 additions & 1 deletion src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/fs/FileUtils.h"
#include "common/hdfs/HdfsCommandHelper.h"
#include "common/hdfs/HdfsHelper.h"
#include "common/network/NetworkUtils.h"
Expand Down Expand Up @@ -86,7 +87,10 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
threadManager->setNamePrefix("executor");
threadManager->start();
nebula::kvstore::KVOptions options;
options.dataPaths_ = {FLAGS_data_path};

auto absolute = boost::filesystem::absolute(FLAGS_data_path);
options.dataPaths_ = {absolute.string()};

options.partMan_ = std::move(partMan);
auto kvstore = std::make_unique<nebula::kvstore::NebulaStore>(
std::move(options), ioPool, localhost, threadManager);
Expand Down
12 changes: 9 additions & 3 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/fs/FileUtils.h"
#include "common/network/NetworkUtils.h"
#include "common/process/ProcessUtils.h"
#include "common/time/TimezoneInfo.h"
Expand Down Expand Up @@ -132,9 +133,14 @@ int main(int argc, char *argv[]) {

std::vector<std::string> paths;
folly::split(",", FLAGS_data_path, paths, true);
std::transform(paths.begin(), paths.end(), paths.begin(), [](auto &p) {
return folly::trimWhitespace(p).str();
});
// make the paths absolute
std::transform(
paths.begin(), paths.end(), paths.begin(), [](const std::string &p) -> std::string {
auto path = folly::trimWhitespace(p).str();
path = boost::filesystem::absolute(path).string();
LOG(INFO) << "data path= " << path;
return path;
});
if (paths.empty()) {
LOG(ERROR) << "Bad data_path format:" << FLAGS_data_path;
return EXIT_FAILURE;
Expand Down
3 changes: 3 additions & 0 deletions src/graph/service/GraphService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "graph/service/GraphService.h"

#include <boost/filesystem.hpp>

#include "clients/storage/StorageClient.h"
#include "common/base/Base.h"
#include "common/encryption/MD5Utils.h"
Expand Down Expand Up @@ -33,6 +35,7 @@ Status GraphService::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecuto
options.role_ = meta::cpp2::HostRole::GRAPH;
options.localHost_ = hostAddr;
options.gitInfoSHA_ = gitInfoSha();
options.rootPath_ = boost::filesystem::current_path().string();

metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()), options);

Expand Down
15 changes: 4 additions & 11 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -237,19 +237,11 @@ struct DirInfo {
2: list<binary> data,
}

struct NodeInfo {
1: HostAddr host,
2: DirInfo dir,
}

struct PartitionBackupInfo {
1: map<PartitionID, LogInfo> (cpp.template = "std::unordered_map") info,
}

struct CheckpointInfo {
1: PartitionBackupInfo partition_info,
1: GraphSpaceID space_id,
2: map<PartitionID, LogInfo> (cpp.template = "std::unordered_map") parts,
// storage checkpoint directory name
2: binary path,
3: binary path,
}

// used for raft and drainer
Expand Down Expand Up @@ -402,6 +394,7 @@ enum ErrorCode {
E_GET_META_DIR_FAILURE = -2072,

E_QUERY_NOT_FOUND = -2073,
E_AGENT_HB_FAILUE = -2074,

// 3xxx for storaged
E_CONSENSUS_ERROR = -3001,
Expand Down
Loading

0 comments on commit 0d497a0

Please sign in to comment.