Skip to content

Commit

Permalink
Move import queue out of sc-network (paritytech#12764)
Browse files Browse the repository at this point in the history
* Move import queue out of `sc-network`

Add supplementary asynchronous API for the import queue which means
it can be run as an independent task and communicated with through
the `ImportQueueService`.

This commit removes removes block and justification imports from
`sc-network` and provides `ChainSync` with a handle to import queue so
it can import blocks and justifications. Polling of the import queue is
moved complete out of `sc-network` and `sc_consensus::Link` is
implemented for `ChainSyncInterfaceHandled` so the import queue
can still influence the syncing process.

* Fix tests

* Apply review comments

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Update client/network/sync/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

Co-authored-by: Bastian Köcher <git@kchr.de>
  • Loading branch information
2 people authored and ltfschoen committed Feb 22, 2023
1 parent 3c47758 commit 5f344f4
Show file tree
Hide file tree
Showing 24 changed files with 716 additions and 490 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ futures = { version = "0.3.21", features = ["thread-pool"] }
futures-timer = "3.0.1"
libp2p = { version = "0.49.0", default-features = false }
log = "0.4.17"
mockall = "0.11.2"
parking_lot = "0.12.1"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0.30"
Expand Down
23 changes: 19 additions & 4 deletions client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub type DefaultImportQueue<Block, Client> =

mod basic_queue;
pub mod buffered_link;
pub mod mock;

/// Shared block import struct used by the queue.
pub type BoxBlockImport<B, Transaction> =
Expand Down Expand Up @@ -105,10 +106,10 @@ pub trait Verifier<B: BlockT>: Send + Sync {
/// Blocks import queue API.
///
/// The `import_*` methods can be called in order to send elements for the import queue to verify.
/// Afterwards, call `poll_actions` to determine how to respond to these elements.
pub trait ImportQueue<B: BlockT>: Send {
pub trait ImportQueueService<B: BlockT>: Send {
/// Import bunch of blocks.
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);

/// Import block justifications.
fn import_justifications(
&mut self,
Expand All @@ -117,12 +118,26 @@ pub trait ImportQueue<B: BlockT>: Send {
number: NumberFor<B>,
justifications: Justifications,
);
/// Polls for actions to perform on the network.
///
}

#[async_trait::async_trait]
pub trait ImportQueue<B: BlockT>: Send {
/// Get a copy of the handle to [`ImportQueueService`].
fn service(&self) -> Box<dyn ImportQueueService<B>>;

/// Get a reference to the handle to [`ImportQueueService`].
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;

/// This method should behave in a way similar to `Future::poll`. It can register the current
/// task and notify later when more actions are ready to be polled. To continue the comparison,
/// it is as if this method always returned `Poll::Pending`.
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &mut dyn Link<B>);

/// Start asynchronous runner for import queue.
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influece the synchronization process.
async fn run(self, link: Box<dyn Link<B>>);
}

/// Hooks that the verification queue can use to influence the synchronization
Expand Down
69 changes: 60 additions & 9 deletions client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,17 @@ use crate::{
import_queue::{
buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport,
BoxJustificationImport, ImportQueue, IncomingBlock, Link, RuntimeOrigin, Verifier,
BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link,
RuntimeOrigin, Verifier,
},
metrics::Metrics,
};

/// Interface to a basic block import queue that is importing blocks sequentially in a separate
/// task, with plugable verification.
pub struct BasicQueue<B: BlockT, Transaction> {
/// Channel to send justification import messages to the background task.
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
/// Channel to send block import messages to the background task.
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
/// Handle for sending justification and block import messages to the background task.
handle: BasicQueueHandle<B>,
/// Results coming from the worker task.
result_port: BufferedLinkReceiver<B>,
_phantom: PhantomData<Transaction>,
Expand All @@ -54,8 +53,7 @@ pub struct BasicQueue<B: BlockT, Transaction> {
impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
fn drop(&mut self) {
// Flush the queue and close the receiver to terminate the future.
self.justification_sender.close_channel();
self.block_import_sender.close_channel();
self.handle.close();
self.result_port.close();
}
}
Expand Down Expand Up @@ -95,11 +93,37 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
future.boxed(),
);

Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData }
Self {
handle: BasicQueueHandle::new(justification_sender, block_import_sender),
result_port,
_phantom: PhantomData,
}
}
}

impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction> {
#[derive(Clone)]
struct BasicQueueHandle<B: BlockT> {
/// Channel to send justification import messages to the background task.
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
/// Channel to send block import messages to the background task.
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
}

impl<B: BlockT> BasicQueueHandle<B> {
pub fn new(
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
) -> Self {
Self { justification_sender, block_import_sender }
}

pub fn close(&mut self) {
self.justification_sender.close_channel();
self.block_import_sender.close_channel();
}
}

impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if blocks.is_empty() {
return
Expand Down Expand Up @@ -138,12 +162,39 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
}
}
}
}

#[async_trait::async_trait]
impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction> {
/// Get handle to [`ImportQueueService`].
fn service(&self) -> Box<dyn ImportQueueService<B>> {
Box::new(self.handle.clone())
}

/// Get a reference to the handle to [`ImportQueueService`].
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B> {
&mut self.handle
}

/// Poll actions from network.
fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
if self.result_port.poll_actions(cx, link).is_err() {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
}
}

/// Start asynchronous runner for import queue.
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influece the synchronization process.
async fn run(mut self, mut link: Box<dyn Link<B>>) {
loop {
if let Err(_) = self.result_port.next_action(&mut *link).await {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
return
}
}
}
}

/// Messages destinated to the background worker.
Expand Down
32 changes: 23 additions & 9 deletions client/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<B: BlockT> Clone for BufferedLinkSender<B> {
}

/// Internal buffered message.
enum BlockImportWorkerMsg<B: BlockT> {
pub enum BlockImportWorkerMsg<B: BlockT> {
BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, bool),
RequestJustification(B::Hash, NumberFor<B>),
Expand Down Expand Up @@ -122,6 +122,18 @@ pub struct BufferedLinkReceiver<B: BlockT> {
}

impl<B: BlockT> BufferedLinkReceiver<B> {
/// Send action for the synchronization to perform.
pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &mut dyn Link<B>) {
match msg {
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
link.blocks_processed(imported, count, results),
BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
link.justification_imported(who, &hash, number, success),
BlockImportWorkerMsg::RequestJustification(hash, number) =>
link.request_justification(&hash, number),
}
}

/// Polls for the buffered link actions. Any enqueued action will be propagated to the link
/// passed as parameter.
///
Expand All @@ -138,15 +150,17 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
Poll::Pending => break Ok(()),
};

match msg {
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
link.blocks_processed(imported, count, results),
BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
link.justification_imported(who, &hash, number, success),
BlockImportWorkerMsg::RequestJustification(hash, number) =>
link.request_justification(&hash, number),
}
self.send_actions(msg, &mut *link);
}
}

/// Poll next element from import queue and send the corresponding action command over the link.
pub async fn next_action(&mut self, link: &mut dyn Link<B>) -> Result<(), ()> {
if let Some(msg) = self.rx.next().await {
self.send_actions(msg, link);
return Ok(())
}
Err(())
}

/// Close the channel.
Expand Down
46 changes: 46 additions & 0 deletions client/consensus/common/src/import_queue/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// This file is part of Substrate.

// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use super::*;

mockall::mock! {
pub ImportQueueHandle<B: BlockT> {}

impl<B: BlockT> ImportQueueService<B> for ImportQueueHandle<B> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
fn import_justifications(
&mut self,
who: RuntimeOrigin,
hash: B::Hash,
number: NumberFor<B>,
justifications: Justifications,
);
}
}

mockall::mock! {
pub ImportQueue<B: BlockT> {}

#[async_trait::async_trait]
impl<B: BlockT> ImportQueue<B> for ImportQueue<B> {
fn service(&self) -> Box<dyn ImportQueueService<B>>;
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
fn poll_actions<'a>(&mut self, cx: &mut futures::task::Context<'a>, link: &mut dyn Link<B>);
async fn run(self, link: Box<dyn Link<B>>);
}
}
28 changes: 12 additions & 16 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ pub mod warp;

use libp2p::PeerId;
use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse};
use sc_consensus::{
import_queue::RuntimeOrigin, BlockImportError, BlockImportStatus, IncomingBlock,
};
use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock};
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Expand Down Expand Up @@ -317,6 +315,12 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;

/// Procss received block data.
fn process_block_response_data(
&mut self,
blocks_to_import: Result<OnBlockData<Block>, BadPeer>,
);

/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
Expand All @@ -326,17 +330,6 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockJustification<Block>, BadPeer>;

/// A batch of blocks have been processed, with or without errors.
///
/// Call this when a batch of blocks have been processed by the import
/// queue, with or without errors.
fn on_blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>, Block::Hash)>,
) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<Block>), BadPeer>>>;

/// Call this when a justification has been processed by the import queue,
/// with or without errors.
fn on_justification_import(
Expand Down Expand Up @@ -378,7 +371,7 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Call when a peer has disconnected.
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<Block>>;
fn peer_disconnected(&mut self, who: &PeerId);

/// Return some key metrics.
fn metrics(&self) -> Metrics;
Expand All @@ -395,7 +388,10 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
/// this function should be polled until it returns [`Poll::Pending`] to
/// consume all pending events.
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<PollResult<Block>>;
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;

/// Send block request to peer
fn send_block_request(&mut self, who: PeerId, request: BlockRequest<Block>);
Expand Down
Loading

0 comments on commit 5f344f4

Please sign in to comment.