Skip to content

Commit

Permalink
split onElected and onLeaderReady (#3003)
Browse files Browse the repository at this point in the history
* split onElected and onLeaderReady

* fix test compile error

* format
  • Loading branch information
liuyu85cn committed Oct 6, 2021
1 parent 3c7d00e commit 9462d35
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 16 deletions.
2 changes: 2 additions & 0 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class Listener : public raftex::RaftPart {

void onElected(TermID) override { LOG(FATAL) << "Should not reach here"; }

void onLeaderReady(TermID) override { LOG(FATAL) << "Should not reach here"; }

void onDiscoverNewLeader(HostAddr nLeader) override {
LOG(INFO) << idStr_ << "Find the new leader " << nLeader;
}
Expand Down
13 changes: 6 additions & 7 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,22 @@ void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for t

void Part::onElected(TermID term) {
VLOG(1) << "Being elected as the leader for the term: " << term;
if (onElectedCallBacks_.empty()) {
return;
}
}

void Part::onLeaderReady(TermID term) {
VLOG(1) << "leader ready to server for the term: " << term;

CallbackOptions opt;
opt.spaceId = spaceId_;
opt.partId = partId_;
opt.term = term_;

for (auto& cb : onElectedCallBacks_) {
for (auto& cb : leaderReadyCB_) {
cb(opt);
}
}

void Part::registerOnElected(OnElectedCallBack cb) {
onElectedCallBacks_.emplace_back(std::move(cb));
}
void Part::registerOnLeaderReady(LeaderReadyCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); }

void Part::onDiscoverNewLeader(HostAddr nLeader) {
LOG(INFO) << idStr_ << "Find the new leader " << nLeader;
Expand Down
8 changes: 5 additions & 3 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class Part : public raftex::RaftPart {

void onElected(TermID term) override;

void onLeaderReady(TermID term) override;

void onDiscoverNewLeader(HostAddr nLeader) override;

cpp2::ErrorCode commitLogs(std::unique_ptr<LogIterator> iter, bool wait) override;
Expand Down Expand Up @@ -114,15 +116,15 @@ class Part : public raftex::RaftPart {
TermID term;
};

using OnElectedCallBack = std::function<void(const CallbackOptions& opt)>;
void registerOnElected(OnElectedCallBack cb);
using LeaderReadyCB = std::function<void(const CallbackOptions& opt)>;
void registerOnLeaderReady(LeaderReadyCB cb);

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
std::string walPath_;
NewLeaderCallback newLeaderCb_ = nullptr;
std::vector<OnElectedCallBack> onElectedCallBacks_;
std::vector<LeaderReadyCB> leaderReadyCB_;

private:
KVEngine* engine_ = nullptr;
Expand Down
6 changes: 5 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,11 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
firstLogId = lastLogId_ + 1;
lastMsgAcceptedCostMs_ = lastMsgSentDur_.elapsedInMSec();
lastMsgAcceptedTime_ = time::WallClock::fastNowInMilliSec();
commitInThisTerm_ = true;
if (!commitInThisTerm_) {
commitInThisTerm_ = true;
bgWorkers_->addTask(
[self = shared_from_this(), term = term_] { self->onLeaderReady(term); });
}
} else {
LOG(FATAL) << idStr_ << "Failed to commit logs";
}
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
// a new leader
virtual void onElected(TermID term) = 0;

// called after leader committed first log
// (a little bit later onElected)
// leader need to set some internal status after elected.
virtual void onLeaderReady(TermID term) = 0;

virtual void onDiscoverNewLeader(HostAddr nLeader) = 0;

// Check if we can accept candidate's message
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/raftex/test/TestShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ void TestShard::onElected(TermID term) {
}
}

void TestShard::onLeaderReady(TermID term) { UNUSED(term); }

nebula::cpp2::ErrorCode TestShard::commitLogs(std::unique_ptr<LogIterator> iter, bool) {
LogID firstId = -1;
LogID lastId = -1;
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TestShard : public RaftPart {

void onLostLeadership(TermID term) override;
void onElected(TermID term) override;
void onLeaderReady(TermID term) override;
void onDiscoverNewLeader(HostAddr) override {}

nebula::cpp2::ErrorCode commitLogs(std::unique_ptr<LogIterator> iter, bool wait) override;
Expand Down
11 changes: 9 additions & 2 deletions src/storage/transaction/ChainUpdateEdgeProcessorLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ folly::SemiFuture<Code> ChainUpdateEdgeProcessorLocal::prepareLocal() {
std::vector<kvstore::KV> data{{key, val}};
auto c = folly::makePromiseContract<Code>();
env_->kvstore_->asyncMultiPut(
spaceId_, partId_, std::move(data), [p = std::move(c.first)](auto rc) mutable {
spaceId_, partId_, std::move(data), [p = std::move(c.first), this](auto rc) mutable {
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
primeInserted_ = true;
} else {
VLOG(1) << "kvstore err: " << apache::thrift::util::enumNameSafe(rc);
}
p.setValue(rc);
});
return std::move(c.second);
Expand Down Expand Up @@ -100,7 +105,9 @@ folly::SemiFuture<Code> ChainUpdateEdgeProcessorLocal::processLocal(Code code) {
erasePrime();
forwardToDelegateProcessor();
} else {
abort();
if (primeInserted_) {
abort();
}
}

return code_;
Expand Down
4 changes: 4 additions & 0 deletions src/storage/transaction/ChainUpdateEdgeProcessorLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class ChainUpdateEdgeProcessorLocal
PartitionID partId_;
int retryLimit_{10};
TermID termOfPrepare_{-1};

// set to true when prime insert succeed
// in processLocal(), we check this to determine if need to do abort()
bool primeInserted_{false};
std::vector<std::string> kvErased_;
std::vector<kvstore::KV> kvAppend_;
folly::Optional<int64_t> ver_{folly::none};
Expand Down
13 changes: 10 additions & 3 deletions src/storage/transaction/TransactionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void TransactionManager::scanAll() {
void TransactionManager::onNewPartAdded(std::shared_ptr<kvstore::Part>& part) {
LOG(INFO) << folly::sformat("space={}, part={} added", part->spaceId(), part->partitionId());
auto fn = std::bind(&TransactionManager::onLeaderElectedWrapper, this, std::placeholders::_1);
part->registerOnElected(fn);
part->registerOnLeaderReady(fn);
}

void TransactionManager::onLeaderElectedWrapper(
Expand All @@ -169,8 +169,12 @@ void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId) {
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
for (; iter->valid(); iter->next()) {
auto edgeKey = ConsistUtil::edgeKeyFromPrime(iter->key());
VLOG(1) << "scanned edgekey: " << folly::hexlify(edgeKey);
auto lockKey = makeLockKey(spaceId, edgeKey.str());
reserveTable_.insert(std::make_pair(lockKey, ResumeType::RESUME_CHAIN));
auto insSucceed = reserveTable_.insert(std::make_pair(lockKey, ResumeType::RESUME_CHAIN));
if (!insSucceed.second) {
LOG(ERROR) << "not supposed to insert fail: " << folly::hexlify(edgeKey);
}
auto* lk = getLockCore(spaceId, partId, false);
auto succeed = lk->try_lock(edgeKey.str());
if (!succeed) {
Expand All @@ -184,7 +188,10 @@ void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId) {
for (; iter->valid(); iter->next()) {
auto edgeKey = ConsistUtil::edgeKeyFromDoublePrime(iter->key());
auto lockKey = makeLockKey(spaceId, edgeKey.str());
reserveTable_.insert(std::make_pair(lockKey, ResumeType::RESUME_REMOTE));
auto insSucceed = reserveTable_.insert(std::make_pair(lockKey, ResumeType::RESUME_REMOTE));
if (!insSucceed.second) {
LOG(ERROR) << "not supposed to insert fail: " << folly::hexlify(edgeKey);
}
auto* lk = getLockCore(spaceId, partId);
auto succeed = lk->try_lock(edgeKey.str());
if (!succeed) {
Expand Down

0 comments on commit 9462d35

Please sign in to comment.