From 0b2eb757c45b198332e4c77678b46cb72a49bba6 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Tue, 12 Apr 2022 15:40:46 +0800 Subject: [PATCH] fix updatePart --- src/kvstore/KVEngine.h | 3 ++- src/kvstore/Part.cpp | 15 ++++++++++++--- src/kvstore/RocksEngine.cpp | 16 ++++++++++++---- src/kvstore/RocksEngine.h | 4 ++-- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index d3a98d044cc..a91659991d6 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -232,8 +232,9 @@ class KVEngine { * * @param partId * @param raftPeer + * @return nebula::cpp2::ErrorCode */ - virtual void updatePart(PartitionID partId, const Peer& raftPeer) = 0; + virtual nebula::cpp2::ErrorCode updatePart(PartitionID partId, const Peer& raftPeer) = 0; /** * @brief Remove a partition from kv engine diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 92fe487b6c8..288f2641bad 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -417,7 +417,10 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const VLOG(1) << idStr_ << "preprocess add learner " << learner; addLearner(learner, false); // persist the part learner info in case of storaged restarting - engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner)); + auto ret = engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner)); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return false; + } } else { VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " << ts; @@ -443,7 +446,10 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const if (ts > startTimeMs_) { VLOG(1) << idStr_ << "preprocess add peer " << peer; addPeer(peer); - engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer)); + auto ret = engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer)); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return false; + } } else { VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " << ts; @@ -457,7 +463,10 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const VLOG(1) << idStr_ << "preprocess remove peer " << peer; preProcessRemovePeer(peer); // remove peer in the persist info - engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted)); + auto ret = engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted)); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return false; + } } else { VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at " << startTimeMs_ << ", but the log timestamp is " << ts; diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index 7f45f4bb409..acd740c3dd5 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -339,18 +339,25 @@ void RocksEngine::addPart(PartitionID partId, const Peers& raftPeers) { } } -void RocksEngine::updatePart(PartitionID partId, const Peer& raftPeer) { +nebula::cpp2::ErrorCode RocksEngine::updatePart(PartitionID partId, const Peer& raftPeer) { std::string val; auto ret = get(balanceKey(partId), &val); - if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + + Peers peers; + if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { + peers = Peers::fromString(val); + } else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { + // do nothing + } else { LOG(INFO) << "Update part failed when get, partId=" << partId; - return; + return ret; } - auto peers = Peers::fromString(val); peers.addOrUpdate(raftPeer); if (peers.allNormalPeers()) { + // When all replica become normal peers, delete this temp key. (when learner is promoted to + // normal replica) ret = remove(balanceKey(partId)); } else { ret = put(balanceKey(partId), peers.toString()); @@ -359,6 +366,7 @@ void RocksEngine::updatePart(PartitionID partId, const Peer& raftPeer) { if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Update part failed when put back, partId=" << partId; } + return ret; } void RocksEngine::removePart(PartitionID partId) { diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 9cc9b057301..2de38333efe 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -405,9 +405,9 @@ class RocksEngine : public KVEngine { * * @param partId * @param raftPeer - * + * @return nebula::cpp2::ErrorCode */ - void updatePart(PartitionID partId, const Peer& raftPeer) override; + nebula::cpp2::ErrorCode updatePart(PartitionID partId, const Peer& raftPeer) override; /** * @brief Remove the part key from rocksdb