From 179f9571b4fec2bee7f492026ce0b30966db7e24 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 10 Sep 2024 17:23:27 +0300 Subject: [PATCH] Syncing strategy refactoring (#5469) This is a step towards https://github.com/paritytech/polkadot-sdk/issues/5333 The commits with code moving (but no other changes) and actual changes are separated for easier review. Essentially this results in `SyncingStrategy` trait replacing struct (which is renamed to `PolkadotSyncingStrategy`, open for better name suggestions). Technically it is already possible to replace `PolkadotSyncingStrategy` with `Box` in syncing engine, but I decided to postpone such change until we actually have an ability to customize it. It should also be possible to swap `PolkadotSyncingStrategy` with just `ChainSync` that only supports regular full sync from genesis (it also implements `SyncingStrategy` trait). While extracted trait still has a lot of non-generic stuff in it like exposed knowledge of warp sync and `StrategyKey` with hardcoded set of options, I believe this is a step in the right direction and will address those in upcoming PRs. With https://github.com/paritytech/polkadot-sdk/pull/5431 that landed earlier warp sync configuration is more straightforward, but there are still numerous things interleaved that will take some time to abstract away nicely and expose in network config for developers. For now this is an internal change even though data structures are technically public and require major version bump. --- prdoc/pr_5469.prdoc | 11 + substrate/client/network/sync/src/engine.rs | 9 +- substrate/client/network/sync/src/strategy.rs | 294 ++-- .../network/sync/src/strategy/chain_sync.rs | 1456 +++++++++-------- .../sync/src/strategy/chain_sync/test.rs | 30 +- 5 files changed, 941 insertions(+), 859 deletions(-) create mode 100644 prdoc/pr_5469.prdoc diff --git a/prdoc/pr_5469.prdoc b/prdoc/pr_5469.prdoc new file mode 100644 index 0000000000000..1e6aa3c0c0726 --- /dev/null +++ b/prdoc/pr_5469.prdoc @@ -0,0 +1,11 @@ +title: Syncing strategy refactoring + +doc: + - audience: Node Dev + description: | + Mostly internal changes to syncing strategies that is a step towards making them configurable/extensible in the + future. It is unlikely that external developers will need to change their code. + +crates: + - name: sc-network-sync + bump: major diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 4b6ccb085834f..86c1a7abf7446 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -33,7 +33,7 @@ use crate::{ }, strategy::{ warp::{EncodedProof, WarpProofRequest, WarpSyncConfig}, - StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, + PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, @@ -189,7 +189,7 @@ pub struct Peer { pub struct SyncingEngine { /// Syncing strategy. - strategy: SyncingStrategy, + strategy: PolkadotSyncingStrategy, /// Blockchain client. client: Arc, @@ -389,7 +389,8 @@ where ); // Initialize syncing strategy. - let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; + let strategy = + PolkadotSyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; let block_announce_protocol_name = block_announce_config.protocol_name().clone(); let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); @@ -697,7 +698,7 @@ where number, ) }, - // Nothing to do, this is handled internally by `SyncingStrategy`. + // Nothing to do, this is handled internally by `PolkadotSyncingStrategy`. SyncingAction::Finished => {}, } } diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index ad3a9461c93b8..f8d6976bbaa0d 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! [`SyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`] +//! [`PolkadotSyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`] //! and specific syncing algorithms. pub mod chain_sync; @@ -29,7 +29,7 @@ use crate::{ types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncStatus}, LOG_TARGET, }; -use chain_sync::{ChainSync, ChainSyncAction, ChainSyncMode}; +use chain_sync::{ChainSync, ChainSyncMode}; use log::{debug, error, info}; use prometheus_endpoint::Registry; use sc_client_api::{BlockBackend, ProofProvider}; @@ -59,6 +59,108 @@ fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode { } } +/// Syncing strategy for syncing engine to use +pub trait SyncingStrategy: Send +where + B: BlockT, +{ + /// Notify syncing state machine that a new sync peer has connected. + fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor); + + /// Notify that a sync peer has disconnected. + fn remove_peer(&mut self, peer_id: &PeerId); + + /// Submit a validated block announcement. + /// + /// Returns new best hash & best number of the peer if they are updated. + #[must_use] + fn on_validated_block_announce( + &mut self, + is_best: bool, + peer_id: PeerId, + announce: &BlockAnnounce, + ) -> Option<(B::Hash, NumberFor)>; + + /// Configure an explicit fork sync request in case external code has detected that there is a + /// stale fork missing. + /// + /// Note that this function should not be used for recent blocks. + /// Sync should be able to download all the recent forks normally. + /// + /// Passing empty `peers` set effectively removes the sync request. + fn set_sync_fork_request(&mut self, peers: Vec, hash: &B::Hash, number: NumberFor); + + /// Request extra justification. + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor); + + /// Clear extra justification requests. + fn clear_justification_requests(&mut self); + + /// Report a justification import (successful or not). + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool); + + /// Process block response. + fn on_block_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + request: BlockRequest, + blocks: Vec>, + ); + + /// Process state response. + fn on_state_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + response: OpaqueStateResponse, + ); + + /// Process warp proof response. + fn on_warp_proof_response( + &mut self, + peer_id: &PeerId, + key: StrategyKey, + response: EncodedProof, + ); + + /// A batch of blocks that have been processed, with or without errors. + /// + /// Call this when a batch of blocks that have been processed by the import queue, with or + /// without errors. + fn on_blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + ); + + /// Notify a syncing strategy that a block has been finalized. + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor); + + /// Inform sync about a new best imported block. + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor); + + // Are we in major sync mode? + fn is_major_syncing(&self) -> bool; + + /// Get the number of peers known to the syncing strategy. + fn num_peers(&self) -> usize; + + /// Returns the current sync status. + fn status(&self) -> SyncStatus; + + /// Get the total number of downloaded blocks. + fn num_downloaded_blocks(&self) -> usize; + + /// Get an estimate of the number of parallel sync requests. + fn num_sync_requests(&self) -> usize; + + /// Get actions that should be performed by the owner on the strategy's behalf + #[must_use] + fn actions(&mut self) -> Result>, ClientError>; +} + /// Syncing configuration containing data for all strategies. #[derive(Clone, Debug)] pub struct SyncingConfig { @@ -104,7 +206,7 @@ pub enum SyncingAction { number: NumberFor, justifications: Justifications, }, - /// Strategy finished. Nothing to do, this is handled by `SyncingStrategy`. + /// Strategy finished. Nothing to do, this is handled by `PolkadotSyncingStrategy`. Finished, } @@ -140,26 +242,8 @@ impl From> for SyncingAction { } } -impl From> for SyncingAction { - fn from(action: ChainSyncAction) -> Self { - match action { - ChainSyncAction::SendBlockRequest { peer_id, request } => - SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request }, - ChainSyncAction::SendStateRequest { peer_id, request } => - SyncingAction::SendStateRequest { peer_id, key: StrategyKey::ChainSync, request }, - ChainSyncAction::CancelRequest { peer_id } => - SyncingAction::CancelRequest { peer_id, key: StrategyKey::ChainSync }, - ChainSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), - ChainSyncAction::ImportBlocks { origin, blocks } => - SyncingAction::ImportBlocks { origin, blocks }, - ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => - SyncingAction::ImportJustifications { peer_id, hash, number, justifications }, - } - } -} - -/// Proxy to specific syncing strategies. -pub struct SyncingStrategy { +/// Proxy to specific syncing strategies used in Polkadot. +pub struct PolkadotSyncingStrategy { /// Initial syncing configuration. config: SyncingConfig, /// Client used by syncing strategies. @@ -171,11 +255,11 @@ pub struct SyncingStrategy { /// `ChainSync` strategy.` chain_sync: Option>, /// Connected peers and their best blocks used to seed a new strategy when switching to it in - /// [`SyncingStrategy::proceed_to_next`]. + /// `PolkadotSyncingStrategy::proceed_to_next`. peer_best_blocks: HashMap)>, } -impl SyncingStrategy +impl SyncingStrategy for PolkadotSyncingStrategy where B: BlockT, Client: HeaderBackend @@ -186,46 +270,7 @@ where + Sync + 'static, { - /// Initialize a new syncing strategy. - pub fn new( - config: SyncingConfig, - client: Arc, - warp_sync_config: Option>, - ) -> Result { - if let SyncMode::Warp = config.mode { - let warp_sync_config = warp_sync_config - .expect("Warp sync configuration must be supplied in warp sync mode."); - let warp_sync = WarpSync::new(client.clone(), warp_sync_config); - Ok(Self { - config, - client, - warp: Some(warp_sync), - state: None, - chain_sync: None, - peer_best_blocks: Default::default(), - }) - } else { - let chain_sync = ChainSync::new( - chain_sync_mode(config.mode), - client.clone(), - config.max_parallel_downloads, - config.max_blocks_per_request, - config.metrics_registry.as_ref(), - std::iter::empty(), - )?; - Ok(Self { - config, - client, - warp: None, - state: None, - chain_sync: Some(chain_sync), - peer_best_blocks: Default::default(), - }) - } - } - - /// Notify that a new peer has connected. - pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { + fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { self.peer_best_blocks.insert(peer_id, (best_hash, best_number)); self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); @@ -233,8 +278,7 @@ where self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); } - /// Notify that a peer has disconnected. - pub fn remove_peer(&mut self, peer_id: &PeerId) { + fn remove_peer(&mut self, peer_id: &PeerId) { self.warp.as_mut().map(|s| s.remove_peer(peer_id)); self.state.as_mut().map(|s| s.remove_peer(peer_id)); self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id)); @@ -242,10 +286,7 @@ where self.peer_best_blocks.remove(peer_id); } - /// Submit a validated block announcement. - /// - /// Returns new best hash & best number of the peer if they are updated. - pub fn on_validated_block_announce( + fn on_validated_block_announce( &mut self, is_best: bool, peer_id: PeerId, @@ -278,46 +319,35 @@ where new_best } - /// Configure an explicit fork sync request in case external code has detected that there is a - /// stale fork missing. - pub fn set_sync_fork_request( - &mut self, - peers: Vec, - hash: &B::Hash, - number: NumberFor, - ) { + fn set_sync_fork_request(&mut self, peers: Vec, hash: &B::Hash, number: NumberFor) { // Fork requests are only handled by `ChainSync`. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.set_sync_fork_request(peers.clone(), hash, number); } } - /// Request extra justification. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { // Justifications can only be requested via `ChainSync`. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.request_justification(hash, number); } } - /// Clear extra justification requests. - pub fn clear_justification_requests(&mut self) { + fn clear_justification_requests(&mut self) { // Justification requests can only be cleared by `ChainSync`. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.clear_justification_requests(); } } - /// Report a justification import (successful or not). - pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { // Only `ChainSync` is interested in justification import. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.on_justification_import(hash, number, success); } } - /// Process block response. - pub fn on_block_response( + fn on_block_response( &mut self, peer_id: PeerId, key: StrategyKey, @@ -329,7 +359,7 @@ where } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) = (key, &mut self.chain_sync) { - chain_sync.on_block_response(peer_id, request, blocks); + chain_sync.on_block_response(peer_id, key, request, blocks); } else { error!( target: LOG_TARGET, @@ -340,8 +370,7 @@ where } } - /// Process state response. - pub fn on_state_response( + fn on_state_response( &mut self, peer_id: PeerId, key: StrategyKey, @@ -352,7 +381,7 @@ where } else if let (StrategyKey::ChainSync, Some(ref mut chain_sync)) = (key, &mut self.chain_sync) { - chain_sync.on_state_response(peer_id, response); + chain_sync.on_state_response(peer_id, key, response); } else { error!( target: LOG_TARGET, @@ -363,8 +392,7 @@ where } } - /// Process warp proof response. - pub fn on_warp_proof_response( + fn on_warp_proof_response( &mut self, peer_id: &PeerId, key: StrategyKey, @@ -382,8 +410,7 @@ where } } - /// A batch of blocks have been processed, with or without errors. - pub fn on_blocks_processed( + fn on_blocks_processed( &mut self, imported: usize, count: usize, @@ -397,24 +424,21 @@ where } } - /// Notify a syncing strategy that a block has been finalized. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { // Only `ChainSync` is interested in block finalization notifications. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.on_block_finalized(hash, number); } } - /// Inform sync about a new best imported block. - pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { // This is relevant to `ChainSync` only. if let Some(ref mut chain_sync) = self.chain_sync { chain_sync.update_chain_info(best_hash, best_number); } } - // Are we in major sync mode? - pub fn is_major_syncing(&self) -> bool { + fn is_major_syncing(&self) -> bool { self.warp.is_some() || self.state.is_some() || match self.chain_sync { @@ -423,13 +447,11 @@ where } } - /// Get the number of peers known to the syncing strategy. - pub fn num_peers(&self) -> usize { + fn num_peers(&self) -> usize { self.peer_best_blocks.len() } - /// Returns the current sync status. - pub fn status(&self) -> SyncStatus { + fn status(&self) -> SyncStatus { // This function presumes that strategies are executed serially and must be refactored // once we have parallel strategies. if let Some(ref warp) = self.warp { @@ -443,21 +465,17 @@ where } } - /// Get the total number of downloaded blocks. - pub fn num_downloaded_blocks(&self) -> usize { + fn num_downloaded_blocks(&self) -> usize { self.chain_sync .as_ref() .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks()) } - /// Get an estimate of the number of parallel sync requests. - pub fn num_sync_requests(&self) -> usize { + fn num_sync_requests(&self) -> usize { self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests()) } - /// Get actions that should be performed by the owner on the strategy's behalf - #[must_use] - pub fn actions(&mut self) -> Result>, ClientError> { + fn actions(&mut self) -> Result>, ClientError> { // This function presumes that strategies are executed serially and must be refactored once // we have parallel strategies. let actions: Vec<_> = if let Some(ref mut warp) = self.warp { @@ -465,7 +483,7 @@ where } else if let Some(ref mut state) = self.state { state.actions().map(Into::into).collect() } else if let Some(ref mut chain_sync) = self.chain_sync { - chain_sync.actions().map(Into::into).collect() + chain_sync.actions()? } else { unreachable!("At least one syncing strategy is always active; qed") }; @@ -476,6 +494,56 @@ where Ok(actions) } +} + +impl PolkadotSyncingStrategy +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ + /// Initialize a new syncing strategy. + pub fn new( + config: SyncingConfig, + client: Arc, + warp_sync_config: Option>, + ) -> Result { + if let SyncMode::Warp = config.mode { + let warp_sync_config = warp_sync_config + .expect("Warp sync configuration must be supplied in warp sync mode."); + let warp_sync = WarpSync::new(client.clone(), warp_sync_config); + Ok(Self { + config, + client, + warp: Some(warp_sync), + state: None, + chain_sync: None, + peer_best_blocks: Default::default(), + }) + } else { + let chain_sync = ChainSync::new( + chain_sync_mode(config.mode), + client.clone(), + config.max_parallel_downloads, + config.max_blocks_per_request, + config.metrics_registry.as_ref(), + std::iter::empty(), + )?; + Ok(Self { + config, + client, + warp: None, + state: None, + chain_sync: Some(chain_sync), + peer_best_blocks: Default::default(), + }) + } + } /// Proceed with the next strategy if the active one finished. pub fn proceed_to_next(&mut self) -> Result<(), ClientError> { diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index cca83a5055cb2..a8ba5558d1bcb 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -35,7 +35,8 @@ use crate::{ strategy::{ disconnected_peers::DisconnectedPeers, state_sync::{ImportResult, StateSync, StateSyncProvider}, - warp::{WarpSyncPhase, WarpSyncProgress}, + warp::{EncodedProof, WarpSyncPhase, WarpSyncProgress}, + StrategyKey, SyncingAction, SyncingStrategy, }, types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncState, SyncStatus}, LOG_TARGET, @@ -197,28 +198,6 @@ struct GapSync { target: NumberFor, } -/// Action that the parent of [`ChainSync`] should perform after reporting a network or block event. -#[derive(Debug)] -pub enum ChainSyncAction { - /// Send block request to peer. Always implies dropping a stale block request to the same peer. - SendBlockRequest { peer_id: PeerId, request: BlockRequest }, - /// Send state request to peer. - SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, - /// Drop stale request. - CancelRequest { peer_id: PeerId }, - /// Peer misbehaved. Disconnect, report it and cancel the block request to it. - DropPeer(BadPeer), - /// Import blocks. - ImportBlocks { origin: BlockOrigin, blocks: Vec> }, - /// Import justifications. - ImportJustifications { - peer_id: PeerId, - hash: B::Hash, - number: NumberFor, - justifications: Justifications, - }, -} - /// Sync operation mode. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum ChainSyncMode { @@ -233,6 +212,75 @@ pub enum ChainSyncMode { }, } +/// All the data we have about a Peer that we are trying to sync with +#[derive(Debug, Clone)] +pub(crate) struct PeerSync { + /// Peer id of this peer. + pub peer_id: PeerId, + /// The common number is the block number that is a common point of + /// ancestry for both our chains (as far as we know). + pub common_number: NumberFor, + /// The hash of the best block that we've seen for this peer. + pub best_hash: B::Hash, + /// The number of the best block that we've seen for this peer. + pub best_number: NumberFor, + /// The state of syncing this peer is in for us, generally categories + /// into `Available` or "busy" with something as defined by `PeerSyncState`. + pub state: PeerSyncState, +} + +impl PeerSync { + /// Update the `common_number` iff `new_common > common_number`. + fn update_common_number(&mut self, new_common: NumberFor) { + if self.common_number < new_common { + trace!( + target: LOG_TARGET, + "Updating peer {} common number from={} => to={}.", + self.peer_id, + self.common_number, + new_common, + ); + self.common_number = new_common; + } + } +} + +struct ForkTarget { + number: NumberFor, + parent_hash: Option, + peers: HashSet, +} + +/// The state of syncing between a Peer and ourselves. +/// +/// Generally two categories, "busy" or `Available`. If busy, the enum +/// defines what we are busy with. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub(crate) enum PeerSyncState { + /// Available for sync requests. + Available, + /// Searching for ancestors the Peer has in common with us. + AncestorSearch { start: NumberFor, current: NumberFor, state: AncestorSearchState }, + /// Actively downloading new blocks, starting from the given Number. + DownloadingNew(NumberFor), + /// Downloading a stale block with given Hash. Stale means that it is a + /// block with a number that is lower than our best number. It might be + /// from a fork and not necessarily already imported. + DownloadingStale(B::Hash), + /// Downloading justification for given block hash. + DownloadingJustification(B::Hash), + /// Downloading state. + DownloadingState, + /// Actively downloading block history after warp sync. + DownloadingGap(NumberFor), +} + +impl PeerSyncState { + pub fn is_available(&self) -> bool { + matches!(self, Self::Available) + } +} + /// The main data structure which contains all the state for a chains /// active syncing strategy. pub struct ChainSync { @@ -280,77 +328,563 @@ pub struct ChainSync { /// Gap download process. gap_sync: Option>, /// Pending actions. - actions: Vec>, + actions: Vec>, /// Prometheus metrics. metrics: Option, } -/// All the data we have about a Peer that we are trying to sync with -#[derive(Debug, Clone)] -pub(crate) struct PeerSync { - /// Peer id of this peer. - pub peer_id: PeerId, - /// The common number is the block number that is a common point of - /// ancestry for both our chains (as far as we know). - pub common_number: NumberFor, - /// The hash of the best block that we've seen for this peer. - pub best_hash: B::Hash, - /// The number of the best block that we've seen for this peer. - pub best_number: NumberFor, - /// The state of syncing this peer is in for us, generally categories - /// into `Available` or "busy" with something as defined by `PeerSyncState`. - pub state: PeerSyncState, -} +impl SyncingStrategy for ChainSync +where + B: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, +{ + fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { + match self.add_peer_inner(peer_id, best_hash, best_number) { + Ok(Some(request)) => self.actions.push(SyncingAction::SendBlockRequest { + peer_id, + key: StrategyKey::ChainSync, + request, + }), + Ok(None) => {}, + Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)), + } + } + + fn remove_peer(&mut self, peer_id: &PeerId) { + self.blocks.clear_peer_download(peer_id); + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_peer_download(peer_id) + } + + if let Some(state) = self.peers.remove(peer_id) { + if !state.state.is_available() { + if let Some(bad_peer) = + self.disconnected_peers.on_disconnect_during_request(*peer_id) + { + self.actions.push(SyncingAction::DropPeer(bad_peer)); + } + } + } + + self.extra_justifications.peer_disconnected(peer_id); + self.allowed_requests.set_all(); + self.fork_targets.retain(|_, target| { + target.peers.remove(peer_id); + !target.peers.is_empty() + }); + if let Some(metrics) = &self.metrics { + metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX)); + } + + let blocks = self.ready_blocks(); + + if !blocks.is_empty() { + self.validate_and_queue_blocks(blocks, false); + } + } + + fn on_validated_block_announce( + &mut self, + is_best: bool, + peer_id: PeerId, + announce: &BlockAnnounce, + ) -> Option<(B::Hash, NumberFor)> { + let number = *announce.header.number(); + let hash = announce.header.hash(); + let parent_status = + self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); + let known_parent = parent_status != BlockStatus::Unknown; + let ancient_parent = parent_status == BlockStatus::InChainPruned; + + let known = self.is_known(&hash); + let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { + peer + } else { + error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}"); + return Some((hash, number)) + }; + + if let PeerSyncState::AncestorSearch { .. } = peer.state { + trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); + return None + } + + let peer_info = is_best.then(|| { + // update their best block + peer.best_number = number; + peer.best_hash = hash; + + (hash, number) + }); + + // If the announced block is the best they have and is not ahead of us, our common number + // is either one further ahead or it's the one they just announced, if we know about it. + if is_best { + if known && self.best_queued_number >= number { + self.update_peer_common_number(&peer_id, number); + } else if announce.header.parent_hash() == &self.best_queued_hash || + known_parent && self.best_queued_number >= number + { + self.update_peer_common_number(&peer_id, number.saturating_sub(One::one())); + } + } + self.allowed_requests.add(&peer_id); + + // known block case + if known || self.is_already_downloading(&hash) { + trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash); + if let Some(target) = self.fork_targets.get_mut(&hash) { + target.peers.insert(peer_id); + } + return peer_info + } + + if ancient_parent { + trace!( + target: LOG_TARGET, + "Ignored ancient block announced from {}: {} {:?}", + peer_id, + hash, + announce.header, + ); + return peer_info + } + + if self.status().state == SyncState::Idle { + trace!( + target: LOG_TARGET, + "Added sync target for block announced from {}: {} {:?}", + peer_id, + hash, + announce.summary(), + ); + self.fork_targets + .entry(hash) + .or_insert_with(|| { + if let Some(metrics) = &self.metrics { + metrics.fork_targets.inc(); + } + + ForkTarget { + number, + parent_hash: Some(*announce.header.parent_hash()), + peers: Default::default(), + } + }) + .peers + .insert(peer_id); + } + + peer_info + } + + // The implementation is similar to `on_validated_block_announce` with unknown parent hash. + fn set_sync_fork_request( + &mut self, + mut peers: Vec, + hash: &B::Hash, + number: NumberFor, + ) { + if peers.is_empty() { + peers = self + .peers + .iter() + // Only request blocks from peers who are ahead or on a par. + .filter(|(_, peer)| peer.best_number >= number) + .map(|(id, _)| *id) + .collect(); + + debug!( + target: LOG_TARGET, + "Explicit sync request for block {hash:?} with no peers specified. \ + Syncing from these peers {peers:?} instead.", + ); + } else { + debug!( + target: LOG_TARGET, + "Explicit sync request for block {hash:?} with {peers:?}", + ); + } + + if self.is_known(hash) { + debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); + return + } + + trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); + for peer_id in &peers { + if let Some(peer) = self.peers.get_mut(peer_id) { + if let PeerSyncState::AncestorSearch { .. } = peer.state { + continue + } + + if number > peer.best_number { + peer.best_number = number; + peer.best_hash = *hash; + } + self.allowed_requests.add(peer_id); + } + } + + self.fork_targets + .entry(*hash) + .or_insert_with(|| { + if let Some(metrics) = &self.metrics { + metrics.fork_targets.inc(); + } + + ForkTarget { number, peers: Default::default(), parent_hash: None } + }) + .peers + .extend(peers); + } + + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; + self.extra_justifications + .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block)) + } + + fn clear_justification_requests(&mut self) { + self.extra_justifications.reset(); + } + + fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; + self.extra_justifications + .try_finalize_root((hash, number), finalization_result, true); + self.allowed_requests.set_all(); + } + + fn on_block_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + request: BlockRequest, + blocks: Vec>, + ) { + if key != StrategyKey::ChainSync { + error!( + target: LOG_TARGET, + "`on_block_response()` called with unexpected key {key:?} for chain sync", + ); + debug_assert!(false); + } + let block_response = BlockResponse:: { id: request.id, blocks }; + + let blocks_range = || match ( + block_response + .blocks + .first() + .and_then(|b| b.header.as_ref().map(|h| h.number())), + block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + trace!( + target: LOG_TARGET, + "BlockResponse {} from {} with {} blocks {}", + block_response.id, + peer_id, + block_response.blocks.len(), + blocks_range(), + ); + + let res = if request.fields == BlockAttributes::JUSTIFICATION { + self.on_block_justification(peer_id, block_response) + } else { + self.on_block_data(&peer_id, Some(request), block_response) + }; + + if let Err(bad_peer) = res { + self.actions.push(SyncingAction::DropPeer(bad_peer)); + } + } + + fn on_state_response( + &mut self, + peer_id: PeerId, + key: StrategyKey, + response: OpaqueStateResponse, + ) { + if key != StrategyKey::ChainSync { + error!( + target: LOG_TARGET, + "`on_state_response()` called with unexpected key {key:?} for chain sync", + ); + debug_assert!(false); + } + if let Err(bad_peer) = self.on_state_data(&peer_id, response) { + self.actions.push(SyncingAction::DropPeer(bad_peer)); + } + } + + fn on_warp_proof_response( + &mut self, + _peer_id: &PeerId, + _key: StrategyKey, + _response: EncodedProof, + ) { + error!( + target: LOG_TARGET, + "`on_warp_proof_response()` called for chain sync strategy", + ); + debug_assert!(false); + } + + fn on_blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + ) { + trace!(target: LOG_TARGET, "Imported {imported} of {count}"); + + let mut has_error = false; + for (_, hash) in &results { + if self.queue_blocks.remove(hash) { + if let Some(metrics) = &self.metrics { + metrics.queued_blocks.dec(); + } + } + self.blocks.clear_queued(hash); + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_queued(hash); + } + } + for (result, hash) in results { + if has_error { + break + } + + has_error |= result.is_err(); + + match result { + Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => + if let Some(peer) = peer_id { + self.update_peer_common_number(&peer, number); + }, + Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { + if aux.clear_justification_requests { + trace!( + target: LOG_TARGET, + "Block imported clears all pending justification requests {number}: {hash:?}", + ); + self.clear_justification_requests(); + } + + if aux.needs_justification { + trace!( + target: LOG_TARGET, + "Block imported but requires justification {number}: {hash:?}", + ); + self.request_justification(&hash, number); + } + + if aux.bad_justification { + if let Some(ref peer) = peer_id { + warn!("💔 Sent block with bad justification to import"); + self.actions.push(SyncingAction::DropPeer(BadPeer( + *peer, + rep::BAD_JUSTIFICATION, + ))); + } + } + + if let Some(peer) = peer_id { + self.update_peer_common_number(&peer, number); + } + let state_sync_complete = + self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash); + if state_sync_complete { + info!( + target: LOG_TARGET, + "State sync is complete ({} MiB), restarting block sync.", + self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), + ); + self.state_sync = None; + self.mode = ChainSyncMode::Full; + self.restart(); + } + let gap_sync_complete = + self.gap_sync.as_ref().map_or(false, |s| s.target == number); + if gap_sync_complete { + info!( + target: LOG_TARGET, + "Block history download is complete." + ); + self.gap_sync = None; + } + }, + Err(BlockImportError::IncompleteHeader(peer_id)) => + if let Some(peer) = peer_id { + warn!( + target: LOG_TARGET, + "💔 Peer sent block with incomplete header to import", + ); + self.actions + .push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); + self.restart(); + }, + Err(BlockImportError::VerificationFailed(peer_id, e)) => { + let extra_message = peer_id + .map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); + + warn!( + target: LOG_TARGET, + "💔 Verification failed for block {hash:?}{extra_message}: {e:?}", + ); + + if let Some(peer) = peer_id { + self.actions + .push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); + } + + self.restart(); + }, + Err(BlockImportError::BadBlock(peer_id)) => + if let Some(peer) = peer_id { + warn!( + target: LOG_TARGET, + "💔 Block {hash:?} received from peer {peer} has been blacklisted", + ); + self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); + }, + Err(BlockImportError::MissingState) => { + // This may happen if the chain we were requesting upon has been discarded + // in the meantime because other chain has been finalized. + // Don't mark it as bad as it still may be synced if explicitly requested. + trace!(target: LOG_TARGET, "Obsolete block {hash:?}"); + }, + e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { + warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err()); + self.state_sync = None; + self.restart(); + }, + Err(BlockImportError::Cancelled) => {}, + }; + } + + self.allowed_requests.set_all(); + } + + fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; + let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { + is_descendent_of(&**client, base, block) + }); + + if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode { + if self.state_sync.is_none() { + if !self.peers.is_empty() && self.queue_blocks.is_empty() { + self.attempt_state_sync(*hash, number, *skip_proofs); + } else { + self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs)); + } + } + } + + if let Err(err) = r { + warn!( + target: LOG_TARGET, + "💔 Error cleaning up pending extra justification data requests: {err}", + ); + } + } + + fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { + self.on_block_queued(best_hash, best_number); + } + + fn is_major_syncing(&self) -> bool { + self.status().state.is_major_syncing() + } + + fn num_peers(&self) -> usize { + self.peers.len() + } + + fn status(&self) -> SyncStatus { + let median_seen = self.median_seen(); + let best_seen_block = + median_seen.and_then(|median| (median > self.best_queued_number).then_some(median)); + let sync_state = if let Some(target) = median_seen { + // A chain is classified as downloading if the provided best block is + // more than `MAJOR_SYNC_BLOCKS` behind the best block or as importing + // if the same can be said about queued blocks. + let best_block = self.client.info().best_number; + if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() { + // If target is not queued, we're downloading, otherwise importing. + if target > self.best_queued_number { + SyncState::Downloading { target } + } else { + SyncState::Importing { target } + } + } else { + SyncState::Idle + } + } else { + SyncState::Idle + }; + + let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress { + phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number), + total_bytes: 0, + }); -impl PeerSync { - /// Update the `common_number` iff `new_common > common_number`. - fn update_common_number(&mut self, new_common: NumberFor) { - if self.common_number < new_common { - trace!( - target: LOG_TARGET, - "Updating peer {} common number from={} => to={}.", - self.peer_id, - self.common_number, - new_common, - ); - self.common_number = new_common; + SyncStatus { + state: sync_state, + best_seen_block, + num_peers: self.peers.len() as u32, + queued_blocks: self.queue_blocks.len() as u32, + state_sync: self.state_sync.as_ref().map(|s| s.progress()), + warp_sync: warp_sync_progress, } } -} -struct ForkTarget { - number: NumberFor, - parent_hash: Option, - peers: HashSet, -} + fn num_downloaded_blocks(&self) -> usize { + self.downloaded_blocks + } -/// The state of syncing between a Peer and ourselves. -/// -/// Generally two categories, "busy" or `Available`. If busy, the enum -/// defines what we are busy with. -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub(crate) enum PeerSyncState { - /// Available for sync requests. - Available, - /// Searching for ancestors the Peer has in common with us. - AncestorSearch { start: NumberFor, current: NumberFor, state: AncestorSearchState }, - /// Actively downloading new blocks, starting from the given Number. - DownloadingNew(NumberFor), - /// Downloading a stale block with given Hash. Stale means that it is a - /// block with a number that is lower than our best number. It might be - /// from a fork and not necessarily already imported. - DownloadingStale(B::Hash), - /// Downloading justification for given block hash. - DownloadingJustification(B::Hash), - /// Downloading state. - DownloadingState, - /// Actively downloading block history after warp sync. - DownloadingGap(NumberFor), -} + fn num_sync_requests(&self) -> usize { + self.fork_targets + .values() + .filter(|f| f.number <= self.best_queued_number) + .count() + } -impl PeerSyncState { - pub fn is_available(&self) -> bool { - matches!(self, Self::Available) + fn actions(&mut self) -> Result>, ClientError> { + if !self.peers.is_empty() && self.queue_blocks.is_empty() { + if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() { + self.attempt_state_sync(hash, number, skip_proofs); + } + } + + let block_requests = self.block_requests().into_iter().map(|(peer_id, request)| { + SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request } + }); + self.actions.extend(block_requests); + + let justification_requests = + self.justification_requests().into_iter().map(|(peer_id, request)| { + SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::ChainSync, request } + }); + self.actions.extend(justification_requests); + + let state_request = self.state_request().into_iter().map(|(peer_id, request)| { + SyncingAction::SendStateRequest { peer_id, key: StrategyKey::ChainSync, request } + }); + self.actions.extend(state_request); + + Ok(std::mem::take(&mut self.actions)) } } @@ -414,73 +948,6 @@ where Ok(sync) } - /// Returns the current sync status. - pub fn status(&self) -> SyncStatus { - let median_seen = self.median_seen(); - let best_seen_block = - median_seen.and_then(|median| (median > self.best_queued_number).then_some(median)); - let sync_state = if let Some(target) = median_seen { - // A chain is classified as downloading if the provided best block is - // more than `MAJOR_SYNC_BLOCKS` behind the best block or as importing - // if the same can be said about queued blocks. - let best_block = self.client.info().best_number; - if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() { - // If target is not queued, we're downloading, otherwise importing. - if target > self.best_queued_number { - SyncState::Downloading { target } - } else { - SyncState::Importing { target } - } - } else { - SyncState::Idle - } - } else { - SyncState::Idle - }; - - let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress { - phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number), - total_bytes: 0, - }); - - SyncStatus { - state: sync_state, - best_seen_block, - num_peers: self.peers.len() as u32, - queued_blocks: self.queue_blocks.len() as u32, - state_sync: self.state_sync.as_ref().map(|s| s.progress()), - warp_sync: warp_sync_progress, - } - } - - /// Get an estimate of the number of parallel sync requests. - pub fn num_sync_requests(&self) -> usize { - self.fork_targets - .values() - .filter(|f| f.number <= self.best_queued_number) - .count() - } - - /// Get the total number of downloaded blocks. - pub fn num_downloaded_blocks(&self) -> usize { - self.downloaded_blocks - } - - /// Get the number of peers known to the syncing state machine. - pub fn num_peers(&self) -> usize { - self.peers.len() - } - - /// Notify syncing state machine that a new sync peer has connected. - pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { - match self.add_peer_inner(peer_id, best_hash, best_number) { - Ok(Some(request)) => - self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), - Ok(None) => {}, - Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), - } - } - #[must_use] fn add_peer_inner( &mut self, @@ -550,138 +1017,53 @@ where peer_id, best_hash, best_number - ); - - ( - PeerSyncState::AncestorSearch { - current: common_best, - start: self.best_queued_number, - state: AncestorSearchState::ExponentialBackoff(One::one()), - }, - Some(ancestry_request::(common_best)), - ) - }; - - self.allowed_requests.add(&peer_id); - self.peers.insert( - peer_id, - PeerSync { - peer_id, - common_number: Zero::zero(), - best_hash, - best_number, - state, - }, - ); - - Ok(req) - }, - Ok(BlockStatus::Queued) | - Ok(BlockStatus::InChainWithState) | - Ok(BlockStatus::InChainPruned) => { - debug!( - target: LOG_TARGET, - "New peer {peer_id} with known best hash {best_hash} ({best_number}).", - ); - self.peers.insert( - peer_id, - PeerSync { - peer_id, - common_number: std::cmp::min(self.best_queued_number, best_number), - best_hash, - best_number, - state: PeerSyncState::Available, - }, - ); - self.allowed_requests.add(&peer_id); - Ok(None) - }, - } - } - - /// Inform sync about a new best imported block. - pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { - self.on_block_queued(best_hash, best_number); - } - - /// Request extra justification. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let client = &self.client; - self.extra_justifications - .schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block)) - } - - /// Clear extra justification requests. - pub fn clear_justification_requests(&mut self) { - self.extra_justifications.reset(); - } - - /// Configure an explicit fork sync request in case external code has detected that there is a - /// stale fork missing. - /// - /// Note that this function should not be used for recent blocks. - /// Sync should be able to download all the recent forks normally. - /// - /// Passing empty `peers` set effectively removes the sync request. - // The implementation is similar to `on_validated_block_announce` with unknown parent hash. - pub fn set_sync_fork_request( - &mut self, - mut peers: Vec, - hash: &B::Hash, - number: NumberFor, - ) { - if peers.is_empty() { - peers = self - .peers - .iter() - // Only request blocks from peers who are ahead or on a par. - .filter(|(_, peer)| peer.best_number >= number) - .map(|(id, _)| *id) - .collect(); - - debug!( - target: LOG_TARGET, - "Explicit sync request for block {hash:?} with no peers specified. \ - Syncing from these peers {peers:?} instead.", - ); - } else { - debug!( - target: LOG_TARGET, - "Explicit sync request for block {hash:?} with {peers:?}", - ); - } - - if self.is_known(hash) { - debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); - return; - } - - trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); - for peer_id in &peers { - if let Some(peer) = self.peers.get_mut(peer_id) { - if let PeerSyncState::AncestorSearch { .. } = peer.state { - continue; - } - - if number > peer.best_number { - peer.best_number = number; - peer.best_hash = *hash; - } - self.allowed_requests.add(peer_id); - } - } + ); - self.fork_targets - .entry(*hash) - .or_insert_with(|| { - if let Some(metrics) = &self.metrics { - metrics.fork_targets.inc(); - } + ( + PeerSyncState::AncestorSearch { + current: common_best, + start: self.best_queued_number, + state: AncestorSearchState::ExponentialBackoff(One::one()), + }, + Some(ancestry_request::(common_best)), + ) + }; - ForkTarget { number, peers: Default::default(), parent_hash: None } - }) - .peers - .extend(peers); + self.allowed_requests.add(&peer_id); + self.peers.insert( + peer_id, + PeerSync { + peer_id, + common_number: Zero::zero(), + best_hash, + best_number, + state, + }, + ); + + Ok(req) + }, + Ok(BlockStatus::Queued) | + Ok(BlockStatus::InChainWithState) | + Ok(BlockStatus::InChainPruned) => { + debug!( + target: LOG_TARGET, + "New peer {peer_id} with known best hash {best_hash} ({best_number}).", + ); + self.peers.insert( + peer_id, + PeerSync { + peer_id, + common_number: std::cmp::min(self.best_queued_number, best_number), + best_hash, + best_number, + state: PeerSyncState::Available, + }, + ); + self.allowed_requests.add(&peer_id); + Ok(None) + }, + } } /// Submit a block response for processing. @@ -857,8 +1239,9 @@ where state: next_state, }; let request = ancestry_request::(next_num); - self.actions.push(ChainSyncAction::SendBlockRequest { + self.actions.push(SyncingAction::SendBlockRequest { peer_id: *peer_id, + key: StrategyKey::ChainSync, request, }); return Ok(()); @@ -965,240 +1348,45 @@ where // We only request one justification at a time let justification = if let Some(block) = response.blocks.into_iter().next() { - if hash != block.hash { - warn!( - target: LOG_TARGET, - "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", - peer_id, - hash, - block.hash, - ); - return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)); - } - - block - .justifications - .or_else(|| legacy_justification_mapping(block.justification)) - } else { - // we might have asked the peer for a justification on a block that we assumed it - // had but didn't (regardless of whether it had a justification for it or not). - trace!( - target: LOG_TARGET, - "Peer {peer_id:?} provided empty response for justification request {hash:?}", - ); - - None - }; - - if let Some((peer_id, hash, number, justifications)) = - self.extra_justifications.on_response(peer_id, justification) - { - self.actions.push(ChainSyncAction::ImportJustifications { - peer_id, - hash, - number, - justifications, - }); - return Ok(()); - } - } - - Ok(()) - } - - /// Report a justification import (successful or not). - pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { - let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; - self.extra_justifications - .try_finalize_root((hash, number), finalization_result, true); - self.allowed_requests.set_all(); - } - - /// Notify sync that a block has been finalized. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { - let client = &self.client; - let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { - is_descendent_of(&**client, base, block) - }); - - if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode { - if self.state_sync.is_none() { - if !self.peers.is_empty() && self.queue_blocks.is_empty() { - self.attempt_state_sync(*hash, number, *skip_proofs); - } else { - self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs)); - } - } - } - - if let Err(err) = r { - warn!( - target: LOG_TARGET, - "💔 Error cleaning up pending extra justification data requests: {err}", - ); - } - } - - fn attempt_state_sync( - &mut self, - finalized_hash: B::Hash, - finalized_number: NumberFor, - skip_proofs: bool, - ) { - let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); - heads.sort(); - let median = heads[heads.len() / 2]; - if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { - if let Ok(Some(header)) = self.client.header(finalized_hash) { - log::debug!( - target: LOG_TARGET, - "Starting state sync for #{finalized_number} ({finalized_hash})", - ); - self.state_sync = - Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs)); - self.allowed_requests.set_all(); - } else { - log::error!( - target: LOG_TARGET, - "Failed to start state sync: header for finalized block \ - #{finalized_number} ({finalized_hash}) is not available", - ); - debug_assert!(false); - } - } - } - - /// Submit a validated block announcement. - /// - /// Returns new best hash & best number of the peer if they are updated. - #[must_use] - pub fn on_validated_block_announce( - &mut self, - is_best: bool, - peer_id: PeerId, - announce: &BlockAnnounce, - ) -> Option<(B::Hash, NumberFor)> { - let number = *announce.header.number(); - let hash = announce.header.hash(); - let parent_status = - self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown); - let known_parent = parent_status != BlockStatus::Unknown; - let ancient_parent = parent_status == BlockStatus::InChainPruned; - - let known = self.is_known(&hash); - let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { - peer - } else { - error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}"); - return Some((hash, number)); - }; - - if let PeerSyncState::AncestorSearch { .. } = peer.state { - trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); - return None; - } - - let peer_info = is_best.then(|| { - // update their best block - peer.best_number = number; - peer.best_hash = hash; - - (hash, number) - }); - - // If the announced block is the best they have and is not ahead of us, our common number - // is either one further ahead or it's the one they just announced, if we know about it. - if is_best { - if known && self.best_queued_number >= number { - self.update_peer_common_number(&peer_id, number); - } else if announce.header.parent_hash() == &self.best_queued_hash || - known_parent && self.best_queued_number >= number - { - self.update_peer_common_number(&peer_id, number.saturating_sub(One::one())); - } - } - self.allowed_requests.add(&peer_id); - - // known block case - if known || self.is_already_downloading(&hash) { - trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash); - if let Some(target) = self.fork_targets.get_mut(&hash) { - target.peers.insert(peer_id); - } - return peer_info; - } - - if ancient_parent { - trace!( - target: LOG_TARGET, - "Ignored ancient block announced from {}: {} {:?}", - peer_id, - hash, - announce.header, - ); - return peer_info; - } - - if self.status().state == SyncState::Idle { - trace!( - target: LOG_TARGET, - "Added sync target for block announced from {}: {} {:?}", - peer_id, - hash, - announce.summary(), - ); - self.fork_targets - .entry(hash) - .or_insert_with(|| { - if let Some(metrics) = &self.metrics { - metrics.fork_targets.inc(); - } - - ForkTarget { - number, - parent_hash: Some(*announce.header.parent_hash()), - peers: Default::default(), - } - }) - .peers - .insert(peer_id); - } - - peer_info - } - - /// Notify that a sync peer has disconnected. - pub fn remove_peer(&mut self, peer_id: &PeerId) { - self.blocks.clear_peer_download(peer_id); - if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_peer_download(peer_id) - } - - if let Some(state) = self.peers.remove(peer_id) { - if !state.state.is_available() { - if let Some(bad_peer) = - self.disconnected_peers.on_disconnect_during_request(*peer_id) - { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + if hash != block.hash { + warn!( + target: LOG_TARGET, + "💔 Invalid block justification provided by {}: requested: {:?} got: {:?}", + peer_id, + hash, + block.hash, + ); + return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)); } - } - } - self.extra_justifications.peer_disconnected(peer_id); - self.allowed_requests.set_all(); - self.fork_targets.retain(|_, target| { - target.peers.remove(peer_id); - !target.peers.is_empty() - }); - if let Some(metrics) = &self.metrics { - metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX)); - } + block + .justifications + .or_else(|| legacy_justification_mapping(block.justification)) + } else { + // we might have asked the peer for a justification on a block that we assumed it + // had but didn't (regardless of whether it had a justification for it or not). + trace!( + target: LOG_TARGET, + "Peer {peer_id:?} provided empty response for justification request {hash:?}", + ); - let blocks = self.ready_blocks(); + None + }; - if !blocks.is_empty() { - self.validate_and_queue_blocks(blocks, false); + if let Some((peer_id, hash, number, justifications)) = + self.extra_justifications.on_response(peer_id, justification) + { + self.actions.push(SyncingAction::ImportJustifications { + peer_id, + hash, + number, + justifications, + }); + return Ok(()); + } } + + Ok(()) } /// Returns the median seen block number. @@ -1272,7 +1460,7 @@ where .set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX)); } - self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: new_blocks }) + self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks }) } fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor) { @@ -1351,7 +1539,10 @@ where PeerSyncState::DownloadingGap(_) | PeerSyncState::DownloadingState => { // Cancel a request first, as `add_peer` may generate a new request. - self.actions.push(ChainSyncAction::CancelRequest { peer_id }); + self.actions.push(SyncingAction::CancelRequest { + peer_id, + key: StrategyKey::ChainSync, + }); self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number); }, PeerSyncState::DownloadingJustification(_) => { @@ -1467,53 +1658,6 @@ where .collect() } - /// Submit blocks received in a response. - pub fn on_block_response( - &mut self, - peer_id: PeerId, - request: BlockRequest, - blocks: Vec>, - ) { - let block_response = BlockResponse:: { id: request.id, blocks }; - - let blocks_range = || match ( - block_response - .blocks - .first() - .and_then(|b| b.header.as_ref().map(|h| h.number())), - block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), - ) { - (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), - (Some(first), Some(_)) => format!(" ({})", first), - _ => Default::default(), - }; - trace!( - target: LOG_TARGET, - "BlockResponse {} from {} with {} blocks {}", - block_response.id, - peer_id, - block_response.blocks.len(), - blocks_range(), - ); - - let res = if request.fields == BlockAttributes::JUSTIFICATION { - self.on_block_justification(peer_id, block_response) - } else { - self.on_block_data(&peer_id, Some(request), block_response) - }; - - if let Err(bad_peer) = res { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); - } - } - - /// Submit a state received in a response. - pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { - if let Err(bad_peer) = self.on_state_data(&peer_id, response) { - self.actions.push(ChainSyncAction::DropPeer(bad_peer)); - } - } - /// Get justification requests scheduled by sync to be sent out. fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { let peers = &mut self.peers; @@ -1751,7 +1895,7 @@ where state: Some(state), }; debug!(target: LOG_TARGET, "State download is complete. Import is queued"); - self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: vec![block] }); + self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] }); Ok(()) }, ImportResult::Continue => Ok(()), @@ -1762,181 +1906,39 @@ where } } - /// A batch of blocks have been processed, with or without errors. - /// - /// Call this when a batch of blocks have been processed by the import - /// queue, with or without errors. - pub fn on_blocks_processed( + fn attempt_state_sync( &mut self, - imported: usize, - count: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)>, + finalized_hash: B::Hash, + finalized_number: NumberFor, + skip_proofs: bool, ) { - trace!(target: LOG_TARGET, "Imported {imported} of {count}"); - - let mut has_error = false; - for (_, hash) in &results { - if self.queue_blocks.remove(hash) { - if let Some(metrics) = &self.metrics { - metrics.queued_blocks.dec(); - } - } - self.blocks.clear_queued(hash); - if let Some(gap_sync) = &mut self.gap_sync { - gap_sync.blocks.clear_queued(hash); - } - } - for (result, hash) in results { - if has_error { - break; - } - - has_error |= result.is_err(); - - match result { - Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => { - if let Some(peer) = peer_id { - self.update_peer_common_number(&peer, number); - } - }, - Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { - if aux.clear_justification_requests { - trace!( - target: LOG_TARGET, - "Block imported clears all pending justification requests {number}: {hash:?}", - ); - self.clear_justification_requests(); - } - - if aux.needs_justification { - trace!( - target: LOG_TARGET, - "Block imported but requires justification {number}: {hash:?}", - ); - self.request_justification(&hash, number); - } - - if aux.bad_justification { - if let Some(ref peer) = peer_id { - warn!("💔 Sent block with bad justification to import"); - self.actions.push(ChainSyncAction::DropPeer(BadPeer( - *peer, - rep::BAD_JUSTIFICATION, - ))); - } - } - - if let Some(peer) = peer_id { - self.update_peer_common_number(&peer, number); - } - let state_sync_complete = - self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash); - if state_sync_complete { - info!( - target: LOG_TARGET, - "State sync is complete ({} MiB), restarting block sync.", - self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), - ); - self.state_sync = None; - self.mode = ChainSyncMode::Full; - self.restart(); - } - let gap_sync_complete = - self.gap_sync.as_ref().map_or(false, |s| s.target == number); - if gap_sync_complete { - info!( - target: LOG_TARGET, - "Block history download is complete." - ); - self.gap_sync = None; - } - }, - Err(BlockImportError::IncompleteHeader(peer_id)) => - if let Some(peer) = peer_id { - warn!( - target: LOG_TARGET, - "💔 Peer sent block with incomplete header to import", - ); - self.actions - .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); - self.restart(); - }, - Err(BlockImportError::VerificationFailed(peer_id, e)) => { - let extra_message = peer_id - .map_or_else(|| "".into(), |peer| format!(" received from ({peer})")); - - warn!( - target: LOG_TARGET, - "💔 Verification failed for block {hash:?}{extra_message}: {e:?}", - ); - - if let Some(peer) = peer_id { - self.actions - .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); - } - - self.restart(); - }, - Err(BlockImportError::BadBlock(peer_id)) => - if let Some(peer) = peer_id { - warn!( - target: LOG_TARGET, - "💔 Block {hash:?} received from peer {peer} has been blacklisted", - ); - self.actions.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); - }, - Err(BlockImportError::MissingState) => { - // This may happen if the chain we were requesting upon has been discarded - // in the meantime because other chain has been finalized. - // Don't mark it as bad as it still may be synced if explicitly requested. - trace!(target: LOG_TARGET, "Obsolete block {hash:?}"); - }, - e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { - warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err()); - self.state_sync = None; - self.restart(); - }, - Err(BlockImportError::Cancelled) => {}, - }; - } - - self.allowed_requests.set_all(); - } - - /// Get pending actions to perform. - #[must_use] - pub fn actions(&mut self) -> impl Iterator> { - if !self.peers.is_empty() && self.queue_blocks.is_empty() { - if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() { - self.attempt_state_sync(hash, number, skip_proofs); + let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); + heads.sort(); + let median = heads[heads.len() / 2]; + if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { + if let Ok(Some(header)) = self.client.header(finalized_hash) { + log::debug!( + target: LOG_TARGET, + "Starting state sync for #{finalized_number} ({finalized_hash})", + ); + self.state_sync = + Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs)); + self.allowed_requests.set_all(); + } else { + log::error!( + target: LOG_TARGET, + "Failed to start state sync: header for finalized block \ + #{finalized_number} ({finalized_hash}) is not available", + ); + debug_assert!(false); } } - - let block_requests = self - .block_requests() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); - self.actions.extend(block_requests); - - let justification_requests = self - .justification_requests() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); - self.actions.extend(justification_requests); - - let state_request = self - .state_request() - .into_iter() - .map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request }); - self.actions.extend(state_request); - - std::mem::take(&mut self.actions).into_iter() } /// A version of `actions()` that doesn't schedule extra requests. For testing only. #[cfg(test)] #[must_use] - fn take_actions(&mut self) -> impl Iterator> { + fn take_actions(&mut self) -> impl Iterator> { std::mem::take(&mut self.actions).into_iter() } } diff --git a/substrate/client/network/sync/src/strategy/chain_sync/test.rs b/substrate/client/network/sync/src/strategy/chain_sync/test.rs index 39d0c8f8d4d63..59436f387db6a 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync/test.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync/test.rs @@ -128,10 +128,10 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { // we wil send block requests to these peers // for these blocks we don't know about - let actions = sync.actions().collect::>(); + let actions = sync.actions().unwrap(); assert_eq!(actions.len(), 2); assert!(actions.iter().all(|action| match action { - ChainSyncAction::SendBlockRequest { peer_id, .. } => + SyncingAction::SendBlockRequest { peer_id, .. } => peer_id == &peer_id1 || peer_id == &peer_id2, _ => false, })); @@ -162,15 +162,15 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { sync.restart(); // which should make us cancel and send out again block requests to the first two peers - let actions = sync.actions().collect::>(); + let actions = sync.actions().unwrap(); assert_eq!(actions.len(), 4); let mut cancelled_first = HashSet::new(); assert!(actions.iter().all(|action| match action { - ChainSyncAction::CancelRequest { peer_id, .. } => { + SyncingAction::CancelRequest { peer_id, .. } => { cancelled_first.insert(peer_id); peer_id == &peer_id1 || peer_id == &peer_id2 }, - ChainSyncAction::SendBlockRequest { peer_id, .. } => { + SyncingAction::SendBlockRequest { peer_id, .. } => { assert!(cancelled_first.remove(peer_id)); peer_id == &peer_id1 || peer_id == &peer_id2 }, @@ -329,7 +329,7 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize, + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize, )); best_block_num += max_blocks_to_request as u32; @@ -476,7 +476,7 @@ fn can_sync_huge_fork() { } else { assert_eq!(actions.len(), 1); match &actions[0] { - ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + SyncingAction::SendBlockRequest { peer_id: _, request, key: _ } => request.clone(), action @ _ => panic!("Unexpected action: {action:?}"), } }; @@ -508,7 +508,7 @@ fn can_sync_huge_fork() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize )); best_block_num += sync.max_blocks_per_request as u32; @@ -610,7 +610,7 @@ fn syncs_fork_without_duplicate_requests() { } else { assert_eq!(actions.len(), 1); match &actions[0] { - ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + SyncingAction::SendBlockRequest { peer_id: _, request, key: _ } => request.clone(), action @ _ => panic!("Unexpected action: {action:?}"), } }; @@ -646,7 +646,7 @@ fn syncs_fork_without_duplicate_requests() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize )); best_block_num += max_blocks_to_request as u32; @@ -839,10 +839,10 @@ fn sync_restart_removes_block_but_not_justification_requests() { let actions = sync.take_actions().collect::>(); for action in actions.iter() { match action { - ChainSyncAction::CancelRequest { peer_id } => { + SyncingAction::CancelRequest { peer_id, key: _ } => { pending_responses.remove(&peer_id); }, - ChainSyncAction::SendBlockRequest { peer_id, .. } => { + SyncingAction::SendBlockRequest { peer_id, .. } => { // we drop obsolete response, but don't register a new request, it's checked in // the `assert!` below pending_responses.remove(&peer_id); @@ -852,7 +852,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { } assert!(actions.iter().any(|action| { match action { - ChainSyncAction::SendBlockRequest { peer_id, .. } => peer_id == &peers[0], + SyncingAction::SendBlockRequest { peer_id, .. } => peer_id == &peers[0], _ => false, } })); @@ -943,7 +943,7 @@ fn request_across_forks() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize )); assert_eq!(sync.best_queued_number, 107); assert_eq!(sync.best_queued_hash, block.hash()); @@ -988,7 +988,7 @@ fn request_across_forks() { assert_eq!(actions.len(), 1); assert!(matches!( &actions[0], - ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize + SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize )); assert!(sync.is_known(&block.header.parent_hash())); }