diff --git a/.circleci/config.yml b/.circleci/config.yml index 1d3bf134d81..f2c31c409c6 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,6 +7,7 @@ workflows: - test - test-wasm - check-rustdoc-links + - integration-test jobs: test: @@ -90,3 +91,24 @@ jobs: - ./target - /usr/local/cargo - /root/.cache/sccache + + integration-test: + docker: + - image: rust + - image: ipfs/go-ipfs + steps: + - checkout + - restore_cache: + key: integration-test-cache-{{ epoch }} + - run: + name: Print Rust version + command: | + rustc --version + - run: + command: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad + - save_cache: + key: integration-test-cache-{{ epoch }} + paths: + - "~/.cargo" + - "./target" + diff --git a/core/src/connection/error.rs b/core/src/connection/error.rs index 6704dfa587d..0d291c370cc 100644 --- a/core/src/connection/error.rs +++ b/core/src/connection/error.rs @@ -29,6 +29,10 @@ pub enum ConnectionError { // TODO: Eventually this should also be a custom error? IO(io::Error), + /// The connection was dropped because the connection limit + /// for a peer has been reached. + ConnectionLimit(ConnectionLimit), + /// The connection handler produced an error. Handler(THandlerErr), } @@ -44,6 +48,8 @@ where write!(f, "Connection error: I/O error: {}", err), ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err), + ConnectionError::ConnectionLimit(l) => + write!(f, "Connection error: Connection limit: {}.", l) } } } @@ -57,6 +63,7 @@ where match self { ConnectionError::IO(err) => Some(err), ConnectionError::Handler(err) => Some(err), + ConnectionError::ConnectionLimit(..) => None, } } } @@ -71,10 +78,6 @@ pub enum PendingConnectionError { /// match the one that was expected or is otherwise invalid. InvalidPeerId, - /// The pending connection was successfully negotiated but dropped - /// because the connection limit for a peer has been reached. - ConnectionLimit(ConnectionLimit), - /// An I/O error occurred on the connection. // TODO: Eventually this should also be a custom error? IO(io::Error), @@ -93,8 +96,6 @@ where write!(f, "Pending connection: Transport error: {}", err), PendingConnectionError::InvalidPeerId => write!(f, "Pending connection: Invalid peer ID."), - PendingConnectionError::ConnectionLimit(l) => - write!(f, "Pending connection: Connection limit: {}.", l) } } } @@ -109,7 +110,6 @@ where PendingConnectionError::IO(err) => Some(err), PendingConnectionError::Transport(err) => Some(err), PendingConnectionError::InvalidPeerId => None, - PendingConnectionError::ConnectionLimit(..) => None, } } } diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 8046755f7d6..025deb538a7 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -112,7 +112,7 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC error: PendingConnectionError, /// The handler that was supposed to handle the connection, /// if the connection failed before the handler was consumed. - handler: Option, + handler: THandler, /// The (expected) peer of the failed connection. peer: Option, /// A reference to the pool that managed the connection. @@ -222,6 +222,7 @@ where TOutEvent: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, + TPeerId: Clone + Send + 'static, { let endpoint = info.to_connected_point(); if let Some(limit) = self.limits.max_pending_incoming { @@ -263,7 +264,7 @@ where TOutEvent: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, - TPeerId: Clone, + TPeerId: Clone + Send + 'static, { self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?; let endpoint = info.to_connected_point(); @@ -298,14 +299,32 @@ where TOutEvent: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, + TPeerId: Clone + Send + 'static, { + // Validate the received peer ID as the last step of the pending connection + // future, so that these errors can be raised before the `handler` is consumed + // by the background task, which happens when this future resolves to an + // "established" connection. let future = future.and_then({ let endpoint = endpoint.clone(); + let expected_peer = peer.clone(); + let local_id = self.local_id.clone(); move |(info, muxer)| { + if let Some(peer) = expected_peer { + if &peer != info.peer_id() { + return future::err(PendingConnectionError::InvalidPeerId) + } + } + + if &local_id == info.peer_id() { + return future::err(PendingConnectionError::InvalidPeerId) + } + let connected = Connected { info, endpoint }; future::ready(Ok((connected, muxer))) } }); + let id = self.manager.add_pending(future, handler); self.pending.insert(id, (endpoint, peer)); id @@ -536,7 +555,7 @@ where PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> > where TConnInfo: ConnectionInfo + Clone, - TPeerId: Clone, + TPeerId: Clone + fmt::Debug, { loop { let item = match self.manager.poll(cx) { @@ -551,7 +570,7 @@ where id, endpoint, error, - handler: Some(handler), + handler, peer, pool: self }) @@ -581,40 +600,22 @@ where .map_or(0, |conns| conns.len()); if let Err(e) = self.limits.check_established(current) { let connected = entry.close(); - return Poll::Ready(PoolEvent::PendingConnectionError { + let num_established = e.current; + return Poll::Ready(PoolEvent::ConnectionError { id, - endpoint: connected.endpoint, - peer: Some(connected.info.peer_id().clone()), - error: PendingConnectionError::ConnectionLimit(e), + connected, + error: ConnectionError::ConnectionLimit(e), + num_established, pool: self, - handler: None, }) } - // Check peer ID. - if let Some(peer) = peer { - if &peer != entry.connected().peer_id() { - let connected = entry.close(); - return Poll::Ready(PoolEvent::PendingConnectionError { - id, - endpoint: connected.endpoint, - peer: Some(connected.info.peer_id().clone()), - error: PendingConnectionError::InvalidPeerId, - pool: self, - handler: None, - }) + // Peer ID checks must already have happened. See `add_pending`. + if cfg!(debug_assertions) { + assert_ne!(&self.local_id, entry.connected().peer_id()); + if let Some(peer) = peer { + assert_eq!(&peer, entry.connected().peer_id()); } } - if &self.local_id == entry.connected().peer_id() { - let connected = entry.close(); - return Poll::Ready(PoolEvent::PendingConnectionError { - id, - endpoint: connected.endpoint, - peer: Some(connected.info.peer_id().clone()), - error: PendingConnectionError::InvalidPeerId, - pool: self, - handler: None, - }) - } // Add the connection to the pool. let peer = entry.connected().peer_id().clone(); let conns = self.established.entry(peer).or_default(); diff --git a/core/src/network.rs b/core/src/network.rs index 49911505d75..7b244d82014 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -55,7 +55,6 @@ use std::{ error, fmt, hash::Hash, - num::NonZeroUsize, pin::Pin, task::{Context, Poll}, }; @@ -331,7 +330,7 @@ where THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, ::Error: error::Error + Send + 'static, TConnInfo: Clone, - TPeerId: AsRef<[u8]> + Send + 'static, + TPeerId: fmt::Debug + Send + 'static, { // Poll the listener(s) for new connections. match ListenersStream::poll(Pin::new(&mut self.listeners), cx) { @@ -383,7 +382,7 @@ where } Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint, error, handler, pool, .. }) => { let dialing = &mut self.dialing; - let (next, event) = on_connection_failed(pool, dialing, id, endpoint, error, handler); + let (next, event) = on_connection_failed(dialing, id, endpoint, error, handler); if let Some(dial) = next { let transport = self.listeners.transport().clone(); if let Err(e) = dial_peer_impl(transport, pool, dialing, dial) { @@ -496,13 +495,11 @@ where /// If the failed connection attempt was a dialing attempt and there /// are more addresses to try, new `DialingOpts` are returned. fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>( - pool: &Pool::Error, TConnInfo, TPeerId>, dialing: &mut FnvHashMap, id: ConnectionId, endpoint: ConnectedPoint, error: PendingConnectionError, - handler: Option, + handler: THandler, ) -> (Option>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>) where TTrans: Transport, @@ -518,41 +515,27 @@ where if let Some(peer_id) = dialing_peer { // A pending outgoing connection to a known peer failed. - let attempt = dialing.remove(&peer_id).expect("by (1)"); + let mut attempt = dialing.remove(&peer_id).expect("by (1)"); let num_remain = attempt.next.len(); let failed_addr = attempt.current.clone(); - let new_state = if pool.is_connected(&peer_id) { - peer::PeerState::Connected - } else if num_remain == 0 { // (2) - peer::PeerState::Disconnected - } else { - peer::PeerState::Dialing { - num_pending_addresses: NonZeroUsize::new(num_remain).expect("by (2)"), - } - }; - let opts = - if let Some(handler) = handler { - if !attempt.next.is_empty() { - let mut attempt = attempt; - let next_attempt = attempt.next.remove(0); - Some(DialingOpts { - peer: peer_id.clone(), - handler, - address: next_attempt, - remaining: attempt.next - }) - } else { - None - } + if num_remain > 0 { + let next_attempt = attempt.next.remove(0); + let opts = DialingOpts { + peer: peer_id.clone(), + handler, + address: next_attempt, + remaining: attempt.next + }; + Some(opts) } else { None }; (opts, NetworkEvent::DialError { - new_state, + attempts_remaining: num_remain, peer_id, multiaddr: failed_addr, error, diff --git a/core/src/network/event.rs b/core/src/network/event.rs index ef28fd0a37d..692ba6ea595 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -39,7 +39,6 @@ use crate::{ pool::Pool, }, muxing::StreamMuxer, - network::peer::PeerState, transport::{Transport, TransportError}, }; use futures::prelude::*; @@ -122,8 +121,8 @@ where /// A dialing attempt to an address of a peer failed. DialError { - /// New state of a peer. - new_state: PeerState, + /// The number of remaining dialing attempts. + attempts_remaining: usize, /// Id of the peer we were trying to dial. peer_id: TPeerId, @@ -145,7 +144,7 @@ where /// The handler that was passed to `dial()`, if the /// connection failed before the handler was consumed. - handler: Option, + handler: THandler, }, /// An established connection produced an event. @@ -219,9 +218,9 @@ where .field("error", error) .finish() } - NetworkEvent::DialError { new_state, peer_id, multiaddr, error } => { + NetworkEvent::DialError { attempts_remaining, peer_id, multiaddr, error } => { f.debug_struct("DialError") - .field("new_state", new_state) + .field("attempts_remaining", attempts_remaining) .field("peer_id", peer_id) .field("multiaddr", multiaddr) .field("error", error) diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index fcf2b863044..d9f1bda4850 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -42,27 +42,9 @@ use std::{ error, fmt, hash::Hash, - num::NonZeroUsize, }; use super::{Network, DialingOpts}; -/// The state of a (remote) peer as seen by the local peer -/// through a [`Network`]. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum PeerState { - /// The [`Network`] is connected to the peer, i.e. has at least one - /// established connection. - Connected, - /// We are currently trying to reach this peer. - Dialing { - /// Number of addresses we are trying to dial. - num_pending_addresses: NonZeroUsize, - }, - /// The [`Network`] is disconnected from the peer, i.e. has no - /// established connection and no pending, outgoing connection. - Disconnected, -} - /// The possible representations of a peer in a [`Network`], as /// seen by the local node. pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 68f1386ea3a..10e7f4b9967 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -29,7 +29,7 @@ use libp2p_core::{ Transport, connection::PendingConnectionError, muxing::StreamMuxerBox, - network::{NetworkEvent, peer::PeerState}, + network::NetworkEvent, upgrade, }; use libp2p_swarm::{ @@ -137,7 +137,7 @@ fn deny_incoming_connec() { match swarm2.poll(cx) { Poll::Ready(NetworkEvent::DialError { - new_state: PeerState::Disconnected, + attempts_remaining: 0, peer_id, multiaddr, error: PendingConnectionError::Transport(_) @@ -294,7 +294,7 @@ fn multiple_addresses_err() { loop { match swarm.poll(cx) { Poll::Ready(NetworkEvent::DialError { - new_state, + attempts_remaining, peer_id, multiaddr, error: PendingConnectionError::Transport(_) @@ -303,15 +303,10 @@ fn multiple_addresses_err() { let expected = addresses.remove(0); assert_eq!(multiaddr, expected); if addresses.is_empty() { - assert_eq!(new_state, PeerState::Disconnected); + assert_eq!(attempts_remaining, 0); return Poll::Ready(Ok(())); } else { - match new_state { - PeerState::Dialing { num_pending_addresses } => { - assert_eq!(num_pending_addresses.get(), addresses.len()); - }, - _ => panic!() - } + assert_eq!(attempts_remaining, addresses.len()); } }, Poll::Ready(_) => unreachable!(), diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 01bffcce074..8ef9eecbd32 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -108,7 +108,7 @@ use libp2p_core::{ NetworkEvent, NetworkConfig, Peer, - peer::{ConnectedPeer, PeerState}, + peer::ConnectedPeer, }, upgrade::ProtocolName, }; @@ -454,11 +454,12 @@ where TBehaviour: NetworkBehaviour, Poll::Ready(NetworkEvent::IncomingConnectionError { error, .. }) => { log::debug!("Incoming connection failed: {:?}", error); }, - Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, new_state }) => { - log::debug!("Connection attempt to peer {:?} at address {:?} failed with {:?}", - peer_id, multiaddr, error); + Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => { + log::debug!( + "Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.", + peer_id, multiaddr, error, attempts_remaining); this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error); - if let PeerState::Disconnected = new_state { + if attempts_remaining == 0 { this.behaviour.inject_dial_failure(&peer_id); } return Poll::Ready(SwarmEvent::UnreachableAddr {