Skip to content

Commit

Permalink
address @wenhaocs's comments, rebased (vesoft-inc#670)
Browse files Browse the repository at this point in the history
## What type of PR is this?
- [ ] bug
- [ ] feature
- [X] enhancement

## What problem(s) does this PR solve?
#### Issue(s) number: 

#### Description:

This is some prepare work for later data balance test, mainly two changes:
1. Follow up of vesoft-inc#3717(In that pr we use snapshot when doing data scan, but the commit log id/term is not taken care). So in this PR, we create a rocksdb snapshot at first, read commit log id/term, and use the same snapshot to scan data.
2. We can't simply skip the special raft command, because if we skip the special log after rebooting, the state of raft peer will be wrong.
3. Add some metrics in raft, remove the `SlowOpTracker`.

## How do you solve it?

## Special notes for your reviewer, ex. impact of this fix, design document, etc:

## Checklist:
Tests:
- [X] Unit test(positive and negative cases)
- [ ] Function test
- [ ] Performance test
- [ ] N/A

Affects:
- [ ] Documentation affected (Please add the label if documentation needs to be modified.)
- [ ] Incompatibility (If it breaks the compatibility, please describe it and add the label.)
- [ ] If it's needed to cherry-pick (If cherry-pick to some branches is required, please label the destination version(s).)
- [ ] Performance impacted: Consumes more CPU/Memory


## Release notes:
not related



Migrated from vesoft-inc#3846

Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
  • Loading branch information
nebula-bots and critical27 committed Mar 15, 2022
1 parent 7a8529b commit 8227aab
Show file tree
Hide file tree
Showing 25 changed files with 173 additions and 212 deletions.
1 change: 0 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"v",
"heartbeat_interval_secs",
"meta_client_retry_times",
"slow_op_threshold_ms",
"clean_wal_interval_secs",
"wal_ttl",
"clean_wal_interval_secs",
Expand Down
1 change: 0 additions & 1 deletion src/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ nebula_add_library(
Status.cpp
SanitizerOptions.cpp
SignalHandler.cpp
SlowOpTracker.cpp
${gdb_debug_script}
)

Expand Down
11 changes: 0 additions & 11 deletions src/common/base/SlowOpTracker.cpp

This file was deleted.

40 changes: 0 additions & 40 deletions src/common/base/SlowOpTracker.h

This file was deleted.

7 changes: 0 additions & 7 deletions src/common/base/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ nebula_add_test(
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME slow_op_tracker_test
SOURCES SlowOpTrackerTest.cpp
OBJECTS $<TARGET_OBJECTS:base_obj> $<TARGET_OBJECTS:time_obj>
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME lru_test
SOURCES ConcurrentLRUCacheTest.cpp
Expand Down
28 changes: 0 additions & 28 deletions src/common/base/test/SlowOpTrackerTest.cpp

This file was deleted.

1 change: 0 additions & 1 deletion src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
{"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ class KVEngine {
* @param value Pointer of value
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;
virtual nebula::cpp2::ErrorCode get(const std::string& key,
std::string* value,
const void* snapshot = nullptr) = 0;

/**
* @brief Read a list of keys
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class KVStore {
virtual const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) = 0;

/**
* @brief Release snapshot.
*
Expand All @@ -137,7 +138,8 @@ class KVStore {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) = 0;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = 0;

/**
* @brief Read a list of keys
Expand Down
64 changes: 50 additions & 14 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,69 @@ NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
CHECK_NOTNULL(store_);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
static constexpr LogID kInvalidLogId = -1;
static constexpr TermID kInvalidLogTerm = -1;
std::vector<std::string> data;
int64_t totalSize = 0;
int64_t totalCount = 0;
LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot, rate limited to {}, batch size is {}",
spaceId,
partId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

const void* snapshot = store_->GetSnapshot(spaceId, partId);
CHECK_NOTNULL(store_);
auto partRet = store_->part(spaceId, partId);
if (!ok(partRet)) {
LOG(INFO) << folly::sformat("Failed to find space {} part {]", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
// Create a rocksdb snapshot
auto snapshot = store_->GetSnapshot(spaceId, partId);
SCOPE_EXIT {
if (snapshot != nullptr) {
store_->ReleaseSnapshot(spaceId, partId, snapshot);
}
};
auto part = nebula::value(partRet);
// Get the commit log id and commit log term of specified partition
std::string val;
auto commitRet = part->engine()->get(NebulaKeyUtils::systemCommitKey(partId), &val, snapshot);
if (commitRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << folly::sformat(
"Cannot fetch the commit log id and term of space {} part {}", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID));
LogID commitLogId;
TermID commitLogTerm;
memcpy(reinterpret_cast<void*>(&commitLogId), val.data(), sizeof(LogID));
memcpy(reinterpret_cast<void*>(&commitLogTerm), val.data() + sizeof(LogID), sizeof(TermID));

LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot of commitLogId {} commitLogTerm {}, rate limited to "
"{}, batch size is {}",
spaceId,
partId,
commitLogId,
commitLogTerm,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
for (const auto& prefix : tables) {
if (!accessTable(spaceId,
partId,
snapshot,
prefix,
cb,
commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
rateLimiter.get())) {
return;
}
}
cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
}

// Promise is set in callback. Access part of the data, and try to send to
Expand All @@ -72,6 +101,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand All @@ -81,7 +112,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed"
<< ", error code:" << static_cast<int32_t>(ret);
cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return false;
}
data.reserve(kReserveNum);
Expand All @@ -91,7 +122,12 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
rateLimiter->consume(static_cast<double>(batchSize), // toConsume
static_cast<double>(FLAGS_snapshot_part_rate_limit), // rate
static_cast<double>(FLAGS_snapshot_part_rate_limit)); // burstSize
if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) {
if (cb(commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
raftex::SnapshotStatus::IN_PROGRESS)) {
data.clear();
batchSize = 0;
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand Down
15 changes: 9 additions & 6 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,8 @@ nebula::cpp2::ErrorCode NebulaStore::getFromKVEngine(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
Expand All @@ -631,17 +632,19 @@ nebula::cpp2::ErrorCode NebulaStore::getFromKVEngine(GraphSpaceID spaceId,
return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED
: nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->get(key, value);
return part->engine()->get(key, value, snapshot);
}

nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
// Currently nebula only reads from leader. We may have to revisit this if we support reading from
// follower.
if (storageCache_ && (storageCache_->vertexPoolExists() || storageCache_->emptyKeyPoolExists())) {
if (storageCache_ && (storageCache_->vertexPoolExists() || storageCache_->emptyKeyPoolExists()) &&
snapshot == nullptr) {
auto cacheKey = NebulaKeyUtils::cacheKey(spaceId, key);
auto exist = storageCache_->getVertexProp(cacheKey, value);
if (exist) {
Expand All @@ -654,7 +657,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
}
} else {
// cache miss
auto ret = getFromKVEngine(spaceId, partId, key, value, canReadFromFollower);
auto ret = getFromKVEngine(spaceId, partId, key, value, canReadFromFollower, nullptr);
if (storageCache_->vertexPoolExists() && ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
// write to vertex pool when the tag is found
folly::Baton<true, std::atomic> baton;
Expand All @@ -676,7 +679,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
}
}

return getFromKVEngine(spaceId, partId, key, value, canReadFromFollower);
return getFromKVEngine(spaceId, partId, key, value, canReadFromFollower, snapshot);
}

const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId,
Expand Down
6 changes: 4 additions & 2 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

/**
* @brief Read a list of keys
Expand Down Expand Up @@ -843,7 +844,8 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower);
bool canReadFromFollower,
const void* snapshot);

private:
// The lock used to protect spaces_
Expand Down
Loading

0 comments on commit 8227aab

Please sign in to comment.