Skip to content

Commit

Permalink
Support custom compactionFilter and mergeOperator in rocksdb engine (v…
Browse files Browse the repository at this point in the history
  • Loading branch information
dangleptr authored and dutor committed Jan 16, 2019
1 parent 93e041c commit 2bd40ef
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 24 deletions.
5 changes: 4 additions & 1 deletion src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ int main(int argc, char *argv[]) {
CHECK(NetworkUtils::ipv4ToInt(result.value(), localIP));

std::unique_ptr<KVStore> kvstore;
kvstore.reset(KVStore::instance(HostAddr(localIP, FLAGS_port), std::move(paths)));
nebula::kvstore::KVOptions options;
options.local_ = HostAddr(localIP, FLAGS_port);
options.dataPaths_ = std::move(paths);
kvstore.reset(KVStore::instance(std::move(options)));
std::unique_ptr<SchemaManager> schemaMan(SchemaManager::instance());

auto handler = std::make_shared<StorageServiceHandler>(kvstore.get(), schemaMan.get());
Expand Down
14 changes: 8 additions & 6 deletions src/kvstore/KVStoreImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,22 @@ namespace nebula {
namespace kvstore {

// static
KVStore* KVStore::instance(HostAddr local, std::vector<std::string> paths) {
auto* instance = new KVStoreImpl(local, std::move(paths));
reinterpret_cast<KVStoreImpl*>(instance)->init();
KVStore* KVStore::instance(KVOptions options) {
auto* instance = new KVStoreImpl(options);
static_cast<KVStoreImpl*>(instance)->init();
return instance;
}

std::vector<Engine> KVStoreImpl::initEngines(GraphSpaceID spaceId) {
decltype(kvs_[spaceId]->engines_) engines;
for (auto& path : paths_) {
for (auto& path : options_.dataPaths_) {
if (FLAGS_engine_type == "rocksdb") {
engines.emplace_back(
new RocksdbEngine(spaceId,
folly::stringPrintf("%s/nebula/%d/data",
path.c_str(), spaceId)),
path.c_str(), spaceId),
options_.mergeOp_,
options_.cfFactory_),
path);
} else {
LOG(FATAL) << "Unknown engine type " << FLAGS_engine_type;
Expand All @@ -64,7 +66,7 @@ std::vector<Engine> KVStoreImpl::initEngines(GraphSpaceID spaceId) {
}

void KVStoreImpl::init() {
auto partsMap = partMan_->parts(local_);
auto partsMap = partMan_->parts(options_.local_);
LOG(INFO) << "Init all parts, total graph space " << partsMap.size();
std::for_each(partsMap.begin(), partsMap.end(), [this](auto& idPart) {
auto spaceId = idPart.first;
Expand Down
10 changes: 4 additions & 6 deletions src/kvstore/KVStoreImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ class KVStoreImpl : public KVStore {
FRIEND_TEST(KVStoreTest, SimpleTest);

public:
KVStoreImpl(HostAddr local, std::vector<std::string> paths)
: partMan_(PartManager::instance())
, local_(local)
, paths_(std::move(paths)) {}
explicit KVStoreImpl(KVOptions options)
: partMan_(PartManager::instance())
, options_(std::move(options)) {}

~KVStoreImpl() = default;

Expand Down Expand Up @@ -77,8 +76,7 @@ class KVStoreImpl : public KVStore {
private:
std::unordered_map<GraphSpaceID, std::unique_ptr<GraphSpaceKV>> kvs_;
PartManager* partMan_ = nullptr;
HostAddr local_;
std::vector<std::string> paths_;
KVOptions options_;
};

} // namespace kvstore
Expand Down
10 changes: 9 additions & 1 deletion src/kvstore/RocksdbEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ DEFINE_uint32(batch_reserved_bytes, 4 * 1024, "default reserved bytes for one ba
namespace nebula {
namespace kvstore {

RocksdbEngine::RocksdbEngine(GraphSpaceID spaceId, const std::string& dataPath)
RocksdbEngine::RocksdbEngine(GraphSpaceID spaceId, const std::string& dataPath,
std::shared_ptr<rocksdb::MergeOperator> mergeOp,
std::shared_ptr<rocksdb::CompactionFilterFactory> cfFactory)
: StorageEngine(spaceId)
, dataPath_(dataPath) {
LOG(INFO) << "open rocksdb on " << dataPath;
Expand All @@ -22,6 +24,12 @@ RocksdbEngine::RocksdbEngine(GraphSpaceID spaceId, const std::string& dataPath)
}
rocksdb::Options options;
options.create_if_missing = true;
if (mergeOp != nullptr) {
options.merge_operator = mergeOp;
}
if (cfFactory != nullptr) {
options.compaction_filter_factory = cfFactory;
}
rocksdb::DB* db = nullptr;
rocksdb::Status status = rocksdb::DB::Open(options, dataPath_, &db);
CHECK(status.ok());
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/RocksdbEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ class RocksdbEngine : public StorageEngine {
FRIEND_TEST(RocksdbEngineTest, SimpleTest);

public:
RocksdbEngine(GraphSpaceID spaceId, const std::string& dataPath);
RocksdbEngine(GraphSpaceID spaceId, const std::string& dataPath,
std::shared_ptr<rocksdb::MergeOperator> mergeOp = nullptr,
std::shared_ptr<rocksdb::CompactionFilterFactory> cfFactory = nullptr);

~RocksdbEngine();

Expand Down
28 changes: 25 additions & 3 deletions src/kvstore/include/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,43 @@
#include "base/Base.h"
#include "kvstore/include/ResultCode.h"
#include "kvstore/include/Iterator.h"
#include <rocksdb/merge_operator.h>
#include <rocksdb/compaction_filter.h>

namespace nebula {
namespace kvstore {


using KVCallback = std::function<void(ResultCode code, HostAddr hostAddr)>;

struct KVOptions {
/**
* Local address, it would be used for search related meta information on meta server.
* */
HostAddr local_;
/**
* Paths for data. It would be used by rocksdb engine.
* Be careful! We should ensure each "paths" has only one instance, otherwise
* it would mix up the data on disk.
* */
std::vector<std::string> dataPaths_;
/**
* Custom MergeOperator used in rocksdb.merge method.
* */
std::shared_ptr<rocksdb::MergeOperator> mergeOp_{nullptr};
/**
* Custom CompactionFilter used in compaction.
* */
std::shared_ptr<rocksdb::CompactionFilterFactory> cfFactory_{nullptr};
};


class KVStore {
public:
/**
* Create one new instance each time.
* Be careful! We should ensure each "paths" has only one instance, otherwise
* it would mix up the data on disk.
* */
static KVStore* instance(HostAddr local, std::vector<std::string> paths);
static KVStore* instance(KVOptions options);

virtual ~KVStore() = default;

Expand Down
6 changes: 4 additions & 2 deletions src/kvstore/test/KVStoreTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ TEST(KVStoreTest, SimpleTest) {
paths.push_back(folly::stringPrintf("%s/disk2", rootPath.path()));

std::unique_ptr<KVStoreImpl> kv;
kv.reset(reinterpret_cast<KVStoreImpl*>(KVStore::instance(HostAddr(0, 0),
std::move(paths))));
KVOptions options;
options.local_ = HostAddr(0, 0);
options.dataPaths_ = std::move(paths);
kv.reset(static_cast<KVStoreImpl*>(KVStore::instance(std::move(options))));
EXPECT_EQ(2, kv->kvs_.size());

EXPECT_EQ(6, kv->kvs_[1]->parts_.size());
Expand Down
43 changes: 43 additions & 0 deletions src/storage/CompactionFilter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#ifndef STORAGE_COMPACTIONFILTER_H_
#define STORAGE_COMPACTIONFILTER_H_

#include "base/Base.h"
#include <rocksdb/compaction_filter.h>

namespace nebula {
namespace storage {

class NebulaCompactionFilterFactory final : public rocksdb::CompactionFilterFactory {
public:
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context& context) override {
return std::make_unique<NebulaCompactionFilter>();
}

const char* Name() const override {
return "NebulaCompactionFilterFactory";
}
};

class NebulaCompactionFilter final : public rocksdb::CompactionFilter {
public:
bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& old_val,
std::string* new_val, bool* value_changed) const override {
return false;
}

const char* Name() const override {
return "NebulaCompactionFilter";
}
};

} // namespace storage
} // namespace nebula
#endif // STORAGE_COMPACTIONFILTER_H_

42 changes: 42 additions & 0 deletions src/storage/MergeOperator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#ifndef KVSTORE_MERGEOPERATOR_H_
#define KVSTORE_MERGEOPERATOR_H_

#include "base/Base.h"
#include <rocksdb/merge_operator.h>

namespace nebula {
namespace kvstore {

class NebulaOperator : public rocksdb::MergeOperator {
public:
const char* Name() const override {
return "NebulaOperator";
}

private:
// Default implementations of the MergeOperator functions
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
LOG(FATAL) << "NOT Implement!";
return false;
}

bool PartialMerge(const Slice& key, const Slice& left_operand,
const Slice& right_operand, std::string* new_value,
Logger* logger) const override {
LOG(FATL) << "NOT implement!";
return false;
}
};


} // namespace kvstore
} // namespace nebula
#endif // KVSTORE_MERGEOPERATOR_H_

12 changes: 8 additions & 4 deletions src/storage/test/TestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TestUtils {
public:
static kvstore::KVStore* initKV(const char* rootPath) {
FLAGS_part_man_type = "memory"; // Use MemPartManager.
kvstore::MemPartManager* partMan = reinterpret_cast<kvstore::MemPartManager*>(
kvstore::MemPartManager* partMan = static_cast<kvstore::MemPartManager*>(
kvstore::PartManager::instance());
// GraphSpaceID => {PartitionIDs}
// 0 => {0, 1, 2, 3, 4, 5}
Expand All @@ -32,9 +32,13 @@ class TestUtils {
std::vector<std::string> paths;
paths.push_back(folly::stringPrintf("%s/disk1", rootPath));
paths.push_back(folly::stringPrintf("%s/disk2", rootPath));
kvstore::KVStoreImpl* kv = reinterpret_cast<kvstore::KVStoreImpl*>(
kvstore::KVStore::instance(HostAddr(0, 0),
std::move(paths)));

kvstore::KVOptions options;
options.local_ = HostAddr(0, 0);
options.dataPaths_ = std::move(paths);

kvstore::KVStoreImpl* kv = static_cast<kvstore::KVStoreImpl*>(
kvstore::KVStore::instance(std::move(options)));
return kv;
}

Expand Down

0 comments on commit 2bd40ef

Please sign in to comment.