Skip to content

Commit

Permalink
fix(block-sync): use avg latency to determine slow sync peer for bloc…
Browse files Browse the repository at this point in the history
…k sync (#3912)

Description
---
Uses average blocks received per second to determine slow sync peer.
Display average block receive time on status line

Motivation and Context
---
Esp. for large blocks, there could be a single large delay that would previously immediately cause the next peer to be attempted. Using the average blocks received per second, a "blip" in the latency will not cause the sync peer to switch.

How Has This Been Tested?
---
Manually
  • Loading branch information
sdbondi authored Mar 11, 2022
1 parent 407160c commit f091c25
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl BlockSyncInfo {

pub fn sync_progress_string(&self) -> String {
format!(
"({}) {}/{} ({:.0}%){} Latency: {:.2?}",
"({}) {}/{} ({:.0}%){}{}",
self.sync_peer.node_id().short_str(),
self.local_height,
self.tip_height,
Expand All @@ -278,7 +278,10 @@ impl BlockSyncInfo {
.items_per_second()
.map(|bps| format!(" {:.2?} blks/s", bps))
.unwrap_or_default(),
self.sync_peer.latency().unwrap_or_default()
self.sync_peer
.calc_avg_latency()
.map(|avg| format!(", latency: {:.2?}", avg))
.unwrap_or_default(),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/block_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum BlockSyncError {
FailedToConstructChainBlock,
#[error("Peer violated the block sync protocol: {0}")]
ProtocolViolation(String),
#[error("Peer {peer} exceeded maximum permitted sync latency. latency: {latency:.2?}s, max: {max_latency:.2?}s")]
#[error("Peer {peer} exceeded maximum permitted sync latency. latency: {latency:.2?}, max: {max_latency:.2?}")]
MaxLatencyExceeded {
peer: NodeId,
latency: Duration,
Expand Down
14 changes: 11 additions & 3 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::{
},
blocks::{Block, BlockValidationError, ChainBlock},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
common::rolling_avg::RollingAverageTime,
proto::base_node::SyncBlocksRequest,
transactions::aggregated_body::AggregateBody,
validation::{BlockSyncBodyValidation, ValidationError},
Expand Down Expand Up @@ -93,7 +94,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
Ok(_) => return Ok(()),
Err(err @ BlockSyncError::AllSyncPeersExceedLatency) => {
warn!(target: LOG_TARGET, "{}", err);
if self.sync_peers.len() == 1 {
if self.sync_peers.len() <= 2 {
warn!(
target: LOG_TARGET,
"Insufficient sync peers to continue with block sync"
Expand Down Expand Up @@ -224,8 +225,10 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
let mut prev_hash = best_full_block_hash;
let mut current_block = None;
let mut last_sync_timer = Instant::now();
let mut avg_latency = RollingAverageTime::new(20);
while let Some(block) = block_stream.next().await {
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let block = block?;

let header = self
Expand Down Expand Up @@ -316,7 +319,12 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
.commit()
.await?;

sync_peer.set_latency(latency);
// Average time between receiving blocks from the peer - used to detect a slow sync peer
let last_avg_latency = avg_latency.calculate_average();
if let Some(latency) = last_avg_latency {
sync_peer.set_latency(latency);
}
// Includes time to add block to database, used to show blocks/s on status line
sync_peer.add_sample(last_sync_timer.elapsed());
self.hooks
.call_on_progress_block_hooks(block.clone(), tip_height, &sync_peer);
Expand All @@ -334,7 +342,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
block.accumulated_data().accumulated_sha_difficulty,
latency
);
if latency > max_latency {
if last_avg_latency.map(|avg| avg > max_latency).unwrap_or(false) {
return Err(BlockSyncError::MaxLatencyExceeded {
peer: sync_peer.node_id().clone(),
latency,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct BlockchainSyncConfig {
impl Default for BlockchainSyncConfig {
fn default() -> Self {
Self {
initial_max_sync_latency: Duration::from_secs(3),
initial_max_sync_latency: Duration::from_secs(10),
max_latency_increase: Duration::from_secs(2),
ban_period: Duration::from_secs(30 * 60),
short_ban_period: Duration::from_secs(60),
Expand Down
19 changes: 9 additions & 10 deletions base_layer/core/src/base_node/sync/sync_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ use std::{
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::NodeId;

use crate::{base_node::chain_metadata_service::PeerChainMetadata, common::rolling_vec::RollingVec};
use crate::{base_node::chain_metadata_service::PeerChainMetadata, common::rolling_avg::RollingAverageTime};

#[derive(Debug, Clone)]
pub struct SyncPeer {
peer_metadata: PeerChainMetadata,
samples: RollingVec<Duration>,
avg_latency: RollingAverageTime,
}

impl SyncPeer {
Expand All @@ -55,25 +55,24 @@ impl SyncPeer {
}

pub fn items_per_second(&self) -> Option<f64> {
if self.samples.is_empty() {
return None;
}

let total_time = self.samples.iter().sum::<Duration>();
Some((self.samples.len() as f64 / total_time.as_micros() as f64) * 1_000_000.0)
self.avg_latency.calc_samples_per_second()
}

pub(super) fn add_sample(&mut self, time: Duration) -> &mut Self {
self.samples.push(time);
self.avg_latency.add_sample(time);
self
}

pub fn calc_avg_latency(&self) -> Option<Duration> {
self.avg_latency.calculate_average()
}
}

impl From<PeerChainMetadata> for SyncPeer {
fn from(peer_metadata: PeerChainMetadata) -> Self {
Self {
peer_metadata,
samples: RollingVec::new(20),
avg_latency: RollingAverageTime::new(20),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
pub mod byte_counter;
pub mod hash_writer;
pub mod limited_reader;
pub mod rolling_avg;
#[cfg(feature = "base_node")]
pub mod rolling_vec;
62 changes: 62 additions & 0 deletions base_layer/core/src/common/rolling_avg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{convert::TryFrom, time::Duration};

use crate::common::rolling_vec::RollingVec;

#[derive(Debug, Clone)]
pub struct RollingAverageTime {
samples: RollingVec<Duration>,
}

impl RollingAverageTime {
pub fn new(num_samples: usize) -> Self {
Self {
samples: RollingVec::new(num_samples),
}
}

pub fn add_sample(&mut self, sample: Duration) {
self.samples.push(sample);
}

pub fn calc_samples_per_second(&self) -> Option<f64> {
if self.samples.is_empty() {
return None;
}

let total_time = self.samples.iter().sum::<Duration>();
Some((self.samples.len() as f64 / total_time.as_micros() as f64) * 1_000_000.0)
}

pub fn calculate_average(&self) -> Option<Duration> {
if self.samples.is_empty() {
return None;
}

let total_time = self.samples.iter().sum::<Duration>();
Some(Duration::from_nanos(
u64::try_from(total_time.as_nanos()).unwrap_or(u64::MAX) / self.samples.len() as u64,
))
}
}

0 comments on commit f091c25

Please sign in to comment.