diff --git a/src/interface/raftex.thrift b/src/interface/raftex.thrift index a4165e1c014..5087ab249e5 100644 --- a/src/interface/raftex.thrift +++ b/src/interface/raftex.thrift @@ -57,7 +57,6 @@ struct AskForVoteRequest { // Response message for the vote call struct AskForVoteResponse { 1: ErrorCode error_code; - 2: TermID current_term; } diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 36b52e64dc1..dfce2f2a25b 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -450,9 +450,7 @@ Host::prepareAppendLogRequest() { req->set_sending_snapshot(true); if (!sendingSnapshot_) { LOG(INFO) << idStr_ << "Can't find log " << lastLogIdSent_ + 1 - << " in wal, send the snapshot" - << ", logIdToSend = " << logIdToSend_ - << ", lastLogId in wal = " << part_->wal()->lastLogId(); + << " in wal, send the snapshot"; sendingSnapshot_ = true; part_->snapshot_->sendSnapshot(part_, addr_) .thenValue([self = shared_from_this()] (Status&& status) { diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index b8c8cc9691d..70918b5bc5e 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -219,7 +219,8 @@ RaftPart::RaftPart(ClusterID clusterId, , ioThreadPool_{pool} , bgWorkers_{workers} , executor_(executor) - , snapshot_(snapshotMan) { + , snapshot_(snapshotMan) + , weight_(1) { FileBasedWalPolicy policy; policy.ttl = FLAGS_wal_ttl; policy.fileSize = FLAGS_wal_file_size; @@ -274,7 +275,7 @@ void RaftPart::start(std::vector&& peers, bool asLearner) { lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); - term_ = lastLogTerm_; + term_ = proposedTerm_ = lastLogTerm_; // Set the quorum number quorum_ = (peers.size() + 1) / 2; @@ -977,7 +978,7 @@ bool RaftPart::needToStartElection() { std::lock_guard g(raftLock_); if (status_ == Status::RUNNING && role_ == Role::FOLLOWER && - (lastMsgRecvDur_.elapsedInMSec() >= FLAGS_raft_heartbeat_interval_secs * 1000 || + (lastMsgRecvDur_.elapsedInMSec() >= weight_ * FLAGS_raft_heartbeat_interval_secs * 1000 || term_ == 0)) { LOG(INFO) << idStr_ << "Start leader election, reason: lastMsgDur " << lastMsgRecvDur_.elapsedInMSec() @@ -1011,7 +1012,7 @@ bool RaftPart::prepareElectionRequest( req.set_part(partId_); req.set_candidate_ip(addr_.first); req.set_candidate_port(addr_.second); - req.set_term(term_ + 1); + req.set_term(++proposedTerm_); // Bump up the proposed term req.set_last_log_id(lastLogId_); req.set_last_log_term(lastLogTerm_); @@ -1052,18 +1053,13 @@ typename RaftPart::Role RaftPart::processElectionResponses( size_t numSucceeded = 0; for (auto& r : results) { - if (r.second.get_current_term() != proposedTerm) { - continue; - } if (r.second.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { ++numSucceeded; - } else if (r.second.get_error_code() == cpp2::ErrorCode::E_LOG_STALE || - r.second.get_error_code() == cpp2::ErrorCode::E_TERM_OUT_OF_DATE) { - LOG(INFO) << idStr_ << "AskForVote is explicitly rejected by peer " - << hosts[r.first]->address() - << ", so step down as follower"; - role_ = Role::FOLLOWER; - return role_; + } else if (r.second.get_error_code() == cpp2::ErrorCode::E_LOG_STALE) { + LOG(INFO) << idStr_ << "My last log id is less than " << hosts[r.first]->address() + << ", double my election interval."; + uint64_t curWeight = weight_.load(); + weight_.store(curWeight * 2); } else { LOG(ERROR) << idStr_ << "Receive response about askForVote from " << hosts[r.first]->address() @@ -1079,11 +1075,6 @@ typename RaftPart::Role RaftPart::processElectionResponses( << proposedTerm; term_ = proposedTerm; role_ = Role::LEADER; - leader_ = addr_; - bgWorkers_->addTask([self = shared_from_this(), term = proposedTerm] { - self->onElected(term); - }); - lastMsgAcceptedTime_ = 0; } return role_; @@ -1127,7 +1118,6 @@ bool RaftPart::leaderElection() { if (hosts.empty()) { VLOG(2) << idStr_ << "No peer found, I will be the leader"; } else { - // Try to collect response from all peers, if timeout, then check quorum auto eb = ioThreadPool_->getEventBase(); auto futures = collectNSucceeded( gen::from(hosts) @@ -1167,14 +1157,27 @@ bool RaftPart::leaderElection() { switch (processElectionResponses(resps, std::move(hosts), proposedTerm)) { case Role::LEADER: { // Elected - LOG(INFO) << idStr_ << "The partition is elected as the leader"; + LOG(INFO) << idStr_ + << "The partition is elected as the leader"; + { + std::lock_guard g(raftLock_); + if (status_ == Status::RUNNING) { + leader_ = addr_; + bgWorkers_->addTask([self = shared_from_this(), + term = voteReq.get_term()] { + self->onElected(term); + }); + lastMsgAcceptedTime_ = 0; + } + } + weight_ = 1; sendHeartbeat(); return true; } case Role::FOLLOWER: { - // Someone was elected or rejected - LOG(INFO) << idStr_ << "Someone else was elected, or is rejected by peers"; - return false; + // Someone was elected + LOG(INFO) << idStr_ << "Someone else was elected"; + return true; } case Role::CANDIDATE: { // No one has been elected @@ -1206,12 +1209,12 @@ void RaftPart::statusPolling(int64_t startTime) { size_t delay = FLAGS_raft_heartbeat_interval_secs * 1000 / 3; if (needToStartElection()) { if (leaderElection()) { - // Elected as leader VLOG(2) << idStr_ << "Stop the election"; } else { - // No leader has been elected, sleep for a while to continue + // No leader has been elected, need to continue + // (After sleeping a random period betwen [500ms, 2s]) VLOG(2) << idStr_ << "Wait for a while and continue the leader election"; - delay = FLAGS_raft_heartbeat_interval_secs * 1000; + delay = (folly::Random::rand32(1500) + 500) * weight_; } } else if (needToSendHeartbeat()) { VLOG(2) << idStr_ << "Need to send heartbeat"; @@ -1277,7 +1280,6 @@ void RaftPart::processAskForVoteRequest( std::lock_guard g(raftLock_); - resp.set_current_term(term_); // Make sure the partition is running if (UNLIKELY(status_ == Status::STOPPED)) { LOG(INFO) << idStr_ @@ -1317,13 +1319,14 @@ void RaftPart::processAskForVoteRequest( return; } - // Check candidate's proposed term is greater than current term - if (req.get_term() <= term_) { + // Check term id + auto term = role_ == Role::CANDIDATE ? proposedTerm_ : term_; + if (req.get_term() <= term) { LOG(INFO) << idStr_ << (role_ == Role::CANDIDATE ? "The partition is currently proposing term " : "The partition currently is on term ") - << term_ + << term << ". The term proposed by the candidate is" " no greater, so it will be rejected"; resp.set_error_code(cpp2::ErrorCode::E_TERM_OUT_OF_DATE); @@ -1365,19 +1368,18 @@ void RaftPart::processAskForVoteRequest( } // Ok, no reason to refuse, we will vote for the candidate LOG(INFO) << idStr_ << "The partition will vote for the candidate"; + resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); Role oldRole = role_; TermID oldTerm = term_; role_ = Role::FOLLOWER; - term_ = req.get_term(); + term_ = proposedTerm_ = req.get_term(); leader_ = std::make_pair(req.get_candidate_ip(), req.get_candidate_port()); - resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); - resp.set_current_term(term_); - // Reset the last message time lastMsgRecvDur_.reset(); + weight_ = 1; // If the partition used to be a leader, need to fire the callback if (oldRole == Role::LEADER) { @@ -1687,7 +1689,8 @@ cpp2::ErrorCode RaftPart::verifyLeader( } leader_ = std::make_pair(req.get_leader_ip(), req.get_leader_port()); - term_ = req.get_current_term(); + term_ = proposedTerm_ = req.get_current_term(); + weight_ = 1; if (oldRole == Role::LEADER) { VLOG(2) << idStr_ << "Was a leader, need to do some clean-up"; if (wal_->lastLogId() > lastLogId_) { diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 71ef533e950..2deb426ba95 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -497,6 +497,11 @@ class RaftPart : public std::enable_shared_from_this { // When the partition voted for someone, termId will be set to // the term id proposed by that candidate TermID term_{0}; + // During normal operation, proposedTerm_ is equal to term_, + // when the partition becomes a candidate, proposedTerm_ will be + // bumped up by 1 every time when sending out the AskForVote + // Request + TermID proposedTerm_{0}; // The id and term of the last-sent log LogID lastLogId_{0};