diff --git a/Cargo.lock b/Cargo.lock index 89d3b70ba8c..92f8a3ea1b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2617,6 +2617,7 @@ dependencies = [ "either", "env_logger 0.10.0", "futures", + "futures-bounded", "futures-timer", "libp2p-core", "libp2p-identity", diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 4b56e6c1237..557f5a18736 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -14,6 +14,7 @@ categories = ["network-programming", "asynchronous"] asynchronous-codec = "0.6" futures = "0.3.28" futures-timer = "3.0.2" +futures-bounded = { workspace = true } libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } libp2p-identity = { workspace = true } diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 162a1e8fb06..50b9882f2c5 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -18,14 +18,13 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{Identify, InboundPush, OutboundPush, Push, UpgradeError}; -use crate::protocol::{Info, PushInfo}; +use crate::protocol::{Info, PushInfo, UpgradeError}; +use crate::{protocol, PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; use either::Either; -use futures::future::BoxFuture; use futures::prelude::*; -use futures::stream::FuturesUnordered; +use futures_bounded::Timeout; use futures_timer::Delay; -use libp2p_core::upgrade::SelectUpgrade; +use libp2p_core::upgrade::{ReadyUpgrade, SelectUpgrade}; use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_identity::PublicKey; @@ -42,6 +41,9 @@ use smallvec::SmallVec; use std::collections::HashSet; use std::{io, task::Context, task::Poll, time::Duration}; +const STREAM_TIMEOUT: Duration = Duration::from_secs(60); +const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10; + /// Protocol handler for sending and receiving identification requests. /// /// Outbound requests are sent periodically. The handler performs expects @@ -49,14 +51,17 @@ use std::{io, task::Context, task::Poll, time::Duration}; /// permitting the underlying connection to be closed. pub struct Handler { remote_peer_id: PeerId, - inbound_identify_push: Option>>, /// Pending events to yield. events: SmallVec< - [ConnectionHandlerEvent>, (), Event, io::Error>; 4], + [ConnectionHandlerEvent< + Either, ReadyUpgrade>, + (), + Event, + io::Error, + >; 4], >, - /// Pending identification replies, awaiting being sent. - pending_replies: FuturesUnordered>>, + active_streams: futures_bounded::FuturesSet>, /// Future that fires when we need to identify the node again. trigger_next_identify: Delay, @@ -125,9 +130,11 @@ impl Handler { ) -> Self { Self { remote_peer_id, - inbound_identify_push: Default::default(), events: SmallVec::new(), - pending_replies: FuturesUnordered::new(), + active_streams: futures_bounded::FuturesSet::new( + STREAM_TIMEOUT, + MAX_CONCURRENT_STREAMS_PER_CONNECTION, + ), trigger_next_identify: Delay::new(initial_delay), exchanged_one_periodic_identify: false, interval, @@ -152,19 +159,28 @@ impl Handler { >, ) { match output { - future::Either::Left(substream) => { + future::Either::Left(stream) => { let info = self.build_info(); - self.pending_replies - .push(crate::protocol::send(substream, info).boxed()); + if self + .active_streams + .try_push( + protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentify), + ) + .is_err() + { + warn!("Dropping inbound stream because we are at capacity"); + } else { + self.exchanged_one_periodic_identify = true; + } } - future::Either::Right(fut) => { - if self.inbound_identify_push.replace(fut).is_some() { - warn!( - "New inbound identify push stream from {} while still \ - upgrading previous one. Replacing previous with new.", - self.remote_peer_id, - ); + future::Either::Right(stream) => { + if self + .active_streams + .try_push(protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush)) + .is_err() + { + warn!("Dropping inbound identify push stream because we are at capacity"); } } } @@ -180,34 +196,31 @@ impl Handler { >, ) { match output { - future::Either::Left(remote_info) => { - self.handle_incoming_info(&remote_info); + future::Either::Left(stream) => { + if self + .active_streams + .try_push(protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify)) + .is_err() + { + warn!("Dropping outbound identify stream because we are at capacity"); + } + } + future::Either::Right(stream) => { + let info = self.build_info(); - self.events - .push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( - remote_info, - ))); + if self + .active_streams + .try_push( + protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentifyPush), + ) + .is_err() + { + warn!("Dropping outbound identify push stream because we are at capacity"); + } } - future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationPushed, - )), } } - fn on_dial_upgrade_error( - &mut self, - DialUpgradeError { error: err, .. }: DialUpgradeError< - ::OutboundOpenInfo, - ::OutboundProtocol, - >, - ) { - let err = err.map_upgrade_err(|e| e.into_inner()); - self.events.push(ConnectionHandlerEvent::NotifyBehaviour( - Event::IdentificationError(err), - )); - self.trigger_next_identify.reset(self.interval); - } - fn build_info(&mut self) -> Info { Info { public_key: self.public_key.clone(), @@ -268,13 +281,20 @@ impl ConnectionHandler for Handler { type FromBehaviour = InEvent; type ToBehaviour = Event; type Error = io::Error; - type InboundProtocol = SelectUpgrade>; - type OutboundProtocol = Either>; + type InboundProtocol = + SelectUpgrade, ReadyUpgrade>; + type OutboundProtocol = Either, ReadyUpgrade>; type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new(SelectUpgrade::new(Identify, Push::inbound()), ()) + SubstreamProtocol::new( + SelectUpgrade::new( + ReadyUpgrade::new(PROTOCOL_NAME), + ReadyUpgrade::new(PUSH_PROTOCOL_NAME), + ), + (), + ) } fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { @@ -283,21 +303,19 @@ impl ConnectionHandler for Handler { self.external_addresses = addresses; } InEvent::Push => { - let info = self.build_info(); self.events .push(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(Either::Right(Push::outbound(info)), ()), + protocol: SubstreamProtocol::new( + Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)), + (), + ), }); } } } fn connection_keep_alive(&self) -> KeepAlive { - if self.inbound_identify_push.is_some() { - return KeepAlive::Yes; - } - - if !self.pending_replies.is_empty() { + if !self.active_streams.is_empty() { return KeepAlive::Yes; } @@ -317,20 +335,34 @@ impl ConnectionHandler for Handler { // Poll the future that fires when we need to identify the node again. if let Poll::Ready(()) = self.trigger_next_identify.poll_unpin(cx) { self.trigger_next_identify.reset(self.interval); - let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(Either::Left(Identify), ()), + let event = ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new( + Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)), + (), + ), }; - return Poll::Ready(ev); + return Poll::Ready(event); } - if let Some(Poll::Ready(res)) = self - .inbound_identify_push - .as_mut() - .map(|f| f.poll_unpin(cx)) - { - self.inbound_identify_push.take(); + match self.active_streams.poll_unpin(cx) { + Poll::Ready(Ok(Ok(Success::ReceivedIdentify(remote_info)))) => { + self.handle_incoming_info(&remote_info); - if let Ok(remote_push_info) = res { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified( + remote_info, + ))); + } + Poll::Ready(Ok(Ok(Success::SentIdentifyPush))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationPushed, + )); + } + Poll::Ready(Ok(Ok(Success::SentIdentify))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::Identification, + )); + } + Poll::Ready(Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info)))) => { if let Some(mut info) = self.remote_info.clone() { info.merge(remote_push_info); self.handle_incoming_info(&info); @@ -340,16 +372,17 @@ impl ConnectionHandler for Handler { )); }; } - } - - // Check for pending replies to send. - if let Poll::Ready(Some(result)) = self.pending_replies.poll_next_unpin(cx) { - let event = result - .map(|()| Event::Identification) - .unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err))); - self.exchanged_one_periodic_identify = true; - - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + Poll::Ready(Ok(Err(e))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Apply(e)), + )); + } + Poll::Ready(Err(Timeout { .. })) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError(StreamUpgradeError::Timeout), + )); + } + Poll::Pending => {} } Poll::Pending @@ -371,8 +404,13 @@ impl ConnectionHandler for Handler { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { self.on_fully_negotiated_outbound(fully_negotiated_outbound) } - ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { - self.on_dial_upgrade_error(dial_upgrade_error) + ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { + self.events.push(ConnectionHandlerEvent::NotifyBehaviour( + Event::IdentificationError( + error.map_upgrade_err(|e| void::unreachable(e.into_inner())), + ), + )); + self.trigger_next_identify.reset(self.interval); } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) @@ -392,11 +430,10 @@ impl ConnectionHandler for Handler { self.remote_peer_id ); - let info = self.build_info(); self.events .push(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new( - Either::Right(Push::outbound(info)), + Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)), (), ), }); @@ -405,3 +442,10 @@ impl ConnectionHandler for Handler { } } } + +enum Success { + SentIdentify, + ReceivedIdentify(Info), + SentIdentifyPush, + ReceivedIdentifyPush(PushInfo), +} diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 989c94a4d67..5e2891e04e4 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -20,20 +20,15 @@ use crate::proto; use asynchronous_codec::{FramedRead, FramedWrite}; -use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::{ - multiaddr, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, - Multiaddr, -}; +use futures::prelude::*; +use libp2p_core::{multiaddr, Multiaddr}; use libp2p_identity as identity; use libp2p_identity::PublicKey; use libp2p_swarm::StreamProtocol; use log::{debug, trace}; use std::convert::TryFrom; -use std::{io, iter, pin::Pin}; +use std::io; use thiserror::Error; -use void::Void; const MAX_MESSAGE_SIZE_BYTES: usize = 4096; @@ -41,28 +36,6 @@ pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/1.0.0"); pub const PUSH_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/id/push/1.0.0"); -/// Substream upgrade protocol for `/ipfs/id/1.0.0`. -#[derive(Debug, Clone)] -pub struct Identify; - -/// Substream upgrade protocol for `/ipfs/id/push/1.0.0`. -#[derive(Debug, Clone)] -pub struct Push(T); -pub struct InboundPush(); -pub struct OutboundPush(Info); - -impl Push { - pub fn inbound() -> Self { - Push(InboundPush()) - } -} - -impl Push { - pub fn outbound(info: Info) -> Self { - Push(OutboundPush(info)) - } -} - /// Identify information of a peer sent in protocol messages. #[derive(Debug, Clone)] pub struct Info { @@ -117,75 +90,7 @@ pub struct PushInfo { pub observed_addr: Option, } -impl UpgradeInfo for Identify { - type Info = StreamProtocol; - type InfoIter = iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(PROTOCOL_NAME) - } -} - -impl InboundUpgrade for Identify { - type Output = C; - type Error = UpgradeError; - type Future = future::Ready>; - - fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - future::ok(socket) - } -} - -impl OutboundUpgrade for Identify -where - C: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Output = Info; - type Error = UpgradeError; - type Future = Pin> + Send>>; - - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { - recv_identify(socket).boxed() - } -} - -impl UpgradeInfo for Push { - type Info = StreamProtocol; - type InfoIter = iter::Once; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(PUSH_PROTOCOL_NAME) - } -} - -impl InboundUpgrade for Push -where - C: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Output = BoxFuture<'static, Result>; - type Error = Void; - type Future = future::Ready>; - - fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - // Lazily upgrade stream, thus allowing upgrade to happen within identify's handler. - future::ok(recv_push(socket).boxed()) - } -} - -impl OutboundUpgrade for Push -where - C: AsyncWrite + Unpin + Send + 'static, -{ - type Output = (); - type Error = UpgradeError; - type Future = Pin> + Send>>; - - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { - send(socket, self.0 .0).boxed() - } -} - -pub(crate) async fn send(io: T, info: Info) -> Result<(), UpgradeError> +pub(crate) async fn send_identify(io: T, info: Info) -> Result<(), UpgradeError> where T: AsyncWrite + Unpin, { @@ -219,7 +124,7 @@ where Ok(()) } -async fn recv_push(socket: T) -> Result +pub(crate) async fn recv_push(socket: T) -> Result where T: AsyncRead + AsyncWrite + Unpin, { @@ -230,7 +135,7 @@ where Ok(info) } -async fn recv_identify(socket: T) -> Result +pub(crate) async fn recv_identify(socket: T) -> Result where T: AsyncRead + AsyncWrite + Unpin, {