From dc8433e3fc4cf2da49d664b627248b63497fcf4a Mon Sep 17 00:00:00 2001 From: Divma <26765164+divagant-martian@users.noreply.github.com> Date: Wed, 9 Feb 2022 10:08:28 -0500 Subject: [PATCH] swarm/src/behaviour: Merge inject_* paired methods (#2445) Co-authored-by: Max Inden --- protocols/autonat/CHANGELOG.md | 6 +- protocols/autonat/src/behaviour.rs | 30 +- protocols/dcutr/src/behaviour.rs | 6 +- protocols/floodsub/CHANGELOG.md | 4 + protocols/floodsub/src/layer.rs | 29 +- protocols/gossipsub/CHANGELOG.md | 3 + protocols/gossipsub/src/behaviour.rs | 310 ++++++++++----------- protocols/gossipsub/src/behaviour/tests.rs | 38 ++- protocols/identify/CHANGELOG.md | 4 + protocols/identify/src/identify.rs | 12 +- protocols/kad/CHANGELOG.md | 3 + protocols/kad/src/behaviour.rs | 59 ++-- protocols/kad/src/behaviour/test.rs | 3 +- protocols/mdns/CHANGELOG.md | 4 + protocols/mdns/src/behaviour.rs | 13 +- protocols/ping/CHANGELOG.md | 4 + protocols/relay/CHANGELOG.md | 4 + protocols/relay/src/v1/behaviour.rs | 154 +++++----- protocols/relay/src/v2/client.rs | 2 + protocols/relay/src/v2/relay.rs | 1 + protocols/rendezvous/CHANGELOG.md | 4 + protocols/request-response/CHANGELOG.md | 4 + protocols/request-response/src/lib.rs | 25 +- swarm-derive/src/lib.rs | 54 +--- swarm/CHANGELOG.md | 3 + swarm/src/behaviour.rs | 19 +- swarm/src/behaviour/either.rs | 57 ++-- swarm/src/behaviour/toggle.rs | 30 +- swarm/src/lib.rs | 49 ++-- swarm/src/test.rs | 132 ++++++--- 30 files changed, 584 insertions(+), 482 deletions(-) diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index c93d77b8c06..5df9d722fa0 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -6,6 +6,10 @@ - Update to `libp2p-request-response` `v0.16.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.1.0 [2022-01-27] -- Initial release. \ No newline at end of file +- Initial release. diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index a1a8bb6ff21..b2e13b11dde 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -303,9 +303,15 @@ impl NetworkBehaviour for Behaviour { conn: &ConnectionId, endpoint: &ConnectedPoint, failed_addresses: Option<&Vec>, + other_established: usize, ) { - self.inner - .inject_connection_established(peer, conn, endpoint, failed_addresses); + self.inner.inject_connection_established( + peer, + conn, + endpoint, + failed_addresses, + other_established, + ); let connections = self.connected.entry(*peer).or_default(); let addr = if endpoint.is_relayed() { None @@ -342,11 +348,16 @@ impl NetworkBehaviour for Behaviour { conn: &ConnectionId, endpoint: &ConnectedPoint, handler: ::Handler, + remaining_established: usize, ) { self.inner - .inject_connection_closed(peer, conn, endpoint, handler); - let connections = self.connected.get_mut(peer).expect("Peer is connected."); - connections.remove(conn); + .inject_connection_closed(peer, conn, endpoint, handler, remaining_established); + if remaining_established == 0 { + self.connected.remove(peer); + } else { + let connections = self.connected.get_mut(peer).expect("Peer is connected."); + connections.remove(conn); + } } fn inject_dial_failure( @@ -362,11 +373,6 @@ impl NetworkBehaviour for Behaviour { } } - fn inject_disconnected(&mut self, peer: &PeerId) { - self.inner.inject_disconnected(peer); - self.connected.remove(peer); - } - fn inject_address_change( &mut self, peer: &PeerId, @@ -461,10 +467,6 @@ impl NetworkBehaviour for Behaviour { self.inner.addresses_of_peer(peer) } - fn inject_connected(&mut self, peer: &PeerId) { - self.inner.inject_connected(peer) - } - fn inject_event( &mut self, peer_id: PeerId, diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index fe8bbe42b2f..f1966abf45a 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -100,6 +100,7 @@ impl NetworkBehaviour for Behaviour { connection_id: &ConnectionId, connected_point: &ConnectedPoint, _failed_addresses: Option<&Vec>, + _other_established: usize, ) { if connected_point.is_relayed() { if connected_point.is_listener() && !self.direct_connections.contains_key(peer_id) { @@ -181,16 +182,13 @@ impl NetworkBehaviour for Behaviour { } } - fn inject_disconnected(&mut self, peer_id: &PeerId) { - assert!(!self.direct_connections.contains_key(peer_id)); - } - fn inject_connection_closed( &mut self, peer_id: &PeerId, connection_id: &ConnectionId, connected_point: &ConnectedPoint, _handler: <::ProtocolsHandler as IntoProtocolsHandler>::Handler, + _remaining_established: usize, ) { if !connected_point.is_relayed() { let connections = self diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index d1ec2814973..631fbafb37a 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.33.0 [2022-01-27] - Update dependencies. diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index b2093c763d6..6f22e804012 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -27,6 +27,7 @@ use crate::FloodsubConfig; use cuckoofilter::{CuckooError, CuckooFilter}; use fnv::FnvHashSet; use libp2p_core::{connection::ConnectionId, PeerId}; +use libp2p_core::{ConnectedPoint, Multiaddr}; use libp2p_swarm::{ dial_opts::{self, DialOpts}, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, OneShotHandler, PollParameters, @@ -287,7 +288,19 @@ impl NetworkBehaviour for Floodsub { Default::default() } - fn inject_connected(&mut self, id: &PeerId) { + fn inject_connection_established( + &mut self, + id: &PeerId, + _: &ConnectionId, + _: &ConnectedPoint, + _: Option<&Vec>, + other_established: usize, + ) { + if other_established > 0 { + // We only care about the first time a peer connects. + return; + } + // We need to send our subscriptions to the newly-connected node. if self.target_peers.contains(id) { for topic in self.subscribed_topics.iter().cloned() { @@ -309,7 +322,19 @@ impl NetworkBehaviour for Floodsub { self.connected_peers.insert(*id, SmallVec::new()); } - fn inject_disconnected(&mut self, id: &PeerId) { + fn inject_connection_closed( + &mut self, + id: &PeerId, + _: &ConnectionId, + _: &ConnectedPoint, + _: Self::ProtocolsHandler, + remaining_established: usize, + ) { + if remaining_established > 0 { + // we only care about peer disconnections + return; + } + let was_in = self.connected_peers.remove(id); debug_assert!(was_in.is_some()); diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 36bb23bd4bc..857697c42db 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -8,8 +8,11 @@ - Emit gossip of all non empty topics (see [PR 2481]). +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + [PR 2442]: https://github.com/libp2p/rust-libp2p/pull/2442 [PR 2481]: https://github.com/libp2p/rust-libp2p/pull/2481 +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 # 0.35.0 [2022-01-27] diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 489f916910c..c354936ef2f 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3055,147 +3055,21 @@ where ) } - fn inject_connected(&mut self, peer_id: &PeerId) { - // Ignore connections from blacklisted peers. - if self.blacklisted_peers.contains(peer_id) { - debug!("Ignoring connection from blacklisted peer: {}", peer_id); - } else { - debug!("New peer connected: {}", peer_id); - // We need to send our subscriptions to the newly-connected node. - let mut subscriptions = vec![]; - for topic_hash in self.mesh.keys() { - subscriptions.push(GossipsubSubscription { - topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Subscribe, - }); - } - - if !subscriptions.is_empty() { - // send our subscriptions to the peer - if self - .send_message( - *peer_id, - GossipsubRpc { - messages: Vec::new(), - subscriptions, - control_msgs: Vec::new(), - } - .into_protobuf(), - ) - .is_err() - { - error!("Failed to send subscriptions, message too large"); - } - } - } - - // Insert an empty set of the topics of this peer until known. - self.peer_topics.insert(*peer_id, Default::default()); - - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.add_peer(*peer_id); - } - } - - fn inject_disconnected(&mut self, peer_id: &PeerId) { - // remove from mesh, topic_peers, peer_topic and the fanout - debug!("Peer disconnected: {}", peer_id); - { - let topics = match self.peer_topics.get(peer_id) { - Some(topics) => (topics), - None => { - debug_assert!( - self.blacklisted_peers.contains(peer_id), - "Disconnected node not in connected list" - ); - return; - } - }; - - // remove peer from all mappings - for topic in topics { - // check the mesh for the topic - if let Some(mesh_peers) = self.mesh.get_mut(topic) { - // check if the peer is in the mesh and remove it - if mesh_peers.remove(peer_id) { - if let Some(m) = self.metrics.as_mut() { - m.peers_removed(topic, Churn::Dc, 1); - m.set_mesh_peers(topic, mesh_peers.len()); - } - }; - } - - // remove from topic_peers - if let Some(peer_list) = self.topic_peers.get_mut(topic) { - if !peer_list.remove(peer_id) { - // debugging purposes - warn!( - "Disconnected node: {} not in topic_peers peer list", - peer_id - ); - } - if let Some(m) = self.metrics.as_mut() { - m.set_topic_peers(topic, peer_list.len()) - } - } else { - warn!( - "Disconnected node: {} with topic: {:?} not in topic_peers", - &peer_id, &topic - ); - } - - // remove from fanout - self.fanout - .get_mut(topic) - .map(|peers| peers.remove(peer_id)); - } - } - - // Forget px and outbound status for this peer - self.px_peers.remove(peer_id); - self.outbound_peers.remove(peer_id); - - // Remove peer from peer_topics and connected_peers - // NOTE: It is possible the peer has already been removed from all mappings if it does not - // support the protocol. - self.peer_topics.remove(peer_id); - - // If metrics are enabled, register the disconnection of a peer based on its protocol. - if let Some(metrics) = self.metrics.as_mut() { - let peer_kind = &self - .connected_peers - .get(peer_id) - .expect("Connected peer must be registered") - .kind; - metrics.peer_protocol_disconnected(peer_kind.clone()); - } - - self.connected_peers.remove(peer_id); - - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.remove_peer(peer_id); - } - } - fn inject_connection_established( &mut self, peer_id: &PeerId, connection_id: &ConnectionId, endpoint: &ConnectedPoint, _: Option<&Vec>, + other_established: usize, ) { - // Check if the peer is an outbound peer - if let ConnectedPoint::Dialer { .. } = endpoint { - // Diverging from the go implementation we only want to consider a peer as outbound peer - // if its first connection is outbound. To check if this connection is the first we - // check if the peer isn't connected yet. This only works because the - // `inject_connection_established` event for the first connection gets called immediately - // before `inject_connected` gets called. - if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) { - // The first connection is outbound and it is not a peer from peer exchange => mark - // it as outbound peer - self.outbound_peers.insert(*peer_id); - } + // Diverging from the go implementation we only want to consider a peer as outbound peer + // if its first connection is outbound. + + if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(peer_id) { + // The first connection is outbound and it is not a peer from peer exchange => mark + // it as outbound peer + self.outbound_peers.insert(*peer_id); } // Add the IP to the peer scoring system @@ -3224,6 +3098,48 @@ where }) .connections .push(*connection_id); + + if other_established == 0 { + // Ignore connections from blacklisted peers. + if self.blacklisted_peers.contains(peer_id) { + debug!("Ignoring connection from blacklisted peer: {}", peer_id); + } else { + debug!("New peer connected: {}", peer_id); + // We need to send our subscriptions to the newly-connected node. + let mut subscriptions = vec![]; + for topic_hash in self.mesh.keys() { + subscriptions.push(GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Subscribe, + }); + } + + if !subscriptions.is_empty() { + // send our subscriptions to the peer + if self + .send_message( + *peer_id, + GossipsubRpc { + messages: Vec::new(), + subscriptions, + control_msgs: Vec::new(), + } + .into_protobuf(), + ) + .is_err() + { + error!("Failed to send subscriptions, message too large"); + } + } + } + + // Insert an empty set of the topics of this peer until known. + self.peer_topics.insert(*peer_id, Default::default()); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.add_peer(*peer_id); + } + } } fn inject_connection_closed( @@ -3232,6 +3148,7 @@ where connection_id: &ConnectionId, endpoint: &ConnectedPoint, _: ::Handler, + remaining_established: usize, ) { // Remove IP from peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { @@ -3246,35 +3163,114 @@ where } } - // Remove the connection from the list - // If there are no connections left, inject_disconnected will remove the mapping entirely. - if let Some(connections) = self.connected_peers.get_mut(peer_id) { - let index = connections - .connections - .iter() - .position(|v| v == connection_id) - .expect("Previously established connection to peer must be present"); - connections.connections.remove(index); + if remaining_established != 0 { + // Remove the connection from the list + if let Some(connections) = self.connected_peers.get_mut(peer_id) { + let index = connections + .connections + .iter() + .position(|v| v == connection_id) + .expect("Previously established connection to peer must be present"); + connections.connections.remove(index); + + // If there are more connections and this peer is in a mesh, inform the first connection + // handler. + if !connections.connections.is_empty() { + if let Some(topics) = self.peer_topics.get(peer_id) { + for topic in topics { + if let Some(mesh_peers) = self.mesh.get(topic) { + if mesh_peers.contains(peer_id) { + self.events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: *peer_id, + event: Arc::new(GossipsubHandlerIn::JoinedMesh), + handler: NotifyHandler::One(connections.connections[0]), + }); + break; + } + } + } + } + } + } + } else { + // remove from mesh, topic_peers, peer_topic and the fanout + debug!("Peer disconnected: {}", peer_id); + { + let topics = match self.peer_topics.get(peer_id) { + Some(topics) => (topics), + None => { + debug_assert!( + self.blacklisted_peers.contains(peer_id), + "Disconnected node not in connected list" + ); + return; + } + }; - // If there are more connections and this peer is in a mesh, inform the first connection - // handler. - if !connections.connections.is_empty() { - if let Some(topics) = self.peer_topics.get(peer_id) { - for topic in topics { - if let Some(mesh_peers) = self.mesh.get(topic) { - if mesh_peers.contains(peer_id) { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer_id, - event: Arc::new(GossipsubHandlerIn::JoinedMesh), - handler: NotifyHandler::One(connections.connections[0]), - }); - break; + // remove peer from all mappings + for topic in topics { + // check the mesh for the topic + if let Some(mesh_peers) = self.mesh.get_mut(topic) { + // check if the peer is in the mesh and remove it + if mesh_peers.remove(peer_id) { + if let Some(m) = self.metrics.as_mut() { + m.peers_removed(topic, Churn::Dc, 1); + m.set_mesh_peers(topic, mesh_peers.len()); } + }; + } + + // remove from topic_peers + if let Some(peer_list) = self.topic_peers.get_mut(topic) { + if !peer_list.remove(peer_id) { + // debugging purposes + warn!( + "Disconnected node: {} not in topic_peers peer list", + peer_id + ); + } + if let Some(m) = self.metrics.as_mut() { + m.set_topic_peers(topic, peer_list.len()) } + } else { + warn!( + "Disconnected node: {} with topic: {:?} not in topic_peers", + &peer_id, &topic + ); } + + // remove from fanout + self.fanout + .get_mut(topic) + .map(|peers| peers.remove(peer_id)); } } + + // Forget px and outbound status for this peer + self.px_peers.remove(peer_id); + self.outbound_peers.remove(peer_id); + + // Remove peer from peer_topics and connected_peers + // NOTE: It is possible the peer has already been removed from all mappings if it does not + // support the protocol. + self.peer_topics.remove(peer_id); + + // If metrics are enabled, register the disconnection of a peer based on its protocol. + if let Some(metrics) = self.metrics.as_mut() { + let peer_kind = &self + .connected_peers + .get(peer_id) + .expect("Connected peer must be registered") + .kind; + metrics.peer_protocol_disconnected(peer_kind.clone()); + } + + self.connected_peers.remove(peer_id); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.remove_peer(peer_id); + } } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 1183f83b83c..b4760b7208b 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -185,7 +185,6 @@ mod tests { F: TopicSubscriptionFilter + Clone + Default + Send + 'static, { let peer = PeerId::random(); - //peers.push(peer.clone()); gs.inject_connection_established( &peer, &ConnectionId::new(0), @@ -201,8 +200,8 @@ mod tests { } }, None, + 0, // first connection ); - as NetworkBehaviour>::inject_connected(gs, &peer); if let Some(kind) = kind { gs.inject_event( peer.clone(), @@ -229,6 +228,32 @@ mod tests { peer } + fn disconnect_peer(gs: &mut Gossipsub, peer_id: &PeerId) + where + D: DataTransform + Default + Clone + Send + 'static, + F: TopicSubscriptionFilter + Clone + Default + Send + 'static, + { + if let Some(peer_connections) = gs.connected_peers.get(peer_id) { + let fake_endpoint = ConnectedPoint::Dialer { + address: Multiaddr::empty(), + role_override: Endpoint::Dialer, + }; // this is not relevant + // peer_connections.connections should never be empty. + let mut active_connections = peer_connections.connections.len(); + for conn_id in peer_connections.connections.clone() { + let handler = gs.new_handler(); + active_connections = active_connections.checked_sub(1).unwrap(); + gs.inject_connection_closed( + peer_id, + &conn_id, + &fake_endpoint, + handler, + active_connections, + ); + } + } + } + // Converts a protobuf message into a gossipsub message for reading the Gossipsub event queue. fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> GossipsubRpc { // Store valid messages. @@ -541,8 +566,8 @@ mod tests { role_override: Endpoint::Dialer, }, None, + 0, ); - gs.inject_connected(&random_peer); // add the new peer to the fanout let fanout_peers = gs.fanout.get_mut(&topic_hashes[1]).unwrap(); @@ -1383,7 +1408,7 @@ mod tests { flush_events(&mut gs); //disconnect peer - gs.inject_disconnected(peer); + disconnect_peer(&mut gs, peer); gs.heartbeat(); @@ -4154,6 +4179,7 @@ mod tests { role_override: Endpoint::Dialer, }, None, + 0, ); } @@ -4174,6 +4200,7 @@ mod tests { role_override: Endpoint::Dialer, }, None, + 1, ); } @@ -4203,6 +4230,7 @@ mod tests { role_override: Endpoint::Dialer, }, None, + 2, ); //nothing changed @@ -5333,7 +5361,7 @@ mod tests { gs.handle_graft(&peers[0], subscribe_topic_hash); // The node disconnects - gs.inject_disconnected(&peers[0]); + disconnect_peer(&mut gs, &peers[0]); // We unsubscribe from the topic. let _ = gs.unsubscribe(&Topic::new(topic)); diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index f4bb78075b5..05206ea78a5 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.33.0 [2022-01-27] - Update dependencies. diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index 2e644502014..45530404a66 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -226,6 +226,7 @@ impl NetworkBehaviour for Identify { conn: &ConnectionId, endpoint: &ConnectedPoint, failed_addresses: Option<&Vec>, + _other_established: usize, ) { let addr = match endpoint { ConnectedPoint::Dialer { address, .. } => address.clone(), @@ -254,8 +255,12 @@ impl NetworkBehaviour for Identify { conn: &ConnectionId, _: &ConnectedPoint, _: ::Handler, + remaining_established: usize, ) { - if let Some(addrs) = self.connected.get_mut(peer_id) { + if remaining_established == 0 { + self.connected.remove(peer_id); + self.pending_push.remove(peer_id); + } else if let Some(addrs) = self.connected.get_mut(peer_id) { addrs.remove(conn); } } @@ -281,11 +286,6 @@ impl NetworkBehaviour for Identify { } } - fn inject_disconnected(&mut self, peer_id: &PeerId) { - self.connected.remove(peer_id); - self.pending_push.remove(peer_id); - } - fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) { if self.config.push_listen_addr_updates { self.pending_push.extend(self.connected.keys()); diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 37cde3548ff..f3d794cec50 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -6,7 +6,10 @@ - Require owned key in `get_record()` method (see [PR 2477]). +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + [PR 2477]: https://github.com/libp2p/rust-libp2p/pull/2477 +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 # 0.34.0 [2022-01-27] diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 196a86f5918..be45798dc12 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1823,6 +1823,7 @@ where _: &ConnectionId, _: &ConnectedPoint, errors: Option<&Vec>, + other_established: usize, ) { for addr in errors.map(|a| a.into_iter()).into_iter().flatten() { self.address_failed(*peer_id, addr); @@ -1832,27 +1833,28 @@ where // remote supports the configured protocol name. Only once a connection // handler reports [`KademliaHandlerEvent::ProtocolConfirmed`] do we // update the local routing table. - } - fn inject_connected(&mut self, peer: &PeerId) { - // Queue events for sending pending RPCs to the connected peer. - // There can be only one pending RPC for a particular peer and query per definition. - for (peer_id, event) in self.queries.iter_mut().filter_map(|q| { - q.inner - .pending_rpcs - .iter() - .position(|(p, _)| p == peer) - .map(|p| q.inner.pending_rpcs.remove(p)) - }) { - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - event, - handler: NotifyHandler::Any, - }); - } + // Peer's first connection. + if other_established == 0 { + // Queue events for sending pending RPCs to the connected peer. + // There can be only one pending RPC for a particular peer and query per definition. + for (peer_id, event) in self.queries.iter_mut().filter_map(|q| { + q.inner + .pending_rpcs + .iter() + .position(|(p, _)| p == peer_id) + .map(|p| q.inner.pending_rpcs.remove(p)) + }) { + self.queued_events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id, + event, + handler: NotifyHandler::Any, + }); + } - self.connected_peers.insert(*peer); + self.connected_peers.insert(*peer_id); + } } fn inject_address_change( @@ -1955,12 +1957,21 @@ where } } - fn inject_disconnected(&mut self, id: &PeerId) { - for query in self.queries.iter_mut() { - query.on_failure(id); + fn inject_connection_closed( + &mut self, + id: &PeerId, + _: &ConnectionId, + _: &ConnectedPoint, + _: ::Handler, + remaining_established: usize, + ) { + if remaining_established == 0 { + for query in self.queries.iter_mut() { + query.on_failure(id); + } + self.connection_updated(*id, None, NodeStatus::Disconnected); + self.connected_peers.remove(id); } - self.connection_updated(*id, None, NodeStatus::Disconnected); - self.connected_peers.remove(id); } fn inject_event( diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index aa462202561..1f67be5a19d 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -1293,8 +1293,7 @@ fn network_behaviour_inject_address_change() { }; // Mimick a connection being established. - kademlia.inject_connection_established(&remote_peer_id, &connection_id, &endpoint, None); - kademlia.inject_connected(&remote_peer_id); + kademlia.inject_connection_established(&remote_peer_id, &connection_id, &endpoint, None, 0); // At this point the remote is not yet known to support the // configured protocol name, so the peer is not yet in the diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 278dc9abff6..9e5276615d7 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.34.0 [2022-01-27] - Update dependencies. diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index fdc26839c11..5e58f8bc66d 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -127,8 +127,17 @@ impl NetworkBehaviour for Mdns { } } - fn inject_disconnected(&mut self, peer: &PeerId) { - self.expire_node(peer); + fn inject_connection_closed( + &mut self, + peer: &PeerId, + _: &libp2p_core::connection::ConnectionId, + _: &libp2p_core::ConnectedPoint, + _: Self::ProtocolsHandler, + remaining_established: usize, + ) { + if remaining_established == 0 { + self.expire_node(peer); + } } fn poll( diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index 9ff7196e21a..9d9341d5607 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.33.0 [2022-01-27] - Update dependencies. diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 7449def4a06..644bb1c1cb0 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.6.1 [2022-02-02] - Remove empty peer entries in `reservations` `HashMap`. See [PR 2464]. diff --git a/protocols/relay/src/v1/behaviour.rs b/protocols/relay/src/v1/behaviour.rs index 953be32d6bd..aaddb0bbe61 100644 --- a/protocols/relay/src/v1/behaviour.rs +++ b/protocols/relay/src/v1/behaviour.rs @@ -205,6 +205,7 @@ impl NetworkBehaviour for Relay { connection_id: &ConnectionId, _: &ConnectedPoint, _: Option<&Vec>, + other_established: usize, ) { let is_first = self .connected_peers @@ -238,69 +239,61 @@ impl NetworkBehaviour for Relay { }, ); } - } - - fn inject_connected(&mut self, peer_id: &PeerId) { - assert!( - self.connected_peers - .get(peer_id) - .map(|cs| !cs.is_empty()) - .unwrap_or(false), - "Expect to be connected to peer with at least one connection." - ); - if let Some(reqs) = self.outgoing_relay_reqs.dialing.remove(peer_id) { - for req in reqs { - let OutgoingDialingRelayReq { - request_id, - src_peer_id, - relay_addr: _, - dst_addr, - dst_peer_id, - send_back, - } = req; - self.outbox_to_swarm - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::Any, - event: RelayHandlerIn::OutgoingRelayReq { - src_peer_id, - request_id, - dst_peer_id, - dst_addr: dst_addr.clone(), - }, - }); + if other_established == 0 { + if let Some(reqs) = self.outgoing_relay_reqs.dialing.remove(peer) { + for req in reqs { + let OutgoingDialingRelayReq { + request_id, + src_peer_id, + relay_addr: _, + dst_addr, + dst_peer_id, + send_back, + } = req; + self.outbox_to_swarm + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event: RelayHandlerIn::OutgoingRelayReq { + src_peer_id, + request_id, + dst_peer_id, + dst_addr: dst_addr.clone(), + }, + }); - self.outgoing_relay_reqs - .upgrading - .insert(request_id, OutgoingUpgradingRelayReq { send_back }); + self.outgoing_relay_reqs + .upgrading + .insert(request_id, OutgoingUpgradingRelayReq { send_back }); + } } - } - // Ask the newly-opened connection to be used as destination if relevant. - if let Some(reqs) = self.incoming_relay_reqs.remove(peer_id) { - for req in reqs { - let IncomingRelayReq::DialingDst { - src_peer_id, - src_addr, - src_connection_id, - request_id, - incoming_relay_req, - } = req; - let event = RelayHandlerIn::OutgoingDstReq { - src_peer_id, - src_addr, - src_connection_id, - request_id, - incoming_relay_req, - }; + // Ask the newly-opened connection to be used as destination if relevant. + if let Some(reqs) = self.incoming_relay_reqs.remove(peer) { + for req in reqs { + let IncomingRelayReq::DialingDst { + src_peer_id, + src_addr, + src_connection_id, + request_id, + incoming_relay_req, + } = req; + let event = RelayHandlerIn::OutgoingDstReq { + src_peer_id, + src_addr, + src_connection_id, + request_id, + incoming_relay_req, + }; - self.outbox_to_swarm - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::Any, - event, - }); + self.outbox_to_swarm + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event, + }); + } } } } @@ -360,6 +353,7 @@ impl NetworkBehaviour for Relay { connection: &ConnectionId, _: &ConnectedPoint, _: ::Handler, + remaining_established: usize, ) { // Remove connection from the set of connections for the given peer. In case the set is // empty it will be removed in `inject_disconnected`. @@ -414,6 +408,28 @@ impl NetworkBehaviour for Relay { } } } + + if remaining_established == 0 { + self.connected_peers.remove(peer); + + if let Some(reqs) = self.incoming_relay_reqs.remove(peer) { + for req in reqs { + let IncomingRelayReq::DialingDst { + src_peer_id, + incoming_relay_req, + .. + } = req; + self.outbox_to_swarm + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: src_peer_id, + handler: NotifyHandler::Any, + event: RelayHandlerIn::DenyIncomingRelayReq( + incoming_relay_req.deny(circuit_relay::Status::HopCantDialDst), + ), + }) + } + } + } } fn inject_listener_error(&mut self, _id: ListenerId, _err: &(dyn std::error::Error + 'static)) { @@ -421,28 +437,6 @@ impl NetworkBehaviour for Relay { fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) {} - fn inject_disconnected(&mut self, id: &PeerId) { - self.connected_peers.remove(id); - - if let Some(reqs) = self.incoming_relay_reqs.remove(id) { - for req in reqs { - let IncomingRelayReq::DialingDst { - src_peer_id, - incoming_relay_req, - .. - } = req; - self.outbox_to_swarm - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: src_peer_id, - handler: NotifyHandler::Any, - event: RelayHandlerIn::DenyIncomingRelayReq( - incoming_relay_req.deny(circuit_relay::Status::HopCantDialDst), - ), - }) - } - } - } - fn inject_event( &mut self, event_source: PeerId, diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index e0459768879..fa4447df830 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -129,6 +129,7 @@ impl NetworkBehaviour for Client { connection_id: &ConnectionId, endpoint: &ConnectedPoint, _failed_addresses: Option<&Vec>, + _other_established: usize, ) { if !endpoint.is_relayed() { self.directly_connected_peers @@ -144,6 +145,7 @@ impl NetworkBehaviour for Client { connection_id: &ConnectionId, endpoint: &ConnectedPoint, _handler: Either, + _remaining_established: usize, ) { if !endpoint.is_relayed() { match self.directly_connected_peers.entry(*peer_id) { diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index ae00ff22fdd..4cca6b3519e 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -213,6 +213,7 @@ impl NetworkBehaviour for Relay { connection: &ConnectionId, _: &ConnectedPoint, _handler: Either, + _remaining_established: usize, ) { if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(*peer) { peer.get_mut().remove(&connection); diff --git a/protocols/rendezvous/CHANGELOG.md b/protocols/rendezvous/CHANGELOG.md index 35c198b09cc..7aaad13dc03 100644 --- a/protocols/rendezvous/CHANGELOG.md +++ b/protocols/rendezvous/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.3.0 [2022-01-27] - Update dependencies. diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 7758a1f6450..e8ab11c3eea 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -4,6 +4,10 @@ - Update to `libp2p-swarm` `v0.34.0`. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 + # 0.15.0 [2022-01-27] - Update dependencies. diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index af6b601eae0..27041b153c5 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -614,21 +614,13 @@ where connection.address = new_address; } - fn inject_connected(&mut self, peer: &PeerId) { - if let Some(pending) = self.pending_outbound_requests.remove(peer) { - for request in pending { - let request = self.try_send_request(peer, request); - assert!(request.is_none()); - } - } - } - fn inject_connection_established( &mut self, peer: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint, _errors: Option<&Vec>, + other_established: usize, ) { let address = match endpoint { ConnectedPoint::Dialer { address, .. } => Some(address.clone()), @@ -638,6 +630,15 @@ where .entry(*peer) .or_default() .push(Connection::new(*conn, address)); + + if other_established == 0 { + if let Some(pending) = self.pending_outbound_requests.remove(peer) { + for request in pending { + let request = self.try_send_request(peer, request); + assert!(request.is_none()); + } + } + } } fn inject_connection_closed( @@ -646,6 +647,7 @@ where conn: &ConnectionId, _: &ConnectedPoint, _: ::Handler, + remaining_established: usize, ) { let connections = self .connected @@ -658,6 +660,7 @@ where .map(|p: usize| connections.remove(p)) .expect("Expected connection to be established before closing."); + debug_assert_eq!(connections.is_empty(), remaining_established == 0); if connections.is_empty() { self.connected.remove(peer_id); } @@ -685,10 +688,6 @@ where } } - fn inject_disconnected(&mut self, peer: &PeerId) { - self.connected.remove(peer); - } - fn inject_dial_failure( &mut self, peer: Option, diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index e221a349f38..8cb35c9cfe9 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -163,40 +163,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }) }; - // Build the list of statements to put in the body of `inject_connected()`. - let inject_connected_stmts = { - data_struct - .fields - .iter() - .enumerate() - .filter_map(move |(field_n, field)| { - if is_ignored(field) { - return None; - } - Some(match field.ident { - Some(ref i) => quote! { self.#i.inject_connected(peer_id); }, - None => quote! { self.#field_n.inject_connected(peer_id); }, - }) - }) - }; - - // Build the list of statements to put in the body of `inject_disconnected()`. - let inject_disconnected_stmts = { - data_struct - .fields - .iter() - .enumerate() - .filter_map(move |(field_n, field)| { - if is_ignored(field) { - return None; - } - Some(match field.ident { - Some(ref i) => quote! { self.#i.inject_disconnected(peer_id); }, - None => quote! { self.#field_n.inject_disconnected(peer_id); }, - }) - }) - }; - // Build the list of statements to put in the body of `inject_connection_established()`. let inject_connection_established_stmts = { data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { @@ -204,8 +170,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { return None; } Some(match field.ident { - Some(ref i) => quote!{ self.#i.inject_connection_established(peer_id, connection_id, endpoint, errors); }, - None => quote!{ self.#field_n.inject_connection_established(peer_id, connection_id, endpoint, errors); }, + Some(ref i) => quote!{ self.#i.inject_connection_established(peer_id, connection_id, endpoint, errors, other_established); }, + None => quote!{ self.#field_n.inject_connection_established(peer_id, connection_id, endpoint, errors, other_established); }, }) }) }; @@ -243,8 +209,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { } }; let inject = match field.ident { - Some(ref i) => quote!{ self.#i.inject_connection_closed(peer_id, connection_id, endpoint, handler) }, - None => quote!{ self.#enum_n.inject_connection_closed(peer_id, connection_id, endpoint, handler) }, + Some(ref i) => quote!{ self.#i.inject_connection_closed(peer_id, connection_id, endpoint, handler, remaining_established) }, + None => quote!{ self.#enum_n.inject_connection_closed(peer_id, connection_id, endpoint, handler, remaining_established) }, }; quote! { @@ -649,15 +615,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { out } - fn inject_connected(&mut self, peer_id: &#peer_id) { - #(#inject_connected_stmts);* - } - - fn inject_disconnected(&mut self, peer_id: &#peer_id) { - #(#inject_disconnected_stmts);* - } - - fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, errors: #dial_errors) { + fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, errors: #dial_errors, other_established: usize) { #(#inject_connection_established_stmts);* } @@ -665,7 +623,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #(#inject_address_change_stmts);* } - fn inject_connection_closed(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, handlers: ::Handler) { + fn inject_connection_closed(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point, handlers: ::Handler, remaining_established: usize) { #(#inject_connection_closed_stmts);* } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index e938160684a..a5714678e5e 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -28,6 +28,8 @@ `DialOpts`. This option is needed for NAT and firewall hole punching. See [PR 2363]. +- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). + [PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 [PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350 [PR 2362]: https://github.com/libp2p/rust-libp2p/pull/2362 @@ -37,6 +39,7 @@ [PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404 [PR 2428]: https://github.com/libp2p/rust-libp2p/pull/2428 [PR 2363]: https://github.com/libp2p/rust-libp2p/pull/2363 +[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 # 0.32.0 [2021-11-16] diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 269bddf3bad..48ccf1faf84 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -190,23 +190,6 @@ pub trait NetworkBehaviour: Send + 'static { vec![] } - /// Indicate to the behaviour that we connected to the node with the given peer id. - /// - /// This node now has a handler (as spawned by `new_handler`) running in the background. - /// - /// This method is only called when the first connection to the peer is established, preceded by - /// [`inject_connection_established`](NetworkBehaviour::inject_connection_established). - fn inject_connected(&mut self, _: &PeerId) {} - - /// Indicates to the behaviour that we disconnected from the node with the given peer id. - /// - /// There is no handler running anymore for this node. Any event that has been sent to it may - /// or may not have been processed by the handler. - /// - /// This method is only called when the last established connection to the peer is closed, - /// preceded by [`inject_connection_closed`](NetworkBehaviour::inject_connection_closed). - fn inject_disconnected(&mut self, _: &PeerId) {} - /// Informs the behaviour about a newly established connection to a peer. fn inject_connection_established( &mut self, @@ -214,6 +197,7 @@ pub trait NetworkBehaviour: Send + 'static { _connection_id: &ConnectionId, _endpoint: &ConnectedPoint, _failed_addresses: Option<&Vec>, + _other_established: usize, ) { } @@ -228,6 +212,7 @@ pub trait NetworkBehaviour: Send + 'static { _: &ConnectionId, _: &ConnectedPoint, _: ::Handler, + _remaining_established: usize, ) { } diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index 3dd6d28a3d2..c48b2beaaef 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -53,34 +53,29 @@ where } } - fn inject_connected(&mut self, peer_id: &PeerId) { - match self { - Either::Left(a) => a.inject_connected(peer_id), - Either::Right(b) => b.inject_connected(peer_id), - }; - } - - fn inject_disconnected(&mut self, peer_id: &PeerId) { - match self { - Either::Left(a) => a.inject_disconnected(peer_id), - Either::Right(b) => b.inject_disconnected(peer_id), - } - } - fn inject_connection_established( &mut self, peer_id: &PeerId, connection: &ConnectionId, endpoint: &ConnectedPoint, errors: Option<&Vec>, + other_established: usize, ) { match self { - Either::Left(a) => { - a.inject_connection_established(peer_id, connection, endpoint, errors) - } - Either::Right(b) => { - b.inject_connection_established(peer_id, connection, endpoint, errors) - } + Either::Left(a) => a.inject_connection_established( + peer_id, + connection, + endpoint, + errors, + other_established, + ), + Either::Right(b) => b.inject_connection_established( + peer_id, + connection, + endpoint, + errors, + other_established, + ), } } @@ -90,14 +85,24 @@ where connection: &ConnectionId, endpoint: &ConnectedPoint, handler: ::Handler, + remaining_established: usize, ) { match (self, handler) { - (Either::Left(behaviour), Either::Left(handler)) => { - behaviour.inject_connection_closed(peer_id, connection, endpoint, handler) - } - (Either::Right(behaviour), Either::Right(handler)) => { - behaviour.inject_connection_closed(peer_id, connection, endpoint, handler) - } + (Either::Left(behaviour), Either::Left(handler)) => behaviour.inject_connection_closed( + peer_id, + connection, + endpoint, + handler, + remaining_established, + ), + (Either::Right(behaviour), Either::Right(handler)) => behaviour + .inject_connection_closed( + peer_id, + connection, + endpoint, + handler, + remaining_established, + ), _ => unreachable!(), } } diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index b6af0e38237..7aeee3cde4a 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -86,27 +86,22 @@ where .unwrap_or_else(Vec::new) } - fn inject_connected(&mut self, peer_id: &PeerId) { - if let Some(inner) = self.inner.as_mut() { - inner.inject_connected(peer_id) - } - } - - fn inject_disconnected(&mut self, peer_id: &PeerId) { - if let Some(inner) = self.inner.as_mut() { - inner.inject_disconnected(peer_id) - } - } - fn inject_connection_established( &mut self, peer_id: &PeerId, connection: &ConnectionId, endpoint: &ConnectedPoint, errors: Option<&Vec>, + other_established: usize, ) { if let Some(inner) = self.inner.as_mut() { - inner.inject_connection_established(peer_id, connection, endpoint, errors) + inner.inject_connection_established( + peer_id, + connection, + endpoint, + errors, + other_established, + ) } } @@ -116,10 +111,17 @@ where connection: &ConnectionId, endpoint: &ConnectedPoint, handler: ::Handler, + remaining_established: usize, ) { if let Some(inner) = self.inner.as_mut() { if let Some(handler) = handler.inner { - inner.inject_connection_closed(peer_id, connection, endpoint, handler) + inner.inject_connection_closed( + peer_id, + connection, + endpoint, + handler, + remaining_established, + ) } } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 38ed1143d86..533e15e6a24 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -662,12 +662,17 @@ where u32::try_from(other_established_connection_ids.len() + 1).unwrap(), ) .expect("n + 1 is always non-zero; qed"); + let non_banned_established = other_established_connection_ids + .into_iter() + .filter(|conn_id| !this.banned_peer_connections.contains(&conn_id)) + .count(); log::debug!( - "Connection established: {:?} {:?}; Total (peer): {}.", + "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", connection.peer_id(), connection.endpoint(), - num_established + num_established, + non_banned_established + 1, ); let endpoint = connection.endpoint().clone(); let failed_addresses = concurrent_dial_errors @@ -678,16 +683,8 @@ where &connection.id(), &endpoint, failed_addresses.as_ref(), + non_banned_established, ); - // The peer is not banned, but there could be previous banned connections - // if the peer was just unbanned. Check if this is the first non-banned - // connection. - let first_non_banned = other_established_connection_ids - .into_iter() - .all(|conn_id| this.banned_peer_connections.contains(&conn_id)); - if first_non_banned { - this.behaviour.inject_connected(&peer_id); - } return Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, num_established, @@ -723,22 +720,17 @@ where u32::try_from(remaining_established_connection_ids.len()).unwrap(); let conn_was_reported = !this.banned_peer_connections.remove(&id); if conn_was_reported { + let remaining_non_banned = remaining_established_connection_ids + .into_iter() + .filter(|conn_id| !this.banned_peer_connections.contains(&conn_id)) + .count(); this.behaviour.inject_connection_closed( &peer_id, &id, &endpoint, handler.into_protocols_handler(), + remaining_non_banned, ); - - // This connection was reported as open to the behaviour. Check if this is - // the last non-banned connection for the peer. - let last_non_banned = remaining_established_connection_ids - .into_iter() - .all(|conn_id| this.banned_peer_connections.contains(&conn_id)); - - if last_non_banned { - this.behaviour.inject_disconnected(&peer_id) - } } return Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, @@ -1534,9 +1526,10 @@ mod tests { [swarm1, swarm2] .iter() .all(|s| s.behaviour.inject_connection_closed.len() == num_connections) - && [swarm1, swarm2] - .iter() - .all(|s| s.behaviour.inject_disconnected.len() == 1) + && [swarm1, swarm2].iter().all(|s| { + let (.., last_remaining) = s.behaviour.inject_connection_closed.last().unwrap(); + *last_remaining == 0 + }) } /// Establishes multiple connections between two peers, @@ -1862,12 +1855,16 @@ mod tests { } State::Disconnecting => { for s in &[&swarm1, &swarm2] { - assert_eq!(s.behaviour.inject_disconnected.len(), 0); + assert!(s + .behaviour + .inject_connection_closed + .iter() + .all(|(.., remaining_conns)| *remaining_conns > 0)); assert_eq!( s.behaviour.inject_connection_established.len(), num_connections ); - assert_eq!(s.behaviour.inject_connected.len(), 1); + s.behaviour.assert_connected(num_connections, 1); } if [&swarm1, &swarm2] .iter() diff --git a/swarm/src/test.rs b/swarm/src/test.rs index 22f7a51fa9b..25f6b47ac23 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -99,10 +99,8 @@ where inner: TInner, pub addresses_of_peer: Vec, - pub inject_connected: Vec, - pub inject_disconnected: Vec, - pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint)>, - pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint)>, + pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, + pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, pub inject_event: Vec<( PeerId, ConnectionId, @@ -127,8 +125,6 @@ where Self { inner, addresses_of_peer: Vec::new(), - inject_connected: Vec::new(), - inject_disconnected: Vec::new(), inject_connection_established: Vec::new(), inject_connection_closed: Vec::new(), inject_event: Vec::new(), @@ -147,8 +143,6 @@ where #[allow(dead_code)] pub fn reset(&mut self) { self.addresses_of_peer = Vec::new(); - self.inject_connected = Vec::new(); - self.inject_disconnected = Vec::new(); self.inject_connection_established = Vec::new(); self.inject_connection_closed = Vec::new(); self.inject_event = Vec::new(); @@ -175,7 +169,13 @@ where expected_disconnections: usize, ) -> bool { if self.inject_connection_closed.len() == expected_closed_connections { - assert_eq!(self.inject_disconnected.len(), expected_disconnections); + assert_eq!( + self.inject_connection_closed + .iter() + .filter(|(.., remaining_established)| { *remaining_established == 0 }) + .count(), + expected_disconnections + ); return true; } @@ -192,7 +192,15 @@ where expected_connections: usize, ) -> bool { if self.inject_connection_established.len() == expected_established_connections { - assert_eq!(self.inject_connected.len(), expected_connections); + assert_eq!( + self.inject_connection_established + .iter() + .filter(|(.., reported_aditional_connections)| { + *reported_aditional_connections == 0 + }) + .count(), + expected_connections + ); return true; } @@ -218,38 +226,49 @@ where self.inner.addresses_of_peer(p) } - fn inject_connected(&mut self, peer: &PeerId) { - assert!( - self.inject_connection_established - .iter() - .any(|(peer_id, _, _)| peer_id == peer), - "`inject_connected` is called after at least one `inject_connection_established`." - ); - self.inject_connected.push(peer.clone()); - self.inner.inject_connected(peer); - } - fn inject_connection_established( &mut self, p: &PeerId, c: &ConnectionId, e: &ConnectedPoint, errors: Option<&Vec>, + other_established: usize, ) { - self.inject_connection_established - .push((p.clone(), c.clone(), e.clone())); - self.inner.inject_connection_established(p, c, e, errors); - } - - fn inject_disconnected(&mut self, peer: &PeerId) { - assert!( - self.inject_connection_closed - .iter() - .any(|(peer_id, _, _)| peer_id == peer), - "`inject_disconnected` is called after at least one `inject_connection_closed`." - ); - self.inject_disconnected.push(*peer); - self.inner.inject_disconnected(peer); + let mut other_peer_connections = self + .inject_connection_established + .iter() + .rev() // take last to first + .filter_map(|(peer, .., other_established)| { + if p == peer { + Some(other_established) + } else { + None + } + }) + .take(other_established); + + // We are informed that there are `other_established` additional connections. Ensure that the + // number of previous connections is consistent with this + if let Some(&prev) = other_peer_connections.next() { + if prev < other_established { + assert_eq!( + prev, + other_established - 1, + "Inconsistent connection reporting" + ) + } + assert_eq!(other_peer_connections.count(), other_established - 1); + } else { + assert_eq!(other_established, 0) + } + self.inject_connection_established.push(( + p.clone(), + c.clone(), + e.clone(), + other_established, + )); + self.inner + .inject_connection_established(p, c, e, errors, other_established); } fn inject_connection_closed( @@ -258,15 +277,46 @@ where c: &ConnectionId, e: &ConnectedPoint, handler: ::Handler, + remaining_established: usize, ) { - let connection = (p.clone(), c.clone(), e.clone()); + let mut other_closed_connections = self + .inject_connection_established + .iter() + .rev() // take last to first + .filter_map(|(peer, .., remaining_established)| { + if p == peer { + Some(remaining_established) + } else { + None + } + }) + .take(remaining_established); + + // We are informed that there are `other_established` additional connections. Ensure that the + // number of previous connections is consistent with this + if let Some(&prev) = other_closed_connections.next() { + if prev < remaining_established { + assert_eq!( + prev, + remaining_established - 1, + "Inconsistent closed connection reporting" + ) + } + assert_eq!(other_closed_connections.count(), remaining_established - 1); + } else { + assert_eq!(remaining_established, 0) + } assert!( - self.inject_connection_established.contains(&connection), + self.inject_connection_established + .iter() + .any(|(peer, conn_id, endpoint, _)| (peer, conn_id, endpoint) == (p, c, e)), "`inject_connection_closed` is called only for connections for \ which `inject_connection_established` was called first." ); - self.inject_connection_closed.push(connection); - self.inner.inject_connection_closed(p, c, e, handler); + self.inject_connection_closed + .push((*p, *c, e.clone(), remaining_established)); + self.inner + .inject_connection_closed(p, c, e, handler, remaining_established); } fn inject_event( @@ -278,14 +328,14 @@ where assert!( self.inject_connection_established .iter() - .any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id), + .any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id), "`inject_event` is called for reported connections." ); assert!( !self .inject_connection_closed .iter() - .any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id), + .any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id), "`inject_event` is never called for closed connections." );