diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index 17b53dfd590..49298eac9d3 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -134,6 +134,8 @@ class Listener : public raftex::RaftPart { void onElected(TermID) override { LOG(FATAL) << "Should not reach here"; } + void onLeaderReady(TermID) override { LOG(FATAL) << "Should not reach here"; } + void onDiscoverNewLeader(HostAddr nLeader) override { LOG(INFO) << idStr_ << "Find the new leader " << nLeader; } diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 1998caec4ac..2fc49382be6 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -176,23 +176,22 @@ void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for t void Part::onElected(TermID term) { VLOG(1) << "Being elected as the leader for the term: " << term; - if (onElectedCallBacks_.empty()) { - return; - } +} + +void Part::onLeaderReady(TermID term) { + VLOG(1) << "leader ready to server for the term: " << term; CallbackOptions opt; opt.spaceId = spaceId_; opt.partId = partId_; opt.term = term_; - for (auto& cb : onElectedCallBacks_) { + for (auto& cb : leaderReadyCB_) { cb(opt); } } -void Part::registerOnElected(OnElectedCallBack cb) { - onElectedCallBacks_.emplace_back(std::move(cb)); -} +void Part::registerOnLeaderReady(LeaderReadyCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); } void Part::onDiscoverNewLeader(HostAddr nLeader) { LOG(INFO) << idStr_ << "Find the new leader " << nLeader; diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index 65bd47f2dc7..7b03ae79e9a 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -85,6 +85,8 @@ class Part : public raftex::RaftPart { void onElected(TermID term) override; + void onLeaderReady(TermID term) override; + void onDiscoverNewLeader(HostAddr nLeader) override; cpp2::ErrorCode commitLogs(std::unique_ptr iter, bool wait) override; @@ -114,15 +116,15 @@ class Part : public raftex::RaftPart { TermID term; }; - using OnElectedCallBack = std::function; - void registerOnElected(OnElectedCallBack cb); + using LeaderReadyCB = std::function; + void registerOnLeaderReady(LeaderReadyCB cb); protected: GraphSpaceID spaceId_; PartitionID partId_; std::string walPath_; NewLeaderCallback newLeaderCb_ = nullptr; - std::vector onElectedCallBacks_; + std::vector leaderReadyCB_; private: KVEngine* engine_ = nullptr; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index d610a8f1579..bdb289363d1 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -870,7 +870,11 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, firstLogId = lastLogId_ + 1; lastMsgAcceptedCostMs_ = lastMsgSentDur_.elapsedInMSec(); lastMsgAcceptedTime_ = time::WallClock::fastNowInMilliSec(); - commitInThisTerm_ = true; + if (!commitInThisTerm_) { + commitInThisTerm_ = true; + bgWorkers_->addTask( + [self = shared_from_this(), term = term_] { self->onLeaderReady(term); }); + } } else { LOG(FATAL) << idStr_ << "Failed to commit logs"; } diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 7e09f6eb493..9d3505626f0 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -269,6 +269,11 @@ class RaftPart : public std::enable_shared_from_this { // a new leader virtual void onElected(TermID term) = 0; + // called after leader committed first log + // (a little bit later onElected) + // leader need to set some internal status after elected. + virtual void onLeaderReady(TermID term) = 0; + virtual void onDiscoverNewLeader(HostAddr nLeader) = 0; // Check if we can accept candidate's message diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index d7c4683e1b0..ec24b716b03 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -162,6 +162,8 @@ void TestShard::onElected(TermID term) { } } +void TestShard::onLeaderReady(TermID term) { UNUSED(term); } + nebula::cpp2::ErrorCode TestShard::commitLogs(std::unique_ptr iter, bool) { LogID firstId = -1; LogID lastId = -1; diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index f1eff48bdfc..13bc27fd619 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -72,6 +72,7 @@ class TestShard : public RaftPart { void onLostLeadership(TermID term) override; void onElected(TermID term) override; + void onLeaderReady(TermID term) override; void onDiscoverNewLeader(HostAddr) override {} nebula::cpp2::ErrorCode commitLogs(std::unique_ptr iter, bool wait) override; diff --git a/src/storage/transaction/ChainUpdateEdgeProcessorLocal.cpp b/src/storage/transaction/ChainUpdateEdgeProcessorLocal.cpp index 8f7a8fb38db..c505817ee9c 100644 --- a/src/storage/transaction/ChainUpdateEdgeProcessorLocal.cpp +++ b/src/storage/transaction/ChainUpdateEdgeProcessorLocal.cpp @@ -64,7 +64,12 @@ folly::SemiFuture ChainUpdateEdgeProcessorLocal::prepareLocal() { std::vector data{{key, val}}; auto c = folly::makePromiseContract(); env_->kvstore_->asyncMultiPut( - spaceId_, partId_, std::move(data), [p = std::move(c.first)](auto rc) mutable { + spaceId_, partId_, std::move(data), [p = std::move(c.first), this](auto rc) mutable { + if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { + primeInserted_ = true; + } else { + VLOG(1) << "kvstore err: " << apache::thrift::util::enumNameSafe(rc); + } p.setValue(rc); }); return std::move(c.second); @@ -100,7 +105,9 @@ folly::SemiFuture ChainUpdateEdgeProcessorLocal::processLocal(Code code) { erasePrime(); forwardToDelegateProcessor(); } else { - abort(); + if (primeInserted_) { + abort(); + } } return code_; diff --git a/src/storage/transaction/ChainUpdateEdgeProcessorLocal.h b/src/storage/transaction/ChainUpdateEdgeProcessorLocal.h index 4299efb7647..c31e395bd3a 100644 --- a/src/storage/transaction/ChainUpdateEdgeProcessorLocal.h +++ b/src/storage/transaction/ChainUpdateEdgeProcessorLocal.h @@ -86,6 +86,10 @@ class ChainUpdateEdgeProcessorLocal PartitionID partId_; int retryLimit_{10}; TermID termOfPrepare_{-1}; + + // set to true when prime insert succeed + // in processLocal(), we check this to determine if need to do abort() + bool primeInserted_{false}; std::vector kvErased_; std::vector kvAppend_; folly::Optional ver_{folly::none}; diff --git a/src/storage/transaction/TransactionManager.cpp b/src/storage/transaction/TransactionManager.cpp index 2079f62af6f..a1c716df319 100644 --- a/src/storage/transaction/TransactionManager.cpp +++ b/src/storage/transaction/TransactionManager.cpp @@ -152,7 +152,7 @@ void TransactionManager::scanAll() { void TransactionManager::onNewPartAdded(std::shared_ptr& part) { LOG(INFO) << folly::sformat("space={}, part={} added", part->spaceId(), part->partitionId()); auto fn = std::bind(&TransactionManager::onLeaderElectedWrapper, this, std::placeholders::_1); - part->registerOnElected(fn); + part->registerOnLeaderReady(fn); } void TransactionManager::onLeaderElectedWrapper( @@ -169,8 +169,12 @@ void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId) { if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { for (; iter->valid(); iter->next()) { auto edgeKey = ConsistUtil::edgeKeyFromPrime(iter->key()); + VLOG(1) << "scanned edgekey: " << folly::hexlify(edgeKey); auto lockKey = makeLockKey(spaceId, edgeKey.str()); - reserveTable_.insert(std::make_pair(lockKey, ResumeType::RESUME_CHAIN)); + auto insSucceed = reserveTable_.insert(std::make_pair(lockKey, ResumeType::RESUME_CHAIN)); + if (!insSucceed.second) { + LOG(ERROR) << "not supposed to insert fail: " << folly::hexlify(edgeKey); + } auto* lk = getLockCore(spaceId, partId, false); auto succeed = lk->try_lock(edgeKey.str()); if (!succeed) { @@ -184,7 +188,10 @@ void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId) { for (; iter->valid(); iter->next()) { auto edgeKey = ConsistUtil::edgeKeyFromDoublePrime(iter->key()); auto lockKey = makeLockKey(spaceId, edgeKey.str()); - reserveTable_.insert(std::make_pair(lockKey, ResumeType::RESUME_REMOTE)); + auto insSucceed = reserveTable_.insert(std::make_pair(lockKey, ResumeType::RESUME_REMOTE)); + if (!insSucceed.second) { + LOG(ERROR) << "not supposed to insert fail: " << folly::hexlify(edgeKey); + } auto* lk = getLockCore(spaceId, partId); auto succeed = lk->try_lock(edgeKey.str()); if (!succeed) {