Skip to content

Commit

Permalink
Fixed halting of the node in rare conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx committed May 29, 2024
1 parent 5ba7821 commit cfc3029
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 24 deletions.
58 changes: 35 additions & 23 deletions crates/services/consensus_module/poa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ use std::{
};

use fuel_core_services::{
stream::BoxStream,
stream::{
BoxFuture,
BoxStream,
},
RunnableService,
RunnableTask,
StateWatcher,
};
use fuel_core_types::services::block_importer::BlockImportInfo;

use fuel_core_types::blockchain::header::BlockHeader;
use tokio::sync::watch;
use tokio_stream::StreamExt;

use crate::deadline_clock::{
DeadlineClock,
OnConflict,
use fuel_core_types::{
blockchain::header::BlockHeader,
services::block_importer::BlockImportInfo,
};
use tokio::{
sync::watch,
time::MissedTickBehavior,
};
use tokio_stream::StreamExt;

#[derive(Debug, Clone, PartialEq)]
pub enum SyncState {
Expand All @@ -42,14 +44,13 @@ impl SyncState {

pub struct SyncTask {
min_connected_reserved_peers: usize,
time_until_synced: Duration,
peer_connections_stream: BoxStream<usize>,
block_stream: BoxStream<BlockImportInfo>,
state_sender: watch::Sender<SyncState>,
// shared with `MainTask` via SyncTask::SharedState
state_receiver: watch::Receiver<SyncState>,
inner_state: InnerSyncState,
timer: DeadlineClock,
timer: Option<tokio::time::Interval>,
}

impl SyncTask {
Expand All @@ -65,7 +66,13 @@ impl SyncTask {
time_until_synced,
block_header.clone(),
);
let timer = DeadlineClock::new();
let timer = if time_until_synced == Duration::ZERO {
None
} else {
let mut timer = tokio::time::interval(time_until_synced);
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
Some(timer)
};

let initial_sync_state = SyncState::from_config(
min_connected_reserved_peers,
Expand All @@ -79,7 +86,6 @@ impl SyncTask {
Self {
peer_connections_stream,
min_connected_reserved_peers,
time_until_synced,
block_stream,
state_sender,
state_receiver,
Expand All @@ -100,10 +106,10 @@ impl SyncTask {
});
}

async fn restart_timer(&mut self) {
self.timer
.set_timeout(self.time_until_synced, OnConflict::Overwrite)
.await;
fn restart_timer(&mut self) {
if let Some(timer) = &mut self.timer {
timer.reset();
}
}
}

Expand Down Expand Up @@ -135,6 +141,12 @@ impl RunnableTask for SyncTask {
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let mut should_continue = true;

let tick: BoxFuture<tokio::time::Instant> = if let Some(timer) = &mut self.timer {
Box::pin(timer.tick())
} else {
let future = core::future::pending();
Box::pin(future)
};
tokio::select! {
biased;
_ = watcher.while_started() => {
Expand All @@ -146,11 +158,11 @@ impl RunnableTask for SyncTask {
match &self.inner_state {
InnerSyncState::InsufficientPeers(block_header) if sufficient_peers => {
self.inner_state = InnerSyncState::SufficientPeers(block_header.clone());
self.restart_timer().await;
self.restart_timer();
}
InnerSyncState::SufficientPeers(block_header) if !sufficient_peers => {
self.inner_state = InnerSyncState::InsufficientPeers(block_header.clone());
self.timer.clear().await;
self.restart_timer();
}
InnerSyncState::Synced { block_header, .. } => {
self.inner_state = InnerSyncState::Synced {
Expand All @@ -170,7 +182,7 @@ impl RunnableTask for SyncTask {
}
InnerSyncState::SufficientPeers(block_header) if new_block_height > block_header.height() => {
self.inner_state = InnerSyncState::SufficientPeers(block_info.block_header);
self.restart_timer().await;
self.restart_timer();
}
InnerSyncState::Synced { block_header, has_sufficient_peers } if new_block_height > block_header.height() => {
if block_info.is_locally_produced() {
Expand All @@ -183,7 +195,7 @@ impl RunnableTask for SyncTask {
// we considered to be synced but we're obviously not!
if *has_sufficient_peers {
self.inner_state = InnerSyncState::SufficientPeers(block_info.block_header);
self.restart_timer().await;
self.restart_timer();
} else {
self.inner_state = InnerSyncState::InsufficientPeers(block_info.block_header);
}
Expand All @@ -194,7 +206,7 @@ impl RunnableTask for SyncTask {
_ => {}
}
}
_ = self.timer.wait() => {
_ = tick => {
if let InnerSyncState::SufficientPeers(block_header) = &self.inner_state {
let block_header = block_header.clone();
self.inner_state = InnerSyncState::Synced {
Expand Down
15 changes: 14 additions & 1 deletion crates/services/importer/src/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ pub enum Error {
ExecuteGenesis,
#[display(fmt = "The database already contains the data at the height {_0}.")]
NotUnique(BlockHeight),
#[display(fmt = "The previous block processing is not finished yet.")]
PreviousBlockProcessingNotFinished,
#[from]
StorageError(StorageError),
UnsupportedConsensusVariant(String),
Expand Down Expand Up @@ -455,7 +457,18 @@ where

// Await until all receivers of the notification process the result.
if let Some(channel) = previous_block_result {
let _ = channel.await;
const TIMEOUT: u64 = 20;
let result =
tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel)
.await;

if result.is_err() {
tracing::error!(
"The previous block processing \
was not finished for {TIMEOUT} seconds."
);
return Err(Error::PreviousBlockProcessingNotFinished)
}
}

let start = Instant::now();
Expand Down

0 comments on commit cfc3029

Please sign in to comment.