Skip to content

Commit

Permalink
Revert "Remove proposed term in raft election (#2297)" (#2321)
Browse files Browse the repository at this point in the history
This reverts commit b465646.

Co-authored-by: dangleptr <37216992+dangleptr@users.noreply.github.com>
  • Loading branch information
critical27 and dangleptr committed Aug 26, 2020
1 parent 87f8b0b commit 8fec368
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 40 deletions.
1 change: 0 additions & 1 deletion src/interface/raftex.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ struct AskForVoteRequest {
// Response message for the vote call
struct AskForVoteResponse {
1: ErrorCode error_code;
2: TermID current_term;
}


Expand Down
4 changes: 1 addition & 3 deletions src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,7 @@ Host::prepareAppendLogRequest() {
req->set_sending_snapshot(true);
if (!sendingSnapshot_) {
LOG(INFO) << idStr_ << "Can't find log " << lastLogIdSent_ + 1
<< " in wal, send the snapshot"
<< ", logIdToSend = " << logIdToSend_
<< ", lastLogId in wal = " << part_->wal()->lastLogId();
<< " in wal, send the snapshot";
sendingSnapshot_ = true;
part_->snapshot_->sendSnapshot(part_, addr_)
.thenValue([self = shared_from_this()] (Status&& status) {
Expand Down
75 changes: 39 additions & 36 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ RaftPart::RaftPart(ClusterID clusterId,
, ioThreadPool_{pool}
, bgWorkers_{workers}
, executor_(executor)
, snapshot_(snapshotMan) {
, snapshot_(snapshotMan)
, weight_(1) {
FileBasedWalPolicy policy;
policy.ttl = FLAGS_wal_ttl;
policy.fileSize = FLAGS_wal_file_size;
Expand Down Expand Up @@ -274,7 +275,7 @@ void RaftPart::start(std::vector<HostAddr>&& peers, bool asLearner) {

lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
term_ = lastLogTerm_;
term_ = proposedTerm_ = lastLogTerm_;

// Set the quorum number
quorum_ = (peers.size() + 1) / 2;
Expand Down Expand Up @@ -977,7 +978,7 @@ bool RaftPart::needToStartElection() {
std::lock_guard<std::mutex> g(raftLock_);
if (status_ == Status::RUNNING &&
role_ == Role::FOLLOWER &&
(lastMsgRecvDur_.elapsedInMSec() >= FLAGS_raft_heartbeat_interval_secs * 1000 ||
(lastMsgRecvDur_.elapsedInMSec() >= weight_ * FLAGS_raft_heartbeat_interval_secs * 1000 ||
term_ == 0)) {
LOG(INFO) << idStr_ << "Start leader election, reason: lastMsgDur "
<< lastMsgRecvDur_.elapsedInMSec()
Expand Down Expand Up @@ -1011,7 +1012,7 @@ bool RaftPart::prepareElectionRequest(
req.set_part(partId_);
req.set_candidate_ip(addr_.first);
req.set_candidate_port(addr_.second);
req.set_term(term_ + 1);
req.set_term(++proposedTerm_); // Bump up the proposed term
req.set_last_log_id(lastLogId_);
req.set_last_log_term(lastLogTerm_);

Expand Down Expand Up @@ -1052,18 +1053,13 @@ typename RaftPart::Role RaftPart::processElectionResponses(

size_t numSucceeded = 0;
for (auto& r : results) {
if (r.second.get_current_term() != proposedTerm) {
continue;
}
if (r.second.get_error_code() == cpp2::ErrorCode::SUCCEEDED) {
++numSucceeded;
} else if (r.second.get_error_code() == cpp2::ErrorCode::E_LOG_STALE ||
r.second.get_error_code() == cpp2::ErrorCode::E_TERM_OUT_OF_DATE) {
LOG(INFO) << idStr_ << "AskForVote is explicitly rejected by peer "
<< hosts[r.first]->address()
<< ", so step down as follower";
role_ = Role::FOLLOWER;
return role_;
} else if (r.second.get_error_code() == cpp2::ErrorCode::E_LOG_STALE) {
LOG(INFO) << idStr_ << "My last log id is less than " << hosts[r.first]->address()
<< ", double my election interval.";
uint64_t curWeight = weight_.load();
weight_.store(curWeight * 2);
} else {
LOG(ERROR) << idStr_ << "Receive response about askForVote from "
<< hosts[r.first]->address()
Expand All @@ -1079,11 +1075,6 @@ typename RaftPart::Role RaftPart::processElectionResponses(
<< proposedTerm;
term_ = proposedTerm;
role_ = Role::LEADER;
leader_ = addr_;
bgWorkers_->addTask([self = shared_from_this(), term = proposedTerm] {
self->onElected(term);
});
lastMsgAcceptedTime_ = 0;
}

return role_;
Expand Down Expand Up @@ -1127,7 +1118,6 @@ bool RaftPart::leaderElection() {
if (hosts.empty()) {
VLOG(2) << idStr_ << "No peer found, I will be the leader";
} else {
// Try to collect response from all peers, if timeout, then check quorum
auto eb = ioThreadPool_->getEventBase();
auto futures = collectNSucceeded(
gen::from(hosts)
Expand Down Expand Up @@ -1167,14 +1157,27 @@ bool RaftPart::leaderElection() {
switch (processElectionResponses(resps, std::move(hosts), proposedTerm)) {
case Role::LEADER: {
// Elected
LOG(INFO) << idStr_ << "The partition is elected as the leader";
LOG(INFO) << idStr_
<< "The partition is elected as the leader";
{
std::lock_guard<std::mutex> g(raftLock_);
if (status_ == Status::RUNNING) {
leader_ = addr_;
bgWorkers_->addTask([self = shared_from_this(),
term = voteReq.get_term()] {
self->onElected(term);
});
lastMsgAcceptedTime_ = 0;
}
}
weight_ = 1;
sendHeartbeat();
return true;
}
case Role::FOLLOWER: {
// Someone was elected or rejected
LOG(INFO) << idStr_ << "Someone else was elected, or is rejected by peers";
return false;
// Someone was elected
LOG(INFO) << idStr_ << "Someone else was elected";
return true;
}
case Role::CANDIDATE: {
// No one has been elected
Expand Down Expand Up @@ -1206,12 +1209,12 @@ void RaftPart::statusPolling(int64_t startTime) {
size_t delay = FLAGS_raft_heartbeat_interval_secs * 1000 / 3;
if (needToStartElection()) {
if (leaderElection()) {
// Elected as leader
VLOG(2) << idStr_ << "Stop the election";
} else {
// No leader has been elected, sleep for a while to continue
// No leader has been elected, need to continue
// (After sleeping a random period betwen [500ms, 2s])
VLOG(2) << idStr_ << "Wait for a while and continue the leader election";
delay = FLAGS_raft_heartbeat_interval_secs * 1000;
delay = (folly::Random::rand32(1500) + 500) * weight_;
}
} else if (needToSendHeartbeat()) {
VLOG(2) << idStr_ << "Need to send heartbeat";
Expand Down Expand Up @@ -1277,7 +1280,6 @@ void RaftPart::processAskForVoteRequest(

std::lock_guard<std::mutex> g(raftLock_);

resp.set_current_term(term_);
// Make sure the partition is running
if (UNLIKELY(status_ == Status::STOPPED)) {
LOG(INFO) << idStr_
Expand Down Expand Up @@ -1317,13 +1319,14 @@ void RaftPart::processAskForVoteRequest(
return;
}

// Check candidate's proposed term is greater than current term
if (req.get_term() <= term_) {
// Check term id
auto term = role_ == Role::CANDIDATE ? proposedTerm_ : term_;
if (req.get_term() <= term) {
LOG(INFO) << idStr_
<< (role_ == Role::CANDIDATE
? "The partition is currently proposing term "
: "The partition currently is on term ")
<< term_
<< term
<< ". The term proposed by the candidate is"
" no greater, so it will be rejected";
resp.set_error_code(cpp2::ErrorCode::E_TERM_OUT_OF_DATE);
Expand Down Expand Up @@ -1365,19 +1368,18 @@ void RaftPart::processAskForVoteRequest(
}
// Ok, no reason to refuse, we will vote for the candidate
LOG(INFO) << idStr_ << "The partition will vote for the candidate";
resp.set_error_code(cpp2::ErrorCode::SUCCEEDED);

Role oldRole = role_;
TermID oldTerm = term_;
role_ = Role::FOLLOWER;
term_ = req.get_term();
term_ = proposedTerm_ = req.get_term();
leader_ = std::make_pair(req.get_candidate_ip(),
req.get_candidate_port());

resp.set_error_code(cpp2::ErrorCode::SUCCEEDED);
resp.set_current_term(term_);

// Reset the last message time
lastMsgRecvDur_.reset();
weight_ = 1;

// If the partition used to be a leader, need to fire the callback
if (oldRole == Role::LEADER) {
Expand Down Expand Up @@ -1687,7 +1689,8 @@ cpp2::ErrorCode RaftPart::verifyLeader(
}
leader_ = std::make_pair(req.get_leader_ip(),
req.get_leader_port());
term_ = req.get_current_term();
term_ = proposedTerm_ = req.get_current_term();
weight_ = 1;
if (oldRole == Role::LEADER) {
VLOG(2) << idStr_ << "Was a leader, need to do some clean-up";
if (wal_->lastLogId() > lastLogId_) {
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
// When the partition voted for someone, termId will be set to
// the term id proposed by that candidate
TermID term_{0};
// During normal operation, proposedTerm_ is equal to term_,
// when the partition becomes a candidate, proposedTerm_ will be
// bumped up by 1 every time when sending out the AskForVote
// Request
TermID proposedTerm_{0};

// The id and term of the last-sent log
LogID lastLogId_{0};
Expand Down

0 comments on commit 8fec368

Please sign in to comment.