Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ban peer that advertises higher PoW than able to provide #3478

Merged
17 changes: 7 additions & 10 deletions base_layer/common_types/src/chain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct ChainMetadata {
/// (exclusive). If `pruned_height` is equal to the `height_of_longest_chain` no blocks can be
/// provided. Archival nodes wil always have an `pruned_height` of zero.
pruned_height: u64,
/// The geometric mean of the proof of work of the longest chain, none if the chain is empty
/// The total accumuated proof of work of the longest chain
accumulated_difficulty: u128,
}

Expand Down Expand Up @@ -122,18 +122,15 @@ impl ChainMetadata {
}

impl Display for ChainMetadata {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), Error> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
let height = self.height_of_longest_chain;
let best_block = self.best_block.to_hex();
let accumulated_difficulty = self.accumulated_difficulty;
fmt.write_str(&format!("Height of longest chain : {}\n", height))?;
fmt.write_str(&format!(
"Geometric mean of longest chain : {}\n",
accumulated_difficulty
))?;
fmt.write_str(&format!("Best block : {}\n", best_block))?;
fmt.write_str(&format!("Pruning horizon : {}\n", self.pruning_horizon))?;
fmt.write_str(&format!("Effective pruned height : {}\n", self.pruned_height))?;
writeln!(f, "Height of longest chain : {}", height)?;
writeln!(f, "Total accumulated difficulty: {}", accumulated_difficulty)?;
writeln!(f, "Best block : {}", best_block)?;
writeln!(f, "Pruning horizon : {}", self.pruning_horizon)?;
writeln!(f, "Effective pruned height : {}", self.pruned_height)?;
Ok(())
}
}
Expand Down
12 changes: 10 additions & 2 deletions base_layer/core/src/base_node/chain_metadata_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use tokio::sync::broadcast;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerChainMetadata {
pub node_id: NodeId,
pub chain_metadata: ChainMetadata,
node_id: NodeId,
chain_metadata: ChainMetadata,
}

impl PeerChainMetadata {
Expand All @@ -41,6 +41,14 @@ impl PeerChainMetadata {
chain_metadata,
}
}

pub fn node_id(&self) -> &NodeId {
&self.node_id
}

pub fn claimed_chain_metadata(&self) -> &ChainMetadata {
&self.chain_metadata
}
}

impl Display for PeerChainMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl ChainMetadataService {
use ConnectivityEvent::*;
match event {
PeerDisconnected(node_id) | PeerBanned(node_id) => {
if let Some(pos) = self.peer_chain_metadata.iter().position(|p| p.node_id == node_id) {
if let Some(pos) = self.peer_chain_metadata.iter().position(|p| *p.node_id() == node_id) {
debug!(
target: LOG_TARGET,
"Removing disconnected/banned peer `{}` from chain metadata list ", node_id
Expand Down Expand Up @@ -251,7 +251,7 @@ impl ChainMetadataService {
if let Some(pos) = self
.peer_chain_metadata
.iter()
.position(|peer_chainstate| &peer_chainstate.node_id == node_id)
.position(|peer_chainstate| peer_chainstate.node_id() == node_id)
{
self.peer_chain_metadata.remove(pos);
}
Expand Down Expand Up @@ -284,7 +284,7 @@ impl ChainMetadataService {
if let Some(pos) = self
.peer_chain_metadata
.iter()
.position(|peer_chainstate| &peer_chainstate.node_id == node_id)
.position(|peer_chainstate| peer_chainstate.node_id() == node_id)
{
self.peer_chain_metadata.remove(pos);
}
Expand Down Expand Up @@ -411,9 +411,9 @@ mod test {
service.handle_liveness_event(&sample_event).await.unwrap();
assert_eq!(service.peer_chain_metadata.len(), 1);
let metadata = service.peer_chain_metadata.remove(0);
assert_eq!(metadata.node_id, node_id);
assert_eq!(*metadata.node_id(), node_id);
assert_eq!(
metadata.chain_metadata.height_of_longest_chain(),
metadata.claimed_chain_metadata().height_of_longest_chain(),
proto_chain_metadata.height_of_longest_chain.unwrap()
);
}
Expand Down Expand Up @@ -443,13 +443,13 @@ mod test {
assert!(service
.peer_chain_metadata
.iter()
.any(|p| &p.node_id == nodes[0].node_id()));
.any(|p| p.node_id() == nodes[0].node_id()));
service.handle_connectivity_event(ConnectivityEvent::PeerBanned(nodes[0].node_id().clone()));
// Check that banned peer was removed
assert!(service
.peer_chain_metadata
.iter()
.all(|p| &p.node_id != nodes[0].node_id()));
.all(|p| p.node_id() != nodes[0].node_id()));
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
use self::{BaseNodeState::*, StateEvent::*, SyncStatus::*};
match (state, event) {
(Starting(s), Initialized) => Listening(s.into()),
(Listening(s), InitialSync) => HeaderSync(s.into()),
(HeaderSync(_), HeadersSynchronized(conn)) => {
if self.config.pruning_horizon > 0 {
HorizonStateSync(states::HorizonStateSync::with_peer(conn))
Expand All @@ -147,6 +146,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
}
},
(HeaderSync(s), HeaderSyncFailed) => Waiting(s.into()),
(HeaderSync(s), Continue) => Listening(s.into()),
(HeaderSync(s), NetworkSilence) => Listening(s.into()),
(HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => Waiting(s.into()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl BlockSync {
let local_nci = shared.local_node_interface.clone();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
synchronizer.on_progress(move |block, remote_tip_height, sync_peers| {
synchronizer.on_progress(move |block, remote_tip_height, sync_peer| {
let local_height = block.height();
local_nci.publish_block_event(BlockEvent::ValidBlockAdded(
block.block().clone().into(),
Expand All @@ -90,7 +90,7 @@ impl BlockSync {
state_info: StateInfo::BlockSync(BlockSyncInfo {
tip_height: remote_tip_height,
local_height,
sync_peers: sync_peers.to_vec(),
sync_peers: vec![sync_peer.clone()],
}),
randomx_vm_cnt,
randomx_vm_flags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::base_node::{
Starting,
Waiting,
},
sync::SyncPeers,
sync::SyncPeer,
};
use randomx_rs::RandomXFlag;
use std::fmt::{Display, Error, Formatter};
Expand All @@ -54,7 +54,6 @@ pub enum BaseNodeState {
#[derive(Debug, Clone, PartialEq)]
pub enum StateEvent {
Initialized,
InitialSync,
HeadersSynchronized(PeerConnection),
HeaderSyncFailed,
HorizonStateSynchronized,
Expand All @@ -81,9 +80,9 @@ impl<E: std::error::Error> From<E> for StateEvent {
#[derive(Debug, Clone, PartialEq)]
pub enum SyncStatus {
// We are behind the chain tip.
Lagging(ChainMetadata, SyncPeers),
Lagging(ChainMetadata, Vec<SyncPeer>),
// We are behind the pruning horizon.
LaggingBehindHorizon(ChainMetadata, SyncPeers),
LaggingBehindHorizon(ChainMetadata, Vec<SyncPeer>),
UpToDate,
}

Expand Down Expand Up @@ -125,7 +124,6 @@ impl Display for StateEvent {
use StateEvent::*;
match self {
Initialized => f.write_str("Initialized"),
InitialSync => f.write_str("InitialSync"),
BlocksSynchronized => f.write_str("Synchronised Blocks"),
HeadersSynchronized(conn) => write!(f, "Headers Synchronized from peer `{}`", conn.peer_node_id()),
HeaderSyncFailed => f.write_str("Header Synchronization Failed"),
Expand Down Expand Up @@ -191,14 +189,7 @@ impl StateInfo {
),
HorizonSyncStatus::Finalizing => "Finalizing horizon sync".to_string(),
},
BlockSync(info) => format!(
"Syncing blocks: ({}) {}",
info.sync_peers
.first()
.map(|n| n.short_str())
.unwrap_or_else(|| "".to_string()),
info.sync_progress_string()
),
BlockSync(info) => format!("Syncing blocks: {}", info.sync_progress_string()),
Listening(_) => "Listening".to_string(),
BlockSyncStarting => "Starting block sync".to_string(),
}
Expand Down Expand Up @@ -276,7 +267,7 @@ pub struct BlockSyncInfo {
}

impl BlockSyncInfo {
/// Creates a new blockSyncInfo
/// Creates a new BlockSyncInfo
pub fn new(tip_height: u64, local_height: u64, sync_peers: Vec<NodeId>) -> BlockSyncInfo {
BlockSyncInfo {
tip_height,
Expand All @@ -287,7 +278,12 @@ impl BlockSyncInfo {

pub fn sync_progress_string(&self) -> String {
format!(
"{}/{} ({:.0}%)",
"({}) {}/{} ({:.0}%)",
self.sync_peers
.iter()
.map(|n| n.short_str())
.collect::<Vec<_>>()
.join(", "),
self.local_height,
self.tip_height,
(self.local_height as f64 / self.tip_height as f64 * 100.0)
Expand All @@ -297,10 +293,6 @@ impl BlockSyncInfo {

impl Display for BlockSyncInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
writeln!(f, "Syncing from the following peers:")?;
for peer in &self.sync_peers {
writeln!(f, "{}", peer)?;
}
writeln!(f, "Syncing {}", self.sync_progress_string())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,24 @@ use crate::{
base_node::{
comms_interface::BlockEvent,
state_machine_service::states::{BlockSyncInfo, Listening, StateEvent, StateInfo, StatusInfo},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeers},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer},
BaseNodeStateMachine,
},
chain_storage::BlockchainBackend,
};
use log::*;
use std::time::Instant;
use tari_comms::peer_manager::NodeId;

const LOG_TARGET: &str = "c::bn::header_sync";

#[derive(Clone, Debug, Default)]
pub struct HeaderSync {
sync_peers: Vec<NodeId>,
sync_peers: Vec<SyncPeer>,
is_synced: bool,
}

impl HeaderSync {
pub fn new(sync_peers: Vec<NodeId>) -> Self {
pub fn new(sync_peers: Vec<SyncPeer>) -> Self {
Self {
sync_peers,
is_synced: false,
Expand All @@ -57,34 +56,38 @@ impl HeaderSync {
&mut self,
shared: &mut BaseNodeStateMachine<B>,
) -> StateEvent {
let sync_peers = if self.sync_peers.is_empty() {
&shared.config.block_sync_config.sync_peers
} else {
&self.sync_peers
};

let mut synchronizer = HeaderSynchronizer::new(
shared.config.block_sync_config.clone(),
shared.db.clone(),
shared.consensus_rules.clone(),
shared.connectivity.clone(),
sync_peers,
&self.sync_peers,
shared.randomx_factory.clone(),
);

let status_event_sender = shared.status_event_sender.clone();
let bootstrapped = shared.is_bootstrapped();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
synchronizer.on_progress(move |details, sync_peers| {
let details = details.map(|(current_height, remote_tip_height)| BlockSyncInfo {
synchronizer.on_starting(move || {
let _ = status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::HeaderSync(None),
randomx_vm_cnt,
randomx_vm_flags,
});
});

let status_event_sender = shared.status_event_sender.clone();
synchronizer.on_progress(move |current_height, remote_tip_height, sync_peer| {
let details = BlockSyncInfo {
tip_height: remote_tip_height,
local_height: current_height,
sync_peers: sync_peers.to_vec(),
});
sync_peers: vec![sync_peer.clone()],
};
let _ = status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::HeaderSync(details),
state_info: StateInfo::HeaderSync(Some(details)),
randomx_vm_cnt,
randomx_vm_flags,
});
Expand All @@ -102,6 +105,10 @@ impl HeaderSync {
self.is_synced = true;
StateEvent::HeadersSynchronized(sync_peer)
},
Err(err @ BlockHeaderSyncError::SyncFailedAllPeers) => {
warn!(target: LOG_TARGET, "{}. Continuing...", err);
StateEvent::Continue
},
Err(err @ BlockHeaderSyncError::NetworkSilence) => {
warn!(target: LOG_TARGET, "{}", err);
self.is_synced = true;
Expand All @@ -120,8 +127,8 @@ impl From<Listening> for HeaderSync {
Default::default()
}
}
impl From<SyncPeers> for HeaderSync {
fn from(peers: SyncPeers) -> Self {
Self::new(peers.into_iter().map(|p| p.node_id).collect())
impl From<Vec<SyncPeer>> for HeaderSync {
fn from(peers: Vec<SyncPeer>) -> Self {
Self::new(peers)
}
}
Loading