-
Notifications
You must be signed in to change notification settings - Fork 153
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor: Send heartbeat with dedicated workers
Heavy AppendEntries traffic can block heartbeat messages. For example, future AppendEntries in stream RPC may not receive a response indicating a follower is alive. In such cases, the leader might time out to extend its lease, and be considered partitioned from the cluster. This commit moves heartbeat broadcasting to separate tasks that won't be blocked by AppendEntries. This ensures the leader can always be acknowledged with the liveness of followers. Separate log progress notification and clock progress notification: When ReplicationCore successfully finished one RPC to Follower/Learner, it informs the RaftCore to update log progress and clock(heartbeat) progress. This commit split these two informations into two `Notification` variants, in order to make progress handling more clear. Another improvement is to ignore a heartbeat progress if it is sent with an older cluster membership config. Because a follower can be removed and re-added, the obsolete heartbeat progress is invalid. This check is done by remembering the membership log id in the `HeartbeatEvent`. `HigherVote` can be sent directly to Notification channel. replication::Response does not need `HigherVote` variant any more. And `Response` is renamed to `Progress`
- Loading branch information
1 parent
77caec9
commit 2f87402
Showing
20 changed files
with
550 additions
and
342 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
use std::fmt; | ||
|
||
use crate::display_ext::DisplayInstantExt; | ||
use crate::display_ext::DisplayOptionExt; | ||
use crate::replication::ReplicationSessionId; | ||
use crate::type_config::alias::InstantOf; | ||
use crate::LogId; | ||
use crate::RaftTypeConfig; | ||
|
||
/// The information for broadcasting a heartbeat. | ||
#[derive(Debug, Clone, Copy)] | ||
#[derive(PartialEq, Eq)] | ||
pub struct HeartbeatEvent<C> | ||
where C: RaftTypeConfig | ||
{ | ||
/// The timestamp when this heartbeat is sent. | ||
/// | ||
/// The Leader use this sending time to calculate the quorum acknowledge time, but not the | ||
/// receiving timestamp. | ||
pub(crate) time: InstantOf<C>, | ||
|
||
/// The vote of the Leader that submit this heartbeat and the log id of the cluster config. | ||
/// | ||
/// The response that matches this session id is considered as a valid response. | ||
/// Otherwise, it is considered as an outdated response from older leader or older cluster | ||
/// membership config and will be ignored. | ||
pub(crate) session_id: ReplicationSessionId<C>, | ||
|
||
/// The last known committed log id of the Leader. | ||
/// | ||
/// When there are no new logs to replicate, the Leader sends a heartbeat to replicate committed | ||
/// log id to followers to update their committed log id. | ||
pub(crate) committed: Option<LogId<C::NodeId>>, | ||
} | ||
|
||
impl<C> HeartbeatEvent<C> | ||
where C: RaftTypeConfig | ||
{ | ||
pub(crate) fn new( | ||
time: InstantOf<C>, | ||
session_id: ReplicationSessionId<C>, | ||
committed: Option<LogId<C::NodeId>>, | ||
) -> Self { | ||
Self { | ||
time, | ||
session_id, | ||
committed, | ||
} | ||
} | ||
} | ||
|
||
impl<C> fmt::Display for HeartbeatEvent<C> | ||
where C: RaftTypeConfig | ||
{ | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
write!( | ||
f, | ||
"(time={}, leader_vote: {}, committed: {})", | ||
self.time.display(), | ||
self.session_id, | ||
self.committed.display() | ||
) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
use std::collections::BTreeMap; | ||
use std::sync::Arc; | ||
|
||
use tracing::Instrument; | ||
use tracing::Level; | ||
use tracing::Span; | ||
|
||
use crate::async_runtime::watch::WatchSender; | ||
use crate::core::heartbeat::event::HeartbeatEvent; | ||
use crate::core::heartbeat::worker::HeartbeatWorker; | ||
use crate::core::notification::Notification; | ||
use crate::type_config::alias::JoinHandleOf; | ||
use crate::type_config::alias::MpscUnboundedSenderOf; | ||
use crate::type_config::alias::OneshotSenderOf; | ||
use crate::type_config::alias::WatchReceiverOf; | ||
use crate::type_config::alias::WatchSenderOf; | ||
use crate::type_config::TypeConfigExt; | ||
use crate::Config; | ||
use crate::RaftNetworkFactory; | ||
use crate::RaftTypeConfig; | ||
|
||
pub(crate) struct HeartbeatWorkersHandle<C> | ||
where C: RaftTypeConfig | ||
{ | ||
pub(crate) id: C::NodeId, | ||
|
||
pub(crate) config: Arc<Config>, | ||
|
||
/// Inform the heartbeat task to broadcast heartbeat message. | ||
/// | ||
/// A Leader will periodically update this value to trigger sending heartbeat messages. | ||
pub(crate) tx: WatchSenderOf<C, Option<HeartbeatEvent<C>>>, | ||
|
||
/// The receiving end of heartbeat command. | ||
/// | ||
/// A separate task will have a clone of this receiver to receive and execute heartbeat command. | ||
pub(crate) rx: WatchReceiverOf<C, Option<HeartbeatEvent<C>>>, | ||
|
||
pub(crate) workers: BTreeMap<C::NodeId, (OneshotSenderOf<C, ()>, JoinHandleOf<C, ()>)>, | ||
} | ||
|
||
impl<C> HeartbeatWorkersHandle<C> | ||
where C: RaftTypeConfig | ||
{ | ||
pub(crate) fn new(id: C::NodeId, config: Arc<Config>) -> Self { | ||
let (tx, rx) = C::watch_channel(None); | ||
|
||
Self { | ||
id, | ||
config, | ||
tx, | ||
rx, | ||
workers: Default::default(), | ||
} | ||
} | ||
|
||
pub(crate) fn broadcast(&self, event: HeartbeatEvent<C>) { | ||
tracing::debug!("id={} send_heartbeat {}", self.id, event); | ||
let _ = self.tx.send(Some(event)); | ||
} | ||
|
||
pub(crate) async fn spawn_workers<NF>( | ||
&mut self, | ||
network_factory: &mut NF, | ||
tx_notification: &MpscUnboundedSenderOf<C, Notification<C>>, | ||
targets: impl IntoIterator<Item = (C::NodeId, C::Node)>, | ||
) where | ||
NF: RaftNetworkFactory<C>, | ||
{ | ||
for (target, node) in targets { | ||
tracing::debug!("id={} spawn HeartbeatWorker target={}", self.id, target); | ||
let network = network_factory.new_client(target, &node).await; | ||
|
||
let worker = HeartbeatWorker { | ||
id: self.id, | ||
rx: self.rx.clone(), | ||
network, | ||
target, | ||
node, | ||
config: self.config.clone(), | ||
tx_notification: tx_notification.clone(), | ||
}; | ||
|
||
let span = tracing::span!(parent: &Span::current(), Level::DEBUG, "heartbeat", id=display(self.id), target=display(target)); | ||
|
||
let (tx_shutdown, rx_shutdown) = C::oneshot(); | ||
|
||
let worker_handle = C::spawn(worker.run(rx_shutdown).instrument(span)); | ||
self.workers.insert(target, (tx_shutdown, worker_handle)); | ||
} | ||
} | ||
|
||
pub(crate) fn shutdown(&mut self) { | ||
self.workers.clear(); | ||
tracing::info!("id={} HeartbeatWorker are shutdown", self.id); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
pub(crate) mod event; | ||
pub(crate) mod handle; | ||
pub(crate) mod worker; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
use std::fmt; | ||
use std::ops::Deref; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
|
||
use futures::FutureExt; | ||
|
||
use crate::async_runtime::watch::WatchReceiver; | ||
use crate::async_runtime::MpscUnboundedSender; | ||
use crate::core::heartbeat::event::HeartbeatEvent; | ||
use crate::core::notification::Notification; | ||
use crate::network::v2::RaftNetworkV2; | ||
use crate::network::RPCOption; | ||
use crate::raft::AppendEntriesRequest; | ||
use crate::type_config::alias::MpscUnboundedSenderOf; | ||
use crate::type_config::alias::OneshotReceiverOf; | ||
use crate::type_config::alias::WatchReceiverOf; | ||
use crate::type_config::TypeConfigExt; | ||
use crate::Config; | ||
use crate::RaftTypeConfig; | ||
|
||
/// A dedicate worker sending heartbeat to a specific follower. | ||
pub struct HeartbeatWorker<C, N> | ||
where | ||
C: RaftTypeConfig, | ||
N: RaftNetworkV2<C>, | ||
{ | ||
pub(crate) id: C::NodeId, | ||
|
||
/// The receiver will be changed when a new heartbeat is needed to be sent. | ||
pub(crate) rx: WatchReceiverOf<C, Option<HeartbeatEvent<C>>>, | ||
|
||
pub(crate) network: N, | ||
|
||
pub(crate) target: C::NodeId, | ||
|
||
#[allow(dead_code)] | ||
pub(crate) node: C::Node, | ||
|
||
pub(crate) config: Arc<Config>, | ||
|
||
/// For sending back result to the [`RaftCore`]. | ||
/// | ||
/// [`RaftCore`]: crate::core::RaftCore | ||
pub(crate) tx_notification: MpscUnboundedSenderOf<C, Notification<C>>, | ||
} | ||
|
||
impl<C, N> fmt::Display for HeartbeatWorker<C, N> | ||
where | ||
C: RaftTypeConfig, | ||
N: RaftNetworkV2<C>, | ||
{ | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
write!(f, "HeartbeatWorker(id={}, target={})", self.id, self.target) | ||
} | ||
} | ||
|
||
impl<C, N> HeartbeatWorker<C, N> | ||
where | ||
C: RaftTypeConfig, | ||
N: RaftNetworkV2<C>, | ||
{ | ||
pub(crate) async fn run(mut self, mut rx_shutdown: OneshotReceiverOf<C, ()>) { | ||
loop { | ||
tracing::debug!("{} is waiting for a new heartbeat event.", self); | ||
|
||
futures::select! { | ||
_ = (&mut rx_shutdown).fuse() => { | ||
tracing::info!("{} is shutdown.", self); | ||
return; | ||
}, | ||
_ = self.rx.changed().fuse() => {}, | ||
} | ||
|
||
let heartbeat: Option<HeartbeatEvent<C>> = *self.rx.borrow_watched(); | ||
|
||
// None is the initial value of the WatchReceiver, ignore it. | ||
let Some(heartbeat) = heartbeat else { | ||
continue; | ||
}; | ||
|
||
let timeout = Duration::from_millis(self.config.heartbeat_interval); | ||
let option = RPCOption::new(timeout); | ||
|
||
let payload = AppendEntriesRequest { | ||
vote: *heartbeat.session_id.leader_vote.deref(), | ||
prev_log_id: None, | ||
leader_commit: heartbeat.committed, | ||
entries: vec![], | ||
}; | ||
|
||
let res = C::timeout(timeout, self.network.append_entries(payload, option)).await; | ||
tracing::debug!("{} sent a heartbeat: {}, result: {:?}", self, heartbeat, res); | ||
|
||
match res { | ||
Ok(Ok(_)) => { | ||
let res = self.tx_notification.send(Notification::HeartbeatProgress { | ||
session_id: heartbeat.session_id, | ||
sending_time: heartbeat.time, | ||
target: self.target, | ||
}); | ||
|
||
if res.is_err() { | ||
tracing::error!("{} failed to send a heartbeat progress to RaftCore. quit", self); | ||
return; | ||
} | ||
} | ||
_ => { | ||
tracing::warn!("{} failed to send a heartbeat: {:?}", self, res); | ||
} | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.