Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

overseer: add a test to ensure all subsystem receive msgs #1590

Merged
merged 2 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async-trait = "0.1"

[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-network-protocol = { path = "../network/protocol" }
futures = { version = "0.3.5", features = ["thread-pool"] }
futures-timer = "3.0.2"
femme = "2.0.1"
Expand Down
219 changes: 217 additions & 2 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ where
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

if let Some(ref mut s) = self.bitfield_signing_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
Expand All @@ -733,7 +737,7 @@ where
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

if let Some(ref mut s) = self.availability_distribution_subsystem.instance {
if let Some(ref mut s) = self.availability_store_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

Expand Down Expand Up @@ -817,7 +821,7 @@ where
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();

if let Some(parent) = self.active_leaves.take(&(block.parent_hash, block.number - 1)) {
if let Some(parent) = block.number.checked_sub(1).and_then(|number| self.active_leaves.take(&(block.parent_hash, number))) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in practice we won't import a block with number 0, but it caused a panic in a test

update.deactivated.push(parent.0);
}

Expand Down Expand Up @@ -879,6 +883,10 @@ where
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}

if let Some(ref mut s) = self.bitfield_signing_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}

if let Some(ref mut s) = self.provisioner_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
Expand Down Expand Up @@ -1021,10 +1029,15 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(

#[cfg(test)]
mod tests {
use std::sync::atomic;
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};

use polkadot_primitives::v1::{BlockData, PoV};
use polkadot_subsystem::DummySubsystem;
use polkadot_subsystem::messages::RuntimeApiRequest;

use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent};

use super::*;


Expand Down Expand Up @@ -1492,4 +1505,206 @@ mod tests {
}
});
}

#[derive(Clone)]
struct CounterSubsystem {
stop_signals_received: Arc<atomic::AtomicUsize>,
signals_received: Arc<atomic::AtomicUsize>,
msgs_received: Arc<atomic::AtomicUsize>,
}

impl CounterSubsystem {
fn new(
stop_signals_received: Arc<atomic::AtomicUsize>,
signals_received: Arc<atomic::AtomicUsize>,
msgs_received: Arc<atomic::AtomicUsize>,
) -> Self {
Self {
stop_signals_received,
signals_received,
msgs_received,
}
}
}

impl<C, M> Subsystem<C> for CounterSubsystem
where
C: SubsystemContext<Message=M>,
M: Send,
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "counter-subsystem",
future: Box::pin(async move {
loop {
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
self.stop_signals_received.fetch_add(1, atomic::Ordering::SeqCst);
break;
},
Ok(Some(FromOverseer::Signal(_))) => {
self.signals_received.fetch_add(1, atomic::Ordering::SeqCst);
continue;
},
Ok(Some(FromOverseer::Communication { .. })) => {
self.msgs_received.fetch_add(1, atomic::Ordering::SeqCst);
continue;
},
Err(_) => (),
_ => (),
}
pending!();
}
}),
}
}
}

fn test_candidate_validation_msg() -> CandidateValidationMessage {
let (sender, _) = oneshot::channel();
let pov = Arc::new(PoV { block_data: BlockData(Vec::new()) });
CandidateValidationMessage::ValidateFromChainState(Default::default(), pov, sender)
}

fn test_candidate_backing_msg() -> CandidateBackingMessage {
let (sender, _) = oneshot::channel();
CandidateBackingMessage::GetBackedCandidates(Default::default(), sender)
}

fn test_candidate_selection_msg() -> CandidateSelectionMessage {
CandidateSelectionMessage::default()
}

fn test_chain_api_msg() -> ChainApiMessage {
let (sender, _) = oneshot::channel();
ChainApiMessage::FinalizedBlockNumber(sender)
}

fn test_collator_protocol_msg() -> CollatorProtocolMessage {
CollatorProtocolMessage::CollateOn(Default::default())
}

fn test_network_bridge_event<M>() -> NetworkBridgeEvent<M> {
NetworkBridgeEvent::PeerDisconnected(PeerId::random())
}

fn test_statement_distribution_msg() -> StatementDistributionMessage {
StatementDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}

fn test_availability_distribution_msg() -> AvailabilityDistributionMessage {
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}

fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}

fn test_provisioner_msg() -> ProvisionerMessage {
let (sender, _) = oneshot::channel();
ProvisionerMessage::RequestInherentData(Default::default(), sender)
}

fn test_pov_distribution_msg() -> PoVDistributionMessage {
PoVDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}

fn test_runtime_api_msg() -> RuntimeApiMessage {
let (sender, _) = oneshot::channel();
RuntimeApiMessage::Request(Default::default(), RuntimeApiRequest::Validators(sender))
}

fn test_availability_store_msg() -> AvailabilityStoreMessage {
let (sender, _) = oneshot::channel();
AvailabilityStoreMessage::QueryAvailableData(Default::default(), sender)
}

fn test_network_bridge_msg() -> NetworkBridgeMessage {
NetworkBridgeMessage::ReportPeer(PeerId::random(), ReputationChange::new(42, ""))
}

// Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly.
#[test]
fn overseer_all_subsystems_receive_signals_and_messages() {
let spawner = sp_core::testing::TaskExecutor::new();

executor::block_on(async move {
let stop_signals_received = Arc::new(atomic::AtomicUsize::new(0));
let signals_received = Arc::new(atomic::AtomicUsize::new(0));
let msgs_received = Arc::new(atomic::AtomicUsize::new(0));

let subsystem = CounterSubsystem::new(
stop_signals_received.clone(),
signals_received.clone(),
msgs_received.clone(),
);

let all_subsystems = AllSubsystems {
candidate_validation: subsystem.clone(),
candidate_backing: subsystem.clone(),
candidate_selection: subsystem.clone(),
collator_protocol: subsystem.clone(),
statement_distribution: subsystem.clone(),
availability_distribution: subsystem.clone(),
bitfield_signing: subsystem.clone(),
bitfield_distribution: subsystem.clone(),
provisioner: subsystem.clone(),
pov_distribution: subsystem.clone(),
runtime_api: subsystem.clone(),
availability_store: subsystem.clone(),
network_bridge: subsystem.clone(),
chain_api: subsystem.clone(),
};
let (overseer, mut handler) = Overseer::new(
vec![],
all_subsystems,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();

pin_mut!(overseer_fut);

// send a signal to each subsystem
handler.block_imported(BlockInfo {
hash: Default::default(),
parent_hash: Default::default(),
number: Default::default(),
}).await.unwrap();

// send a msg to each subsystem
// except for BitfieldSigning as the message is not instantiable
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap();
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await.unwrap();
handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await.unwrap();
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await.unwrap();
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await.unwrap();
handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await.unwrap();
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await.unwrap();
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await.unwrap();
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await.unwrap();
handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await.unwrap();
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await.unwrap();
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await.unwrap();
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await.unwrap();
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await.unwrap();

// send a stop signal to each subsystems
handler.stop().await.unwrap();

select! {
res = overseer_fut => {
const NUM_SUBSYSTEMS: usize = 14;

assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
// x2 because of broadcast_signal on startup
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS);
// -1 for BitfieldSigning
assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);

assert!(res.is_ok());
},
complete => (),
}
});
}
}