From 8227aabad9fa1fbfc4bf1dc0759ea8f45ea490d4 Mon Sep 17 00:00:00 2001 From: nebula-bots <88429921+nebula-bots@users.noreply.github.com> Date: Tue, 15 Mar 2022 17:38:31 +0800 Subject: [PATCH] address @wenhaocs's comments, rebased (#670) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What type of PR is this? - [ ] bug - [ ] feature - [X] enhancement ## What problem(s) does this PR solve? #### Issue(s) number: #### Description: This is some prepare work for later data balance test, mainly two changes: 1. Follow up of #3717(In that pr we use snapshot when doing data scan, but the commit log id/term is not taken care). So in this PR, we create a rocksdb snapshot at first, read commit log id/term, and use the same snapshot to scan data. 2. We can't simply skip the special raft command, because if we skip the special log after rebooting, the state of raft peer will be wrong. 3. Add some metrics in raft, remove the `SlowOpTracker`. ## How do you solve it? ## Special notes for your reviewer, ex. impact of this fix, design document, etc: ## Checklist: Tests: - [X] Unit test(positive and negative cases) - [ ] Function test - [ ] Performance test - [ ] N/A Affects: - [ ] Documentation affected (Please add the label if documentation needs to be modified.) - [ ] Incompatibility (If it breaks the compatibility, please describe it and add the label.) - [ ] If it's needed to cherry-pick (If cherry-pick to some branches is required, please label the destination version(s).) - [ ] Performance impacted: Consumes more CPU/Memory ## Release notes: not related Migrated from vesoft-inc/nebula#3846 Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> --- resources/gflags.json | 1 - src/common/base/CMakeLists.txt | 1 - src/common/base/SlowOpTracker.cpp | 11 ---- src/common/base/SlowOpTracker.h | 40 -------------- src/common/base/test/CMakeLists.txt | 7 --- src/common/base/test/SlowOpTrackerTest.cpp | 28 ---------- src/common/meta/GflagsManager.cpp | 1 - src/kvstore/KVEngine.h | 4 +- src/kvstore/KVStore.h | 4 +- src/kvstore/NebulaSnapshotManager.cpp | 64 +++++++++++++++++----- src/kvstore/NebulaSnapshotManager.h | 2 + src/kvstore/NebulaStore.cpp | 15 +++-- src/kvstore/NebulaStore.h | 6 +- src/kvstore/Part.cpp | 59 ++++---------------- src/kvstore/RocksEngine.cpp | 9 ++- src/kvstore/RocksEngine.h | 4 +- src/kvstore/raftex/Host.cpp | 9 ++- src/kvstore/raftex/RaftPart.cpp | 48 ++++++++-------- src/kvstore/raftex/SnapshotManager.cpp | 18 ++---- src/kvstore/raftex/SnapshotManager.h | 4 +- src/kvstore/raftex/test/TestShard.h | 15 ++++- src/kvstore/stats/KVStats.cpp | 17 +++++- src/kvstore/stats/KVStats.h | 7 ++- src/storage/test/IndexTestUtil.h | 9 ++- tests/admin/test_configs.py | 2 - 25 files changed, 173 insertions(+), 212 deletions(-) delete mode 100644 src/common/base/SlowOpTracker.cpp delete mode 100644 src/common/base/SlowOpTracker.h delete mode 100644 src/common/base/test/SlowOpTrackerTest.cpp diff --git a/resources/gflags.json b/resources/gflags.json index 59d164ea12d..52fd60b2786 100644 --- a/resources/gflags.json +++ b/resources/gflags.json @@ -4,7 +4,6 @@ "v", "heartbeat_interval_secs", "meta_client_retry_times", - "slow_op_threshold_ms", "clean_wal_interval_secs", "wal_ttl", "clean_wal_interval_secs", diff --git a/src/common/base/CMakeLists.txt b/src/common/base/CMakeLists.txt index 22db6d19c43..d32bebf795c 100644 --- a/src/common/base/CMakeLists.txt +++ b/src/common/base/CMakeLists.txt @@ -13,7 +13,6 @@ nebula_add_library( Status.cpp SanitizerOptions.cpp SignalHandler.cpp - SlowOpTracker.cpp ${gdb_debug_script} ) diff --git a/src/common/base/SlowOpTracker.cpp b/src/common/base/SlowOpTracker.cpp deleted file mode 100644 index 0986643d9fc..00000000000 --- a/src/common/base/SlowOpTracker.cpp +++ /dev/null @@ -1,11 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "common/base/SlowOpTracker.h" - -#include "common/base/Base.h" -#include "common/time/WallClock.h" - -DEFINE_int64(slow_op_threshold_ms, 100, "default threshold for slow operation"); diff --git a/src/common/base/SlowOpTracker.h b/src/common/base/SlowOpTracker.h deleted file mode 100644 index 63c86597298..00000000000 --- a/src/common/base/SlowOpTracker.h +++ /dev/null @@ -1,40 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef COMMON_BASE_SLOWOPTRACKER_H_ -#define COMMON_BASE_SLOWOPTRACKER_H_ - -#include "common/base/Base.h" -#include "common/time/WallClock.h" - -DECLARE_int64(slow_op_threshold_ms); - -namespace nebula { - -class SlowOpTracker { - public: - SlowOpTracker() : startMs_(time::WallClock::fastNowInMilliSec()) {} - - ~SlowOpTracker() = default; - - bool slow(int64_t threshold = 0) { - dur_ = time::WallClock::fastNowInMilliSec() - startMs_; - if (dur_ < 0) { - dur_ = 0; - } - return threshold > 0 ? dur_ > threshold : dur_ > FLAGS_slow_op_threshold_ms; - } - - void output(const std::string& prefix, const std::string& msg) { - LOG(INFO) << prefix << "total time:" << dur_ << "ms, " << msg; - } - - private: - int64_t startMs_ = 0; - int64_t dur_ = 0; -}; - -} // namespace nebula -#endif // COMMON_BASE_SLOWOPTRACKER_H_ diff --git a/src/common/base/test/CMakeLists.txt b/src/common/base/test/CMakeLists.txt index 15f89608d67..bfe1e4d5dd0 100644 --- a/src/common/base/test/CMakeLists.txt +++ b/src/common/base/test/CMakeLists.txt @@ -79,13 +79,6 @@ nebula_add_test( LIBRARIES gtest gtest_main ) -nebula_add_test( - NAME slow_op_tracker_test - SOURCES SlowOpTrackerTest.cpp - OBJECTS $ $ - LIBRARIES gtest gtest_main -) - nebula_add_test( NAME lru_test SOURCES ConcurrentLRUCacheTest.cpp diff --git a/src/common/base/test/SlowOpTrackerTest.cpp b/src/common/base/test/SlowOpTrackerTest.cpp deleted file mode 100644 index 5d12762be1e..00000000000 --- a/src/common/base/test/SlowOpTrackerTest.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include - -#include "common/base/Base.h" -#include "common/base/SlowOpTracker.h" - -namespace nebula { - -TEST(SlowOpTrackerTest, SimpleTest) { - SlowOpTracker tracker; - usleep(500000); - CHECK(tracker.slow()); - tracker.output("PREFIX", "This is a prefix msg"); -} - -} // namespace nebula - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - return RUN_ALL_TESTS(); -} diff --git a/src/common/meta/GflagsManager.cpp b/src/common/meta/GflagsManager.cpp index 4661dabef95..9c2ba419c96 100644 --- a/src/common/meta/GflagsManager.cpp +++ b/src/common/meta/GflagsManager.cpp @@ -55,7 +55,6 @@ std::unordered_map> GflagsManager {"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, {"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, {"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}}, - {"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}}, {"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}}, {"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, {"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 71902c12b89..ea8911e79c9 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -117,7 +117,9 @@ class KVEngine { * @param value Pointer of value * @return nebula::cpp2::ErrorCode */ - virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0; + virtual nebula::cpp2::ErrorCode get(const std::string& key, + std::string* value, + const void* snapshot = nullptr) = 0; /** * @brief Read a list of keys diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index f97e47719d4..9a8f56332ad 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -114,6 +114,7 @@ class KVStore { virtual const void* GetSnapshot(GraphSpaceID spaceId, PartitionID partID, bool canReadFromFollower = false) = 0; + /** * @brief Release snapshot. * @@ -137,7 +138,8 @@ class KVStore { PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower = false) = 0; + bool canReadFromFollower = false, + const void* snapshot = nullptr) = 0; /** * @brief Read a list of keys diff --git a/src/kvstore/NebulaSnapshotManager.cpp b/src/kvstore/NebulaSnapshotManager.cpp index 4bff03a0a40..15704e668fd 100644 --- a/src/kvstore/NebulaSnapshotManager.cpp +++ b/src/kvstore/NebulaSnapshotManager.cpp @@ -29,32 +29,61 @@ NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) { void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, PartitionID partId, raftex::SnapshotCallback cb) { - auto rateLimiter = std::make_unique(); - CHECK_NOTNULL(store_); - auto tables = NebulaKeyUtils::snapshotPrefix(partId); + static constexpr LogID kInvalidLogId = -1; + static constexpr TermID kInvalidLogTerm = -1; std::vector data; int64_t totalSize = 0; int64_t totalCount = 0; - LOG(INFO) << folly::sformat( - "Space {} Part {} start send snapshot, rate limited to {}, batch size is {}", - spaceId, - partId, - FLAGS_snapshot_part_rate_limit, - FLAGS_snapshot_batch_size); - - const void* snapshot = store_->GetSnapshot(spaceId, partId); + CHECK_NOTNULL(store_); + auto partRet = store_->part(spaceId, partId); + if (!ok(partRet)) { + LOG(INFO) << folly::sformat("Failed to find space {} part {]", spaceId, partId); + cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); + return; + } + // Create a rocksdb snapshot + auto snapshot = store_->GetSnapshot(spaceId, partId); SCOPE_EXIT { if (snapshot != nullptr) { store_->ReleaseSnapshot(spaceId, partId, snapshot); } }; + auto part = nebula::value(partRet); + // Get the commit log id and commit log term of specified partition + std::string val; + auto commitRet = part->engine()->get(NebulaKeyUtils::systemCommitKey(partId), &val, snapshot); + if (commitRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << folly::sformat( + "Cannot fetch the commit log id and term of space {} part {}", spaceId, partId); + cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); + return; + } + CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID)); + LogID commitLogId; + TermID commitLogTerm; + memcpy(reinterpret_cast(&commitLogId), val.data(), sizeof(LogID)); + memcpy(reinterpret_cast(&commitLogTerm), val.data() + sizeof(LogID), sizeof(TermID)); + + LOG(INFO) << folly::sformat( + "Space {} Part {} start send snapshot of commitLogId {} commitLogTerm {}, rate limited to " + "{}, batch size is {}", + spaceId, + partId, + commitLogId, + commitLogTerm, + FLAGS_snapshot_part_rate_limit, + FLAGS_snapshot_batch_size); + auto rateLimiter = std::make_unique(); + auto tables = NebulaKeyUtils::snapshotPrefix(partId); for (const auto& prefix : tables) { if (!accessTable(spaceId, partId, snapshot, prefix, cb, + commitLogId, + commitLogTerm, data, totalCount, totalSize, @@ -62,7 +91,7 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, return; } } - cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE); + cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::DONE); } // Promise is set in callback. Access part of the data, and try to send to @@ -72,6 +101,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, + LogID commitLogId, + TermID commitLogTerm, std::vector& data, int64_t& totalCount, int64_t& totalSize, @@ -81,7 +112,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { VLOG(2) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed" << ", error code:" << static_cast(ret); - cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); + cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); return false; } data.reserve(kReserveNum); @@ -91,7 +122,12 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, rateLimiter->consume(static_cast(batchSize), // toConsume static_cast(FLAGS_snapshot_part_rate_limit), // rate static_cast(FLAGS_snapshot_part_rate_limit)); // burstSize - if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) { + if (cb(commitLogId, + commitLogTerm, + data, + totalCount, + totalSize, + raftex::SnapshotStatus::IN_PROGRESS)) { data.clear(); batchSize = 0; } else { diff --git a/src/kvstore/NebulaSnapshotManager.h b/src/kvstore/NebulaSnapshotManager.h index 1d1bced436c..73447ccbead 100644 --- a/src/kvstore/NebulaSnapshotManager.h +++ b/src/kvstore/NebulaSnapshotManager.h @@ -50,6 +50,8 @@ class NebulaSnapshotManager : public raftex::SnapshotManager { const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, + LogID commitLogId, + TermID commitLogTerm, std::vector& data, int64_t& totalCount, int64_t& totalSize, diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index dd678cad9c7..09c5568a01f 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -621,7 +621,8 @@ nebula::cpp2::ErrorCode NebulaStore::getFromKVEngine(GraphSpaceID spaceId, PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower) { + bool canReadFromFollower, + const void* snapshot) { auto ret = part(spaceId, partId); if (!ok(ret)) { return error(ret); @@ -631,17 +632,19 @@ nebula::cpp2::ErrorCode NebulaStore::getFromKVEngine(GraphSpaceID spaceId, return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED : nebula::cpp2::ErrorCode::E_LEADER_CHANGED; } - return part->engine()->get(key, value); + return part->engine()->get(key, value, snapshot); } nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower) { + bool canReadFromFollower, + const void* snapshot) { // Currently nebula only reads from leader. We may have to revisit this if we support reading from // follower. - if (storageCache_ && (storageCache_->vertexPoolExists() || storageCache_->emptyKeyPoolExists())) { + if (storageCache_ && (storageCache_->vertexPoolExists() || storageCache_->emptyKeyPoolExists()) && + snapshot == nullptr) { auto cacheKey = NebulaKeyUtils::cacheKey(spaceId, key); auto exist = storageCache_->getVertexProp(cacheKey, value); if (exist) { @@ -654,7 +657,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, } } else { // cache miss - auto ret = getFromKVEngine(spaceId, partId, key, value, canReadFromFollower); + auto ret = getFromKVEngine(spaceId, partId, key, value, canReadFromFollower, nullptr); if (storageCache_->vertexPoolExists() && ret == nebula::cpp2::ErrorCode::SUCCEEDED) { // write to vertex pool when the tag is found folly::Baton baton; @@ -676,7 +679,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, } } - return getFromKVEngine(spaceId, partId, key, value, canReadFromFollower); + return getFromKVEngine(spaceId, partId, key, value, canReadFromFollower, snapshot); } const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId, diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 4623f6dff3a..c7123825155 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -227,7 +227,8 @@ class NebulaStore : public KVStore, public Handler { PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower = false) override; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override; /** * @brief Read a list of keys @@ -843,7 +844,8 @@ class NebulaStore : public KVStore, public Handler { PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower); + bool canReadFromFollower, + const void* snapshot); private: // The lock used to protect spaces_ diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index cfaccb009b4..44c4f1caff5 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -331,26 +331,12 @@ std::tuple Part::commitLogs( } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - commitTransLeader(newLeader); - } else { - VLOG(1) << idStr_ << "Skip commit stale transfer leader " << newLeader - << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " - << ts; - } + commitTransLeader(newLeader); break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - commitRemovePeer(peer); - } else { - VLOG(1) << idStr_ << "Skip commit stale remove peer " << peer - << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " - << ts; - } + commitRemovePeer(peer); break; } default: { @@ -434,51 +420,26 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const switch (log[sizeof(int64_t)]) { case OP_ADD_LEARNER: { auto learner = decodeHost(OP_ADD_LEARNER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess add learner " << learner; - addLearner(learner); - } else { - VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at " - << startTimeMs_ << ", but the log timestamp is " << ts; - } + LOG(INFO) << idStr_ << "preprocess add learner " << learner; + addLearner(learner); break; } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess trans leader " << newLeader; - preProcessTransLeader(newLeader); - } else { - VLOG(1) << idStr_ << "Skip stale transfer leader " << newLeader - << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " - << ts; - } + LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader; + preProcessTransLeader(newLeader); break; } case OP_ADD_PEER: { auto peer = decodeHost(OP_ADD_PEER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess add peer " << peer; - addPeer(peer); - } else { - VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at " - << startTimeMs_ << ", but the log timestamp is " << ts; - } + LOG(INFO) << idStr_ << "preprocess add peer " << peer; + addPeer(peer); break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess remove peer " << peer; - preProcessRemovePeer(peer); - } else { - VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at " - << startTimeMs_ << ", but the log timestamp is " << ts; - } + LOG(INFO) << idStr_ << "preprocess remove peer " << peer; + preProcessRemovePeer(peer); break; } default: { diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 43e92cca92f..49695483171 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -175,8 +175,13 @@ nebula::cpp2::ErrorCode RocksEngine::commitBatchWrite(std::unique_ptr(snapshot); + } rocksdb::Status status = db_->Get(options, rocksdb::Slice(key), value); if (status.ok()) { return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -240,7 +245,7 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref const void* snapshot, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; - if (snapshot != nullptr) { + if (UNLIKELY(snapshot != nullptr)) { options.snapshot = reinterpret_cast(snapshot); } options.prefix_same_as_start = true; diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 0fb9ecf2c11..62fd2283d8d 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -220,7 +220,9 @@ class RocksEngine : public KVEngine { * @param value Pointer of value * @return nebula::cpp2::ErrorCode */ - nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) override; + nebula::cpp2::ErrorCode get(const std::string& key, + std::string* value, + const void* snapshot = nullptr) override; /** * @brief Read a list of keys diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 685c70ebd28..499752f846e 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -10,7 +10,10 @@ #include #include "common/network/NetworkUtils.h" +#include "common/stats/StatsManager.h" +#include "common/time/WallClock.h" #include "kvstore/raftex/RaftPart.h" +#include "kvstore/stats/KVStats.h" #include "kvstore/wal/FileBasedWal.h" DEFINE_uint32(max_appendlog_batch_size, @@ -158,9 +161,12 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) { void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr req) { using TransportException = apache::thrift::transport::TTransportException; + auto beforeRpcUs = time::WallClock::fastNowInMicroSec(); sendAppendLogRequest(eb, req) .via(eb) - .thenValue([eb, self = shared_from_this()](cpp2::AppendLogResponse&& resp) { + .thenValue([eb, beforeRpcUs, self = shared_from_this()](cpp2::AppendLogResponse&& resp) { + stats::StatsManager::addValue(kAppendLogLatencyUs, + time::WallClock::fastNowInMicroSec() - beforeRpcUs); VLOG_IF(1, FLAGS_trace_raft) << self->idStr_ << "AppendLogResponse " << "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) << ", currTerm " @@ -333,6 +339,7 @@ nebula::cpp2::ErrorCode Host::startSendSnapshot() { << ", firstLogId in wal = " << part_->wal()->firstLogId() << ", lastLogId in wal = " << part_->wal()->lastLogId(); sendingSnapshot_ = true; + stats::StatsManager::addValue(kNumSendSnapshot); part_->snapshot_->sendSnapshot(part_, addr_) .thenValue([self = shared_from_this()](auto&& status) { std::lock_guard g(self->lock_); diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 34d3c7e7f42..d2eb5ac2e51 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -12,11 +12,11 @@ #include "common/base/Base.h" #include "common/base/CollectNSucceeded.h" -#include "common/base/SlowOpTracker.h" #include "common/network/NetworkUtils.h" #include "common/stats/StatsManager.h" #include "common/thread/NamedThread.h" #include "common/thrift/ThriftClientManager.h" +#include "common/time/ScopedTimer.h" #include "common/time/WallClock.h" #include "common/utils/LogStrListIterator.h" #include "interface/gen-cpp2/RaftexServiceAsyncClient.h" @@ -741,18 +741,18 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { prevLogTerm = lastLogTerm_; committed = committedLogId_; // Step 1: Write WAL - SlowOpTracker tracker; - if (!wal_->appendLogs(iter)) { - VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL"; - res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL; - lastLogId_ = wal_->lastLogId(); - lastLogTerm_ = wal_->lastLogTerm(); - break; + { + SCOPED_TIMER(&execTime_); + if (!wal_->appendLogs(iter)) { + VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL"; + res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL; + lastLogId_ = wal_->lastLogId(); + lastLogTerm_ = wal_->lastLogTerm(); + break; + } } + stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_); lastId = wal_->lastLogId(); - if (tracker.slow()) { - tracker.output(idStr_, folly::stringPrintf("Write WAL, total %ld", lastId - prevLogId + 1)); - } VLOG(4) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId << "] to WAL"; } while (false); @@ -797,7 +797,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb, << iter.firstLogId() << ", " << lastLogId << "] to all peer hosts"; lastMsgSentDur_.reset(); - SlowOpTracker tracker; + auto beforeAppendLogUs = time::WallClock::fastNowInMicroSec(); collectNSucceeded(gen::from(hosts) | gen::map([self = shared_from_this(), eb, @@ -830,13 +830,11 @@ void RaftPart::replicateLogs(folly::EventBase* eb, prevLogId, prevLogTerm, pHosts = std::move(hosts), - tracker](folly::Try&& result) mutable { + beforeAppendLogUs](folly::Try&& result) mutable { VLOG(4) << self->idStr_ << "Received enough response"; CHECK(!result.hasException()); - if (tracker.slow()) { - tracker.output(self->idStr_, - folly::stringPrintf("Total send logs: %ld", lastLogId - prevLogId + 1)); - } + stats::StatsManager::addValue(kReplicateLogLatencyUs, + time::WallClock::fastNowInMicroSec() - beforeAppendLogUs); self->processAppendLogResponses(*result, eb, std::move(it), @@ -918,7 +916,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, { auto walIt = wal_->iterator(committedId + 1, lastLogId); - SlowOpTracker tracker; // Step 3: Commit the batch auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), true); if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -938,10 +935,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, } else { LOG(FATAL) << idStr_ << "Failed to commit logs"; } - if (tracker.slow()) { - tracker.output(idStr_, - folly::stringPrintf("Total commit: %ld", committedLogId_ - committedId)); - } VLOG(4) << idStr_ << "Leader succeeded in committing the logs " << committedId + 1 << " to " << lastLogId; } @@ -1190,6 +1183,7 @@ folly::Future RaftPart::leaderElection(bool isPreVote) { auto proposedTerm = voteReq.get_term(); auto resps = ElectionResponses(); + stats::StatsManager::addValue(kNumStartElect); if (hosts.empty()) { auto ret = handleElectionResponses(resps, hosts, proposedTerm, isPreVote); inElection_ = false; @@ -1467,7 +1461,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, // Reset the last message time lastMsgRecvDur_.reset(); isBlindFollower_ = false; - stats::StatsManager::addValue(kNumRaftVotes); + stats::StatsManager::addValue(kNumGrantVotes); return; } @@ -1590,7 +1584,13 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, std::make_move_iterator(req.get_log_str_list().begin() + diffIndex), std::make_move_iterator(req.get_log_str_list().end())); RaftLogIterator logIter(firstId, std::move(logEntries)); - if (wal_->appendLogs(logIter)) { + bool result = false; + { + SCOPED_TIMER(&execTime_); + result = wal_->appendLogs(logIter); + } + stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_); + if (result) { CHECK_EQ(lastId, wal_->lastLogId()); lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp index 0182d46a3ad..576f76e3cc9 100644 --- a/src/kvstore/raftex/SnapshotManager.cpp +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -34,19 +34,13 @@ folly::Future>> SnapshotManager::sendSnapshot( auto spaceId = part->spaceId_; auto partId = part->partId_; auto termId = part->term_; - // TODO(heng): maybe the committedLogId is less than the real one in the - // snapshot. It will not loss the data, but maybe some record will be - // committed twice. - auto commitLogIdAndTerm = part->lastCommittedLogId(); const auto& localhost = part->address(); - std::vector> results; - VLOG(1) << part->idStr_ << "Begin to send the snapshot to the host " << dst - << ", commitLogId = " << commitLogIdAndTerm.first - << ", commitLogTerm = " << commitLogIdAndTerm.second; accessAllRowsInSnapshot( spaceId, partId, - [&, this, p = std::move(p)](const std::vector& data, + [&, this, p = std::move(p)](LogID commitLogId, + TermID commitLogTerm, + const std::vector& data, int64_t totalCount, int64_t totalSize, SnapshotStatus status) mutable -> bool { @@ -60,8 +54,8 @@ folly::Future>> SnapshotManager::sendSnapshot( auto f = send(spaceId, partId, termId, - commitLogIdAndTerm.first, - commitLogIdAndTerm.second, + commitLogId, + commitLogTerm, localhost, data, totalSize, @@ -77,7 +71,7 @@ folly::Future>> SnapshotManager::sendSnapshot( if (status == SnapshotStatus::DONE) { VLOG(1) << part->idStr_ << "Finished, totalCount " << totalCount << ", totalSize " << totalSize; - p.setValue(commitLogIdAndTerm); + p.setValue(std::make_pair(commitLogId, commitLogTerm)); } return true; } else { diff --git a/src/kvstore/raftex/SnapshotManager.h b/src/kvstore/raftex/SnapshotManager.h index fc082708c46..ff15e1de07e 100644 --- a/src/kvstore/raftex/SnapshotManager.h +++ b/src/kvstore/raftex/SnapshotManager.h @@ -26,7 +26,9 @@ enum SnapshotStatus { }; // Return false if send snapshot failed, will not send the rest of it. -using SnapshotCallback = folly::Function& rows, +using SnapshotCallback = folly::Function& rows, int64_t totalCount, int64_t totalSize, SnapshotStatus status)>; diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index 518ccf67b55..868f44aed7e 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -160,13 +160,19 @@ class NebulaSnapshotManager : public SnapshotManager { int64_t totalCount = 0; int64_t totalSize = 0; std::vector data; + auto commitLogIdAndTerm = part->lastCommittedLogId(); folly::RWSpinLock::ReadHolder rh(&part->lock_); for (auto& row : part->data_) { if (data.size() > 100) { LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() << ", total count sended " << totalCount << ", total size sended " << totalSize << ", finished false"; - cb(data, totalCount, totalSize, SnapshotStatus::IN_PROGRESS); + cb(commitLogIdAndTerm.first, + commitLogIdAndTerm.second, + data, + totalCount, + totalSize, + SnapshotStatus::IN_PROGRESS); data.clear(); } auto encoded = encodeSnapshotRow(row.first, row.second); @@ -177,7 +183,12 @@ class NebulaSnapshotManager : public SnapshotManager { LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() << ", total count sended " << totalCount << ", total size sended " << totalSize << ", finished true"; - cb(data, totalCount, totalSize, SnapshotStatus::DONE); + cb(commitLogIdAndTerm.first, + commitLogIdAndTerm.second, + data, + totalCount, + totalSize, + SnapshotStatus::DONE); } RaftexService* service_; diff --git a/src/kvstore/stats/KVStats.cpp b/src/kvstore/stats/KVStats.cpp index 6b8996f9b58..a6a152e404e 100644 --- a/src/kvstore/stats/KVStats.cpp +++ b/src/kvstore/stats/KVStats.cpp @@ -12,17 +12,30 @@ namespace nebula { stats::CounterId kCommitLogLatencyUs; stats::CounterId kCommitSnapshotLatencyUs; +stats::CounterId kAppendWalLatencyUs; +stats::CounterId kReplicateLogLatencyUs; +stats::CounterId kAppendLogLatencyUs; stats::CounterId kTransferLeaderLatencyUs; -stats::CounterId kNumRaftVotes; +stats::CounterId kNumStartElect; +stats::CounterId kNumGrantVotes; +stats::CounterId kNumSendSnapshot; void initKVStats() { kCommitLogLatencyUs = stats::StatsManager::registerHisto( "commit_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); kCommitSnapshotLatencyUs = stats::StatsManager::registerHisto( "commit_snapshot_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); + kAppendWalLatencyUs = stats::StatsManager::registerHisto( + "append_wal_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); + kReplicateLogLatencyUs = stats::StatsManager::registerHisto( + "replicate_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); + kAppendLogLatencyUs = stats::StatsManager::registerHisto( + "append_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); kTransferLeaderLatencyUs = stats::StatsManager::registerHisto( "transfer_leader_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); - kNumRaftVotes = stats::StatsManager::registerStats("num_raft_votes", "rate, sum"); + kNumStartElect = stats::StatsManager::registerStats("num_start_elect", "rate, sum"); + kNumGrantVotes = stats::StatsManager::registerStats("num_grant_votes", "rate, sum"); + kNumSendSnapshot = stats::StatsManager::registerStats("num_send_snapshot", "rate, sum"); } } // namespace nebula diff --git a/src/kvstore/stats/KVStats.h b/src/kvstore/stats/KVStats.h index 0cda92a301f..b1bcd283f94 100644 --- a/src/kvstore/stats/KVStats.h +++ b/src/kvstore/stats/KVStats.h @@ -14,8 +14,13 @@ namespace nebula { // Raft related stats extern stats::CounterId kCommitLogLatencyUs; extern stats::CounterId kCommitSnapshotLatencyUs; +extern stats::CounterId kAppendWalLatencyUs; +extern stats::CounterId kReplicateLogLatencyUs; +extern stats::CounterId kAppendLogLatencyUs; extern stats::CounterId kTransferLeaderLatencyUs; -extern stats::CounterId kNumRaftVotes; +extern stats::CounterId kNumStartElect; +extern stats::CounterId kNumGrantVotes; +extern stats::CounterId kNumSendSnapshot; void initKVStats(); diff --git a/src/storage/test/IndexTestUtil.h b/src/storage/test/IndexTestUtil.h index 0b0fa6e10b8..59839cd0e16 100644 --- a/src/storage/test/IndexTestUtil.h +++ b/src/storage/test/IndexTestUtil.h @@ -76,10 +76,14 @@ class MockKVStore : public ::nebula::kvstore::KVStore { void ReleaseSnapshot(GraphSpaceID, PartitionID, const void*) override {} // Read a single key nebula::cpp2::ErrorCode get(GraphSpaceID spaceId, - PartitionID, + PartitionID partId, const std::string& key, std::string* value, - bool) override { + bool canReadFromFollower = false, + const void* snapshot = nullptr) override { + UNUSED(canReadFromFollower); + UNUSED(partId); + UNUSED(snapshot); CHECK_EQ(spaceId, spaceId_); auto iter = kv_.lower_bound(key); if (iter != kv_.end() && iter->first == key) { @@ -129,6 +133,7 @@ class MockKVStore : public ::nebula::kvstore::KVStore { (*iter) = std::move(mockIter); return ::nebula::cpp2::ErrorCode::SUCCEEDED; } + nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, diff --git a/tests/admin/test_configs.py b/tests/admin/test_configs.py index 600d4f0eee2..b9c1e4129f6 100644 --- a/tests/admin/test_configs.py +++ b/tests/admin/test_configs.py @@ -66,7 +66,6 @@ def test_configs(self): expected_result = [ ['GRAPH', 'v', 'int', 'MUTABLE', v], ['GRAPH', 'minloglevel', 'int', 'MUTABLE', 0], - ['GRAPH', 'slow_op_threshold_ms', 'int', 'MUTABLE', 100], ['GRAPH', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1], ['GRAPH', 'meta_client_retry_times', 'int', 'MUTABLE', 3], ['GRAPH', 'accept_partial_success', 'bool', 'MUTABLE', False], @@ -86,7 +85,6 @@ def test_configs(self): ['STORAGE', 'wal_ttl', 'int', 'MUTABLE', 14400], ['STORAGE', 'minloglevel', 'int', 'MUTABLE', 0], ['STORAGE', 'custom_filter_interval_secs', 'int', 'MUTABLE', 86400], - ['STORAGE', 'slow_op_threshold_ms', 'int', 'MUTABLE', 100], ['STORAGE', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1], ['STORAGE', 'meta_client_retry_times', 'int', 'MUTABLE', 3], ['STORAGE', 'rocksdb_db_options', 'map', 'MUTABLE', {}],