diff --git a/resources/gflags.json b/resources/gflags.json index 59d164ea12d..52fd60b2786 100644 --- a/resources/gflags.json +++ b/resources/gflags.json @@ -4,7 +4,6 @@ "v", "heartbeat_interval_secs", "meta_client_retry_times", - "slow_op_threshold_ms", "clean_wal_interval_secs", "wal_ttl", "clean_wal_interval_secs", diff --git a/src/common/base/CMakeLists.txt b/src/common/base/CMakeLists.txt index 22db6d19c43..d32bebf795c 100644 --- a/src/common/base/CMakeLists.txt +++ b/src/common/base/CMakeLists.txt @@ -13,7 +13,6 @@ nebula_add_library( Status.cpp SanitizerOptions.cpp SignalHandler.cpp - SlowOpTracker.cpp ${gdb_debug_script} ) diff --git a/src/common/base/SlowOpTracker.cpp b/src/common/base/SlowOpTracker.cpp deleted file mode 100644 index 0986643d9fc..00000000000 --- a/src/common/base/SlowOpTracker.cpp +++ /dev/null @@ -1,11 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "common/base/SlowOpTracker.h" - -#include "common/base/Base.h" -#include "common/time/WallClock.h" - -DEFINE_int64(slow_op_threshold_ms, 100, "default threshold for slow operation"); diff --git a/src/common/base/SlowOpTracker.h b/src/common/base/SlowOpTracker.h deleted file mode 100644 index 63c86597298..00000000000 --- a/src/common/base/SlowOpTracker.h +++ /dev/null @@ -1,40 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef COMMON_BASE_SLOWOPTRACKER_H_ -#define COMMON_BASE_SLOWOPTRACKER_H_ - -#include "common/base/Base.h" -#include "common/time/WallClock.h" - -DECLARE_int64(slow_op_threshold_ms); - -namespace nebula { - -class SlowOpTracker { - public: - SlowOpTracker() : startMs_(time::WallClock::fastNowInMilliSec()) {} - - ~SlowOpTracker() = default; - - bool slow(int64_t threshold = 0) { - dur_ = time::WallClock::fastNowInMilliSec() - startMs_; - if (dur_ < 0) { - dur_ = 0; - } - return threshold > 0 ? dur_ > threshold : dur_ > FLAGS_slow_op_threshold_ms; - } - - void output(const std::string& prefix, const std::string& msg) { - LOG(INFO) << prefix << "total time:" << dur_ << "ms, " << msg; - } - - private: - int64_t startMs_ = 0; - int64_t dur_ = 0; -}; - -} // namespace nebula -#endif // COMMON_BASE_SLOWOPTRACKER_H_ diff --git a/src/common/base/test/CMakeLists.txt b/src/common/base/test/CMakeLists.txt index 15f89608d67..bfe1e4d5dd0 100644 --- a/src/common/base/test/CMakeLists.txt +++ b/src/common/base/test/CMakeLists.txt @@ -79,13 +79,6 @@ nebula_add_test( LIBRARIES gtest gtest_main ) -nebula_add_test( - NAME slow_op_tracker_test - SOURCES SlowOpTrackerTest.cpp - OBJECTS $ $ - LIBRARIES gtest gtest_main -) - nebula_add_test( NAME lru_test SOURCES ConcurrentLRUCacheTest.cpp diff --git a/src/common/base/test/SlowOpTrackerTest.cpp b/src/common/base/test/SlowOpTrackerTest.cpp deleted file mode 100644 index 5d12762be1e..00000000000 --- a/src/common/base/test/SlowOpTrackerTest.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include - -#include "common/base/Base.h" -#include "common/base/SlowOpTracker.h" - -namespace nebula { - -TEST(SlowOpTrackerTest, SimpleTest) { - SlowOpTracker tracker; - usleep(500000); - CHECK(tracker.slow()); - tracker.output("PREFIX", "This is a prefix msg"); -} - -} // namespace nebula - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - return RUN_ALL_TESTS(); -} diff --git a/src/common/meta/GflagsManager.cpp b/src/common/meta/GflagsManager.cpp index 4661dabef95..9c2ba419c96 100644 --- a/src/common/meta/GflagsManager.cpp +++ b/src/common/meta/GflagsManager.cpp @@ -55,7 +55,6 @@ std::unordered_map> GflagsManager {"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, {"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, {"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}}, - {"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}}, {"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}}, {"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, {"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 71902c12b89..ea8911e79c9 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -117,7 +117,9 @@ class KVEngine { * @param value Pointer of value * @return nebula::cpp2::ErrorCode */ - virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0; + virtual nebula::cpp2::ErrorCode get(const std::string& key, + std::string* value, + const void* snapshot = nullptr) = 0; /** * @brief Read a list of keys diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index f97e47719d4..9a8f56332ad 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -114,6 +114,7 @@ class KVStore { virtual const void* GetSnapshot(GraphSpaceID spaceId, PartitionID partID, bool canReadFromFollower = false) = 0; + /** * @brief Release snapshot. * @@ -137,7 +138,8 @@ class KVStore { PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower = false) = 0; + bool canReadFromFollower = false, + const void* snapshot = nullptr) = 0; /** * @brief Read a list of keys diff --git a/src/kvstore/NebulaSnapshotManager.cpp b/src/kvstore/NebulaSnapshotManager.cpp index 4bff03a0a40..15704e668fd 100644 --- a/src/kvstore/NebulaSnapshotManager.cpp +++ b/src/kvstore/NebulaSnapshotManager.cpp @@ -29,32 +29,61 @@ NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) { void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, PartitionID partId, raftex::SnapshotCallback cb) { - auto rateLimiter = std::make_unique(); - CHECK_NOTNULL(store_); - auto tables = NebulaKeyUtils::snapshotPrefix(partId); + static constexpr LogID kInvalidLogId = -1; + static constexpr TermID kInvalidLogTerm = -1; std::vector data; int64_t totalSize = 0; int64_t totalCount = 0; - LOG(INFO) << folly::sformat( - "Space {} Part {} start send snapshot, rate limited to {}, batch size is {}", - spaceId, - partId, - FLAGS_snapshot_part_rate_limit, - FLAGS_snapshot_batch_size); - - const void* snapshot = store_->GetSnapshot(spaceId, partId); + CHECK_NOTNULL(store_); + auto partRet = store_->part(spaceId, partId); + if (!ok(partRet)) { + LOG(INFO) << folly::sformat("Failed to find space {} part {]", spaceId, partId); + cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); + return; + } + // Create a rocksdb snapshot + auto snapshot = store_->GetSnapshot(spaceId, partId); SCOPE_EXIT { if (snapshot != nullptr) { store_->ReleaseSnapshot(spaceId, partId, snapshot); } }; + auto part = nebula::value(partRet); + // Get the commit log id and commit log term of specified partition + std::string val; + auto commitRet = part->engine()->get(NebulaKeyUtils::systemCommitKey(partId), &val, snapshot); + if (commitRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << folly::sformat( + "Cannot fetch the commit log id and term of space {} part {}", spaceId, partId); + cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); + return; + } + CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID)); + LogID commitLogId; + TermID commitLogTerm; + memcpy(reinterpret_cast(&commitLogId), val.data(), sizeof(LogID)); + memcpy(reinterpret_cast(&commitLogTerm), val.data() + sizeof(LogID), sizeof(TermID)); + + LOG(INFO) << folly::sformat( + "Space {} Part {} start send snapshot of commitLogId {} commitLogTerm {}, rate limited to " + "{}, batch size is {}", + spaceId, + partId, + commitLogId, + commitLogTerm, + FLAGS_snapshot_part_rate_limit, + FLAGS_snapshot_batch_size); + auto rateLimiter = std::make_unique(); + auto tables = NebulaKeyUtils::snapshotPrefix(partId); for (const auto& prefix : tables) { if (!accessTable(spaceId, partId, snapshot, prefix, cb, + commitLogId, + commitLogTerm, data, totalCount, totalSize, @@ -62,7 +91,7 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId, return; } } - cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE); + cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::DONE); } // Promise is set in callback. Access part of the data, and try to send to @@ -72,6 +101,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, + LogID commitLogId, + TermID commitLogTerm, std::vector& data, int64_t& totalCount, int64_t& totalSize, @@ -81,7 +112,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { VLOG(2) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed" << ", error code:" << static_cast(ret); - cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); + cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED); return false; } data.reserve(kReserveNum); @@ -91,7 +122,12 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId, rateLimiter->consume(static_cast(batchSize), // toConsume static_cast(FLAGS_snapshot_part_rate_limit), // rate static_cast(FLAGS_snapshot_part_rate_limit)); // burstSize - if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) { + if (cb(commitLogId, + commitLogTerm, + data, + totalCount, + totalSize, + raftex::SnapshotStatus::IN_PROGRESS)) { data.clear(); batchSize = 0; } else { diff --git a/src/kvstore/NebulaSnapshotManager.h b/src/kvstore/NebulaSnapshotManager.h index 1d1bced436c..73447ccbead 100644 --- a/src/kvstore/NebulaSnapshotManager.h +++ b/src/kvstore/NebulaSnapshotManager.h @@ -50,6 +50,8 @@ class NebulaSnapshotManager : public raftex::SnapshotManager { const void* snapshot, const std::string& prefix, raftex::SnapshotCallback& cb, + LogID commitLogId, + TermID commitLogTerm, std::vector& data, int64_t& totalCount, int64_t& totalSize, diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index dd678cad9c7..09c5568a01f 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -621,7 +621,8 @@ nebula::cpp2::ErrorCode NebulaStore::getFromKVEngine(GraphSpaceID spaceId, PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower) { + bool canReadFromFollower, + const void* snapshot) { auto ret = part(spaceId, partId); if (!ok(ret)) { return error(ret); @@ -631,17 +632,19 @@ nebula::cpp2::ErrorCode NebulaStore::getFromKVEngine(GraphSpaceID spaceId, return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED : nebula::cpp2::ErrorCode::E_LEADER_CHANGED; } - return part->engine()->get(key, value); + return part->engine()->get(key, value, snapshot); } nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower) { + bool canReadFromFollower, + const void* snapshot) { // Currently nebula only reads from leader. We may have to revisit this if we support reading from // follower. - if (storageCache_ && (storageCache_->vertexPoolExists() || storageCache_->emptyKeyPoolExists())) { + if (storageCache_ && (storageCache_->vertexPoolExists() || storageCache_->emptyKeyPoolExists()) && + snapshot == nullptr) { auto cacheKey = NebulaKeyUtils::cacheKey(spaceId, key); auto exist = storageCache_->getVertexProp(cacheKey, value); if (exist) { @@ -654,7 +657,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, } } else { // cache miss - auto ret = getFromKVEngine(spaceId, partId, key, value, canReadFromFollower); + auto ret = getFromKVEngine(spaceId, partId, key, value, canReadFromFollower, nullptr); if (storageCache_->vertexPoolExists() && ret == nebula::cpp2::ErrorCode::SUCCEEDED) { // write to vertex pool when the tag is found folly::Baton baton; @@ -676,7 +679,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId, } } - return getFromKVEngine(spaceId, partId, key, value, canReadFromFollower); + return getFromKVEngine(spaceId, partId, key, value, canReadFromFollower, snapshot); } const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId, diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 4623f6dff3a..c7123825155 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -227,7 +227,8 @@ class NebulaStore : public KVStore, public Handler { PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower = false) override; + bool canReadFromFollower = false, + const void* snapshot = nullptr) override; /** * @brief Read a list of keys @@ -843,7 +844,8 @@ class NebulaStore : public KVStore, public Handler { PartitionID partId, const std::string& key, std::string* value, - bool canReadFromFollower); + bool canReadFromFollower, + const void* snapshot); private: // The lock used to protect spaces_ diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index cfaccb009b4..44c4f1caff5 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -331,26 +331,12 @@ std::tuple Part::commitLogs( } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - commitTransLeader(newLeader); - } else { - VLOG(1) << idStr_ << "Skip commit stale transfer leader " << newLeader - << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " - << ts; - } + commitTransLeader(newLeader); break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - commitRemovePeer(peer); - } else { - VLOG(1) << idStr_ << "Skip commit stale remove peer " << peer - << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " - << ts; - } + commitRemovePeer(peer); break; } default: { @@ -434,51 +420,26 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const switch (log[sizeof(int64_t)]) { case OP_ADD_LEARNER: { auto learner = decodeHost(OP_ADD_LEARNER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess add learner " << learner; - addLearner(learner); - } else { - VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at " - << startTimeMs_ << ", but the log timestamp is " << ts; - } + LOG(INFO) << idStr_ << "preprocess add learner " << learner; + addLearner(learner); break; } case OP_TRANS_LEADER: { auto newLeader = decodeHost(OP_TRANS_LEADER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess trans leader " << newLeader; - preProcessTransLeader(newLeader); - } else { - VLOG(1) << idStr_ << "Skip stale transfer leader " << newLeader - << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " - << ts; - } + LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader; + preProcessTransLeader(newLeader); break; } case OP_ADD_PEER: { auto peer = decodeHost(OP_ADD_PEER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess add peer " << peer; - addPeer(peer); - } else { - VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at " - << startTimeMs_ << ", but the log timestamp is " << ts; - } + LOG(INFO) << idStr_ << "preprocess add peer " << peer; + addPeer(peer); break; } case OP_REMOVE_PEER: { auto peer = decodeHost(OP_REMOVE_PEER, log); - auto ts = getTimestamp(log); - if (ts > startTimeMs_) { - VLOG(1) << idStr_ << "preprocess remove peer " << peer; - preProcessRemovePeer(peer); - } else { - VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at " - << startTimeMs_ << ", but the log timestamp is " << ts; - } + LOG(INFO) << idStr_ << "preprocess remove peer " << peer; + preProcessRemovePeer(peer); break; } default: { diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 43e92cca92f..49695483171 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -175,8 +175,13 @@ nebula::cpp2::ErrorCode RocksEngine::commitBatchWrite(std::unique_ptr(snapshot); + } rocksdb::Status status = db_->Get(options, rocksdb::Slice(key), value); if (status.ok()) { return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -240,7 +245,7 @@ nebula::cpp2::ErrorCode RocksEngine::prefixWithExtractor(const std::string& pref const void* snapshot, std::unique_ptr* storageIter) { rocksdb::ReadOptions options; - if (snapshot != nullptr) { + if (UNLIKELY(snapshot != nullptr)) { options.snapshot = reinterpret_cast(snapshot); } options.prefix_same_as_start = true; diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 0fb9ecf2c11..62fd2283d8d 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -220,7 +220,9 @@ class RocksEngine : public KVEngine { * @param value Pointer of value * @return nebula::cpp2::ErrorCode */ - nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) override; + nebula::cpp2::ErrorCode get(const std::string& key, + std::string* value, + const void* snapshot = nullptr) override; /** * @brief Read a list of keys diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 685c70ebd28..499752f846e 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -10,7 +10,10 @@ #include #include "common/network/NetworkUtils.h" +#include "common/stats/StatsManager.h" +#include "common/time/WallClock.h" #include "kvstore/raftex/RaftPart.h" +#include "kvstore/stats/KVStats.h" #include "kvstore/wal/FileBasedWal.h" DEFINE_uint32(max_appendlog_batch_size, @@ -158,9 +161,12 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) { void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr req) { using TransportException = apache::thrift::transport::TTransportException; + auto beforeRpcUs = time::WallClock::fastNowInMicroSec(); sendAppendLogRequest(eb, req) .via(eb) - .thenValue([eb, self = shared_from_this()](cpp2::AppendLogResponse&& resp) { + .thenValue([eb, beforeRpcUs, self = shared_from_this()](cpp2::AppendLogResponse&& resp) { + stats::StatsManager::addValue(kAppendLogLatencyUs, + time::WallClock::fastNowInMicroSec() - beforeRpcUs); VLOG_IF(1, FLAGS_trace_raft) << self->idStr_ << "AppendLogResponse " << "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) << ", currTerm " @@ -333,6 +339,7 @@ nebula::cpp2::ErrorCode Host::startSendSnapshot() { << ", firstLogId in wal = " << part_->wal()->firstLogId() << ", lastLogId in wal = " << part_->wal()->lastLogId(); sendingSnapshot_ = true; + stats::StatsManager::addValue(kNumSendSnapshot); part_->snapshot_->sendSnapshot(part_, addr_) .thenValue([self = shared_from_this()](auto&& status) { std::lock_guard g(self->lock_); diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 34d3c7e7f42..d2eb5ac2e51 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -12,11 +12,11 @@ #include "common/base/Base.h" #include "common/base/CollectNSucceeded.h" -#include "common/base/SlowOpTracker.h" #include "common/network/NetworkUtils.h" #include "common/stats/StatsManager.h" #include "common/thread/NamedThread.h" #include "common/thrift/ThriftClientManager.h" +#include "common/time/ScopedTimer.h" #include "common/time/WallClock.h" #include "common/utils/LogStrListIterator.h" #include "interface/gen-cpp2/RaftexServiceAsyncClient.h" @@ -741,18 +741,18 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { prevLogTerm = lastLogTerm_; committed = committedLogId_; // Step 1: Write WAL - SlowOpTracker tracker; - if (!wal_->appendLogs(iter)) { - VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL"; - res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL; - lastLogId_ = wal_->lastLogId(); - lastLogTerm_ = wal_->lastLogTerm(); - break; + { + SCOPED_TIMER(&execTime_); + if (!wal_->appendLogs(iter)) { + VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL"; + res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL; + lastLogId_ = wal_->lastLogId(); + lastLogTerm_ = wal_->lastLogTerm(); + break; + } } + stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_); lastId = wal_->lastLogId(); - if (tracker.slow()) { - tracker.output(idStr_, folly::stringPrintf("Write WAL, total %ld", lastId - prevLogId + 1)); - } VLOG(4) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId << "] to WAL"; } while (false); @@ -797,7 +797,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb, << iter.firstLogId() << ", " << lastLogId << "] to all peer hosts"; lastMsgSentDur_.reset(); - SlowOpTracker tracker; + auto beforeAppendLogUs = time::WallClock::fastNowInMicroSec(); collectNSucceeded(gen::from(hosts) | gen::map([self = shared_from_this(), eb, @@ -830,13 +830,11 @@ void RaftPart::replicateLogs(folly::EventBase* eb, prevLogId, prevLogTerm, pHosts = std::move(hosts), - tracker](folly::Try&& result) mutable { + beforeAppendLogUs](folly::Try&& result) mutable { VLOG(4) << self->idStr_ << "Received enough response"; CHECK(!result.hasException()); - if (tracker.slow()) { - tracker.output(self->idStr_, - folly::stringPrintf("Total send logs: %ld", lastLogId - prevLogId + 1)); - } + stats::StatsManager::addValue(kReplicateLogLatencyUs, + time::WallClock::fastNowInMicroSec() - beforeAppendLogUs); self->processAppendLogResponses(*result, eb, std::move(it), @@ -918,7 +916,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, { auto walIt = wal_->iterator(committedId + 1, lastLogId); - SlowOpTracker tracker; // Step 3: Commit the batch auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), true); if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -938,10 +935,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, } else { LOG(FATAL) << idStr_ << "Failed to commit logs"; } - if (tracker.slow()) { - tracker.output(idStr_, - folly::stringPrintf("Total commit: %ld", committedLogId_ - committedId)); - } VLOG(4) << idStr_ << "Leader succeeded in committing the logs " << committedId + 1 << " to " << lastLogId; } @@ -1190,6 +1183,7 @@ folly::Future RaftPart::leaderElection(bool isPreVote) { auto proposedTerm = voteReq.get_term(); auto resps = ElectionResponses(); + stats::StatsManager::addValue(kNumStartElect); if (hosts.empty()) { auto ret = handleElectionResponses(resps, hosts, proposedTerm, isPreVote); inElection_ = false; @@ -1467,7 +1461,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, // Reset the last message time lastMsgRecvDur_.reset(); isBlindFollower_ = false; - stats::StatsManager::addValue(kNumRaftVotes); + stats::StatsManager::addValue(kNumGrantVotes); return; } @@ -1590,7 +1584,13 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, std::make_move_iterator(req.get_log_str_list().begin() + diffIndex), std::make_move_iterator(req.get_log_str_list().end())); RaftLogIterator logIter(firstId, std::move(logEntries)); - if (wal_->appendLogs(logIter)) { + bool result = false; + { + SCOPED_TIMER(&execTime_); + result = wal_->appendLogs(logIter); + } + stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_); + if (result) { CHECK_EQ(lastId, wal_->lastLogId()); lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp index 0182d46a3ad..576f76e3cc9 100644 --- a/src/kvstore/raftex/SnapshotManager.cpp +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -34,19 +34,13 @@ folly::Future>> SnapshotManager::sendSnapshot( auto spaceId = part->spaceId_; auto partId = part->partId_; auto termId = part->term_; - // TODO(heng): maybe the committedLogId is less than the real one in the - // snapshot. It will not loss the data, but maybe some record will be - // committed twice. - auto commitLogIdAndTerm = part->lastCommittedLogId(); const auto& localhost = part->address(); - std::vector> results; - VLOG(1) << part->idStr_ << "Begin to send the snapshot to the host " << dst - << ", commitLogId = " << commitLogIdAndTerm.first - << ", commitLogTerm = " << commitLogIdAndTerm.second; accessAllRowsInSnapshot( spaceId, partId, - [&, this, p = std::move(p)](const std::vector& data, + [&, this, p = std::move(p)](LogID commitLogId, + TermID commitLogTerm, + const std::vector& data, int64_t totalCount, int64_t totalSize, SnapshotStatus status) mutable -> bool { @@ -60,8 +54,8 @@ folly::Future>> SnapshotManager::sendSnapshot( auto f = send(spaceId, partId, termId, - commitLogIdAndTerm.first, - commitLogIdAndTerm.second, + commitLogId, + commitLogTerm, localhost, data, totalSize, @@ -77,7 +71,7 @@ folly::Future>> SnapshotManager::sendSnapshot( if (status == SnapshotStatus::DONE) { VLOG(1) << part->idStr_ << "Finished, totalCount " << totalCount << ", totalSize " << totalSize; - p.setValue(commitLogIdAndTerm); + p.setValue(std::make_pair(commitLogId, commitLogTerm)); } return true; } else { diff --git a/src/kvstore/raftex/SnapshotManager.h b/src/kvstore/raftex/SnapshotManager.h index fc082708c46..ff15e1de07e 100644 --- a/src/kvstore/raftex/SnapshotManager.h +++ b/src/kvstore/raftex/SnapshotManager.h @@ -26,7 +26,9 @@ enum SnapshotStatus { }; // Return false if send snapshot failed, will not send the rest of it. -using SnapshotCallback = folly::Function& rows, +using SnapshotCallback = folly::Function& rows, int64_t totalCount, int64_t totalSize, SnapshotStatus status)>; diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index 518ccf67b55..868f44aed7e 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -160,13 +160,19 @@ class NebulaSnapshotManager : public SnapshotManager { int64_t totalCount = 0; int64_t totalSize = 0; std::vector data; + auto commitLogIdAndTerm = part->lastCommittedLogId(); folly::RWSpinLock::ReadHolder rh(&part->lock_); for (auto& row : part->data_) { if (data.size() > 100) { LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() << ", total count sended " << totalCount << ", total size sended " << totalSize << ", finished false"; - cb(data, totalCount, totalSize, SnapshotStatus::IN_PROGRESS); + cb(commitLogIdAndTerm.first, + commitLogIdAndTerm.second, + data, + totalCount, + totalSize, + SnapshotStatus::IN_PROGRESS); data.clear(); } auto encoded = encodeSnapshotRow(row.first, row.second); @@ -177,7 +183,12 @@ class NebulaSnapshotManager : public SnapshotManager { LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() << ", total count sended " << totalCount << ", total size sended " << totalSize << ", finished true"; - cb(data, totalCount, totalSize, SnapshotStatus::DONE); + cb(commitLogIdAndTerm.first, + commitLogIdAndTerm.second, + data, + totalCount, + totalSize, + SnapshotStatus::DONE); } RaftexService* service_; diff --git a/src/kvstore/stats/KVStats.cpp b/src/kvstore/stats/KVStats.cpp index 6b8996f9b58..a6a152e404e 100644 --- a/src/kvstore/stats/KVStats.cpp +++ b/src/kvstore/stats/KVStats.cpp @@ -12,17 +12,30 @@ namespace nebula { stats::CounterId kCommitLogLatencyUs; stats::CounterId kCommitSnapshotLatencyUs; +stats::CounterId kAppendWalLatencyUs; +stats::CounterId kReplicateLogLatencyUs; +stats::CounterId kAppendLogLatencyUs; stats::CounterId kTransferLeaderLatencyUs; -stats::CounterId kNumRaftVotes; +stats::CounterId kNumStartElect; +stats::CounterId kNumGrantVotes; +stats::CounterId kNumSendSnapshot; void initKVStats() { kCommitLogLatencyUs = stats::StatsManager::registerHisto( "commit_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); kCommitSnapshotLatencyUs = stats::StatsManager::registerHisto( "commit_snapshot_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); + kAppendWalLatencyUs = stats::StatsManager::registerHisto( + "append_wal_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); + kReplicateLogLatencyUs = stats::StatsManager::registerHisto( + "replicate_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); + kAppendLogLatencyUs = stats::StatsManager::registerHisto( + "append_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); kTransferLeaderLatencyUs = stats::StatsManager::registerHisto( "transfer_leader_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999"); - kNumRaftVotes = stats::StatsManager::registerStats("num_raft_votes", "rate, sum"); + kNumStartElect = stats::StatsManager::registerStats("num_start_elect", "rate, sum"); + kNumGrantVotes = stats::StatsManager::registerStats("num_grant_votes", "rate, sum"); + kNumSendSnapshot = stats::StatsManager::registerStats("num_send_snapshot", "rate, sum"); } } // namespace nebula diff --git a/src/kvstore/stats/KVStats.h b/src/kvstore/stats/KVStats.h index 0cda92a301f..b1bcd283f94 100644 --- a/src/kvstore/stats/KVStats.h +++ b/src/kvstore/stats/KVStats.h @@ -14,8 +14,13 @@ namespace nebula { // Raft related stats extern stats::CounterId kCommitLogLatencyUs; extern stats::CounterId kCommitSnapshotLatencyUs; +extern stats::CounterId kAppendWalLatencyUs; +extern stats::CounterId kReplicateLogLatencyUs; +extern stats::CounterId kAppendLogLatencyUs; extern stats::CounterId kTransferLeaderLatencyUs; -extern stats::CounterId kNumRaftVotes; +extern stats::CounterId kNumStartElect; +extern stats::CounterId kNumGrantVotes; +extern stats::CounterId kNumSendSnapshot; void initKVStats(); diff --git a/src/storage/test/IndexTestUtil.h b/src/storage/test/IndexTestUtil.h index 0b0fa6e10b8..59839cd0e16 100644 --- a/src/storage/test/IndexTestUtil.h +++ b/src/storage/test/IndexTestUtil.h @@ -76,10 +76,14 @@ class MockKVStore : public ::nebula::kvstore::KVStore { void ReleaseSnapshot(GraphSpaceID, PartitionID, const void*) override {} // Read a single key nebula::cpp2::ErrorCode get(GraphSpaceID spaceId, - PartitionID, + PartitionID partId, const std::string& key, std::string* value, - bool) override { + bool canReadFromFollower = false, + const void* snapshot = nullptr) override { + UNUSED(canReadFromFollower); + UNUSED(partId); + UNUSED(snapshot); CHECK_EQ(spaceId, spaceId_); auto iter = kv_.lower_bound(key); if (iter != kv_.end() && iter->first == key) { @@ -129,6 +133,7 @@ class MockKVStore : public ::nebula::kvstore::KVStore { (*iter) = std::move(mockIter); return ::nebula::cpp2::ErrorCode::SUCCEEDED; } + nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, diff --git a/tests/admin/test_configs.py b/tests/admin/test_configs.py index 600d4f0eee2..b9c1e4129f6 100644 --- a/tests/admin/test_configs.py +++ b/tests/admin/test_configs.py @@ -66,7 +66,6 @@ def test_configs(self): expected_result = [ ['GRAPH', 'v', 'int', 'MUTABLE', v], ['GRAPH', 'minloglevel', 'int', 'MUTABLE', 0], - ['GRAPH', 'slow_op_threshold_ms', 'int', 'MUTABLE', 100], ['GRAPH', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1], ['GRAPH', 'meta_client_retry_times', 'int', 'MUTABLE', 3], ['GRAPH', 'accept_partial_success', 'bool', 'MUTABLE', False], @@ -86,7 +85,6 @@ def test_configs(self): ['STORAGE', 'wal_ttl', 'int', 'MUTABLE', 14400], ['STORAGE', 'minloglevel', 'int', 'MUTABLE', 0], ['STORAGE', 'custom_filter_interval_secs', 'int', 'MUTABLE', 86400], - ['STORAGE', 'slow_op_threshold_ms', 'int', 'MUTABLE', 100], ['STORAGE', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1], ['STORAGE', 'meta_client_retry_times', 'int', 'MUTABLE', 3], ['STORAGE', 'rocksdb_db_options', 'map', 'MUTABLE', {}],