Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use rcu in DiskManager #3917

Merged
merged 2 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 53 additions & 28 deletions src/kvstore/DiskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ DiskManager::DiskManager(const std::vector<std::string>& dataPaths,
try {
// atomic is not copy-constructible
std::vector<std::atomic_uint64_t> freeBytes(dataPaths.size() + 1);
Paths* paths = new Paths();
paths_.store(paths);
size_t index = 0;
for (const auto& path : dataPaths) {
auto absolute = boost::filesystem::absolute(path);
Expand All @@ -27,7 +29,7 @@ DiskManager::DiskManager(const std::vector<std::string>& dataPaths,
}
auto canonical = boost::filesystem::canonical(path);
auto info = boost::filesystem::space(canonical);
dataPaths_.emplace_back(std::move(canonical));
paths->dataPaths_.emplace_back(std::move(canonical));
freeBytes[index++] = info.available;
}
freeBytes_ = std::move(freeBytes);
Expand All @@ -39,21 +41,31 @@ DiskManager::DiskManager(const std::vector<std::string>& dataPaths,
}
}

StatusOr<std::vector<std::string>> DiskManager::path(GraphSpaceID spaceId) {
auto spaceIt = partPath_.find(spaceId);
if (spaceIt == partPath_.end()) {
DiskManager::~DiskManager() {
std::lock_guard<std::mutex> lg(lock_);
Paths* paths = paths_.load(std::memory_order_acquire);
folly::rcu_retire(paths, std::default_delete<Paths>());
}

StatusOr<std::vector<std::string>> DiskManager::path(GraphSpaceID spaceId) const {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
auto spaceIt = paths->partPath_.find(spaceId);
if (spaceIt == paths->partPath_.end()) {
return Status::Error("Space not found");
}
std::vector<std::string> paths;
std::vector<std::string> pathsRes;
for (const auto& partEntry : spaceIt->second) {
paths.emplace_back(partEntry.first);
pathsRes.emplace_back(partEntry.first);
}
return paths;
return pathsRes;
}

StatusOr<std::string> DiskManager::path(GraphSpaceID spaceId, PartitionID partId) {
auto spaceIt = partPath_.find(spaceId);
if (spaceIt == partPath_.end()) {
StatusOr<std::string> DiskManager::path(GraphSpaceID spaceId, PartitionID partId) const {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
auto spaceIt = paths->partPath_.find(spaceId);
if (spaceIt == paths->partPath_.end()) {
return Status::Error("Space not found");
}
for (const auto& [path, parts] : spaceIt->second) {
Expand All @@ -67,13 +79,17 @@ StatusOr<std::string> DiskManager::path(GraphSpaceID spaceId, PartitionID partId
void DiskManager::addPartToPath(GraphSpaceID spaceId, PartitionID partId, const std::string& path) {
std::lock_guard<std::mutex> lg(lock_);
try {
Paths* oldPaths = paths_.load(std::memory_order_acquire);
Paths* newPaths = new Paths(*oldPaths);
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
dataPath = boost::filesystem::absolute(dataPath);
auto iter = std::find(dataPaths_.begin(), dataPaths_.end(), dataPath);
CHECK(iter != dataPaths_.end());
partIndex_[spaceId][partId] = iter - dataPaths_.begin();
partPath_[spaceId][canonical.string()].emplace(partId);
auto iter = std::find(newPaths->dataPaths_.begin(), newPaths->dataPaths_.end(), dataPath);
CHECK(iter != newPaths->dataPaths_.end());
newPaths->partIndex_[spaceId][partId] = iter - newPaths->dataPaths_.begin();
newPaths->partPath_[spaceId][canonical.string()].emplace(partId);
paths_.store(newPaths, std::memory_order_release);
folly::rcu_retire(oldPaths, std::default_delete<Paths>());
} catch (boost::filesystem::filesystem_error& e) {
LOG(FATAL) << "Invalid path: " << e.what();
}
Expand All @@ -84,21 +100,26 @@ void DiskManager::removePartFromPath(GraphSpaceID spaceId,
const std::string& path) {
std::lock_guard<std::mutex> lg(lock_);
try {
Paths* oldPaths = paths_.load(std::memory_order_acquire);
Paths* newPaths = new Paths(*oldPaths);
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
dataPath = boost::filesystem::absolute(dataPath);
auto iter = std::find(dataPaths_.begin(), dataPaths_.end(), dataPath);
CHECK(iter != dataPaths_.end());
partIndex_[spaceId].erase(partId);
partPath_[spaceId][canonical.string()].erase(partId);
auto iter = std::find(newPaths->dataPaths_.begin(), newPaths->dataPaths_.end(), dataPath);
CHECK(iter != newPaths->dataPaths_.end());
newPaths->partIndex_[spaceId].erase(partId);
newPaths->partPath_[spaceId][canonical.string()].erase(partId);
paths_.store(newPaths, std::memory_order_release);
folly::rcu_retire(oldPaths, std::default_delete<Paths>());
} catch (boost::filesystem::filesystem_error& e) {
LOG(FATAL) << "Invalid path: " << e.what();
}
}

void DiskManager::getDiskParts(SpaceDiskPartsMap& diskParts) {
std::lock_guard<std::mutex> lg(lock_);
for (const auto& [space, partDiskMap] : partPath_) {
void DiskManager::getDiskParts(SpaceDiskPartsMap& diskParts) const {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
for (const auto& [space, partDiskMap] : paths->partPath_) {
std::unordered_map<std::string, meta::cpp2::PartitionList> tmpPartPaths;
for (const auto& [path, partitions] : partDiskMap) {
std::vector<PartitionID> tmpPartitions;
Expand All @@ -113,9 +134,11 @@ void DiskManager::getDiskParts(SpaceDiskPartsMap& diskParts) {
}
}

bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) {
auto spaceIt = partIndex_.find(spaceId);
if (spaceIt == partIndex_.end()) {
bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) const {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
auto spaceIt = paths->partIndex_.find(spaceId);
if (spaceIt == paths->partIndex_.end()) {
return false;
}
auto partIt = spaceIt->second.find(partId);
Expand All @@ -127,14 +150,16 @@ bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) {

void DiskManager::refresh() {
// refresh the available bytes of each data path, skip the dummy path
for (size_t i = 0; i < dataPaths_.size(); i++) {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
for (size_t i = 0; i < paths->dataPaths_.size(); i++) {
boost::system::error_code ec;
auto info = boost::filesystem::space(dataPaths_[i], ec);
auto info = boost::filesystem::space(paths->dataPaths_[i], ec);
if (!ec) {
VLOG(2) << "Refresh filesystem info of " << dataPaths_[i];
VLOG(2) << "Refresh filesystem info of " << paths->dataPaths_[i];
freeBytes_[i] = info.available;
} else {
LOG(WARNING) << "Get filesystem info of " << dataPaths_[i] << " failed";
LOG(WARNING) << "Get filesystem info of " << paths->dataPaths_[i] << " failed";
}
}
}
Expand Down
29 changes: 17 additions & 12 deletions src/kvstore/DiskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/base/StatusOr.h"
#include "common/thread/GenericWorker.h"
#include "common/thrift/ThriftTypes.h"
#include "folly/synchronization/Rcu.h"
#include "interface/gen-cpp2/meta_types.h"

namespace nebula {
Expand All @@ -41,14 +42,16 @@ class DiskManager {
DiskManager(const std::vector<std::string>& dataPaths,
std::shared_ptr<thread::GenericWorker> bgThread = nullptr);

~DiskManager();

/**
* @brief return canonical data path of given space
*
* @param spaceId
* @return StatusOr<std::vector<std::string>> Canonical path of all which contains the specified
* space, e.g. {"/DataPath1/nebula/spaceId", "/DataPath2/nebula/spaceId" ... }
*/
StatusOr<std::vector<std::string>> path(GraphSpaceID spaceId);
StatusOr<std::vector<std::string>> path(GraphSpaceID spaceId) const;

/**
* @brief Canonical path which contains the specified space and part, e.g.
Expand All @@ -60,7 +63,7 @@ class DiskManager {
* @param partId
* @return StatusOr<std::string> data path of given partId if found, else return error status
*/
StatusOr<std::string> path(GraphSpaceID spaceId, PartitionID partId);
StatusOr<std::string> path(GraphSpaceID spaceId, PartitionID partId) const;

/**
* @brief Add a partition to a given path, called when add a partiton in NebulaStore
Expand Down Expand Up @@ -90,35 +93,37 @@ class DiskManager {
* @return true Data path remains enough space
* @return false Data path does not remain enough space
*/
bool hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId);
bool hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) const;

/**
* @brief Get all partitions grouped by data path and spaceId
*
* @param diskParts Get all space data path and all partition in the path
*/
void getDiskParts(SpaceDiskPartsMap& diskParts);
void getDiskParts(SpaceDiskPartsMap& diskParts) const;

private:
/**
* @brief Refresh free bytes of data path periodically
*/
void refresh();

struct Paths {
// canonical path of data_path flag
std::vector<boost::filesystem::path> dataPaths_;
// given a space and data path, return all parts in the path
std::unordered_map<GraphSpaceID, PartDiskMap> partPath_;
// the index in dataPaths_ for a given space + part
std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, size_t>> partIndex_;
};

private:
std::shared_ptr<thread::GenericWorker> bgThread_;

// canonical path of data_path flag
std::vector<boost::filesystem::path> dataPaths_;
std::atomic<Paths*> paths_;
// free space available to a non-privileged process, in bytes
std::vector<std::atomic_uint64_t> freeBytes_;

// given a space and data path, return all parts in the path
std::unordered_map<GraphSpaceID, PartDiskMap> partPath_;

// the index in dataPaths_ for a given space + part
std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, size_t>> partIndex_;

// lock used to protect partPath_ and partIndex_
std::mutex lock_;
critical27 marked this conversation as resolved.
Show resolved Hide resolved
};
Expand Down