diff --git a/Cargo.lock b/Cargo.lock index 4a7655a30048..85c439355c3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2623,6 +2623,7 @@ dependencies = [ name = "libp2p-gossipsub" version = "0.46.1" dependencies = [ + "async-channel", "async-std", "asynchronous-codec", "base64 0.21.5", diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 37873de39f98..2a0510038f1f 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -37,6 +37,7 @@ sha2 = "0.10.8" smallvec = "1.11.2" tracing = "0.1.37" void = "1.0.2" +async-channel = "1.9.0" # Metrics dependencies prometheus-client = { workspace = true } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 24a32de4cc79..af8abd0725f5 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -45,8 +45,6 @@ use libp2p_swarm::{ THandlerOutEvent, ToSwarm, }; -use crate::backoff::BackoffStorage; -use crate::config::{Config, ValidationMode}; use crate::gossip_promises::GossipPromises; use crate::handler::{Handler, HandlerEvent, HandlerIn}; use crate::mcache::MessageCache; @@ -62,6 +60,11 @@ use crate::types::{ SubscriptionAction, }; use crate::types::{PeerConnections, PeerKind, RpcOut}; +use crate::{backoff::BackoffStorage, types::RpcSender}; +use crate::{ + config::{Config, ValidationMode}, + types::rpc_channel, +}; use crate::{rpc_proto::proto, TopicScoreParams}; use crate::{PublishError, SubscriptionError, ValidationError}; use instant::SystemTime; @@ -332,6 +335,9 @@ pub struct Behaviour { /// Keep track of a set of internal metrics relating to gossipsub. metrics: Option, + + /// Connection handler message queue channels. + handler_send_queues: HashMap, } impl Behaviour @@ -471,6 +477,7 @@ where config, subscription_filter, data_transform, + handler_send_queues: Default::default(), }) } } @@ -537,7 +544,8 @@ where for peer in self.peer_topics.keys().copied().collect::>() { tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); let event = RpcOut::Subscribe(topic_hash.clone()); - self.send_message(peer, event); + self.send_message(peer, event) + .expect("Subscribe messages should be always sent"); } // call JOIN(topic) @@ -564,7 +572,8 @@ where for peer in self.peer_topics.keys().copied().collect::>() { tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); let event = RpcOut::Unsubscribe(topic_hash.clone()); - self.send_message(peer, event); + self.send_message(peer, event) + .expect("Subscribe messages should be always sent"); } // call LEAVE(topic) @@ -711,9 +720,18 @@ where } // Send to peers we know are subscribed to the topic. + let mut errors = 0; for peer_id in recipient_peers.iter() { tracing::trace!(peer=%peer_id, "Sending message to peer"); - self.send_message(*peer_id, RpcOut::Publish(raw_message.clone())); + if self + .send_message(*peer_id, RpcOut::Publish(raw_message.clone())) + .is_err() + { + errors += 1; + } + } + if errors == recipient_peers.len() { + return Err(PublishError::InsufficientPeers); } tracing::debug!(message=%msg_id, "Published message"); @@ -1311,7 +1329,7 @@ where ); } else { tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - self.send_message(*peer_id, RpcOut::Forward(msg)); + let _ = self.send_message(*peer_id, RpcOut::Forward(msg)); } } } @@ -1469,7 +1487,8 @@ where .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) .collect::>() { - self.send_message(*peer_id, RpcOut::Control(action)); + self.send_message(*peer_id, RpcOut::Control(action)) + .expect("PRUNE messages should always be sent"); } // Send the prune messages to the peer tracing::debug!( @@ -1970,6 +1989,7 @@ where .collect::>() { self.send_message(*propagation_source, RpcOut::Control(action)) + .expect("GRAFT messages should always be sent"); } // Notify the application of the subscriptions @@ -2520,7 +2540,8 @@ where // send the control messages for msg in control_msgs.chain(prunes).collect::>() { - self.send_message(peer, RpcOut::Control(msg)); + self.send_message(peer, RpcOut::Control(msg)) + .expect("PRUNE messages should always be sent"); } } @@ -2534,7 +2555,8 @@ where self.config.do_px() && !no_px.contains(peer), false, ); - self.send_message(*peer, RpcOut::Control(prune)); + self.send_message(*peer, RpcOut::Control(prune)) + .expect("PRUNE messages should always be sent"); // inform the handler peer_removed_from_mesh( @@ -2606,7 +2628,7 @@ where for peer in recipient_peers.iter() { tracing::debug!(%peer, message=%msg_id, "Sending message to peer"); - self.send_message(*peer, event.clone()); + let _ = self.send_message(*peer, event.clone()); } tracing::debug!("Completed forwarding message"); Ok(true) @@ -2720,7 +2742,7 @@ where fn flush_control_pool(&mut self) { for (peer, controls) in self.control_pool.drain().collect::>() { for msg in controls { - self.send_message(peer, RpcOut::Control(msg)); + let _ = self.send_message(peer, RpcOut::Control(msg)); } } @@ -2730,19 +2752,24 @@ where /// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it /// is not already an arc. - fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) { + fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> Result<(), ()> { + let sender = self + .handler_send_queues + .get_mut(&peer_id) + .expect("Peerid should exist"); + + if sender.try_send(rpc.clone()).is_err() { + tracing::debug!(peer=%peer_id, "Dropping message as peer is full"); + return Err(()); + } + if let Some(m) = self.metrics.as_mut() { if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc { // register bytes sent on the internal metrics. m.msg_sent(&message.topic, message.raw_protobuf_len()); } } - - self.events.push_back(ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(rpc), - handler: NotifyHandler::Any, - }); + Ok(()) } fn on_connection_established( @@ -2811,7 +2838,8 @@ where tracing::debug!(peer=%peer_id, "New peer connected"); // We need to send our subscriptions to the newly-connected node. for topic_hash in self.mesh.clone().into_keys() { - self.send_message(peer_id, RpcOut::Subscribe(topic_hash)); + self.send_message(peer_id, RpcOut::Subscribe(topic_hash)) + .expect("Subscribe messages should be always sent"); } } @@ -2939,6 +2967,7 @@ where } self.connected_peers.remove(&peer_id); + self.handler_send_queues.remove(&peer_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.remove_peer(&peer_id); @@ -2998,21 +3027,25 @@ where fn handle_established_inbound_connection( &mut self, _: ConnectionId, - _: PeerId, + peer_id: PeerId, _: &Multiaddr, _: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(Handler::new(self.config.protocol_config())) + let (sender, receiver) = rpc_channel(self.config.connection_handler_queue_len()); + self.handler_send_queues.insert(peer_id, sender); + Ok(Handler::new(self.config.protocol_config(), receiver)) } fn handle_established_outbound_connection( &mut self, _: ConnectionId, - _: PeerId, + peer_id: PeerId, _: &Multiaddr, _: Endpoint, ) -> Result, ConnectionDenied> { - Ok(Handler::new(self.config.protocol_config())) + let (sender, receiver) = rpc_channel(self.config.connection_handler_queue_len()); + self.handler_send_queues.insert(peer_id, sender); + Ok(Handler::new(self.config.protocol_config(), receiver)) } fn on_connection_handler_event( diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 570cdf43f907..050b387a5e3a 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -23,6 +23,7 @@ use super::*; use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; +use crate::types::RpcReceiver; use crate::ValidationError; use crate::{ config::Config, config::ConfigBuilder, types::Rpc, IdentTopic as Topic, TopicScoreParams, @@ -58,7 +59,14 @@ where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, { - pub(crate) fn create_network(self) -> (Behaviour, Vec, Vec) { + pub(crate) fn create_network( + self, + ) -> ( + Behaviour, + Vec, + HashMap, + Vec, + ) { let keypair = libp2p_identity::Keypair::generate_ed25519(); // create a gossipsub struct let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( @@ -86,10 +94,11 @@ where // build and connect peer_no random peers let mut peers = vec![]; + let mut receiver_queues = HashMap::new(); let empty = vec![]; for i in 0..self.peer_no { - peers.push(add_peer( + let (peer, receiver) = add_peer( &mut gs, if self.to_subscribe { &topic_hashes @@ -98,10 +107,12 @@ where }, i < self.outbound, i < self.explicit, - )); + ); + peers.push(peer); + receiver_queues.insert(peer, receiver); } - (gs, peers, topic_hashes) + (gs, peers, receiver_queues, topic_hashes) } fn peer_no(mut self, peer_no: usize) -> Self { @@ -165,7 +176,7 @@ fn add_peer( topic_hashes: &Vec, outbound: bool, explicit: bool, -) -> PeerId +) -> (PeerId, RpcReceiver) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -179,7 +190,7 @@ fn add_peer_with_addr( outbound: bool, explicit: bool, address: Multiaddr, -) -> PeerId +) -> (PeerId, RpcReceiver) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -201,7 +212,7 @@ fn add_peer_with_addr_and_kind( explicit: bool, address: Multiaddr, kind: Option, -) -> PeerId +) -> (PeerId, RpcReceiver) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -219,6 +230,9 @@ where } }; + let (sender, receiver) = rpc_channel(100); + gs.handler_send_queues.insert(peer, sender); + gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: peer, connection_id: ConnectionId::new_unchecked(0), @@ -249,7 +263,7 @@ where &peer, ); } - peer + (peer, receiver) } fn disconnect_peer(gs: &mut Behaviour, peer_id: &PeerId) @@ -389,7 +403,7 @@ fn test_subscribe() { // - run JOIN(topic) let subscribe_topic = vec![String::from("test_subscribe")]; - let (gs, _, topic_hashes) = inject_nodes1() + let (gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(subscribe_topic) .to_subscribe(true) @@ -401,19 +415,16 @@ fn test_subscribe() { ); // collect all the subscriptions - let subscriptions = gs - .events - .iter() - .filter(|e| { - matches!( - e, - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(_)), - .. + let subscriptions = queues + .into_values() + .fold(0, |mut collected_subscriptions, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Subscribe(_)) = c.priority.try_recv() { + collected_subscriptions = collected_subscriptions + 1 } - ) - }) - .count(); + } + collected_subscriptions + }); // we sent a subscribe to all known peers assert_eq!(subscriptions, 20); @@ -434,7 +445,7 @@ fn test_unsubscribe() { .collect::>(); // subscribe to topic_strings - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) @@ -462,15 +473,15 @@ fn test_unsubscribe() { ); // collect all the subscriptions - let subscriptions = gs - .events - .iter() - .fold(0, |collected_subscriptions, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(_)), - .. - } => collected_subscriptions + 1, - _ => collected_subscriptions, + let subscriptions = queues + .into_values() + .fold(0, |mut collected_subscriptions, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Subscribe(_)) = c.priority.try_recv() { + collected_subscriptions = collected_subscriptions + 1 + } + } + collected_subscriptions }); // we sent a unsubscribe to all known peers, for two topics @@ -503,7 +514,7 @@ fn test_join() { .map(|t| Topic::new(t.clone())) .collect::>(); - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, _receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) @@ -557,14 +568,27 @@ fn test_join() { gs.fanout .insert(topic_hashes[1].clone(), Default::default()); let mut new_peers: Vec = vec![]; + + let mut peers = vec![]; for _ in 0..3 { let random_peer = PeerId::random(); // inform the behaviour of a new peer + let address = "/ip4/127.0.0.1".parse::().unwrap(); + let peer = gs + .handle_established_inbound_connection( + ConnectionId::new_unchecked(0), + random_peer, + &address, + &address, + ) + .unwrap(); + peers.push(peer); + gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: random_peer, connection_id: ConnectionId::new_unchecked(0), endpoint: &ConnectedPoint::Dialer { - address: "/ip4/127.0.0.1".parse::().unwrap(), + address, role_override: Endpoint::Dialer, }, failed_addresses: &[], @@ -616,7 +640,7 @@ fn test_publish_without_flood_publishing() { .unwrap(); let publish_topic = String::from("test_publish"); - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![publish_topic.clone()]) .to_subscribe(true) @@ -640,18 +664,15 @@ fn test_publish_without_flood_publishing() { gs.publish(Topic::new(publish_topic), publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Publish(message)), - .. - } => { - collected_publish.push(message); - collected_publish + let publishes = queues + .into_values() + .fold(vec![], |mut collected_publish, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + collected_publish.push(message); + } } - _ => collected_publish, + collected_publish }); // Transform the inbound message @@ -695,7 +716,7 @@ fn test_fanout() { .unwrap(); let fanout_topic = String::from("test_fanout"); - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![fanout_topic.clone()]) .to_subscribe(true) @@ -727,18 +748,15 @@ fn test_fanout() { ); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Publish(message)), - .. - } => { - collected_publish.push(message); - collected_publish + let publishes = queues + .into_values() + .fold(vec![], |mut collected_publish, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + collected_publish.push(message); + } } - _ => collected_publish, + collected_publish }); // Transform the inbound message @@ -769,7 +787,7 @@ fn test_fanout() { #[test] /// Test the gossipsub NetworkBehaviour peer connection logic. fn test_inject_connected() { - let (gs, peers, topic_hashes) = inject_nodes1() + let (gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -777,26 +795,19 @@ fn test_inject_connected() { // check that our subscriptions are sent to each of the peers // collect all the SendEvents - let subscriptions = gs - .events - .into_iter() - .filter_map(|e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(topic)), - peer_id, - .. - } => Some((peer_id, topic)), - _ => None, - }) - .fold( - HashMap::>::new(), - |mut subs, (peer, sub)| { - let mut peer_subs = subs.remove(&peer).unwrap_or_default(); - peer_subs.push(sub.into_string()); - subs.insert(peer, peer_subs); - subs - }, - ); + let subscriptions = queues.into_iter().fold( + HashMap::>::new(), + |mut collected_subscriptions, (peer, c)| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Subscribe(topic)) = c.priority.try_recv() { + let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default(); + peer_subs.push(topic.into_string()); + collected_subscriptions.insert(peer, peer_subs); + } + } + collected_subscriptions + }, + ); // check that there are two subscriptions sent to each peer for peer_subs in subscriptions.values() { @@ -831,7 +842,7 @@ fn test_handle_received_subscriptions() { .iter() .map(|&t| String::from(t)) .collect(); - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topics) .to_subscribe(false) @@ -991,7 +1002,7 @@ fn test_get_random_peers() { /// Tests that the correct message is sent when a peer asks for a message in our cache. #[test] fn test_handle_iwant_msg_cached() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1019,18 +1030,16 @@ fn test_handle_iwant_msg_cached() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); // the messages we are sending - let sent_messages = gs.events.into_iter().fold( - Vec::::new(), - |mut collected_messages, e| match e { - ToSwarm::NotifyHandler { event, .. } => { - if let HandlerIn::Message(RpcOut::Forward(message)) = event { - collected_messages.push(message); + let sent_messages = queues + .into_values() + .fold(vec![], |mut collected_messages, c| { + while !c.non_priority.is_empty() { + if let Ok(RpcOut::Forward(msg)) = c.non_priority.try_recv() { + collected_messages.push(msg) } - collected_messages } - _ => collected_messages, - }, - ); + collected_messages + }); assert!( sent_messages @@ -1044,7 +1053,7 @@ fn test_handle_iwant_msg_cached() { /// Tests that messages are sent correctly depending on the shifting of the message cache. #[test] fn test_handle_iwant_msg_cached_shifted() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1077,18 +1086,20 @@ fn test_handle_iwant_msg_cached_shifted() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); // is the message is being sent? - let message_exists = gs.events.iter().any(|e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Forward(message)), - .. - } => { - gs.config.message_id( - &gs.data_transform - .inbound_transform(message.clone()) - .unwrap(), - ) == msg_id + let message_exists = queues.values().any(|c| { + let mut out = false; + while !c.non_priority.is_empty() { + if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward(message)) if + gs.config.message_id( + &gs.data_transform + .inbound_transform(message.clone()) + .unwrap(), + ) == msg_id) + { + out = true; + } } - _ => false, + out }); // default history_length is 5, expect no messages after shift > 5 if shift < 5 { @@ -1108,7 +1119,7 @@ fn test_handle_iwant_msg_cached_shifted() { #[test] // tests that an event is not created when a peers asks for a message not in our cache fn test_handle_iwant_msg_not_cached() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1127,7 +1138,7 @@ fn test_handle_iwant_msg_not_cached() { #[test] // tests that an event is created when a peer shares that it has a message we want fn test_handle_ihave_subscribed_and_msg_not_cached() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1159,7 +1170,7 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { // tests that an event is not created when a peer shares that it has a message that // we already have fn test_handle_ihave_subscribed_and_msg_cached() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1181,7 +1192,7 @@ fn test_handle_ihave_subscribed_and_msg_cached() { // test that an event is not created when a peer shares that it has a message in // a topic that we are not subscribed to fn test_handle_ihave_not_subscribed() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(20) .topics(vec![]) .to_subscribe(true) @@ -1207,7 +1218,7 @@ fn test_handle_ihave_not_subscribed() { // tests that a peer is added to our mesh when we are both subscribed // to the same topic fn test_handle_graft_is_subscribed() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1225,7 +1236,7 @@ fn test_handle_graft_is_subscribed() { // tests that a peer is not added to our mesh when they are subscribed to // a topic that we are not fn test_handle_graft_is_not_subscribed() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1250,7 +1261,7 @@ fn test_handle_graft_multiple_topics() { .map(|&t| String::from(t)) .collect(); - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topics) .to_subscribe(true) @@ -1280,7 +1291,7 @@ fn test_handle_graft_multiple_topics() { #[test] // tests that a peer is removed from our mesh fn test_handle_prune_peer_in_mesh() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1309,34 +1320,50 @@ fn test_handle_prune_peer_in_mesh() { fn count_control_msgs( gs: &Behaviour, + queues: &HashMap, mut filter: impl FnMut(&PeerId, &ControlAction) -> bool, ) -> usize { gs.control_pool .iter() .map(|(peer_id, actions)| actions.iter().filter(|m| filter(peer_id, m)).count()) .sum::() - + gs.events + + queues .iter() - .filter(|e| match e { - ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(RpcOut::Control(action)), - .. - } => filter(peer_id, action), - _ => false, + .fold(0, |mut collected_messages, (peer_id, c)| { + while !c.priority.is_empty() || !c.non_priority.is_empty() { + if let Ok(RpcOut::Control(action)) = c.priority.try_recv() { + if filter(peer_id, &action) { + collected_messages = collected_messages + 1; + } + } + if let Ok(RpcOut::Control(action)) = c.non_priority.try_recv() { + if filter(peer_id, &action) { + collected_messages = collected_messages + 1; + } + } + } + collected_messages }) - .count() } -fn flush_events(gs: &mut Behaviour) { +fn flush_events( + gs: &mut Behaviour, + receiver_queues: &mut HashMap, +) { gs.control_pool.clear(); gs.events.clear(); + for c in receiver_queues.values_mut() { + while !c.priority.is_empty() || !c.non_priority.is_empty() { + let _ = c.priority.try_recv(); + let _ = c.non_priority.try_recv(); + } + } } #[test] // tests that a peer added as explicit peer gets connected to fn test_explicit_peer_gets_connected() { - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) .topics(Vec::new()) .to_subscribe(true) @@ -1369,7 +1396,7 @@ fn test_explicit_peer_reconnects() { .check_explicit_peers_ticks(2) .build() .unwrap(); - let (mut gs, others, _) = inject_nodes1() + let (mut gs, others, mut queues, _) = inject_nodes1() .peer_no(1) .topics(Vec::new()) .to_subscribe(true) @@ -1381,7 +1408,7 @@ fn test_explicit_peer_reconnects() { //add peer as explicit peer gs.add_explicit_peer(peer); - flush_events(&mut gs); + flush_events(&mut gs, &mut queues); //disconnect peer disconnect_peer(&mut gs, peer); @@ -1419,7 +1446,7 @@ fn test_explicit_peer_reconnects() { #[test] fn test_handle_graft_explicit_peer() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1437,7 +1464,7 @@ fn test_handle_graft_explicit_peer() { //check prunes assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == peer + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == peer && match m { ControlAction::Prune { topic_hash, .. } => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], @@ -1450,7 +1477,7 @@ fn test_handle_graft_explicit_peer() { #[test] fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { - let (gs, peers, topic_hashes) = inject_nodes1() + let (gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1466,7 +1493,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && matches!(m, ControlAction::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" @@ -1474,7 +1501,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1483,7 +1510,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { #[test] fn do_not_graft_explicit_peer() { - let (mut gs, others, topic_hashes) = inject_nodes1() + let (mut gs, others, queues, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic")]) .to_subscribe(true) @@ -1498,7 +1525,7 @@ fn do_not_graft_explicit_peer() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &others[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &others[0] && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1507,7 +1534,7 @@ fn do_not_graft_explicit_peer() { #[test] fn do_forward_messages_to_explicit_peers() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1527,21 +1554,15 @@ fn do_forward_messages_to_explicit_peers() { validated: true, }; gs.handle_received_message(message.clone(), &local_id); - assert_eq!( - gs.events - .iter() - .filter(|e| match e { - ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(RpcOut::Forward(m)), - .. - } => { - peer_id == &peers[0] && m.data == message.data + queues.into_iter().fold(0, |mut fwds, (peer_id, c)| { + while !c.non_priority.is_empty() { + if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward(m)) if peer_id == peers[0] && m.data == message.data) { + fwds = fwds +1; + } } - _ => false, - }) - .count(), + fwds + }), 1, "The message did not get forwarded to the explicit peer" ); @@ -1549,7 +1570,7 @@ fn do_forward_messages_to_explicit_peers() { #[test] fn explicit_peers_not_added_to_mesh_on_subscribe() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1578,7 +1599,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && matches!(m, ControlAction::Graft { .. })) > 0, "No graft message got created to non-explicit peer" @@ -1586,7 +1607,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1595,7 +1616,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { #[test] fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1627,7 +1648,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && matches!(m, ControlAction::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" @@ -1635,7 +1656,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1644,7 +1665,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { #[test] fn no_gossip_gets_sent_to_explicit_peers() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1691,7 +1712,7 @@ fn test_mesh_addition() { let config: Config = Config::default(); // Adds mesh_low peers and PRUNE 2 giving us a deficit. - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _queues, topics) = inject_nodes1() .peer_no(config.mesh_n() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1727,7 +1748,7 @@ fn test_mesh_subtraction() { // Adds mesh_low peers and PRUNE 2 giving us a deficit. let n = config.mesh_n_high() + 10; //make all outbound connections so that we allow grafting to all - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1751,7 +1772,7 @@ fn test_mesh_subtraction() { fn test_connect_to_px_peers_on_handle_prune() { let config: Config = Config::default(); - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1807,7 +1828,7 @@ fn test_send_px_and_backoff_in_prune() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1824,7 +1845,7 @@ fn test_send_px_and_backoff_in_prune() { //check prune message assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { ControlAction::Prune { topic_hash, @@ -1848,7 +1869,7 @@ fn test_prune_backoffed_peer_on_graft() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1865,14 +1886,14 @@ fn test_prune_backoffed_peer_on_graft() { ); //ignore all messages until now - gs.events.clear(); + flush_events(&mut gs, &mut queues); //handle graft gs.handle_graft(&peers[0], vec![topics[0].clone()]); //check prune message assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { ControlAction::Prune { topic_hash, @@ -1897,7 +1918,7 @@ fn test_do_not_graft_within_backoff_period() { .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1908,7 +1929,7 @@ fn test_do_not_graft_within_backoff_period() { gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), Some(1))]); //forget all events until now - flush_events(&mut gs); + flush_events(&mut gs, &mut queues); //call heartbeat gs.heartbeat(); @@ -1922,7 +1943,10 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )), 0, "Graft message created too early within backoff period" ); @@ -1933,7 +1957,10 @@ fn test_do_not_graft_within_backoff_period() { //check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) > 0, "No graft message was created after backoff period" ); } @@ -1948,7 +1975,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1959,7 +1986,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), None)]); //forget all events until now - flush_events(&mut gs); + flush_events(&mut gs, &mut queues); //call heartbeat gs.heartbeat(); @@ -1971,7 +1998,10 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )), 0, "Graft message created too early within backoff period" ); @@ -1982,7 +2012,10 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) > 0, "No graft message was created after backoff period" ); } @@ -2001,7 +2034,7 @@ fn test_unsubscribe_backoff() { let topic = String::from("test"); // only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(1) .topics(vec![topic.clone()]) .to_subscribe(true) @@ -2011,7 +2044,7 @@ fn test_unsubscribe_backoff() { let _ = gs.unsubscribe(&Topic::new(topic)); assert_eq!( - count_control_msgs(&gs, |_, m| match m { + count_control_msgs(&gs, &queues, |_, m| match m { ControlAction::Prune { backoff, .. } => backoff == &Some(1), _ => false, }), @@ -2022,7 +2055,7 @@ fn test_unsubscribe_backoff() { let _ = gs.subscribe(&Topic::new(topics[0].to_string())); // forget all events until now - flush_events(&mut gs); + flush_events(&mut gs, &mut queues); // call heartbeat gs.heartbeat(); @@ -2036,7 +2069,10 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )), 0, "Graft message created too early within backoff period" ); @@ -2047,7 +2083,10 @@ fn test_unsubscribe_backoff() { // check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) > 0, "No graft message was created after backoff period" ); } @@ -2058,7 +2097,7 @@ fn test_flood_publish() { let topic = "test"; // Adds more peers than mesh can hold to test flood publishing - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, queues, _) = inject_nodes1() .peer_no(config.mesh_n_high() + 10) .topics(vec![topic.into()]) .to_subscribe(true) @@ -2069,17 +2108,15 @@ fn test_flood_publish() { gs.publish(Topic::new(topic), publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { event, .. } => { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { + let publishes = queues + .into_values() + .fold(vec![], |mut collected_publish, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { collected_publish.push(message); } - collected_publish } - _ => collected_publish, + collected_publish }); // Transform the inbound message @@ -2114,7 +2151,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //add more peers than in mesh to test gossipping //by default only mesh_n_low peers will get added to mesh - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(config.mesh_n_low() + config.gossip_lazy() + 1) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2142,7 +2179,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, |_, action| match action { + count_control_msgs(&gs, &queues, |_, action| match action { ControlAction::IHave { topic_hash, message_ids, @@ -2159,7 +2196,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { //add a lot of peers let m = config.mesh_n_low() + config.gossip_lazy() * (2.0 / config.gossip_factor()) as usize; - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(m) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2186,7 +2223,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { let msg_id = gs.config.message_id(message); //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, |_, action| match action { + count_control_msgs(&gs, &queues, |_, action| match action { ControlAction::IHave { topic_hash, message_ids, @@ -2202,7 +2239,7 @@ fn test_accept_only_outbound_peer_grafts_when_mesh_full() { let config: Config = Config::default(); //enough peers to fill the mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2217,8 +2254,8 @@ fn test_accept_only_outbound_peer_grafts_when_mesh_full() { assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high()); //create an outbound and an inbound peer - let inbound = add_peer(&mut gs, &topics, false, false); - let outbound = add_peer(&mut gs, &topics, true, false); + let (inbound, _in_reciver) = add_peer(&mut gs, &topics, false, false); + let (outbound, _out_receiver) = add_peer(&mut gs, &topics, true, false); //send grafts gs.handle_graft(&inbound, vec![topics[0].clone()]); @@ -2248,7 +2285,7 @@ fn test_do_not_remove_too_many_outbound_peers() { .unwrap(); //fill the mesh with inbound connections - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2263,7 +2300,7 @@ fn test_do_not_remove_too_many_outbound_peers() { //create m outbound connections and graft (we will accept the graft) let mut outbound = HashSet::new(); for _ in 0..m { - let peer = add_peer(&mut gs, &topics, true, false); + let (peer, _) = add_peer(&mut gs, &topics, true, false); outbound.insert(peer); gs.handle_graft(&peer, topics.clone()); } @@ -2286,7 +2323,7 @@ fn test_add_outbound_peers_if_min_is_not_satisfied() { let config: Config = Config::default(); // Fill full mesh with inbound peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2298,8 +2335,9 @@ fn test_add_outbound_peers_if_min_is_not_satisfied() { } //create config.mesh_outbound_min() many outbound connections without grafting + let mut peers = vec![]; for _ in 0..config.mesh_outbound_min() { - add_peer(&mut gs, &topics, true, false); + peers.push(add_peer(&mut gs, &topics, true, false)); } // Nothing changed in the mesh yet @@ -2320,7 +2358,7 @@ fn test_prune_negative_scored_peers() { let config = Config::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2344,7 +2382,7 @@ fn test_prune_negative_scored_peers() { //check prune message assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { ControlAction::Prune { topic_hash, @@ -2365,7 +2403,7 @@ fn test_prune_negative_scored_peers() { fn test_dont_graft_to_negative_scored_peers() { let config = Config::default(); //init full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2377,8 +2415,8 @@ fn test_dont_graft_to_negative_scored_peers() { .create_network(); //add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); //reduce score of p1 to negative gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 1); @@ -2404,7 +2442,7 @@ fn test_ignore_px_from_negative_scored_peer() { let config = Config::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2451,7 +2489,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { .unwrap(); // Build mesh with three peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(3) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2478,7 +2516,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { // Check that px in prune message only contains third peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && match m { ControlAction::Prune { topic_hash, @@ -2504,7 +2542,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { }; // Build full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2518,8 +2556,8 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { } // Add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2551,7 +2589,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( - count_control_msgs(&gs, |peer, action| match action { + count_control_msgs(&gs, &queues, |peer, action| match action { ControlAction::IHave { topic_hash, message_ids, @@ -2579,7 +2617,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { }; // Build full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2595,8 +2633,10 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { } // Add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, receiver2); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2627,17 +2667,15 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { gs.handle_iwant(&p2, vec![msg_id.clone()]); // the messages we are sending - let sent_messages = gs - .events + let sent_messages = queues .into_iter() - .fold(vec![], |mut collected_messages, e| match e { - ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(RpcOut::Forward(message)) = event { + .fold(vec![], |mut collected_messages, (peer_id, c)| { + while !c.non_priority.is_empty() { + if let Ok(RpcOut::Forward(message)) = c.non_priority.try_recv() { collected_messages.push((peer_id, message)); } - collected_messages } - _ => collected_messages, + collected_messages }); //the message got sent to p2 @@ -2667,7 +2705,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { ..PeerScoreThresholds::default() }; //build full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2683,8 +2721,8 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { } //add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); //reduce score of p1 below peer_score_thresholds.gossip_threshold //note that penalties get squared so two penalties means a score of @@ -2715,7 +2753,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( - count_control_msgs(&gs, |peer, c| match c { + count_control_msgs(&gs, &queues, |peer, c| match c { ControlAction::IWant { message_ids } => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); @@ -2743,7 +2781,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { }; //build mesh with no peers and no subscribed topics - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, mut queues, _) = inject_nodes1() .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); @@ -2753,8 +2791,10 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { let topics = vec![topic.hash()]; //add two additional peers that will be added to the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.publish_threshold //note that penalties get squared so two penalties means a score of @@ -2772,17 +2812,15 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { gs.publish(topic, publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events + let publishes = queues .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { + .fold(vec![], |mut collected_publish, (peer_id, c)| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { collected_publish.push((peer_id, message)); } - collected_publish } - _ => collected_publish, + collected_publish }); //assert only published to p2 @@ -2800,15 +2838,17 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { ..PeerScoreThresholds::default() }; //build mesh with no peers - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .topics(vec!["test".into()]) .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); //add two additional peers that will be added to the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.publish_threshold //note that penalties get squared so two penalties means a score of @@ -2826,17 +2866,15 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events + let publishes = queues .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push((peer_id, message)); + .fold(vec![], |mut collected_publish, (peer_id, c)| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + collected_publish.push((peer_id, message)) } - collected_publish } - _ => collected_publish, + collected_publish }); //assert only published to p2 @@ -2856,15 +2894,15 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { }; //build mesh with no peers - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, _, topics) = inject_nodes1() .topics(vec!["test".into()]) .gs_config(config.clone()) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); //add two additional peers that will be added to the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); //reduce score of p1 below peer_score_thresholds.graylist_threshold //note that penalties get squared so two penalties means a score of @@ -2986,7 +3024,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() { ..PeerScoreThresholds::default() }; // Build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3057,7 +3095,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() { //build mesh with more peers than mesh can hold let n = config.mesh_n_high() + 1; - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3117,7 +3155,7 @@ fn test_scoring_p1() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3199,7 +3237,7 @@ fn test_scoring_p2() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3299,7 +3337,7 @@ fn test_scoring_p3() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3400,7 +3438,7 @@ fn test_scoring_p3b() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3492,7 +3530,7 @@ fn test_scoring_p4_valid_message() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3551,7 +3589,7 @@ fn test_scoring_p4_invalid_signature() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3609,7 +3647,7 @@ fn test_scoring_p4_message_from_self() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3659,7 +3697,7 @@ fn test_scoring_p4_ignored_message() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3718,7 +3756,7 @@ fn test_scoring_p4_application_invalidated_message() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3780,7 +3818,7 @@ fn test_scoring_p4_application_invalid_message_from_two_peers() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3850,7 +3888,7 @@ fn test_scoring_p4_three_application_invalid_messages() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3934,7 +3972,7 @@ fn test_scoring_p4_decay() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3988,7 +4026,7 @@ fn test_scoring_p5() { }; //build mesh with one peer - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -4014,7 +4052,7 @@ fn test_scoring_p6() { ..Default::default() }; - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) .topics(vec![]) .to_subscribe(false) @@ -4027,20 +4065,20 @@ fn test_scoring_p6() { //create 5 peers with the same ip let addr = Multiaddr::from(Ipv4Addr::new(10, 1, 2, 3)); let peers = vec![ - add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()), - add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()), - add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()), - add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()), - add_peer_with_addr(&mut gs, &vec![], true, true, addr.clone()), + add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, true, addr.clone()).0, ]; //create 4 other peers with other ip let addr2 = Multiaddr::from(Ipv4Addr::new(10, 1, 2, 4)); let others = vec![ - add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()), - add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()), - add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()), - add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()), + add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()).0, + add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()).0, ]; //no penalties yet @@ -4144,7 +4182,7 @@ fn test_scoring_p7_grafts_before_backoff() { ..Default::default() }; - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4221,7 +4259,7 @@ fn test_opportunistic_grafting() { ..Default::default() }; - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(5) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4250,7 +4288,7 @@ fn test_opportunistic_grafting() { } //set scores for peers in the mesh - for (i, peer) in others.iter().enumerate().take(5) { + for (i, (peer, _receiver)) in others.iter().enumerate().take(5) { gs.set_application_score(peer, 0.0 + i as f64); } @@ -4290,7 +4328,7 @@ fn test_opportunistic_grafting() { ); assert!( - gs.mesh[&topics[0]].is_disjoint(&others.iter().cloned().take(2).collect()), + gs.mesh[&topics[0]].is_disjoint(&others.iter().map(|(p, _)| p).cloned().take(2).collect()), "peers below or equal to median should not be added in opportunistic grafting" ); } @@ -4298,7 +4336,7 @@ fn test_opportunistic_grafting() { #[test] fn test_ignore_graft_from_unknown_topic() { //build gossipsub without subscribing to any topics - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, queues, _) = inject_nodes1() .peer_no(0) .topics(vec![]) .to_subscribe(false) @@ -4309,7 +4347,10 @@ fn test_ignore_graft_from_unknown_topic() { //assert that no prune got created assert_eq!( - count_control_msgs(&gs, |_, a| matches!(a, ControlAction::Prune { .. })), + count_control_msgs(&gs, &queues, |_, a| matches!( + a, + ControlAction::Prune { .. } + )), 0, "we should not prune after graft in unknown topic" ); @@ -4319,14 +4360,15 @@ fn test_ignore_graft_from_unknown_topic() { fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { let config = Config::default(); //build gossipsub with full mesh - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) .create_network(); //add another peer not in the mesh - let peer = add_peer(&mut gs, &topics, false, false); + let (peer, receiver) = add_peer(&mut gs, &topics, false, false); + queues.insert(peer, receiver); //receive a message let mut seq = 0; @@ -4340,7 +4382,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.handle_received_message(m1, &PeerId::random()); //clear events - gs.events.clear(); + flush_events(&mut gs, &mut queues); //the first gossip_retransimission many iwants return the valid message, all others are // ignored. @@ -4349,16 +4391,14 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { } assert_eq!( - gs.events - .iter() - .filter(|e| matches!( - e, - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Forward(_)), - .. + queues.into_values().fold(0, |mut fwds, c| { + while !c.non_priority.is_empty() { + if let Ok(RpcOut::Forward(_)) = dbg!(c.non_priority.try_recv()) { + fwds = fwds + 1; } - )) - .count(), + } + fwds + }), config.gossip_retransimission() as usize, "not more then gossip_retransmission many messages get sent back" ); @@ -4371,7 +4411,7 @@ fn test_ignore_too_many_ihaves() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4379,7 +4419,8 @@ fn test_ignore_too_many_ihaves() { .create_network(); //add another peer not in the mesh - let peer = add_peer(&mut gs, &topics, false, false); + let (peer, receiver) = add_peer(&mut gs, &topics, false, false); + queues.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4408,7 +4449,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( - count_control_msgs(&gs, |p, action| p == &peer + count_control_msgs(&gs, &queues, |p, action| p == &peer && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" @@ -4431,7 +4472,7 @@ fn test_ignore_too_many_ihaves() { //we sent iwant for all 20 messages assert_eq!( - count_control_msgs(&gs, |p, action| p == &peer + count_control_msgs(&gs, &queues, |p, action| p == &peer && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1)), 20, "all 20 should get sent" @@ -4446,7 +4487,7 @@ fn test_ignore_too_many_messages_in_ihave() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4454,7 +4495,8 @@ fn test_ignore_too_many_messages_in_ihave() { .create_network(); //add another peer not in the mesh - let peer = add_peer(&mut gs, &topics, false, false); + let (peer, receiver) = add_peer(&mut gs, &topics, false, false); + queues.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4480,7 +4522,7 @@ fn test_ignore_too_many_messages_in_ihave() { //we send iwant only for the first 10 messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, |p, action| match action { + count_control_msgs(&gs, &queues, |p, action| match action { ControlAction::IWant { message_ids } => p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); @@ -4505,7 +4547,7 @@ fn test_ignore_too_many_messages_in_ihave() { //we sent 20 iwant messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, |p, action| match action { + count_control_msgs(&gs, &queues, |p, action| match action { ControlAction::IWant { message_ids } => p == &peer && { sum += message_ids.len(); @@ -4526,7 +4568,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4539,8 +4581,8 @@ fn test_limit_number_of_message_ids_inside_ihave() { } //add two other peers not in the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _) = add_peer(&mut gs, &topics, false, false); + let (p2, _) = add_peer(&mut gs, &topics, false, false); //receive 200 messages from another peer let mut seq = 0; @@ -4559,7 +4601,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves2 = HashSet::new(); assert_eq!( - count_control_msgs(&gs, |p, action| match action { + count_control_msgs(&gs, &queues, |p, action| match action { ControlAction::IHave { message_ids, .. } => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); @@ -4616,7 +4658,7 @@ fn test_iwant_penalties() { }; // fill the mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4640,7 +4682,7 @@ fn test_iwant_penalties() { let mut first_messages = Vec::new(); let mut second_messages = Vec::new(); let mut seq = 0; - for peer in &other_peers { + for (peer, _receiver) in &other_peers { let msg1 = random_message(&mut seq, &topics); let msg2 = random_message(&mut seq, &topics); @@ -4663,19 +4705,19 @@ fn test_iwant_penalties() { } // the peers send us all the first message ids in time - for (index, peer) in other_peers.iter().enumerate() { + for (index, (peer, _receiver)) in other_peers.iter().enumerate() { gs.handle_received_message(first_messages[index].clone(), peer); } // now we do a heartbeat no penalization should have been applied yet gs.heartbeat(); - for peer in &other_peers { + for (peer, _receiver) in &other_peers { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(peer), 0.0); } // receive the first twenty of the other peers then send their response - for (index, peer) in other_peers.iter().enumerate().take(20) { + for (index, (peer, _receiver)) in other_peers.iter().enumerate().take(20) { gs.handle_received_message(second_messages[index].clone(), peer); } @@ -4686,7 +4728,7 @@ fn test_iwant_penalties() { gs.heartbeat(); // now we get the second messages from the last 80 peers. - for (index, peer) in other_peers.iter().enumerate() { + for (index, (peer, _receiver)) in other_peers.iter().enumerate() { if index > 19 { gs.handle_received_message(second_messages[index].clone(), peer); } @@ -4700,7 +4742,7 @@ fn test_iwant_penalties() { let mut single_penalized = 0; let mut double_penalized = 0; - for (i, peer) in other_peers.iter().enumerate() { + for (i, (peer, _receiver)) in other_peers.iter().enumerate() { let score = gs.peer_score.as_ref().unwrap().0.score(peer); if score == 0.0 { not_penalized += 1; @@ -4728,7 +4770,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .flood_publish(false) .build() .unwrap(); - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4736,7 +4778,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .create_network(); //add two floodsub peer, one explicit, one implicit - let p1 = add_peer_with_addr_and_kind( + let (p1, receiver1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4744,7 +4786,11 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { Multiaddr::empty(), Some(PeerKind::Floodsub), ); - let p2 = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + queues.insert(p1, receiver1); + + let (p2, receiver2) = + add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + queues.insert(p2, receiver2); //p1 and p2 are not in the mesh assert!(!gs.mesh[&topics[0]].contains(&p1) && !gs.mesh[&topics[0]].contains(&p2)); @@ -4754,24 +4800,21 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = gs - .events + let publishes = queues .iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { peer_id, event, .. } => { - if peer_id == &p1 || peer_id == &p2 { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push(message); - } + .fold(0, |mut collected_publish, (peer_id, c)| { + while !c.priority.is_empty() { + if matches!(c.priority.try_recv(), + Ok(RpcOut::Publish(_)) if peer_id == &p1 || peer_id == &p2) + { + collected_publish = collected_publish + 1; } - collected_publish } - _ => collected_publish, + collected_publish }); assert_eq!( - publishes.len(), - 2, + publishes, 2, "Should send a publish message to all floodsub peers" ); } @@ -4782,7 +4825,7 @@ fn test_do_not_use_floodsub_in_fanout() { .flood_publish(false) .build() .unwrap(); - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, mut queues, _) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(Vec::new()) .to_subscribe(false) @@ -4793,7 +4836,7 @@ fn test_do_not_use_floodsub_in_fanout() { let topics = vec![topic.hash()]; //add two floodsub peer, one explicit, one implicit - let p1 = add_peer_with_addr_and_kind( + let (p1, receiver1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4801,31 +4844,32 @@ fn test_do_not_use_floodsub_in_fanout() { Multiaddr::empty(), Some(PeerKind::Floodsub), ); - let p2 = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + queues.insert(p1, receiver1); + let (p2, receiver2) = + add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + + queues.insert(p2, receiver2); //publish a message let publish_data = vec![0; 42]; gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = gs - .events + let publishes = queues .iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { peer_id, event, .. } => { - if peer_id == &p1 || peer_id == &p2 { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push(message); - } + .fold(0, |mut collected_publish, (peer_id, c)| { + while !c.priority.is_empty() { + if matches!(c.priority.try_recv(), + Ok(RpcOut::Publish(_)) if peer_id == &p1 || peer_id == &p2) + { + collected_publish = collected_publish + 1; } - collected_publish } - _ => collected_publish, + collected_publish }); assert_eq!( - publishes.len(), - 2, + publishes, 2, "Should send a publish message to all floodsub peers" ); @@ -4837,7 +4881,7 @@ fn test_do_not_use_floodsub_in_fanout() { #[test] fn test_dont_add_floodsub_peers_to_mesh_on_join() { - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) .topics(Vec::new()) .to_subscribe(false) @@ -4867,14 +4911,14 @@ fn test_dont_add_floodsub_peers_to_mesh_on_join() { #[test] fn test_dont_send_px_to_old_gossipsub_peers() { - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, queues, topics) = inject_nodes1() .peer_no(0) .topics(vec!["test".into()]) .to_subscribe(false) .create_network(); //add an old gossipsub peer - let p1 = add_peer_with_addr_and_kind( + let (p1, _receiver1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4892,7 +4936,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( - count_control_msgs(&gs, |_, m| match m { + count_control_msgs(&gs, &queues, |_, m| match m { ControlAction::Prune { peers: px, .. } => !px.is_empty(), _ => false, }), @@ -4904,7 +4948,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { #[test] fn test_dont_send_floodsub_peers_in_px() { //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -4930,7 +4974,7 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( - count_control_msgs(&gs, |_, m| match m { + count_control_msgs(&gs, &queues, |_, m| match m { ControlAction::Prune { peers: px, .. } => !px.is_empty(), _ => false, }), @@ -4941,7 +4985,7 @@ fn test_dont_send_floodsub_peers_in_px() { #[test] fn test_dont_add_floodsub_peers_to_mesh_in_heartbeat() { - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, _, topics) = inject_nodes1() .peer_no(0) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4969,7 +5013,7 @@ fn test_dont_add_floodsub_peers_to_mesh_in_heartbeat() { // Some very basic test of public api methods. #[test] fn test_public_api() { - let (gs, peers, topic_hashes) = inject_nodes1() + let (gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(4) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -5001,7 +5045,7 @@ fn test_public_api() { fn test_subscribe_to_invalid_topic() { let t1 = Topic::new("t1"); let t2 = Topic::new("t2"); - let (mut gs, _, _) = inject_nodes::() + let (mut gs, _, _, _) = inject_nodes::() .subscription_filter(WhitelistSubscriptionFilter( vec![t1.hash()].into_iter().collect(), )) @@ -5015,7 +5059,7 @@ fn test_subscribe_to_invalid_topic() { #[test] fn test_subscribe_and_graft_with_negative_score() { //simulate a communication between two gossipsub instances - let (mut gs1, _, topic_hashes) = inject_nodes1() + let (mut gs1, _, _, topic_hashes) = inject_nodes1() .topics(vec!["test".into()]) .scoring(Some(( PeerScoreParams::default(), @@ -5023,14 +5067,14 @@ fn test_subscribe_and_graft_with_negative_score() { ))) .create_network(); - let (mut gs2, _, _) = inject_nodes1().create_network(); + let (mut gs2, _, queues, _) = inject_nodes1().create_network(); let connection_id = ConnectionId::new_unchecked(0); let topic = Topic::new("test"); - let p2 = add_peer(&mut gs1, &Vec::new(), true, false); - let p1 = add_peer(&mut gs2, &topic_hashes, false, false); + let (p2, _receiver1) = add_peer(&mut gs1, &Vec::new(), true, false); + let (p1, _receiver2) = add_peer(&mut gs2, &topic_hashes, false, false); //add penalty to peer p2 gs1.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); @@ -5040,22 +5084,16 @@ fn test_subscribe_and_graft_with_negative_score() { //subscribe to topic in gs2 gs2.subscribe(&topic).unwrap(); - let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, gs2: &mut Behaviour<_, _>| { + let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, _gs2: &mut Behaviour<_, _>| { //collect messages to p1 - let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { - ToSwarm::NotifyHandler { peer_id, event, .. } => { - if peer_id == p1 { - if let HandlerIn::Message(m) = event { - Some(m) - } else { - None - } - } else { - None - } - } - _ => None, - }); + let messages_to_p1 = + queues + .iter() + .filter_map(|(peer_id, c)| match c.non_priority.try_recv() { + Ok(rpc) if peer_id == &p1 => Some(rpc), + _ => None, + }); + for message in messages_to_p1 { gs1.on_connection_handler_event( p2, @@ -5093,7 +5131,7 @@ fn test_graft_without_subscribe() { let topic = String::from("test_subscribe"); let subscribe_topic = vec![topic.clone()]; let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()]; - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(1) .topics(subscribe_topic) .to_subscribe(false) diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 7e79912cc4a6..b99f8548067f 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -95,6 +95,7 @@ pub struct Config { max_ihave_messages: usize, iwant_followup_time: Duration, published_message_ids_cache_time: Duration, + connection_handler_queue_len: usize, } impl Config { @@ -350,6 +351,11 @@ impl Config { pub fn published_message_ids_cache_time(&self) -> Duration { self.published_message_ids_cache_time } + + /// The max number of messages a `ConnectionHandler` can buffer. + pub fn connection_handler_queue_len(&self) -> usize { + self.connection_handler_queue_len + } } impl Default for Config { @@ -417,6 +423,7 @@ impl Default for ConfigBuilder { max_ihave_messages: 10, iwant_followup_time: Duration::from_secs(3), published_message_ids_cache_time: Duration::from_secs(10), + connection_handler_queue_len: 100, }, invalid_protocol: false, } @@ -782,6 +789,10 @@ impl ConfigBuilder { self } + pub fn connection_handler_queue_len(&mut self, len: usize) { + self.config.connection_handler_queue_len = len; + } + /// Constructs a [`Config`] from the given configuration and validates the settings. pub fn build(&self) -> Result { // check all constraints on config diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index e91f81776e76..dd048c7e2fdf 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -20,7 +20,7 @@ use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::rpc_proto::proto; -use crate::types::{PeerKind, RawMessage, Rpc, RpcOut}; +use crate::types::{PeerKind, RawMessage, Rpc, RpcReceiver}; use crate::ValidationError; use asynchronous_codec::Framed; use futures::future::Either; @@ -33,7 +33,6 @@ use libp2p_swarm::handler::{ FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; use libp2p_swarm::Stream; -use smallvec::SmallVec; use std::{ pin::Pin, task::{Context, Poll}, @@ -61,8 +60,6 @@ pub enum HandlerEvent { #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum HandlerIn { - /// A gossipsub message to send. - Message(RpcOut), /// The peer has joined the mesh. JoinedMesh, /// The peer has left the mesh. @@ -94,8 +91,8 @@ pub struct EnabledHandler { /// The single long-lived inbound substream. inbound_substream: Option, - /// Queue of values that we want to send to the remote. - send_queue: SmallVec<[proto::RPC; 16]>, + /// Queue of values that we want to send to the remote + send_queue: RpcReceiver, /// Flag indicating that an outbound substream is being established to prevent duplicate /// requests. @@ -159,7 +156,7 @@ enum OutboundSubstreamState { impl Handler { /// Builds a new [`Handler`]. - pub fn new(protocol_config: ProtocolConfig) -> Self { + pub fn new(protocol_config: ProtocolConfig, message_queue: RpcReceiver) -> Self { Handler::Enabled(EnabledHandler { listen_protocol: protocol_config, inbound_substream: None, @@ -167,11 +164,11 @@ impl Handler { outbound_substream_establishing: false, outbound_substream_attempts: 0, inbound_substream_attempts: 0, - send_queue: SmallVec::new(), peer_kind: None, peer_kind_sent: false, last_io_activity: Instant::now(), in_mesh: false, + send_queue: message_queue, }) } } @@ -250,10 +247,11 @@ impl EnabledHandler { ) { // outbound idle state Some(OutboundSubstreamState::WaitingOutput(substream)) => { - if let Some(message) = self.send_queue.pop() { - self.send_queue.shrink_to_fit(); - self.outbound_substream = - Some(OutboundSubstreamState::PendingSend(substream, message)); + if let Poll::Ready(Some(message)) = self.send_queue.poll_next_unpin(cx) { + self.outbound_substream = Some(OutboundSubstreamState::PendingSend( + substream, + message.into_protobuf(), + )); continue; } @@ -409,7 +407,6 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, message: HandlerIn) { match self { Handler::Enabled(handler) => match message { - HandlerIn::Message(m) => handler.send_queue.push(m.into_protobuf()), HandlerIn::JoinedMesh => { handler.in_mesh = true; } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index d1b92ff0ba88..a1d9ff3fae22 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -20,12 +20,17 @@ //! A collection of types using the Gossipsub system. use crate::TopicHash; +use async_channel::{Receiver, Sender}; +use futures::Stream; use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; use quick_protobuf::MessageWrite; -use std::fmt; use std::fmt::Debug; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::Poll; +use std::{fmt, pin::Pin}; use crate::rpc_proto::proto; #[cfg(feature = "serde")] @@ -512,3 +517,99 @@ impl fmt::Display for PeerKind { f.write_str(self.as_ref()) } } + +/// Create `RpcOut` channel that is priority aware. +pub(crate) fn rpc_channel(cap: usize) -> (RpcSender, RpcReceiver) { + let (priority_sender, priority_receiver) = async_channel::unbounded(); + let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2); + let len = Arc::new(AtomicUsize::new(0)); + ( + RpcSender { + cap: cap / 2, + len: len.clone(), + priority: priority_sender, + non_priority: non_priority_sender, + }, + RpcReceiver { + len, + priority: priority_receiver, + non_priority: non_priority_receiver, + }, + ) +} + +/// `RpcOut` sender that is priority aware. +pub(crate) struct RpcSender { + cap: usize, + len: Arc, + priority: Sender, + non_priority: Sender, +} + +impl RpcSender { + /// Send `RpcOut`s to the `ConnectionHandler` according to their priority. + pub(crate) fn try_send(&mut self, rpc: RpcOut) -> Result<(), ()> { + // Forward messages, IWANT and IHAVE control messages are regarded as low priority. + match rpc { + rpc @ RpcOut::Forward(_) + | rpc @ RpcOut::Control(ControlAction::IHave { .. }) + | rpc @ RpcOut::Control(ControlAction::IWant { .. }) => { + if let Err(err) = self.non_priority.try_send(rpc) { + let rpc = err.into_inner(); + tracing::trace!("{rpc:?} message dropped, queue is full"); + } + } + // GRAFT and PRUNE control messages, Subscription, and Publishes messages. + // Publish messages are limited to the capacity of the queue. + rpc @ RpcOut::Control(_) + | rpc @ RpcOut::Subscribe(_) + | rpc @ RpcOut::Unsubscribe(_) => { + self.priority.try_send(rpc).expect("Channel is unbounded"); + } + rpc @ RpcOut::Publish(_) => { + if self.len.load(Ordering::Relaxed) >= self.cap { + return Err(()); + } + self.priority.try_send(rpc).expect("Channel is unbounded"); + self.len.fetch_add(1, Ordering::Relaxed); + } + } + Ok(()) + } +} + +/// `RpcOut` sender that is priority aware. +pub struct RpcReceiver { + len: Arc, + pub(crate) priority: Receiver, + pub(crate) non_priority: Receiver, +} + +impl RpcReceiver { + /// Check if both queues are empty. + pub(crate) fn is_empty(&self) -> bool { + self.priority.is_empty() && self.non_priority.is_empty() + } +} + +impl Stream for RpcReceiver { + type Item = RpcOut; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // The control queue is polled first. + if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) { + if let Some(RpcOut::Publish(_)) = rpc { + self.len.fetch_sub(1, Ordering::Relaxed); + } + return Poll::Ready(rpc); + } + // The priority queue is then polled. + if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) { + return Poll::Ready(rpc); + } + Pin::new(&mut self.non_priority).poll_next(cx) + } +}