diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index aaa683a352..f6bfac05c3 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -269,7 +269,7 @@ impl BlockSyncInfo { pub fn sync_progress_string(&self) -> String { format!( - "({}) {}/{} ({:.0}%){} Latency: {:.2?}", + "({}) {}/{} ({:.0}%){}{}", self.sync_peer.node_id().short_str(), self.local_height, self.tip_height, @@ -278,7 +278,10 @@ impl BlockSyncInfo { .items_per_second() .map(|bps| format!(" {:.2?} blks/s", bps)) .unwrap_or_default(), - self.sync_peer.latency().unwrap_or_default() + self.sync_peer + .calc_avg_latency() + .map(|avg| format!(", latency: {:.2?}", avg)) + .unwrap_or_default(), ) } } diff --git a/base_layer/core/src/base_node/sync/block_sync/error.rs b/base_layer/core/src/base_node/sync/block_sync/error.rs index ec3eee8ded..d0ecd305b3 100644 --- a/base_layer/core/src/base_node/sync/block_sync/error.rs +++ b/base_layer/core/src/base_node/sync/block_sync/error.rs @@ -50,7 +50,7 @@ pub enum BlockSyncError { FailedToConstructChainBlock, #[error("Peer violated the block sync protocol: {0}")] ProtocolViolation(String), - #[error("Peer {peer} exceeded maximum permitted sync latency. latency: {latency:.2?}s, max: {max_latency:.2?}s")] + #[error("Peer {peer} exceeded maximum permitted sync latency. latency: {latency:.2?}, max: {max_latency:.2?}")] MaxLatencyExceeded { peer: NodeId, latency: Duration, diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index cc029fce21..50489b768a 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -41,6 +41,7 @@ use crate::{ }, blocks::{Block, BlockValidationError, ChainBlock}, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, + common::rolling_avg::RollingAverageTime, proto::base_node::SyncBlocksRequest, transactions::aggregated_body::AggregateBody, validation::{BlockSyncBodyValidation, ValidationError}, @@ -93,7 +94,7 @@ impl BlockSynchronizer { Ok(_) => return Ok(()), Err(err @ BlockSyncError::AllSyncPeersExceedLatency) => { warn!(target: LOG_TARGET, "{}", err); - if self.sync_peers.len() == 1 { + if self.sync_peers.len() <= 2 { warn!( target: LOG_TARGET, "Insufficient sync peers to continue with block sync" @@ -224,8 +225,10 @@ impl BlockSynchronizer { let mut prev_hash = best_full_block_hash; let mut current_block = None; let mut last_sync_timer = Instant::now(); + let mut avg_latency = RollingAverageTime::new(20); while let Some(block) = block_stream.next().await { let latency = last_sync_timer.elapsed(); + avg_latency.add_sample(latency); let block = block?; let header = self @@ -316,7 +319,12 @@ impl BlockSynchronizer { .commit() .await?; - sync_peer.set_latency(latency); + // Average time between receiving blocks from the peer - used to detect a slow sync peer + let last_avg_latency = avg_latency.calculate_average(); + if let Some(latency) = last_avg_latency { + sync_peer.set_latency(latency); + } + // Includes time to add block to database, used to show blocks/s on status line sync_peer.add_sample(last_sync_timer.elapsed()); self.hooks .call_on_progress_block_hooks(block.clone(), tip_height, &sync_peer); @@ -334,7 +342,7 @@ impl BlockSynchronizer { block.accumulated_data().accumulated_sha_difficulty, latency ); - if latency > max_latency { + if last_avg_latency.map(|avg| avg > max_latency).unwrap_or(false) { return Err(BlockSyncError::MaxLatencyExceeded { peer: sync_peer.node_id().clone(), latency, diff --git a/base_layer/core/src/base_node/sync/config.rs b/base_layer/core/src/base_node/sync/config.rs index e37903aeac..b064340cfa 100644 --- a/base_layer/core/src/base_node/sync/config.rs +++ b/base_layer/core/src/base_node/sync/config.rs @@ -46,7 +46,7 @@ pub struct BlockchainSyncConfig { impl Default for BlockchainSyncConfig { fn default() -> Self { Self { - initial_max_sync_latency: Duration::from_secs(3), + initial_max_sync_latency: Duration::from_secs(10), max_latency_increase: Duration::from_secs(2), ban_period: Duration::from_secs(30 * 60), short_ban_period: Duration::from_secs(60), diff --git a/base_layer/core/src/base_node/sync/sync_peer.rs b/base_layer/core/src/base_node/sync/sync_peer.rs index 4cbb7330d5..44d11932a5 100644 --- a/base_layer/core/src/base_node/sync/sync_peer.rs +++ b/base_layer/core/src/base_node/sync/sync_peer.rs @@ -28,12 +28,12 @@ use std::{ use tari_common_types::chain_metadata::ChainMetadata; use tari_comms::peer_manager::NodeId; -use crate::{base_node::chain_metadata_service::PeerChainMetadata, common::rolling_vec::RollingVec}; +use crate::{base_node::chain_metadata_service::PeerChainMetadata, common::rolling_avg::RollingAverageTime}; #[derive(Debug, Clone)] pub struct SyncPeer { peer_metadata: PeerChainMetadata, - samples: RollingVec, + avg_latency: RollingAverageTime, } impl SyncPeer { @@ -55,25 +55,24 @@ impl SyncPeer { } pub fn items_per_second(&self) -> Option { - if self.samples.is_empty() { - return None; - } - - let total_time = self.samples.iter().sum::(); - Some((self.samples.len() as f64 / total_time.as_micros() as f64) * 1_000_000.0) + self.avg_latency.calc_samples_per_second() } pub(super) fn add_sample(&mut self, time: Duration) -> &mut Self { - self.samples.push(time); + self.avg_latency.add_sample(time); self } + + pub fn calc_avg_latency(&self) -> Option { + self.avg_latency.calculate_average() + } } impl From for SyncPeer { fn from(peer_metadata: PeerChainMetadata) -> Self { Self { peer_metadata, - samples: RollingVec::new(20), + avg_latency: RollingAverageTime::new(20), } } } diff --git a/base_layer/core/src/common/mod.rs b/base_layer/core/src/common/mod.rs index c6d79d23dd..efc4d874a6 100644 --- a/base_layer/core/src/common/mod.rs +++ b/base_layer/core/src/common/mod.rs @@ -23,5 +23,6 @@ pub mod byte_counter; pub mod hash_writer; pub mod limited_reader; +pub mod rolling_avg; #[cfg(feature = "base_node")] pub mod rolling_vec; diff --git a/base_layer/core/src/common/rolling_avg.rs b/base_layer/core/src/common/rolling_avg.rs new file mode 100644 index 0000000000..9230eb9667 --- /dev/null +++ b/base_layer/core/src/common/rolling_avg.rs @@ -0,0 +1,62 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{convert::TryFrom, time::Duration}; + +use crate::common::rolling_vec::RollingVec; + +#[derive(Debug, Clone)] +pub struct RollingAverageTime { + samples: RollingVec, +} + +impl RollingAverageTime { + pub fn new(num_samples: usize) -> Self { + Self { + samples: RollingVec::new(num_samples), + } + } + + pub fn add_sample(&mut self, sample: Duration) { + self.samples.push(sample); + } + + pub fn calc_samples_per_second(&self) -> Option { + if self.samples.is_empty() { + return None; + } + + let total_time = self.samples.iter().sum::(); + Some((self.samples.len() as f64 / total_time.as_micros() as f64) * 1_000_000.0) + } + + pub fn calculate_average(&self) -> Option { + if self.samples.is_empty() { + return None; + } + + let total_time = self.samples.iter().sum::(); + Some(Duration::from_nanos( + u64::try_from(total_time.as_nanos()).unwrap_or(u64::MAX) / self.samples.len() as u64, + )) + } +}