Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
overseer: add a test to ensure all subsystem receive msgs (#1590)
Browse files Browse the repository at this point in the history
* overseer: add a test to ensure all subsystem receive msgs

* lol
  • Loading branch information
ordian committed Aug 14, 2020
1 parent 73de27e commit 8a50b23
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async-trait = "0.1"

[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-network-protocol = { path = "../network/protocol" }
futures = { version = "0.3.5", features = ["thread-pool"] }
futures-timer = "3.0.2"
femme = "2.0.1"
Expand Down
219 changes: 217 additions & 2 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ where
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

if let Some(ref mut s) = self.bitfield_signing_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
Expand All @@ -733,7 +737,7 @@ where
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

if let Some(ref mut s) = self.availability_distribution_subsystem.instance {
if let Some(ref mut s) = self.availability_store_subsystem.instance {
let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}

Expand Down Expand Up @@ -817,7 +821,7 @@ where
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();

if let Some(parent) = self.active_leaves.take(&(block.parent_hash, block.number - 1)) {
if let Some(parent) = block.number.checked_sub(1).and_then(|number| self.active_leaves.take(&(block.parent_hash, number))) {
update.deactivated.push(parent.0);
}

Expand Down Expand Up @@ -879,6 +883,10 @@ where
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}

if let Some(ref mut s) = self.bitfield_signing_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}

if let Some(ref mut s) = self.provisioner_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
}
Expand Down Expand Up @@ -1021,10 +1029,15 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(

#[cfg(test)]
mod tests {
use std::sync::atomic;
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt};

use polkadot_primitives::v1::{BlockData, PoV};
use polkadot_subsystem::DummySubsystem;
use polkadot_subsystem::messages::RuntimeApiRequest;

use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent};

use super::*;


Expand Down Expand Up @@ -1492,4 +1505,206 @@ mod tests {
}
});
}

#[derive(Clone)]
struct CounterSubsystem {
stop_signals_received: Arc<atomic::AtomicUsize>,
signals_received: Arc<atomic::AtomicUsize>,
msgs_received: Arc<atomic::AtomicUsize>,
}

impl CounterSubsystem {
fn new(
stop_signals_received: Arc<atomic::AtomicUsize>,
signals_received: Arc<atomic::AtomicUsize>,
msgs_received: Arc<atomic::AtomicUsize>,
) -> Self {
Self {
stop_signals_received,
signals_received,
msgs_received,
}
}
}

impl<C, M> Subsystem<C> for CounterSubsystem
where
C: SubsystemContext<Message=M>,
M: Send,
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "counter-subsystem",
future: Box::pin(async move {
loop {
match ctx.try_recv().await {
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
self.stop_signals_received.fetch_add(1, atomic::Ordering::SeqCst);
break;
},
Ok(Some(FromOverseer::Signal(_))) => {
self.signals_received.fetch_add(1, atomic::Ordering::SeqCst);
continue;
},
Ok(Some(FromOverseer::Communication { .. })) => {
self.msgs_received.fetch_add(1, atomic::Ordering::SeqCst);
continue;
},
Err(_) => (),
_ => (),
}
pending!();
}
}),
}
}
}

fn test_candidate_validation_msg() -> CandidateValidationMessage {
let (sender, _) = oneshot::channel();
let pov = Arc::new(PoV { block_data: BlockData(Vec::new()) });
CandidateValidationMessage::ValidateFromChainState(Default::default(), pov, sender)
}

fn test_candidate_backing_msg() -> CandidateBackingMessage {
let (sender, _) = oneshot::channel();
CandidateBackingMessage::GetBackedCandidates(Default::default(), sender)
}

fn test_candidate_selection_msg() -> CandidateSelectionMessage {
CandidateSelectionMessage::default()
}

fn test_chain_api_msg() -> ChainApiMessage {
let (sender, _) = oneshot::channel();
ChainApiMessage::FinalizedBlockNumber(sender)
}

fn test_collator_protocol_msg() -> CollatorProtocolMessage {
CollatorProtocolMessage::CollateOn(Default::default())
}

fn test_network_bridge_event<M>() -> NetworkBridgeEvent<M> {
NetworkBridgeEvent::PeerDisconnected(PeerId::random())
}

fn test_statement_distribution_msg() -> StatementDistributionMessage {
StatementDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}

fn test_availability_distribution_msg() -> AvailabilityDistributionMessage {
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}

fn test_bitfield_distribution_msg() -> BitfieldDistributionMessage {
BitfieldDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}

fn test_provisioner_msg() -> ProvisionerMessage {
let (sender, _) = oneshot::channel();
ProvisionerMessage::RequestInherentData(Default::default(), sender)
}

fn test_pov_distribution_msg() -> PoVDistributionMessage {
PoVDistributionMessage::NetworkBridgeUpdateV1(test_network_bridge_event())
}

fn test_runtime_api_msg() -> RuntimeApiMessage {
let (sender, _) = oneshot::channel();
RuntimeApiMessage::Request(Default::default(), RuntimeApiRequest::Validators(sender))
}

fn test_availability_store_msg() -> AvailabilityStoreMessage {
let (sender, _) = oneshot::channel();
AvailabilityStoreMessage::QueryAvailableData(Default::default(), sender)
}

fn test_network_bridge_msg() -> NetworkBridgeMessage {
NetworkBridgeMessage::ReportPeer(PeerId::random(), ReputationChange::new(42, ""))
}

// Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly.
#[test]
fn overseer_all_subsystems_receive_signals_and_messages() {
let spawner = sp_core::testing::TaskExecutor::new();

executor::block_on(async move {
let stop_signals_received = Arc::new(atomic::AtomicUsize::new(0));
let signals_received = Arc::new(atomic::AtomicUsize::new(0));
let msgs_received = Arc::new(atomic::AtomicUsize::new(0));

let subsystem = CounterSubsystem::new(
stop_signals_received.clone(),
signals_received.clone(),
msgs_received.clone(),
);

let all_subsystems = AllSubsystems {
candidate_validation: subsystem.clone(),
candidate_backing: subsystem.clone(),
candidate_selection: subsystem.clone(),
collator_protocol: subsystem.clone(),
statement_distribution: subsystem.clone(),
availability_distribution: subsystem.clone(),
bitfield_signing: subsystem.clone(),
bitfield_distribution: subsystem.clone(),
provisioner: subsystem.clone(),
pov_distribution: subsystem.clone(),
runtime_api: subsystem.clone(),
availability_store: subsystem.clone(),
network_bridge: subsystem.clone(),
chain_api: subsystem.clone(),
};
let (overseer, mut handler) = Overseer::new(
vec![],
all_subsystems,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();

pin_mut!(overseer_fut);

// send a signal to each subsystem
handler.block_imported(BlockInfo {
hash: Default::default(),
parent_hash: Default::default(),
number: Default::default(),
}).await.unwrap();

// send a msg to each subsystem
// except for BitfieldSigning as the message is not instantiable
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap();
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await.unwrap();
handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await.unwrap();
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await.unwrap();
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await.unwrap();
handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await.unwrap();
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await.unwrap();
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await.unwrap();
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await.unwrap();
handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await.unwrap();
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await.unwrap();
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await.unwrap();
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await.unwrap();
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await.unwrap();

// send a stop signal to each subsystems
handler.stop().await.unwrap();

select! {
res = overseer_fut => {
const NUM_SUBSYSTEMS: usize = 14;

assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
// x2 because of broadcast_signal on startup
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS);
// -1 for BitfieldSigning
assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);

assert!(res.is_ok());
},
complete => (),
}
});
}
}

0 comments on commit 8a50b23

Please sign in to comment.