Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the write performance when host is down #5571

Merged
merged 5 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ DEFINE_uint32(max_appendlog_batch_size,
"The max number of logs in each appendLog request batch");
DEFINE_uint32(max_outstanding_requests, 1024, "The max number of outstanding appendLog requests");
DEFINE_int32(raft_rpc_timeout_ms, 1000, "rpc timeout for raft client");
DEFINE_int32(pause_host_time_factor,
4,
"The factor of pause host time based on raft heartbeat interval");

DECLARE_bool(trace_raft);
DECLARE_uint32(raft_heartbeat_interval_secs);
Expand Down Expand Up @@ -60,11 +63,22 @@ nebula::cpp2::ErrorCode Host::canAppendLog() const {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode Host::canSendHBOrVote() const {
CHECK(!lock_.try_lock());
if (stopped_) {
VLOG(2) << idStr_ << "The host is stopped, just return";
return nebula::cpp2::ErrorCode::E_RAFT_HOST_STOPPED;
}

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

folly::Future<cpp2::AskForVoteResponse> Host::askForVote(const cpp2::AskForVoteRequest& req,
folly::EventBase* eb) {
{
std::lock_guard<std::mutex> g(lock_);
if (stopped_) {
auto res = canSendHBOrVote();
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(3) << idStr_ << "The Host is not in a proper status, do not send";
cpp2::AskForVoteResponse resp;
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_HOST_STOPPED;
Expand Down Expand Up @@ -410,11 +424,39 @@ folly::Future<cpp2::HeartbeatResponse> Host::sendHeartbeat(
pro = std::move(promise)](folly::Try<cpp2::HeartbeatResponse>&& t) mutable {
VLOG(4) << self->idStr_ << "heartbeat call got response";
if (t.hasException()) {
using TransportException = apache::thrift::transport::TTransportException;
auto exWrapper = std::move(t).exception();
auto exception = exWrapper.get_exception<TransportException>();
VLOG(2) << self->idStr_ << "Heartbeat: " << exception->what();
// If we keeps receiving NOT_OPEN exception after some HB intervals,
// we can assume that the peer is down so we mark paused_ as true
if (exception && exception->getType() == TransportException::NOT_OPEN) {
if (!self->paused_) {
auto now = time::WallClock::fastNowInMilliSec();
if (now - self->lastHeartbeatTime_ >=
FLAGS_pause_host_time_factor * FLAGS_raft_heartbeat_interval_secs * 1000) {
LOG(WARNING) << self->idStr_
<< "Pasue this host because long time no heartbeat response";
std::lock_guard<std::mutex> g(self->lock_);
self->paused_ = true;
}
}
}
cpp2::HeartbeatResponse resp;
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_RPC_EXCEPTION;
pro.setValue(std::move(resp));
return;
} else {
auto& resp = t.value();
if (resp.error_code_ref() == nebula::cpp2::ErrorCode::SUCCEEDED) {
std::lock_guard<std::mutex> g(self->lock_);
// If the peer is back online and ready, we set paused_ as false,
// the leader can then resume sending appendLog request to this peer
if (self->paused_) {
self->paused_ = false;
}
}
self->setLastHeartbeatTime(time::WallClock::fastNowInMilliSec());
pro.setValue(std::move(t.value()));
}
});
Expand All @@ -427,7 +469,7 @@ folly::Future<cpp2::HeartbeatResponse> Host::sendHeartbeatRequest(

{
std::lock_guard<std::mutex> g(lock_);
auto res = canAppendLog();
auto res = canSendHBOrVote();
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(3) << idStr_ << "The Host is not in a proper status, do not send";
cpp2::HeartbeatResponse resp;
Expand Down Expand Up @@ -459,8 +501,8 @@ std::shared_ptr<cpp2::AppendLogRequest> Host::getPendingReqIfAny(std::shared_ptr

// Check if there are any pending request to send
if (self->noRequest()) {
self->noMoreRequestCV_.notify_all();
self->requestOnGoing_ = false;
self->noMoreRequestCV_.notify_all();
return nullptr;
}

Expand Down
13 changes: 13 additions & 0 deletions src/kvstore/raftex/Host.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ class Host final : public std::enable_shared_from_this<Host> {
*/
nebula::cpp2::ErrorCode canAppendLog() const;

/**
* @brief Whether Host can send HB or AskForVote request to the peer
*
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode canSendHBOrVote() const;

/**
* @brief Send append log rpc
*
Expand Down Expand Up @@ -244,6 +251,12 @@ class Host final : public std::enable_shared_from_this<Host> {

mutable std::mutex lock_;

// If stopped_ is true, we will not send any request to the peer;
// If stopped_ is false:
// 1. no mater whether paused_ is true or not, we can send HB request or AskForVote request;
// 2. Only if paused_ is false, we can send appendlog request, of course, including HB
// request and AskForRequest request
// See canAppendLog() and canSendHBOrVote()
luyade marked this conversation as resolved.
Show resolved Hide resolved
bool paused_{false};
bool stopped_{false};

Expand Down
4 changes: 0 additions & 4 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2097,10 +2097,6 @@ void RaftPart::sendHeartbeat() {
if (!hosts[resp.first]->isLearner() &&
resp.second.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED) {
++numSucceeded;
// only metad 0 space 0 part need this state now.
if (spaceId_ == kDefaultSpaceId) {
hosts[resp.first]->setLastHeartbeatTime(time::WallClock::fastNowInMilliSec());
}
}
highestTerm = std::max(highestTerm, resp.second.get_current_term());
}
Expand Down
Loading