diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 3327d4bb86b..e1f59677423 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -42,7 +42,7 @@ //! make sure that the blocks are imported in the correct order. use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider}; -use sc_consensus::import_queue::{ImportQueue, IncomingBlock}; +use sc_consensus::import_queue::{ImportQueueService, IncomingBlock}; use sp_consensus::{BlockOrigin, BlockStatus}; use sp_runtime::{ generic::BlockId, @@ -103,7 +103,7 @@ impl RecoveryDelay { } /// Encapsulates the logic of the pov recovery. -pub struct PoVRecovery { +pub struct PoVRecovery { /// All the pending candidates that we are waiting for to be imported or that need to be /// recovered when `next_candidate_to_recover` tells us to do so. pending_candidates: HashMap>, @@ -119,23 +119,22 @@ pub struct PoVRecovery { waiting_for_parent: HashMap>, recovery_delay: RecoveryDelay, parachain_client: Arc, - parachain_import_queue: IQ, + parachain_import_queue: Box>, relay_chain_interface: RC, para_id: ParaId, } -impl PoVRecovery +impl PoVRecovery where PC: BlockBackend + BlockchainEvents + UsageProvider, RCInterface: RelayChainInterface + Clone, - IQ: ImportQueue, { /// Create a new instance. pub fn new( overseer_handle: OverseerHandle, recovery_delay: RecoveryDelay, parachain_client: Arc, - parachain_import_queue: IQ, + parachain_import_queue: Box>, relay_chain_interface: RCInterface, para_id: ParaId, ) -> Self { diff --git a/client/relay-chain-minimal-node/src/network.rs b/client/relay-chain-minimal-node/src/network.rs index a85fb8d7377..23e214aa5b8 100644 --- a/client/relay-chain-minimal-node/src/network.rs +++ b/client/relay-chain-minimal-node/src/network.rs @@ -21,6 +21,7 @@ use polkadot_node_network_protocol::PeerId; use sc_network::{NetworkService, SyncState}; use sc_client_api::HeaderBackend; +use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link}; use sc_network_common::{ config::{ NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig, @@ -29,14 +30,12 @@ use sc_network_common::{ service::NetworkSyncForkRequest, sync::{ message::{BlockAnnouncesHandshake, BlockRequest}, - Metrics, SyncStatus, + BadPeer, Metrics, OnBlockData, PollBlockAnnounceValidation, SyncStatus, }, }; use sc_network_light::light_client_requests; use sc_network_sync::{block_request_handler, state_request_handler}; use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle}; -use sp_consensus::BlockOrigin; -use sp_runtime::Justifications; use std::{iter, sync::Arc}; @@ -92,7 +91,6 @@ pub(crate) fn build_collator_network( chain_sync: Box::new(chain_sync), network_config: config.network.clone(), chain: client.clone(), - import_queue: Box::new(DummyImportQueue), protocol_id, metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, @@ -324,12 +322,7 @@ impl sc_network_common::sync::ChainSync for DummyChainSync { std::task::Poll::Pending } - fn peer_disconnected( - &mut self, - _who: &PeerId, - ) -> Option> { - None - } + fn peer_disconnected(&mut self, _who: &PeerId) {} fn metrics(&self) -> sc_network_common::sync::Metrics { Metrics { @@ -355,7 +348,7 @@ impl sc_network_common::sync::ChainSync for DummyChainSync { fn poll( &mut self, _cx: &mut std::task::Context, - ) -> std::task::Poll> { + ) -> std::task::Poll> { std::task::Poll::Pending } @@ -366,37 +359,39 @@ impl sc_network_common::sync::ChainSync for DummyChainSync { fn num_active_peers(&self) -> usize { 0 } + + fn process_block_response_data(&mut self, _blocks_to_import: Result, BadPeer>) {} } -struct DummyImportQueue; +struct DummyChainSyncService(std::marker::PhantomData); -impl sc_service::ImportQueue for DummyImportQueue { - fn import_blocks( - &mut self, - _origin: BlockOrigin, - _blocks: Vec>, - ) { - } +impl NetworkSyncForkRequest> for DummyChainSyncService { + fn set_sync_fork_request(&self, _peers: Vec, _hash: B::Hash, _number: NumberFor) {} +} + +impl JustificationSyncLink for DummyChainSyncService { + fn request_justification(&self, _hash: &B::Hash, _number: NumberFor) {} - fn import_justifications( + fn clear_justification_requests(&self) {} +} + +impl Link for DummyChainSyncService { + fn blocks_processed( &mut self, - _who: PeerId, - _hash: Hash, - _number: NumberFor, - _justifications: Justifications, + _imported: usize, + _count: usize, + _results: Vec<(Result>, BlockImportError>, B::Hash)>, ) { } - fn poll_actions( + fn justification_imported( &mut self, - _cx: &mut futures::task::Context, - _link: &mut dyn sc_consensus::import_queue::Link, + _who: PeerId, + _hash: &B::Hash, + _number: NumberFor, + _success: bool, ) { } -} - -struct DummyChainSyncService(std::marker::PhantomData); -impl NetworkSyncForkRequest> for DummyChainSyncService { - fn set_sync_fork_request(&self, _peers: Vec, _hash: B::Hash, _number: NumberFor) {} + fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor) {} } diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index e705364a1c8..3090ba2a80c 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Parity Technologies "] edition = "2021" [dependencies] +async-trait = "0.1.59" parking_lot = "0.12.1" # Substrate diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 067ca1c83f3..53277ca5817 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -25,23 +25,16 @@ use polkadot_primitives::v2::CollatorPair; use sc_client_api::{ Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, UsageProvider, }; -use sc_consensus::{ - import_queue::{ImportQueue, IncomingBlock, Link, RuntimeOrigin}, - BlockImport, -}; +use sc_consensus::{import_queue::ImportQueueService, BlockImport}; use sc_service::{Configuration, TaskManager}; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; -use sp_consensus::BlockOrigin; use sp_core::traits::SpawnNamed; -use sp_runtime::{ - traits::{Block as BlockT, NumberFor}, - Justifications, -}; +use sp_runtime::traits::Block as BlockT; use std::{sync::Arc, time::Duration}; /// Parameters given to [`start_collator`]. -pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner, IQ> { +pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner> { pub block_status: Arc, pub client: Arc, pub announce_block: Arc>) + Send + Sync>, @@ -50,7 +43,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn pub relay_chain_interface: RCInterface, pub task_manager: &'a mut TaskManager, pub parachain_consensus: Box>, - pub import_queue: IQ, + pub import_queue: Box>, pub collator_key: CollatorPair, pub relay_chain_slot_duration: Duration, } @@ -60,7 +53,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn /// A collator is similar to a validator in a normal blockchain. /// It is responsible for producing blocks and sending the blocks to a /// parachain validator for validation and inclusion into the relay chain. -pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner, IQ>( +pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner>( StartCollatorParams { block_status, client, @@ -73,7 +66,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner import_queue, collator_key, relay_chain_slot_duration, - }: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner, IQ>, + }: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>, ) -> sc_service::error::Result<()> where Block: BlockT, @@ -92,7 +85,6 @@ where Spawner: SpawnNamed + Clone + Send + Sync + 'static, RCInterface: RelayChainInterface + Clone + 'static, Backend: BackendT + 'static, - IQ: ImportQueue + 'static, { let consensus = cumulus_client_consensus_common::run_parachain_consensus( para_id, @@ -139,21 +131,21 @@ where } /// Parameters given to [`start_full_node`]. -pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface, IQ> { +pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> { pub para_id: ParaId, pub client: Arc, pub relay_chain_interface: RCInterface, pub task_manager: &'a mut TaskManager, pub announce_block: Arc>) + Send + Sync>, pub relay_chain_slot_duration: Duration, - pub import_queue: IQ, + pub import_queue: Box>, } /// Start a full node for a parachain. /// /// A full node will only sync the given parachain and will follow the /// tip of the chain. -pub fn start_full_node( +pub fn start_full_node( StartFullNodeParams { client, announce_block, @@ -162,7 +154,7 @@ pub fn start_full_node( para_id, relay_chain_slot_duration, import_queue, - }: StartFullNodeParams, + }: StartFullNodeParams, ) -> sc_service::error::Result<()> where Block: BlockT, @@ -176,7 +168,6 @@ where for<'a> &'a Client: BlockImport, Backend: BackendT + 'static, RCInterface: RelayChainInterface + Clone + 'static, - IQ: ImportQueue + 'static, { let consensus = cumulus_client_consensus_common::run_parachain_consensus( para_id, @@ -226,36 +217,3 @@ pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration parachain_config } - -/// A shared import queue -/// -/// This is basically a hack until the Substrate side is implemented properly. -#[derive(Clone)] -pub struct SharedImportQueue(Arc>>); - -impl SharedImportQueue { - /// Create a new instance of the shared import queue. - pub fn new + 'static>(import_queue: IQ) -> Self { - Self(Arc::new(parking_lot::Mutex::new(import_queue))) - } -} - -impl ImportQueue for SharedImportQueue { - fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { - self.0.lock().import_blocks(origin, blocks) - } - - fn import_justifications( - &mut self, - who: RuntimeOrigin, - hash: Block::Hash, - number: NumberFor, - justifications: Justifications, - ) { - self.0.lock().import_justifications(who, hash, number, justifications) - } - - fn poll_actions(&mut self, cx: &mut std::task::Context, link: &mut dyn Link) { - self.0.lock().poll_actions(cx, link) - } -} diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index 0a0d1327e0e..36ccfe2f75c 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -22,6 +22,7 @@ use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayC use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node; // Substrate Imports +use sc_consensus::ImportQueue; use sc_executor::NativeElseWasmExecutor; use sc_network::NetworkService; use sc_network_common::service::NetworkBlock; @@ -196,14 +197,15 @@ async fn start_node_impl( let validator = parachain_config.role.is_authority(); let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); - let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); + let import_queue_service = params.import_queue.service(); + let (network, system_rpc_tx, tx_handler_controller, start_network) = sc_service::build_network(sc_service::BuildNetworkParams { config: ¶chain_config, client: client.clone(), transaction_pool: transaction_pool.clone(), spawn_handle: task_manager.spawn_handle(), - import_queue: import_queue.clone(), + import_queue: params.import_queue, block_announce_validator_builder: Some(Box::new(|_| { Box::new(block_announce_validator) })), @@ -293,7 +295,7 @@ async fn start_node_impl( relay_chain_interface, spawner, parachain_consensus, - import_queue, + import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, }; @@ -307,7 +309,7 @@ async fn start_node_impl( para_id: id, relay_chain_interface, relay_chain_slot_duration, - import_queue, + import_queue: import_queue_service, }; start_full_node(params)?; diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index a239f195a47..aa10daa457f 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -43,7 +43,7 @@ use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier; use futures::lock::Mutex; use sc_consensus::{ import_queue::{BasicQueue, Verifier as VerifierT}, - BlockImportParams, + BlockImportParams, ImportQueue, }; use sc_executor::WasmExecutor; use sc_network::NetworkService; @@ -374,14 +374,15 @@ where let validator = parachain_config.role.is_authority(); let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); - let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); + let import_queue_service = params.import_queue.service(); + let (network, system_rpc_tx, tx_handler_controller, start_network) = sc_service::build_network(sc_service::BuildNetworkParams { config: ¶chain_config, client: client.clone(), transaction_pool: transaction_pool.clone(), spawn_handle: task_manager.spawn_handle(), - import_queue: import_queue.clone(), + import_queue: params.import_queue, block_announce_validator_builder: Some(Box::new(|_| { Box::new(block_announce_validator) })), @@ -450,7 +451,7 @@ where relay_chain_interface, spawner, parachain_consensus, - import_queue, + import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, }; @@ -464,7 +465,7 @@ where para_id, relay_chain_interface, relay_chain_slot_duration, - import_queue, + import_queue: import_queue_service, }; start_full_node(params)?; @@ -557,14 +558,15 @@ where let validator = parachain_config.role.is_authority(); let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); - let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); + let import_queue_service = params.import_queue.service(); + let (network, system_rpc_tx, tx_handler_controller, start_network) = sc_service::build_network(sc_service::BuildNetworkParams { config: ¶chain_config, client: client.clone(), transaction_pool: transaction_pool.clone(), spawn_handle: task_manager.spawn_handle(), - import_queue: import_queue.clone(), + import_queue: params.import_queue, block_announce_validator_builder: Some(Box::new(|_| { Box::new(block_announce_validator) })), @@ -646,7 +648,7 @@ where relay_chain_interface: relay_chain_interface.clone(), spawner, parachain_consensus, - import_queue, + import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, }; @@ -660,7 +662,7 @@ where para_id, relay_chain_interface, relay_chain_slot_duration, - import_queue, + import_queue: import_queue_service, }; start_full_node(params)?; @@ -1326,14 +1328,15 @@ where let validator = parachain_config.role.is_authority(); let prometheus_registry = parachain_config.prometheus_registry().cloned(); let transaction_pool = params.transaction_pool.clone(); - let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); + let import_queue_service = params.import_queue.service(); + let (network, system_rpc_tx, tx_handler_controller, start_network) = sc_service::build_network(sc_service::BuildNetworkParams { config: ¶chain_config, client: client.clone(), transaction_pool: transaction_pool.clone(), spawn_handle: task_manager.spawn_handle(), - import_queue: import_queue.clone(), + import_queue: params.import_queue, block_announce_validator_builder: Some(Box::new(|_| { Box::new(block_announce_validator) })), @@ -1414,7 +1417,7 @@ where relay_chain_interface, spawner, parachain_consensus, - import_queue, + import_queue: import_queue_service, collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, }; @@ -1428,7 +1431,7 @@ where para_id, relay_chain_interface, relay_chain_slot_duration, - import_queue, + import_queue: import_queue_service, }; start_full_node(params)?; diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 29f37806ccf..486184ff6b7 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -48,6 +48,7 @@ use frame_system_rpc_runtime_api::AccountNonceApi; use polkadot_primitives::v2::{CollatorPair, Hash as PHash, PersistedValidationData}; use polkadot_service::ProvideRuntimeApi; use sc_client_api::execution_extensions::ExecutionStrategies; +use sc_consensus::ImportQueue; use sc_network::{multiaddr, NetworkBlock, NetworkService}; use sc_network_common::{config::TransportConfig, service::NetworkStateInfo}; use sc_service::{ @@ -269,14 +270,15 @@ where let block_announce_validator_builder = move |_| Box::new(block_announce_validator) as Box<_>; let prometheus_registry = parachain_config.prometheus_registry().cloned(); - let import_queue = cumulus_client_service::SharedImportQueue::new(params.import_queue); + let import_queue_service = params.import_queue.service(); + let (network, system_rpc_tx, tx_handler_controller, start_network) = sc_service::build_network(sc_service::BuildNetworkParams { config: ¶chain_config, client: client.clone(), transaction_pool: transaction_pool.clone(), spawn_handle: task_manager.spawn_handle(), - import_queue: import_queue.clone(), + import_queue: params.import_queue, block_announce_validator_builder: Some(Box::new(block_announce_validator_builder)), warp_sync: None, })?; @@ -362,7 +364,7 @@ where parachain_consensus, relay_chain_interface, collator_key, - import_queue, + import_queue: import_queue_service, relay_chain_slot_duration: Duration::from_secs(6), }; @@ -374,7 +376,7 @@ where task_manager: &mut task_manager, para_id, relay_chain_interface, - import_queue, + import_queue: import_queue_service, // The slot duration is currently used internally only to configure // the recovery delay of pov-recovery. We don't want to wait for too // long on the full node to recover, so we reduce this time here.