Skip to content

Commit

Permalink
fix: ban peer that advertises higher PoW than able to provide
Browse files Browse the repository at this point in the history
- Can only transition to `HeaderSync` if claimed chain metadata is
  advertised
- `HeaderSync` is now aware of the claimed `ChainMetadata`
- `HeaderSync` now assumes (invariant) that all sync peers have
  claimed a higher accumulated PoW and will ban them if the validated
  accumulated difficulty does not reach the claimed acc diff.
- Adds ban condition in `determine_sync_status` phase, if a peer is not
  able to improve on the local chain strength (because we know that in
  order to be selected for header sync it must have advertised a
  stronger chain)
- Adds ban condition if header sync completes but is less than the
  claimed PoW. This is not strictly necessary since they were still able
  to provide a stronger chain as per Nakamoto consensus, but could still
  indicate some malicious intent.
- If sync fails for all peers, the state machine continues rather than
  `WAITING`. This removes the disruption that false metadata can cause.
- fix `select_sync_peers` to include peers claim that provide a enough full
  blocks for _our_ pruning horison (fixes cucumber test)
  higher than the local pruned
  • Loading branch information
sdbondi committed Oct 20, 2021
1 parent 5d7fb20 commit d124e02
Show file tree
Hide file tree
Showing 23 changed files with 385 additions and 478 deletions.
17 changes: 7 additions & 10 deletions base_layer/common_types/src/chain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct ChainMetadata {
/// (exclusive). If `pruned_height` is equal to the `height_of_longest_chain` no blocks can be
/// provided. Archival nodes wil always have an `pruned_height` of zero.
pruned_height: u64,
/// The geometric mean of the proof of work of the longest chain, none if the chain is empty
/// The total accumuated proof of work of the longest chain
accumulated_difficulty: u128,
}

Expand Down Expand Up @@ -122,18 +122,15 @@ impl ChainMetadata {
}

impl Display for ChainMetadata {
fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), Error> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
let height = self.height_of_longest_chain;
let best_block = self.best_block.to_hex();
let accumulated_difficulty = self.accumulated_difficulty;
fmt.write_str(&format!("Height of longest chain : {}\n", height))?;
fmt.write_str(&format!(
"Geometric mean of longest chain : {}\n",
accumulated_difficulty
))?;
fmt.write_str(&format!("Best block : {}\n", best_block))?;
fmt.write_str(&format!("Pruning horizon : {}\n", self.pruning_horizon))?;
fmt.write_str(&format!("Effective pruned height : {}\n", self.pruned_height))?;
writeln!(f, "Height of longest chain : {}", height)?;
writeln!(f, "Total accumulated difficulty: {}", accumulated_difficulty)?;
writeln!(f, "Best block : {}", best_block)?;
writeln!(f, "Pruning horizon : {}", self.pruning_horizon)?;
writeln!(f, "Effective pruned height : {}", self.pruned_height)?;
Ok(())
}
}
Expand Down
12 changes: 10 additions & 2 deletions base_layer/core/src/base_node/chain_metadata_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use tokio::sync::broadcast;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerChainMetadata {
pub node_id: NodeId,
pub chain_metadata: ChainMetadata,
node_id: NodeId,
chain_metadata: ChainMetadata,
}

impl PeerChainMetadata {
Expand All @@ -41,6 +41,14 @@ impl PeerChainMetadata {
chain_metadata,
}
}

pub fn node_id(&self) -> &NodeId {
&self.node_id
}

pub fn claimed_chain_metadata(&self) -> &ChainMetadata {
&self.chain_metadata
}
}

impl Display for PeerChainMetadata {
Expand Down
14 changes: 7 additions & 7 deletions base_layer/core/src/base_node/chain_metadata_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl ChainMetadataService {
use ConnectivityEvent::*;
match event {
PeerDisconnected(node_id) | PeerBanned(node_id) => {
if let Some(pos) = self.peer_chain_metadata.iter().position(|p| p.node_id == node_id) {
if let Some(pos) = self.peer_chain_metadata.iter().position(|p| *p.node_id() == node_id) {
debug!(
target: LOG_TARGET,
"Removing disconnected/banned peer `{}` from chain metadata list ", node_id
Expand Down Expand Up @@ -251,7 +251,7 @@ impl ChainMetadataService {
if let Some(pos) = self
.peer_chain_metadata
.iter()
.position(|peer_chainstate| &peer_chainstate.node_id == node_id)
.position(|peer_chainstate| peer_chainstate.node_id() == node_id)
{
self.peer_chain_metadata.remove(pos);
}
Expand Down Expand Up @@ -284,7 +284,7 @@ impl ChainMetadataService {
if let Some(pos) = self
.peer_chain_metadata
.iter()
.position(|peer_chainstate| &peer_chainstate.node_id == node_id)
.position(|peer_chainstate| peer_chainstate.node_id() == node_id)
{
self.peer_chain_metadata.remove(pos);
}
Expand Down Expand Up @@ -411,9 +411,9 @@ mod test {
service.handle_liveness_event(&sample_event).await.unwrap();
assert_eq!(service.peer_chain_metadata.len(), 1);
let metadata = service.peer_chain_metadata.remove(0);
assert_eq!(metadata.node_id, node_id);
assert_eq!(*metadata.node_id(), node_id);
assert_eq!(
metadata.chain_metadata.height_of_longest_chain(),
metadata.claimed_chain_metadata().height_of_longest_chain(),
proto_chain_metadata.height_of_longest_chain.unwrap()
);
}
Expand Down Expand Up @@ -443,13 +443,13 @@ mod test {
assert!(service
.peer_chain_metadata
.iter()
.any(|p| &p.node_id == nodes[0].node_id()));
.any(|p| p.node_id() == nodes[0].node_id()));
service.handle_connectivity_event(ConnectivityEvent::PeerBanned(nodes[0].node_id().clone()));
// Check that banned peer was removed
assert!(service
.peer_chain_metadata
.iter()
.all(|p| &p.node_id != nodes[0].node_id()));
.all(|p| p.node_id() != nodes[0].node_id()));
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
use self::{BaseNodeState::*, StateEvent::*, SyncStatus::*};
match (state, event) {
(Starting(s), Initialized) => Listening(s.into()),
(Listening(s), InitialSync) => HeaderSync(s.into()),
(HeaderSync(_), HeadersSynchronized(conn)) => {
if self.config.pruning_horizon > 0 {
HorizonStateSync(states::HorizonStateSync::with_peer(conn))
Expand All @@ -147,6 +146,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
}
},
(HeaderSync(s), HeaderSyncFailed) => Waiting(s.into()),
(HeaderSync(s), Continue) => Listening(s.into()),
(HeaderSync(s), NetworkSilence) => Listening(s.into()),
(HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()),
(HorizonStateSync(s), HorizonStateSyncFailure) => Waiting(s.into()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl BlockSync {
let local_nci = shared.local_node_interface.clone();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
synchronizer.on_progress(move |block, remote_tip_height, sync_peers| {
synchronizer.on_progress(move |block, remote_tip_height, sync_peer| {
let local_height = block.height();
local_nci.publish_block_event(BlockEvent::ValidBlockAdded(
block.block().clone().into(),
Expand All @@ -90,7 +90,7 @@ impl BlockSync {
state_info: StateInfo::BlockSync(BlockSyncInfo {
tip_height: remote_tip_height,
local_height,
sync_peers: sync_peers.to_vec(),
sync_peers: vec![sync_peer.clone()],
}),
randomx_vm_cnt,
randomx_vm_flags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::base_node::{
Starting,
Waiting,
},
sync::SyncPeers,
sync::SyncPeer,
};
use randomx_rs::RandomXFlag;
use std::fmt::{Display, Error, Formatter};
Expand All @@ -54,7 +54,6 @@ pub enum BaseNodeState {
#[derive(Debug, Clone, PartialEq)]
pub enum StateEvent {
Initialized,
InitialSync,
HeadersSynchronized(PeerConnection),
HeaderSyncFailed,
HorizonStateSynchronized,
Expand All @@ -81,9 +80,9 @@ impl<E: std::error::Error> From<E> for StateEvent {
#[derive(Debug, Clone, PartialEq)]
pub enum SyncStatus {
// We are behind the chain tip.
Lagging(ChainMetadata, SyncPeers),
Lagging(ChainMetadata, Vec<SyncPeer>),
// We are behind the pruning horizon.
LaggingBehindHorizon(ChainMetadata, SyncPeers),
LaggingBehindHorizon(ChainMetadata, Vec<SyncPeer>),
UpToDate,
}

Expand Down Expand Up @@ -125,7 +124,6 @@ impl Display for StateEvent {
use StateEvent::*;
match self {
Initialized => f.write_str("Initialized"),
InitialSync => f.write_str("InitialSync"),
BlocksSynchronized => f.write_str("Synchronised Blocks"),
HeadersSynchronized(conn) => write!(f, "Headers Synchronized from peer `{}`", conn.peer_node_id()),
HeaderSyncFailed => f.write_str("Header Synchronization Failed"),
Expand Down Expand Up @@ -191,14 +189,7 @@ impl StateInfo {
),
HorizonSyncStatus::Finalizing => "Finalizing horizon sync".to_string(),
},
BlockSync(info) => format!(
"Syncing blocks: ({}) {}",
info.sync_peers
.first()
.map(|n| n.short_str())
.unwrap_or_else(|| "".to_string()),
info.sync_progress_string()
),
BlockSync(info) => format!("Syncing blocks: {}", info.sync_progress_string()),
Listening(_) => "Listening".to_string(),
BlockSyncStarting => "Starting block sync".to_string(),
}
Expand Down Expand Up @@ -276,7 +267,7 @@ pub struct BlockSyncInfo {
}

impl BlockSyncInfo {
/// Creates a new blockSyncInfo
/// Creates a new BlockSyncInfo
pub fn new(tip_height: u64, local_height: u64, sync_peers: Vec<NodeId>) -> BlockSyncInfo {
BlockSyncInfo {
tip_height,
Expand All @@ -287,7 +278,12 @@ impl BlockSyncInfo {

pub fn sync_progress_string(&self) -> String {
format!(
"{}/{} ({:.0}%)",
"({}) {}/{} ({:.0}%)",
self.sync_peers
.iter()
.map(|n| n.short_str())
.collect::<Vec<_>>()
.join(", "),
self.local_height,
self.tip_height,
(self.local_height as f64 / self.tip_height as f64 * 100.0)
Expand All @@ -297,10 +293,6 @@ impl BlockSyncInfo {

impl Display for BlockSyncInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
writeln!(f, "Syncing from the following peers:")?;
for peer in &self.sync_peers {
writeln!(f, "{}", peer)?;
}
writeln!(f, "Syncing {}", self.sync_progress_string())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,24 @@ use crate::{
base_node::{
comms_interface::BlockEvent,
state_machine_service::states::{BlockSyncInfo, Listening, StateEvent, StateInfo, StatusInfo},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeers},
sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer},
BaseNodeStateMachine,
},
chain_storage::BlockchainBackend,
};
use log::*;
use std::time::Instant;
use tari_comms::peer_manager::NodeId;

const LOG_TARGET: &str = "c::bn::header_sync";

#[derive(Clone, Debug, Default)]
pub struct HeaderSync {
sync_peers: Vec<NodeId>,
sync_peers: Vec<SyncPeer>,
is_synced: bool,
}

impl HeaderSync {
pub fn new(sync_peers: Vec<NodeId>) -> Self {
pub fn new(sync_peers: Vec<SyncPeer>) -> Self {
Self {
sync_peers,
is_synced: false,
Expand All @@ -57,34 +56,38 @@ impl HeaderSync {
&mut self,
shared: &mut BaseNodeStateMachine<B>,
) -> StateEvent {
let sync_peers = if self.sync_peers.is_empty() {
&shared.config.block_sync_config.sync_peers
} else {
&self.sync_peers
};

let mut synchronizer = HeaderSynchronizer::new(
shared.config.block_sync_config.clone(),
shared.db.clone(),
shared.consensus_rules.clone(),
shared.connectivity.clone(),
sync_peers,
&self.sync_peers,
shared.randomx_factory.clone(),
);

let status_event_sender = shared.status_event_sender.clone();
let bootstrapped = shared.is_bootstrapped();
let randomx_vm_cnt = shared.get_randomx_vm_cnt();
let randomx_vm_flags = shared.get_randomx_vm_flags();
synchronizer.on_progress(move |details, sync_peers| {
let details = details.map(|(current_height, remote_tip_height)| BlockSyncInfo {
synchronizer.on_starting(move || {
let _ = status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::HeaderSync(None),
randomx_vm_cnt,
randomx_vm_flags,
});
});

let status_event_sender = shared.status_event_sender.clone();
synchronizer.on_progress(move |current_height, remote_tip_height, sync_peer| {
let details = BlockSyncInfo {
tip_height: remote_tip_height,
local_height: current_height,
sync_peers: sync_peers.to_vec(),
});
sync_peers: vec![sync_peer.clone()],
};
let _ = status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::HeaderSync(details),
state_info: StateInfo::HeaderSync(Some(details)),
randomx_vm_cnt,
randomx_vm_flags,
});
Expand All @@ -102,6 +105,10 @@ impl HeaderSync {
self.is_synced = true;
StateEvent::HeadersSynchronized(sync_peer)
},
Err(err @ BlockHeaderSyncError::SyncFailedAllPeers) => {
warn!(target: LOG_TARGET, "{}. Continuing...", err);
StateEvent::Continue
},
Err(err @ BlockHeaderSyncError::NetworkSilence) => {
warn!(target: LOG_TARGET, "{}", err);
self.is_synced = true;
Expand All @@ -120,8 +127,8 @@ impl From<Listening> for HeaderSync {
Default::default()
}
}
impl From<SyncPeers> for HeaderSync {
fn from(peers: SyncPeers) -> Self {
Self::new(peers.into_iter().map(|p| p.node_id).collect())
impl From<Vec<SyncPeer>> for HeaderSync {
fn from(peers: Vec<SyncPeer>) -> Self {
Self::new(peers)
}
}
Loading

0 comments on commit d124e02

Please sign in to comment.