Skip to content

Commit

Permalink
Runtime diagnostics for leaked messages in unbounded channels (parity…
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin authored and ark0f committed Feb 27, 2023
1 parent 1a3a29e commit b7309ab
Show file tree
Hide file tree
Showing 37 changed files with 257 additions and 134 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion client/api/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ impl<Block: BlockT> StorageNotifications<Block> {
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
let receiver = self.0.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys });
let receiver = self
.0
.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000);

StorageEventStream(receiver)
}
Expand Down
4 changes: 2 additions & 2 deletions client/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
) -> Result<Self, Error> {
let beefy_best_block = Arc::new(RwLock::new(None));

let stream = best_block_stream.subscribe();
let stream = best_block_stream.subscribe(100_000);
let closure_clone = beefy_best_block.clone();
let future = stream.for_each(move |best_beefy| {
let async_clone = closure_clone.clone();
Expand All @@ -141,7 +141,7 @@ where
fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let stream = self
.finality_proof_stream
.subscribe()
.subscribe(100_000)
.map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block>(vfp));

let fut = async move {
Expand Down
2 changes: 1 addition & 1 deletion client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ where
// Subscribe to finality notifications and justifications before waiting for runtime pallet and
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
let mut finality_notifications = client.finality_notification_stream().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

// Wait for BEEFY pallet to be active before starting voter.
let persisted_state =
Expand Down
10 changes: 5 additions & 5 deletions client/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ pub(crate) fn get_beefy_streams(
let beefy_rpc_links = net.peer(index).data.beefy_rpc_links.lock().clone().unwrap();
let BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream } =
beefy_rpc_links;
best_block_streams.push(from_voter_best_beefy_stream.subscribe());
versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe());
best_block_streams.push(from_voter_best_beefy_stream.subscribe(100_000));
versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe(100_000));
});
(best_block_streams, versioned_finality_proof_streams)
}
Expand Down Expand Up @@ -736,7 +736,7 @@ async fn beefy_importing_blocks() {
let hashof1 = block.header.hash();

// Import without justifications.
let mut justif_recv = justif_stream.subscribe();
let mut justif_recv = justif_stream.subscribe(100_000);
assert_eq!(
block_import
.import_block(params(block.clone(), None), HashMap::new())
Expand Down Expand Up @@ -779,7 +779,7 @@ async fn beefy_importing_blocks() {
let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap();
let block = builder.build().unwrap().block;
let hashof2 = block.header.hash();
let mut justif_recv = justif_stream.subscribe();
let mut justif_recv = justif_stream.subscribe(100_000);
assert_eq!(
block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(),
ImportResult::Imported(ImportedAux {
Expand Down Expand Up @@ -823,7 +823,7 @@ async fn beefy_importing_blocks() {
let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap();
let block = builder.build().unwrap().block;
let hashof3 = block.header.hash();
let mut justif_recv = justif_stream.subscribe();
let mut justif_recv = justif_stream.subscribe(100_000);
assert_eq!(
block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(),
ImportResult::Imported(ImportedAux {
Expand Down
8 changes: 4 additions & 4 deletions client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
spawner: &impl sp_core::traits::SpawnEssentialNamed,
prometheus_registry: Option<&Registry>,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
let (result_sender, result_port) = buffered_link::buffered_link(100_000);

let metrics = prometheus_registry.and_then(|r| {
Metrics::register(r)
Expand Down Expand Up @@ -276,10 +276,10 @@ impl<B: BlockT> BlockImportWorker<B> {
use worker_messages::*;

let (justification_sender, mut justification_port) =
tracing_unbounded("mpsc_import_queue_worker_justification");
tracing_unbounded("mpsc_import_queue_worker_justification", 100_000);

let (block_import_sender, block_import_port) =
tracing_unbounded("mpsc_import_queue_worker_blocks");
tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000);

let mut worker = BlockImportWorker { result_sender, justification_import, metrics };

Expand Down Expand Up @@ -595,7 +595,7 @@ mod tests {

#[test]
fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link();
let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);

let (worker, mut finality_sender, mut block_import_sender) =
BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
Expand Down
12 changes: 7 additions & 5 deletions client/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! # use sp_test_primitives::Block;
//! # struct DummyLink; impl Link<Block> for DummyLink {}
//! # let mut my_link = DummyLink;
//! let (mut tx, mut rx) = buffered_link::<Block>();
//! let (mut tx, mut rx) = buffered_link::<Block>(100_000);
//! tx.blocks_processed(0, 0, vec![]);
//!
//! // Calls `my_link.blocks_processed(0, 0, vec![])` when polled.
Expand All @@ -51,9 +51,11 @@ use super::BlockImportResult;

/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and
/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
/// them to another link.
pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link");
/// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size.
pub fn buffered_link<B: BlockT>(
queue_size_warning: i64,
) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
let tx = BufferedLinkSender { tx };
let rx = BufferedLinkReceiver { rx: rx.fuse() };
(tx, rx)
Expand Down Expand Up @@ -175,7 +177,7 @@ mod tests {

#[test]
fn is_closed() {
let (tx, rx) = super::buffered_link::<Block>();
let (tx, rx) = super::buffered_link::<Block>(1);
assert!(!tx.is_closed());
drop(rx);
assert!(tx.is_closed());
Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ where
}

fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let stream = self.justification_stream.subscribe().map(
let stream = self.justification_stream.subscribe(100_000).map(
|x: sc_finality_grandpa::GrandpaJustification<Block>| {
JustificationNotification::from(x)
},
Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ impl<Block: BlockT> GossipValidator<Block> {
None => None,
};

let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator");
let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator", 100_000);
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
set_state,
Expand Down
1 change: 1 addition & 0 deletions client/finality-grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl<B: BlockT> NeighborPacketWorker<B> {
pub(super) fn new(rebroadcast_period: Duration) -> (Self, NeighborPacketSender<B>) {
let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>(
"mpsc_grandpa_neighbor_packet_worker",
100_000,
);
let delay = Delay::new(rebroadcast_period);

Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl NetworkEventStream for TestNetwork {
&self,
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = NetworkEvent> + Send>> {
let (tx, rx) = tracing_unbounded("test");
let (tx, rx) = tracing_unbounded("test", 100_000);
let _ = self.sender.unbounded_send(Event::EventStream(tx));
Box::pin(rx)
}
Expand Down Expand Up @@ -253,7 +253,7 @@ fn voter_set_state() -> SharedVoterSetState<Block> {

// needs to run in a tokio runtime.
pub(crate) fn make_test_network() -> (impl Future<Output = Tester>, TestNetwork) {
let (tx, rx) = tracing_unbounded("test");
let (tx, rx) = tracing_unbounded("test", 100_000);
let net = TestNetwork { sender: tx };

#[derive(Clone)]
Expand Down
3 changes: 2 additions & 1 deletion client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ where
}
})?;

let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command");
let (voter_commands_tx, voter_commands_rx) =
tracing_unbounded("mpsc_grandpa_voter_command", 100_000);

let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();

Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ mod tests {
aux_schema::load_persistent(&*backend, client.info().genesis_hash, 0, || Ok(voters))
.unwrap();

let (_tx, voter_command_rx) = tracing_unbounded("");
let (_tx, voter_command_rx) = tracing_unbounded("test_mpsc_voter_command", 100_000);

let observer = ObserverWork::new(
client,
Expand Down
8 changes: 4 additions & 4 deletions client/finality-grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod tests {

impl TestChainState {
fn new() -> (Self, ImportNotifications<Block>) {
let (tx, rx) = tracing_unbounded("test");
let (tx, rx) = tracing_unbounded("test", 100_000);
let state =
TestChainState { sender: tx, known_blocks: Arc::new(Mutex::new(HashMap::new())) };

Expand Down Expand Up @@ -680,7 +680,7 @@ mod tests {
// enact all dependencies before importing the message
enact_dependencies(&chain_state);

let (global_tx, global_rx) = tracing_unbounded("test");
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);

let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
Expand Down Expand Up @@ -708,7 +708,7 @@ mod tests {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();

let (global_tx, global_rx) = tracing_unbounded("test");
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);

let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
Expand Down Expand Up @@ -896,7 +896,7 @@ mod tests {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();

let (global_tx, global_rx) = tracing_unbounded("test");
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);

let block_sync_requester = TestBlockSyncRequester::default();

Expand Down
4 changes: 2 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ where
&params.network_config.transport,
)?;

let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker");
let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);

if let Some(path) = &params.network_config.net_config_path {
fs::create_dir_all(path)?;
Expand Down Expand Up @@ -1003,7 +1003,7 @@ where
H: ExHashT,
{
fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
let (tx, rx) = out_events::channel(name);
let (tx, rx) = out_events::channel(name, 100_000);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
Box::pin(rx)
}
Expand Down
Loading

0 comments on commit b7309ab

Please sign in to comment.