diff --git a/Cargo.lock b/Cargo.lock index 513a18f56c2..fa07d1578ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5710,6 +5710,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tower", "tracing", "uint", "zeroize", diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index acebad629a2..4578ee60860 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -153,7 +153,7 @@ impl BaseNodeStateMachine { db.set_disable_add_block_flag(); HeaderSync(HeaderSyncState::new(sync_peers, local_metadata)) }, - (HeaderSync(s), HeaderSyncFailed) => { + (HeaderSync(s), HeaderSyncFailed(_err)) => { db.clear_disable_add_block_flag(); Waiting(s.into()) }, @@ -161,7 +161,7 @@ impl BaseNodeStateMachine { db.clear_disable_add_block_flag(); Listening(s.into()) }, - (HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()), + (HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()), (DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()), (DecideNextSync(s), Continue) => { diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index 2fea428f714..5f0ef9c9f55 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -37,7 +37,7 @@ use crate::base_node::{ Starting, Waiting, }, - sync::{HorizonSyncInfo, SyncPeer}, + sync::{AttemptSyncResult, HorizonSyncInfo, SyncPeer}, }; #[derive(Debug)] @@ -57,8 +57,8 @@ pub enum BaseNodeState { #[derive(Debug, Clone, PartialEq)] pub enum StateEvent { Initialized, - HeadersSynchronized(SyncPeer), - HeaderSyncFailed, + HeadersSynchronized(SyncPeer, AttemptSyncResult), + HeaderSyncFailed(String), ProceedToHorizonSync(Vec), ProceedToBlockSync(Vec), HorizonStateSynchronized, @@ -145,8 +145,8 @@ impl Display for StateEvent { match self { Initialized => write!(f, "Initialized"), BlocksSynchronized => write!(f, "Synchronised Blocks"), - HeadersSynchronized(peer) => write!(f, "Headers Synchronized from peer `{}`", peer), - HeaderSyncFailed => write!(f, "Header Synchronization Failed"), + HeadersSynchronized(peer, result) => write!(f, "Headers Synchronized from peer `{}` ({:?})", peer, result), + HeaderSyncFailed(err) => write!(f, "Header Synchronization Failed ({})", err), ProceedToHorizonSync(_) => write!(f, "Proceed to horizon sync"), ProceedToBlockSync(_) => write!(f, "Proceed to block sync"), HorizonStateSynchronized => write!(f, "Horizon State Synchronized"), diff --git a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs index 1c51066ea19..d13b19198e6 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs @@ -24,6 +24,7 @@ use std::{cmp::Ordering, time::Instant}; use log::*; use tari_common_types::chain_metadata::ChainMetadata; +use tari_comms::peer_manager::NodeId; use crate::{ base_node::{ @@ -35,7 +36,6 @@ use crate::{ }, chain_storage::BlockchainBackend, }; - const LOG_TARGET: &str = "c::bn::header_sync"; #[derive(Clone, Debug)] @@ -70,12 +70,42 @@ impl HeaderSyncState { self.sync_peers } + fn remove_sync_peer(&mut self, node_id: &NodeId) { + if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) { + self.sync_peers.remove(pos); + } + } + // converting u64 to i64 is okay as the future time limit is the hundreds so way below u32 even + #[allow(clippy::too_many_lines)] #[allow(clippy::cast_possible_wrap)] pub async fn next_event( &mut self, shared: &mut BaseNodeStateMachine, ) -> StateEvent { + // Only sync to peers with better claimed accumulated difficulty than the local chain: this may be possible + // at this stage due to read-write lock race conditions in the database + match shared.db.get_chain_metadata().await { + Ok(best_block_metadata) => { + let mut remove = Vec::new(); + for sync_peer in &self.sync_peers { + if sync_peer.claimed_chain_metadata().accumulated_difficulty() <= + best_block_metadata.accumulated_difficulty() + { + remove.push(sync_peer.node_id().clone()); + } + } + for node_id in remove { + self.remove_sync_peer(&node_id); + } + if self.sync_peers.is_empty() { + // Go back to Listening state + return StateEvent::Continue; + } + }, + Err(e) => return StateEvent::FatalError(format!("{}", e)), + } + let mut synchronizer = HeaderSynchronizer::new( shared.config.blockchain_sync_config.clone(), shared.db.clone(), @@ -128,7 +158,7 @@ impl HeaderSyncState { let mut mdc = vec![]; log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned()))); match synchronizer.synchronize().await { - Ok(sync_peer) => { + Ok((sync_peer, sync_result)) => { log_mdc::extend(mdc); info!( target: LOG_TARGET, @@ -144,9 +174,10 @@ impl HeaderSyncState { } } self.is_synced = true; - StateEvent::HeadersSynchronized(sync_peer) + StateEvent::HeadersSynchronized(sync_peer, sync_result) }, Err(err) => { + println!("HeaderSyncState::next_event - {}", err); let _ignore = shared.status_event_sender.send(StatusInfo { bootstrapped, state_info: StateInfo::SyncFailed("HeaderSyncFailed".to_string()), @@ -163,7 +194,7 @@ impl HeaderSyncState { _ => { log_mdc::extend(mdc); debug!(target: LOG_TARGET, "Header sync failed: {}", err); - StateEvent::HeaderSyncFailed + StateEvent::HeaderSyncFailed(err.to_string()) }, } }, diff --git a/base_layer/core/src/base_node/sync/header_sync/mod.rs b/base_layer/core/src/base_node/sync/header_sync/mod.rs index d71c092a525..da4f5676645 100644 --- a/base_layer/core/src/base_node/sync/header_sync/mod.rs +++ b/base_layer/core/src/base_node/sync/header_sync/mod.rs @@ -26,4 +26,4 @@ pub use error::BlockHeaderSyncError; mod validator; mod synchronizer; -pub use synchronizer::HeaderSynchronizer; +pub use synchronizer::{AttemptSyncResult, HeaderSyncStatus, HeaderSynchronizer}; diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index c29abc3ff2a..c4f395be946 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -40,13 +40,14 @@ use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError}; use crate::{ base_node::sync::{ban::PeerBanManager, hooks::Hooks, rpc, BlockchainSyncConfig, SyncPeer}, blocks::{BlockHeader, ChainBlock, ChainHeader}, - chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, + chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError}, common::rolling_avg::RollingAverageTime, consensus::ConsensusManager, proof_of_work::randomx_factory::RandomXFactory, proto::{ base_node as proto, base_node::{FindChainSplitRequest, SyncHeadersRequest}, + core::BlockHeader as ProtoBlockHeader, }, }; @@ -63,7 +64,7 @@ pub struct HeaderSynchronizer<'a, B> { connectivity: ConnectivityRequester, sync_peers: &'a mut Vec, hooks: Hooks, - local_metadata: &'a ChainMetadata, + local_cached_metadata: &'a ChainMetadata, peer_ban_manager: PeerBanManager, } @@ -85,7 +86,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { connectivity, sync_peers, hooks: Default::default(), - local_metadata, + local_cached_metadata: local_metadata, peer_ban_manager, } } @@ -105,7 +106,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { self.hooks.add_on_rewind_hook(hook); } - pub async fn synchronize(&mut self) -> Result { + pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> { debug!(target: LOG_TARGET, "Starting header sync.",); info!( @@ -117,7 +118,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { let mut latency_increases_counter = 0; loop { match self.try_sync_from_all_peers(max_latency).await { - Ok(sync_peer) => break Ok(sync_peer), + Ok((peer, sync_result)) => break Ok((peer, sync_result)), Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => { // If we have few sync peers, throw this out to be retried later if self.sync_peers.len() < 2 { @@ -135,7 +136,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } #[allow(clippy::too_many_lines)] - pub async fn try_sync_from_all_peers(&mut self, max_latency: Duration) -> Result { + pub async fn try_sync_from_all_peers( + &mut self, + max_latency: Duration, + ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> { let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::>(); info!( target: LOG_TARGET, @@ -145,7 +149,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { let mut latency_counter = 0usize; for node_id in sync_peer_node_ids { match self.connect_and_attempt_sync(&node_id, max_latency).await { - Ok(peer) => return Ok(peer), + Ok((peer, sync_result)) => return Ok((peer, sync_result)), Err(err) => { let ban_reason = BlockHeaderSyncError::get_ban_reason( &err, @@ -180,7 +184,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { &mut self, node_id: &NodeId, max_latency: Duration, - ) -> Result { + ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> { let peer_index = self .get_sync_peer_index(node_id) .ok_or(BlockHeaderSyncError::PeerNotFound)?; @@ -214,8 +218,8 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency); let sync_peer = self.sync_peers[peer_index].clone(); - self.attempt_sync(&sync_peer, client, max_latency).await?; - Ok(sync_peer) + let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?; + Ok((sync_peer, sync_result)) } async fn dial_sync_peer(&self, node_id: &NodeId) -> Result { @@ -236,7 +240,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { sync_peer: &SyncPeer, mut client: rpc::BaseNodeSyncRpcClient, max_latency: Duration, - ) -> Result<(), BlockHeaderSyncError> { + ) -> Result { let latency = client.get_last_request_latency(); debug!( target: LOG_TARGET, @@ -245,59 +249,69 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { latency.unwrap_or_default().as_millis() ); - // Fetch the local tip header at the beginning of the sync process - let local_tip_header = self.db.fetch_last_chain_header().await?; - let local_total_accumulated_difficulty = local_tip_header.accumulated_data().total_accumulated_difficulty; - let header_tip_height = local_tip_header.height(); - let sync_status = self - .determine_sync_status(sync_peer, local_tip_header, &mut client) + // Fetch best local data at the beginning of the sync process + let best_block_metadata = self.db.get_chain_metadata().await?; + let best_header = self.db.fetch_last_chain_header().await?; + let best_block = self + .db + .fetch_chain_header(best_block_metadata.height_of_longest_chain()) + .await?; + let best_header_height = best_header.height(); + let best_block_height = best_block.height(); + + if best_header_height < best_block_height || + best_block_height < self.local_cached_metadata.height_of_longest_chain() + { + return Err(BlockHeaderSyncError::ChainStorageError( + ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()), + )); + } + + let peer_response = self + .find_chain_split(sync_peer.node_id(), &mut client, NUM_INITIAL_HEADERS_TO_REQUEST as u64) .await?; - match sync_status { - SyncStatus::InSync | SyncStatus::WereAhead => { - let metadata = self.db.get_chain_metadata().await?; - if metadata.height_of_longest_chain() < header_tip_height { + let header_sync_status = self + .determine_sync_status(&sync_peer.to_string(), best_header, best_block, peer_response.clone()) + .await?; + + match header_sync_status.clone() { + HeaderSyncStatus::InSyncOrAhead => { + if best_block_height < best_header_height { debug!( target: LOG_TARGET, "Headers are in sync at height {} but tip is {}. Proceeding to archival/pruned block sync", - header_tip_height, - metadata.height_of_longest_chain() + best_header_height, + best_block_height ); - Ok(()) + + Ok(AttemptSyncResult { + headers_returned: peer_response.headers.len() as u64, + fork_hash_index: peer_response.fork_hash_index, + header_sync_status, + }) } else { - // Check if the metadata that we had when we decided to enter header sync is behind the peer's - // claimed one. If so, our chain has updated in the meantime and the sync peer - // is behaving. - if self.local_metadata.accumulated_difficulty() <= - sync_peer.claimed_chain_metadata().accumulated_difficulty() - { - debug!( - target: LOG_TARGET, - "Local blockchain received a better block through propagation at height {} (was: {}). \ - Proceeding to archival/pruned block sync", - metadata.height_of_longest_chain(), - self.local_metadata.height_of_longest_chain() - ); - return Ok(()); - } - debug!( + // We will only attempt sync if the our accumulated difficulty is less than the peer's claimed + // accumulated difficulty, thus this is adverse behaviour form the peer. + warn!( target: LOG_TARGET, "Headers and block state are already in-sync (Header Tip: {}, Block tip: {}, Peer's height: \ - {})", - header_tip_height, - metadata.height_of_longest_chain(), + {}), peer has lied about chain metadata or did not want to provide headers", + best_header_height, + best_block_height, sync_peer.claimed_chain_metadata().height_of_longest_chain(), ); + Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata { claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(), actual: None, - local: local_total_accumulated_difficulty, + local: self.local_cached_metadata.accumulated_difficulty(), }) } }, - SyncStatus::Lagging(split_info) => { + HeaderSyncStatus::Lagging(split_info) => { self.hooks.call_on_progress_header_hooks( split_info - .local_tip_header + .best_block .height() .checked_sub(split_info.reorg_steps_back) .unwrap_or_default(), @@ -306,7 +320,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { ); self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency) .await?; - Ok(()) + Ok(AttemptSyncResult { + headers_returned: peer_response.headers.len() as u64, + fork_hash_index: peer_response.fork_hash_index, + header_sync_status, + }) }, } } @@ -316,7 +334,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { peer: &NodeId, client: &mut rpc::BaseNodeSyncRpcClient, header_count: u64, - ) -> Result<(proto::FindChainSplitResponse, Vec, u64), BlockHeaderSyncError> { + ) -> Result { const NUM_CHAIN_SPLIT_HEADERS: usize = 500; // Limit how far back we're willing to go. A peer might just say it does not have a chain split // and keep us busy going back until the genesis. @@ -380,69 +398,76 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { )); } - let steps_back = resp.fork_hash_index.saturating_add(offset as u64); - return Ok((resp, block_hashes, steps_back)); + let reorg_steps_back = resp.fork_hash_index.saturating_add(offset as u64); + let proto::FindChainSplitResponse { + headers, + fork_hash_index, + tip_height: remote_tip_height, + } = resp; + return Ok(PeerChainSplitResponse { + block_hashes, + reorg_steps_back, + headers, + fork_hash_index, + remote_tip_height, + }); } } /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information - /// of the chain split (see [SyncStatus]). + /// of the chain split (see [HeaderSyncStatus]). /// - /// If the local node is behind the remote chain (i.e. `SyncStatus::Lagging`), the appropriate `ChainSplitInfo` is - /// returned, the header validator is initialized and the preliminary headers are validated. + /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate + /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated. async fn determine_sync_status( &mut self, - sync_peer: &SyncPeer, - local_tip_header: ChainHeader, - client: &mut rpc::BaseNodeSyncRpcClient, - ) -> Result { - let (resp, block_hashes, steps_back) = self - .find_chain_split(sync_peer.node_id(), client, NUM_INITIAL_HEADERS_TO_REQUEST as u64) - .await?; - let proto::FindChainSplitResponse { - headers, - fork_hash_index, - tip_height: remote_tip_height, - } = resp; - - if steps_back > 0 { + sync_peer: &str, + best_header: ChainHeader, + best_block: ChainHeader, + peer_response: PeerChainSplitResponse, + ) -> Result { + if peer_response.reorg_steps_back > 0 { debug!( target: LOG_TARGET, "Found chain split {} blocks back, received {} headers from peer `{}`", - steps_back, - headers.len(), + peer_response.reorg_steps_back, + peer_response.headers.len(), sync_peer ); } - // If the peer returned no new headers, this means header sync is done. - if headers.is_empty() { - if fork_hash_index > 0 { + // If the peer returned no new headers, they have no headers, but may still have more blocks than we have, + // thus have a higher accumulated difficulty. + if peer_response.headers.is_empty() { + if peer_response.fork_hash_index > 0 { debug!( target: LOG_TARGET, - "Peer `{}` has sent no headers but forked_hash_index is {}. The peer is behind our chain.", + "Peer `{}` has sent no headers with forked_hash_index {}. The peer has less headers than we have.", sync_peer, - fork_hash_index + peer_response.fork_hash_index ); - - return Ok(SyncStatus::WereAhead); + } else { + debug!(target: LOG_TARGET, "Headers already in sync with peer `{}`.", sync_peer); } - debug!(target: LOG_TARGET, "Already in sync with peer `{}`.", sync_peer); - return Ok(SyncStatus::InSync); + return Ok(HeaderSyncStatus::InSyncOrAhead); } - let headers = headers + let headers = peer_response + .headers .into_iter() .map(BlockHeader::try_from) .collect::, _>>() .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?; - let num_new_headers = headers.len(); + let num_new_headers = headers.len(); // Required fro later use, no 'Copy' trait on 'BlockHeader' // NOTE: We can trust that the header associated with this hash exists because `block_hashes` was supplied by // this node. Bounds checking for fork_hash_index has been done above. #[allow(clippy::cast_possible_truncation)] - let chain_split_hash = block_hashes.get(fork_hash_index as usize).unwrap(); + let chain_split_hash = peer_response + .block_hashes + .get(peer_response.fork_hash_index as usize) + .unwrap(); self.header_validator.initialize_state(chain_split_hash).await?; for header in headers { @@ -462,8 +487,8 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { ); // Basic sanity check that the peer sent tip height greater than the split. - let split_height = local_tip_header.height().saturating_sub(steps_back); - if remote_tip_height < split_height { + let split_height = best_header.height().saturating_sub(peer_response.reorg_steps_back); + if peer_response.remote_tip_height < split_height { return Err(BlockHeaderSyncError::InvalidProtocolResponse(format!( "Peer {} sent invalid remote tip height", sync_peer @@ -471,12 +496,12 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } let chain_split_info = ChainSplitInfo { - local_tip_header, - remote_tip_height, - reorg_steps_back: steps_back, + best_block, + remote_tip_height: peer_response.remote_tip_height, + reorg_steps_back: peer_response.reorg_steps_back, chain_split_hash: *chain_split_hash, }; - Ok(SyncStatus::Lagging(Box::new(chain_split_info))) + Ok(HeaderSyncStatus::Lagging(Box::new(chain_split_info))) } async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result>, BlockHeaderSyncError> { @@ -520,7 +545,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { // If we already have a stronger chain at this point, switch over to it. // just in case we happen to be exactly NUM_INITIAL_HEADERS_TO_REQUEST headers behind. - let has_better_pow = self.pending_chain_has_higher_pow(&split_info.local_tip_header); + let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block); if has_better_pow { debug!( @@ -541,10 +566,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata { claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(), actual: Some(total_accumulated_difficulty), - local: split_info - .local_tip_header - .accumulated_data() - .total_accumulated_difficulty, + local: split_info.best_block.accumulated_data().total_accumulated_difficulty, }); } // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on @@ -625,7 +647,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { // The remote chain has not (yet) been accepted. // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is // achieved. - if self.pending_chain_has_higher_pow(&split_info.local_tip_header) { + if self.pending_chain_has_higher_pow(&split_info.best_block) { self.switch_to_pending_chain(&split_info).await?; has_switched_to_new_chain = true; } @@ -665,10 +687,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { .header_validator .current_valid_chain_tip_header() .map(|h| h.accumulated_data().total_accumulated_difficulty), - local: split_info - .local_tip_header - .accumulated_data() - .total_accumulated_difficulty, + local: split_info.best_block.accumulated_data().total_accumulated_difficulty, }); } else { warn!( @@ -677,7 +696,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { swapped. Ignoring", sync_peer.claimed_chain_metadata().accumulated_difficulty(), split_info - .local_tip_header + .best_block .accumulated_data() .total_accumulated_difficulty ); @@ -697,10 +716,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata { claimed: claimed_total_accumulated_diff, actual: Some(last_total_accumulated_difficulty), - local: split_info - .local_tip_header - .accumulated_data() - .total_accumulated_difficulty, + local: split_info.best_block.accumulated_data().total_accumulated_difficulty, }); } @@ -781,18 +797,43 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } } -struct ChainSplitInfo { - local_tip_header: ChainHeader, - remote_tip_height: u64, +#[derive(Debug, Clone)] +struct PeerChainSplitResponse { + block_hashes: Vec, reorg_steps_back: u64, - chain_split_hash: HashOutput, + headers: Vec, + fork_hash_index: u64, + remote_tip_height: u64, +} + +/// Information about the chain split from the remote node. +#[derive(Debug, Clone, PartialEq)] +pub struct ChainSplitInfo { + /// The best block on the local chain. + pub best_block: ChainHeader, + /// The height of the remote node's tip. + pub remote_tip_height: u64, + /// The number of blocks to reorg back to the fork. + pub reorg_steps_back: u64, + /// The hash of the block at the fork. + pub chain_split_hash: HashOutput, +} + +/// The result of an attempt to synchronize headers with a peer. +#[derive(Debug, Clone, PartialEq)] +pub struct AttemptSyncResult { + /// The number of headers that were returned. + pub headers_returned: u64, + /// The fork hash index of the remote peer. + pub fork_hash_index: u64, + /// The header sync status. + pub header_sync_status: HeaderSyncStatus, } -enum SyncStatus { - /// Local and remote node are in sync - InSync, - /// Local node is ahead of the remote node - WereAhead, +#[derive(Debug, Clone, PartialEq)] +pub enum HeaderSyncStatus { + /// Local and remote node are in sync or ahead + InSyncOrAhead, /// Local node is lagging behind remote node Lagging(Box), } diff --git a/base_layer/core/src/base_node/sync/mod.rs b/base_layer/core/src/base_node/sync/mod.rs index 2193bb6482f..1c5ef806910 100644 --- a/base_layer/core/src/base_node/sync/mod.rs +++ b/base_layer/core/src/base_node/sync/mod.rs @@ -36,7 +36,7 @@ pub use block_sync::{BlockSyncError, BlockSynchronizer}; #[cfg(feature = "base_node")] mod header_sync; #[cfg(feature = "base_node")] -pub use header_sync::{BlockHeaderSyncError, HeaderSynchronizer}; +pub use header_sync::{AttemptSyncResult, BlockHeaderSyncError, HeaderSyncStatus, HeaderSynchronizer}; #[cfg(feature = "base_node")] mod horizon_state_sync; diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 908852faa20..c17f45d3769 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -26,12 +26,14 @@ use rand::rngs::OsRng; use tari_common::configuration::Network; use tari_comms::{ peer_manager::{NodeIdentity, PeerFeatures}, - protocol::messaging::MessagingEventSender, + protocol::{messaging::MessagingEventSender, rpc::RpcServer}, transports::MemoryTransport, CommsNode, + UnspawnedCommsNode, }; use tari_comms_dht::{outbound::OutboundMessageRequester, Dht}; use tari_core::{ + base_node, base_node::{ chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer}, comms_interface::OutboundNodeCommsInterface, @@ -63,6 +65,7 @@ use tari_p2p::{ comms_connector::{pubsub_connector, InboundDomainConnector}, initialization::initialize_local_test_comms, services::liveness::{config::LivenessConfig, LivenessHandle, LivenessInitializer}, + P2pConfig, }; use tari_service_framework::{RegisterHandle, StackBuilder}; use tari_shutdown::Shutdown; @@ -105,6 +108,7 @@ pub struct BaseNodeBuilder { mempool_config: Option, mempool_service_config: Option, liveness_service_config: Option, + p2p_config: Option, validators: Option>, consensus_manager: Option, network: NetworkConsensus, @@ -120,6 +124,7 @@ impl BaseNodeBuilder { mempool_config: None, mempool_service_config: None, liveness_service_config: None, + p2p_config: None, validators: None, consensus_manager: None, network, @@ -156,6 +161,12 @@ impl BaseNodeBuilder { self } + /// Set the p2p configuration + pub fn with_p2p_config(mut self, config: P2pConfig) -> Self { + self.p2p_config = Some(config); + self + } + pub fn with_validators( mut self, block: impl CandidateBlockValidator + 'static, @@ -203,6 +214,7 @@ impl BaseNodeBuilder { mempool, consensus_manager.clone(), self.liveness_service_config.unwrap_or_default(), + self.p2p_config.unwrap_or_default(), data_path, ) .await; @@ -251,6 +263,7 @@ pub async fn create_network_with_2_base_nodes(data_path: &str) -> (NodeInterface pub async fn create_network_with_2_base_nodes_with_config>( mempool_service_config: MempoolServiceConfig, liveness_service_config: LivenessConfig, + p2p_config: P2pConfig, consensus_manager: ConsensusManager, data_path: P, ) -> (NodeInterfaces, NodeInterfaces, ConsensusManager) { @@ -261,6 +274,7 @@ pub async fn create_network_with_2_base_nodes_with_config>( .with_node_identity(alice_node_identity.clone()) .with_mempool_service_config(mempool_service_config.clone()) .with_liveness_service_config(liveness_service_config.clone()) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) .start(data_path.as_ref().join("alice").as_os_str().to_str().unwrap()) .await; @@ -269,6 +283,7 @@ pub async fn create_network_with_2_base_nodes_with_config>( .with_peers(vec![alice_node_identity]) .with_mempool_service_config(mempool_service_config) .with_liveness_service_config(liveness_service_config) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) .start(data_path.as_ref().join("bob").as_os_str().to_str().unwrap()) .await; @@ -360,9 +375,9 @@ async fn setup_comms_services( peers: Vec>, publisher: InboundDomainConnector, data_path: &str, -) -> (CommsNode, Dht, MessagingEventSender, Shutdown) { + shutdown: &Shutdown, +) -> (UnspawnedCommsNode, Dht, MessagingEventSender) { let peers = peers.into_iter().map(|p| p.to_peer()).collect(); - let shutdown = Shutdown::new(); let (comms, dht, messaging_events) = initialize_local_test_comms( node_identity, @@ -375,7 +390,7 @@ async fn setup_comms_services( .await .unwrap(); - (comms, dht, messaging_events, shutdown) + (comms, dht, messaging_events) } // Helper function for starting the services of the Base node. @@ -386,12 +401,15 @@ async fn setup_base_node_services( mempool: Mempool, consensus_manager: ConsensusManager, liveness_service_config: LivenessConfig, + p2p_config: P2pConfig, data_path: &str, ) -> NodeInterfaces { let (publisher, subscription_factory) = pubsub_connector(100); let subscription_factory = Arc::new(subscription_factory); - let (comms, dht, messaging_events, shutdown) = - setup_comms_services(node_identity.clone(), peers, publisher, data_path).await; + let shutdown = Shutdown::new(); + + let (comms, dht, messaging_events) = + setup_comms_services(node_identity.clone(), peers, publisher.clone(), data_path, &shutdown).await; let mock_state_machine = MockBaseNodeStateMachine::new(); let randomx_factory = RandomXFactory::new(2); @@ -417,6 +435,25 @@ async fn setup_base_node_services( .await .unwrap(); + let base_node_service = handles.expect_handle::(); + let rpc_server = RpcServer::builder() + .with_maximum_simultaneous_sessions(p2p_config.rpc_max_simultaneous_sessions) + .with_maximum_sessions_per_client(p2p_config.rpc_max_sessions_per_peer) + .finish(); + let rpc_server = rpc_server.add_service(base_node::create_base_node_sync_rpc_service( + blockchain_db.clone().into(), + base_node_service, + )); + let comms = comms + .add_protocol_extension(rpc_server) + .spawn_with_transport(MemoryTransport) + .await + .unwrap(); + // Set the public address for tests + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + let outbound_nci = handles.expect_handle::(); let local_nci = handles.expect_handle::(); let outbound_mp_interface = handles.expect_handle::(); diff --git a/base_layer/core/tests/tests/header_sync.rs b/base_layer/core/tests/tests/header_sync.rs new file mode 100644 index 00000000000..b4ec1aafeb2 --- /dev/null +++ b/base_layer/core/tests/tests/header_sync.rs @@ -0,0 +1,294 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::time::Duration; + +use tari_common::configuration::Network; +use tari_core::{ + base_node::{ + chain_metadata_service::PeerChainMetadata, + state_machine_service::{ + states::{HeaderSyncState, StateEvent, StatusInfo}, + BaseNodeStateMachine, + BaseNodeStateMachineConfig, + }, + sync::{HeaderSyncStatus, SyncPeer}, + SyncValidators, + }, + consensus::{ConsensusConstantsBuilder, ConsensusManagerBuilder}, + mempool::MempoolServiceConfig, + proof_of_work::{randomx_factory::RandomXFactory, Difficulty}, + test_helpers::blockchain::TempDatabase, + transactions::test_helpers::create_test_core_key_manager_with_memory_db, + validation::mocks::MockValidator, +}; +use tari_p2p::{services::liveness::config::LivenessConfig, P2pConfig}; +use tari_shutdown::Shutdown; +use tempfile::tempdir; +use tokio::sync::{broadcast, watch}; + +use crate::helpers::{ + block_builders::{append_block, create_genesis_block}, + nodes::{create_network_with_2_base_nodes_with_config, NodeInterfaces}, +}; + +static EMISSION: [u64; 2] = [10, 10]; + +#[allow(clippy::too_many_lines)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_header_sync() { + // Create the network with alice node and bob node + let network = Network::LocalNet; + let temp_dir = tempdir().unwrap(); + let key_manager = create_test_core_key_manager_with_memory_db(); + let consensus_constants = ConsensusConstantsBuilder::new(network) + .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) + .build(); + let (initial_block, _) = create_genesis_block(&consensus_constants, &key_manager).await; + let consensus_manager = ConsensusManagerBuilder::new(network) + .add_consensus_constants(consensus_constants) + .with_block(initial_block.clone()) + .build() + .unwrap(); + let (alice_node, bob_node, consensus_manager) = create_network_with_2_base_nodes_with_config( + MempoolServiceConfig::default(), + LivenessConfig { + auto_ping_interval: Some(Duration::from_millis(100)), + ..Default::default() + }, + P2pConfig::default(), + consensus_manager, + temp_dir.path().to_str().unwrap(), + ) + .await; + let shutdown = Shutdown::new(); + let (state_change_event_publisher, _) = broadcast::channel(10); + let (status_event_sender, _status_event_receiver) = watch::channel(StatusInfo::new()); + + // Alice needs a state machine for header sync + let mut alice_state_machine = BaseNodeStateMachine::new( + alice_node.blockchain_db.clone().into(), + alice_node.local_nci.clone(), + alice_node.comms.connectivity(), + alice_node.comms.peer_manager(), + alice_node.chain_metadata_handle.get_event_stream(), + BaseNodeStateMachineConfig::default(), + SyncValidators::new(MockValidator::new(true), MockValidator::new(true)), + status_event_sender, + state_change_event_publisher, + RandomXFactory::default(), + consensus_manager.clone(), + shutdown.to_signal(), + ); + + // Add 1 block to bob's chain + let block_1_bob = append_block( + &bob_node.blockchain_db, + &initial_block, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_1_bob.height(), 1); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 1); + + // Alice attempts header sync, still on the genesys block, headers will be lagging + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Lagging" + match event.clone() { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 1); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 4); + assert_eq!(sync_result.headers_returned, 1); + assert_eq!(sync_result.fork_hash_index, 0); + if let HeaderSyncStatus::Lagging(val) = sync_result.header_sync_status { + assert_eq!(val.remote_tip_height, 1); + assert_eq!(val.best_block.height(), 0); + assert_eq!(val.reorg_steps_back, 0); + } else { + panic!("Should be 'Lagging'"); + } + }, + _ => panic!("Expected HeadersSynchronized event"), + } + + // Alice attempts header sync again, still on the genesys block, headers will be in sync + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "InSyncOrAhead" + match event.clone() { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 1); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 4); + assert_eq!(sync_result.headers_returned, 0); + assert_eq!(sync_result.fork_hash_index, 0); + if let HeaderSyncStatus::InSyncOrAhead = sync_result.header_sync_status { + // Good, headers were in sync + } else { + panic!("Should be 'InSyncOrAhead'"); + } + }, + _ => panic!("Expected StateEvent::HeadersSynchronized event"), + } + + // Bob adds another block + let block_2_bob = append_block( + &bob_node.blockchain_db, + &block_1_bob, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_2_bob.height(), 2); + + // Alice attempts header sync, still on the genesys block, headers will be lagging + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Lagging" + match event.clone() { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 2); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 7); + assert_eq!(sync_result.headers_returned, 1); + assert_eq!(sync_result.fork_hash_index, 0); + if let HeaderSyncStatus::Lagging(val) = sync_result.header_sync_status { + assert_eq!(val.remote_tip_height, 2); + assert_eq!(val.best_block.height(), 0); + assert_eq!(val.reorg_steps_back, 0); + } else { + panic!("Should be 'Lagging'"); + } + }, + _ => panic!("Expected StateEvent::HeadersSynchronized event"), + } + + // Alice adds 3 (different) blocks, with POW on par with bob's chain, but with greater height + let block_1_alice = append_block( + &alice_node.blockchain_db, + &initial_block, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + let block_2_alice = append_block( + &alice_node.blockchain_db, + &block_1_alice, + vec![], + &consensus_manager, + Difficulty::from_u64(2).unwrap(), + &key_manager, + ) + .await + .unwrap(); + // Alice adds another block, with POW on par with bob's chain, but with greater height + let block_3_alice = append_block( + &alice_node.blockchain_db, + &block_2_alice, + vec![], + &consensus_manager, + Difficulty::from_u64(1).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_3_alice.height(), 3); + assert_eq!(block_3_alice.accumulated_data().total_accumulated_difficulty, 7); + assert_eq!( + block_3_alice.accumulated_data().total_accumulated_difficulty, + block_2_bob.accumulated_data().total_accumulated_difficulty + ); + + // Alice attempts header sync, but POW is on par + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + match event.clone() { + StateEvent::Continue => { + // Good - Header sync not attempted, sync peer does not have better POW + }, + _ => panic!("Expected StateEvent::Continue event"), + } + + // Bob adds more blocks and draws ahead of Alice + let block_3_bob = append_block( + &bob_node.blockchain_db, + &block_2_bob, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + let block_4_bob = append_block( + &bob_node.blockchain_db, + &block_3_bob, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_4_bob.height(), 4); + + // Alice attempts header sync, on a higher chain with less POW, headers will be lagging with reorg steps + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Lagging" + match event { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 4); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 13); + assert_eq!(sync_result.headers_returned, 4); + assert_eq!(sync_result.fork_hash_index, 3); + if let HeaderSyncStatus::Lagging(val) = sync_result.header_sync_status { + assert_eq!(val.remote_tip_height, 4); + assert_eq!(val.best_block.height(), 3); + assert_eq!(val.reorg_steps_back, 3); + } else { + panic!("Should be 'Lagging'"); + } + }, + _ => panic!("Expected StateEvent::HeadersSynchronized event"), + } +} + +async fn sync_headers( + alice_state_machine: &mut BaseNodeStateMachine, + alice_node: &NodeInterfaces, + bob_node: &NodeInterfaces, +) -> StateEvent { + let mut header_sync = HeaderSyncState::new( + vec![SyncPeer::from(PeerChainMetadata::new( + bob_node.node_identity.node_id().clone(), + bob_node.blockchain_db.get_chain_metadata().unwrap(), + None, + ))], + alice_node.blockchain_db.get_chain_metadata().unwrap(), + ); + header_sync.next_event(alice_state_machine).await +} diff --git a/base_layer/core/tests/tests/mempool.rs b/base_layer/core/tests/tests/mempool.rs index 026ee2a0219..04d9133e290 100644 --- a/base_layer/core/tests/tests/mempool.rs +++ b/base_layer/core/tests/tests/mempool.rs @@ -69,7 +69,7 @@ use tari_core::{ }, }; use tari_key_manager::key_manager_service::KeyManagerInterface; -use tari_p2p::{services::liveness::LivenessConfig, tari_message::TariMessageType}; +use tari_p2p::{services::liveness::LivenessConfig, tari_message::TariMessageType, P2pConfig}; use tari_script::script; use tari_test_utils::async_assert_eventually; use tempfile::tempdir; @@ -1721,6 +1721,7 @@ async fn block_event_and_reorg_event_handling() { let (mut alice, mut bob, consensus_manager) = create_network_with_2_base_nodes_with_config( MempoolServiceConfig::default(), LivenessConfig::default(), + P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), ) diff --git a/base_layer/core/tests/tests/mod.rs b/base_layer/core/tests/tests/mod.rs index 7e69440d1fa..42f811db0f8 100644 --- a/base_layer/core/tests/tests/mod.rs +++ b/base_layer/core/tests/tests/mod.rs @@ -25,6 +25,7 @@ use tari_core::{blocks::ChainBlock, chain_storage::BlockAddResult}; mod async_db; mod base_node_rpc; mod block_validation; +mod header_sync; mod mempool; mod node_comms_interface; mod node_service; diff --git a/base_layer/core/tests/tests/node_state_machine.rs b/base_layer/core/tests/tests/node_state_machine.rs index ed4fd2d9508..08b42961844 100644 --- a/base_layer/core/tests/tests/node_state_machine.rs +++ b/base_layer/core/tests/tests/node_state_machine.rs @@ -43,7 +43,7 @@ use tari_core::{ transactions::test_helpers::create_test_core_key_manager_with_memory_db, validation::mocks::MockValidator, }; -use tari_p2p::services::liveness::config::LivenessConfig; +use tari_p2p::{services::liveness::config::LivenessConfig, P2pConfig}; use tari_shutdown::Shutdown; use tari_test_utils::unpack_enum; use tari_utilities::ByteArray; @@ -81,6 +81,7 @@ async fn test_listening_lagging() { auto_ping_interval: Some(Duration::from_millis(100)), ..Default::default() }, + P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), ) diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index f87eed31b0d..ac9ab9b6536 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -135,7 +135,7 @@ pub async fn initialize_local_test_comms>( discovery_request_timeout: Duration, seed_peers: Vec, shutdown_signal: ShutdownSignal, -) -> Result<(CommsNode, Dht, MessagingEventSender), CommsInitializationError> { +) -> Result<(UnspawnedCommsNode, Dht, MessagingEventSender), CommsInitializationError> { let peer_database_name = { let mut rng = thread_rng(); iter::repeat(()) @@ -201,17 +201,10 @@ pub async fn initialize_local_test_comms>( ) .build(); - let comms = comms - .add_protocol_extension( - MessagingProtocolExtension::new(MESSAGING_PROTOCOL_ID.clone(), event_sender.clone(), pipeline) - .enable_message_received_event(), - ) - .spawn_with_transport(MemoryTransport) - .await?; - - comms - .node_identity() - .add_public_address(comms.listening_address().clone()); + let comms = comms.add_protocol_extension( + MessagingProtocolExtension::new(MESSAGING_PROTOCOL_ID.clone(), event_sender.clone(), pipeline) + .enable_message_received_event(), + ); Ok((comms, dht, event_sender)) } @@ -424,7 +417,7 @@ fn acquire_exclusive_file_lock(db_path: &Path) -> Result, diff --git a/base_layer/p2p/tests/support/comms_and_services.rs b/base_layer/p2p/tests/support/comms_and_services.rs index 490fcaece46..4bd2dca73f8 100644 --- a/base_layer/p2p/tests/support/comms_and_services.rs +++ b/base_layer/p2p/tests/support/comms_and_services.rs @@ -22,7 +22,12 @@ use std::{sync::Arc, time::Duration}; -use tari_comms::{peer_manager::NodeIdentity, protocol::messaging::MessagingEventSender, CommsNode}; +use tari_comms::{ + peer_manager::NodeIdentity, + protocol::messaging::MessagingEventSender, + transports::MemoryTransport, + CommsNode, +}; use tari_comms_dht::Dht; use tari_p2p::{comms_connector::InboundDomainConnector, initialization::initialize_local_test_comms}; use tari_shutdown::ShutdownSignal; @@ -46,5 +51,11 @@ pub async fn setup_comms_services( .await .unwrap(); + let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + // Set the public address for tests + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + (comms, dht, messaging_events) } diff --git a/base_layer/wallet/tests/support/comms_and_services.rs b/base_layer/wallet/tests/support/comms_and_services.rs index 296d1175074..b6c7344f0e0 100644 --- a/base_layer/wallet/tests/support/comms_and_services.rs +++ b/base_layer/wallet/tests/support/comms_and_services.rs @@ -26,6 +26,7 @@ use tari_comms::{ message::MessageTag, net_address::MultiaddressesWithStats, peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags}, + transports::MemoryTransport, types::CommsPublicKey, CommsNode, }; @@ -57,6 +58,12 @@ pub async fn setup_comms_services( .await .unwrap(); + let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + // Set the public address for tests + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + (comms, dht) }