Skip to content

Commit

Permalink
fix updatePart (#4139)
Browse files Browse the repository at this point in the history
* fix updatePart

* fix format check

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
critical27 and Sophie-Xie committed Apr 13, 2022
1 parent f416f99 commit d2ac7de
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
3 changes: 2 additions & 1 deletion src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ class KVEngine {
*
* @param partId
* @param raftPeer
* @return nebula::cpp2::ErrorCode
*/
virtual void updatePart(PartitionID partId, const Peer& raftPeer) = 0;
virtual nebula::cpp2::ErrorCode updatePart(PartitionID partId, const Peer& raftPeer) = 0;

/**
* @brief Remove a partition from kv engine
Expand Down
15 changes: 12 additions & 3 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,10 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const
VLOG(1) << idStr_ << "preprocess add learner " << learner;
addLearner(learner, false);
// persist the part learner info in case of storaged restarting
engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner));
auto ret = engine_->updatePart(partId_, Peer(learner, Peer::Status::kLearner));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return false;
}
} else {
VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
Expand All @@ -443,7 +446,10 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer));
auto ret = engine_->updatePart(partId_, Peer(peer, Peer::Status::kPromotedPeer));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return false;
}
} else {
VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
Expand All @@ -457,7 +463,10 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const
VLOG(1) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
// remove peer in the persist info
engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted));
auto ret = engine_->updatePart(partId_, Peer(peer, Peer::Status::kDeleted));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return false;
}
} else {
VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
Expand Down
16 changes: 12 additions & 4 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,18 +339,25 @@ void RocksEngine::addPart(PartitionID partId, const Peers& raftPeers) {
}
}

void RocksEngine::updatePart(PartitionID partId, const Peer& raftPeer) {
nebula::cpp2::ErrorCode RocksEngine::updatePart(PartitionID partId, const Peer& raftPeer) {
std::string val;
auto ret = get(balanceKey(partId), &val);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {

Peers peers;
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
peers = Peers::fromString(val);
} else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
// do nothing
} else {
LOG(INFO) << "Update part failed when get, partId=" << partId;
return;
return ret;
}

auto peers = Peers::fromString(val);
peers.addOrUpdate(raftPeer);

if (peers.allNormalPeers()) {
// When all replica become normal peers, delete this temp key. (when learner is promoted to
// normal replica)
ret = remove(balanceKey(partId));
} else {
ret = put(balanceKey(partId), peers.toString());
Expand All @@ -359,6 +366,7 @@ void RocksEngine::updatePart(PartitionID partId, const Peer& raftPeer) {
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Update part failed when put back, partId=" << partId;
}
return ret;
}

void RocksEngine::removePart(PartitionID partId) {
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,9 @@ class RocksEngine : public KVEngine {
*
* @param partId
* @param raftPeer
*
* @return nebula::cpp2::ErrorCode
*/
void updatePart(PartitionID partId, const Peer& raftPeer) override;
nebula::cpp2::ErrorCode updatePart(PartitionID partId, const Peer& raftPeer) override;

/**
* @brief Remove the part key from rocksdb
Expand Down

0 comments on commit d2ac7de

Please sign in to comment.