Skip to content

Commit

Permalink
refactor(swarm)!: don't be generic over Transport (#3272)
Browse files Browse the repository at this point in the history
Ever since we moved `Pool` into `libp2p-swarm`, we always use it with the same `Transport`: `Boxed`. It is thus unnecessary for us to be overly generic over what kind of `Transport` we are using. This allows us to remove a few type parameters from the implementation which overall simplifies things.

This is technically a breaking change because I am removing a type parameter from two exported type aliases:

- `PendingInboundConnectionError`
- `PendingOutboundConnectionError`

Those have always only be used with `std::io::Error` in our API but it is still a breaking change.
  • Loading branch information
thomaseizinger authored Dec 23, 2022
1 parent aca3454 commit 5782a96
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 98 deletions.
6 changes: 2 additions & 4 deletions misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,8 @@ enum PendingInboundConnectionError {
ConnectionLimit,
}

impl<TTransErr> From<&libp2p_swarm::PendingInboundConnectionError<TTransErr>>
for PendingInboundConnectionError
{
fn from(error: &libp2p_swarm::PendingInboundConnectionError<TTransErr>) -> Self {
impl From<&libp2p_swarm::PendingInboundConnectionError> for PendingInboundConnectionError {
fn from(error: &libp2p_swarm::PendingInboundConnectionError) -> Self {
match error {
libp2p_swarm::PendingInboundConnectionError::WrongPeerId { .. } => {
PendingInboundConnectionError::WrongPeerId
Expand Down
4 changes: 4 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

- Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134].

- Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`.
These two types are always used with `std::io::Error`. See [PR 3272].

[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272

# 0.41.1

Expand Down
7 changes: 3 additions & 4 deletions swarm/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ impl<THandlerErr> From<io::Error> for ConnectionError<THandlerErr> {
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to
/// [`PendingInboundConnectionError`], one or more [`TransportError`]s can occur for a single
/// connection.
pub type PendingOutboundConnectionError<TTransErr> =
PendingConnectionError<Vec<(Multiaddr, TransportError<TTransErr>)>>;
pub type PendingOutboundConnectionError =
PendingConnectionError<Vec<(Multiaddr, TransportError<io::Error>)>>;

/// Errors that can occur in the context of a pending incoming `Connection`.
pub type PendingInboundConnectionError<TTransErr> =
PendingConnectionError<TransportError<TTransErr>>;
pub type PendingInboundConnectionError = PendingConnectionError<TransportError<io::Error>>;

/// Errors that can occur in the context of a pending `Connection`.
#[derive(Debug)]
Expand Down
48 changes: 16 additions & 32 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError,
PendingInboundConnectionError, PendingOutboundConnectionError,
},
transport::{Transport, TransportError},
transport::TransportError,
ConnectedPoint, ConnectionHandler, Executor, IntoConnectionHandler, Multiaddr, PeerId,
};
use concurrent_dial::ConcurrentDial;
Expand Down Expand Up @@ -79,9 +79,8 @@ impl ExecSwitch {
}

/// A connection `Pool` manages a set of connections for each peer.
pub struct Pool<THandler, TTrans>
pub struct Pool<THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
local_id: PeerId,
Expand Down Expand Up @@ -124,10 +123,10 @@ where

/// Sender distributed to pending tasks for reporting events back
/// to the pool.
pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent<TTrans>>,
pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,

/// Receiver for events reported from pending tasks.
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent<TTrans>>,
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,

/// Sender distributed to established tasks for reporting events back
/// to the pool.
Expand Down Expand Up @@ -213,7 +212,7 @@ impl<THandler> PendingConnection<THandler> {
}
}

impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> {
impl<THandler: IntoConnectionHandler> fmt::Debug for Pool<THandler> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Pool")
.field("counters", &self.counters)
Expand All @@ -223,10 +222,7 @@ impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THa

/// Event that can happen on the `Pool`.
#[derive(Debug)]
pub enum PoolEvent<THandler: IntoConnectionHandler, TTrans>
where
TTrans: Transport,
{
pub enum PoolEvent<THandler: IntoConnectionHandler> {
/// A new connection has been established.
ConnectionEstablished {
id: ConnectionId,
Expand All @@ -239,7 +235,7 @@ where
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed in parallel. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<TTrans::Error>)>>,
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
/// How long it took to establish this connection.
established_in: std::time::Duration,
},
Expand Down Expand Up @@ -272,7 +268,7 @@ where
/// The ID of the failed connection.
id: ConnectionId,
/// The error that occurred.
error: PendingOutboundConnectionError<TTrans::Error>,
error: PendingOutboundConnectionError,
/// The handler that was supposed to handle the connection.
handler: THandler,
/// The (expected) peer of the failed connection.
Expand All @@ -288,7 +284,7 @@ where
/// Local connection address.
local_addr: Multiaddr,
/// The error that occurred.
error: PendingInboundConnectionError<TTrans::Error>,
error: PendingInboundConnectionError,
/// The handler that was supposed to handle the connection.
handler: THandler,
},
Expand All @@ -312,10 +308,9 @@ where
},
}

impl<THandler, TTrans> Pool<THandler, TTrans>
impl<THandler> Pool<THandler>
where
THandler: IntoConnectionHandler,
TTrans: Transport,
{
/// Creates a new empty `Pool`.
pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
Expand Down Expand Up @@ -429,12 +424,9 @@ where
}
}

impl<THandler, TTrans> Pool<THandler, TTrans>
impl<THandler> Pool<THandler>
where
THandler: IntoConnectionHandler,
TTrans: Transport + 'static,
TTrans::Output: Send + 'static,
TTrans::Error: Send + 'static,
{
/// Adds a pending outgoing connection to the pool in the form of a `Future`
/// that establishes and negotiates the connection.
Expand All @@ -448,22 +440,15 @@ where
'static,
(
Multiaddr,
Result<
<TTrans as Transport>::Output,
TransportError<<TTrans as Transport>::Error>,
>,
Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
),
>,
>,
peer: Option<PeerId>,
handler: THandler,
role_override: Endpoint,
dial_concurrency_factor_override: Option<NonZeroU8>,
) -> Result<ConnectionId, (ConnectionLimit, THandler)>
where
TTrans: Send,
TTrans::Dial: Send + 'static,
{
) -> Result<ConnectionId, (ConnectionLimit, THandler)> {
if let Err(limit) = self.counters.check_max_pending_outgoing() {
return Err((limit, handler));
};
Expand Down Expand Up @@ -515,7 +500,7 @@ where
info: IncomingInfo<'_>,
) -> Result<ConnectionId, (ConnectionLimit, THandler)>
where
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
{
let endpoint = info.create_connected_point();

Expand Down Expand Up @@ -552,9 +537,8 @@ where
}

/// Polls the connection pool for events.
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler, TTrans>>
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler>>
where
TTrans: Transport<Output = (PeerId, StreamMuxerBox)>,
THandler: IntoConnectionHandler + 'static,
THandler::Handler: ConnectionHandler + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
Expand Down Expand Up @@ -677,7 +661,7 @@ where
),
};

let error: Result<(), PendingInboundConnectionError<_>> = self
let error = self
.counters
// Check general established connection limit.
.check_max_established(&endpoint)
Expand Down
42 changes: 16 additions & 26 deletions swarm/src/connection/pool/concurrent_dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,38 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{
transport::{Transport, TransportError},
Multiaddr,
};
use crate::{transport::TransportError, Multiaddr};
use futures::{
future::{BoxFuture, Future},
ready,
stream::{FuturesUnordered, StreamExt},
};
use libp2p_core::muxing::StreamMuxerBox;
use libp2p_core::PeerId;
use std::{
num::NonZeroU8,
pin::Pin,
task::{Context, Poll},
};

type Dial<TTrans> = BoxFuture<
type Dial = BoxFuture<
'static,
(
Multiaddr,
Result<<TTrans as Transport>::Output, TransportError<<TTrans as Transport>::Error>>,
Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
),
>;

pub struct ConcurrentDial<TTrans: Transport> {
dials: FuturesUnordered<Dial<TTrans>>,
pending_dials: Box<dyn Iterator<Item = Dial<TTrans>> + Send>,
errors: Vec<(Multiaddr, TransportError<TTrans::Error>)>,
pub struct ConcurrentDial {
dials: FuturesUnordered<Dial>,
pending_dials: Box<dyn Iterator<Item = Dial> + Send>,
errors: Vec<(Multiaddr, TransportError<std::io::Error>)>,
}

impl<TTrans: Transport> Unpin for ConcurrentDial<TTrans> {}
impl Unpin for ConcurrentDial {}

impl<TTrans> ConcurrentDial<TTrans>
where
TTrans: Transport + Send + 'static,
TTrans::Output: Send,
TTrans::Error: Send,
TTrans::Dial: Send + 'static,
{
pub(crate) fn new(pending_dials: Vec<Dial<TTrans>>, concurrency_factor: NonZeroU8) -> Self {
impl ConcurrentDial {
pub(crate) fn new(pending_dials: Vec<Dial>, concurrency_factor: NonZeroU8) -> Self {
let mut pending_dials = pending_dials.into_iter();

let dials = FuturesUnordered::new();
Expand All @@ -75,20 +68,17 @@ where
}
}

impl<TTrans> Future for ConcurrentDial<TTrans>
where
TTrans: Transport,
{
impl Future for ConcurrentDial {
type Output = Result<
// Either one dial succeeded, returning the negotiated [`PeerId`], the address, the
// muxer and the addresses and errors of the dials that failed before.
(
Multiaddr,
TTrans::Output,
Vec<(Multiaddr, TransportError<TTrans::Error>)>,
(PeerId, StreamMuxerBox),
Vec<(Multiaddr, TransportError<std::io::Error>)>,
),
// Or all dials failed, thus returning the address and error for each dial.
Vec<(Multiaddr, TransportError<TTrans::Error>)>,
Vec<(Multiaddr, TransportError<std::io::Error>)>,
>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Expand Down
35 changes: 13 additions & 22 deletions swarm/src/connection/pool/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
connection::{
self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
},
transport::{Transport, TransportError},
transport::TransportError,
ConnectionHandler, Multiaddr, PeerId,
};
use futures::{
Expand All @@ -35,6 +35,7 @@ use futures::{
SinkExt, StreamExt,
};
use libp2p_core::connection::ConnectionId;
use libp2p_core::muxing::StreamMuxerBox;
use std::pin::Pin;
use void::Void;

Expand All @@ -48,26 +49,19 @@ pub enum Command<T> {
Close,
}

#[derive(Debug)]
pub enum PendingConnectionEvent<TTrans>
where
TTrans: Transport,
{
pub enum PendingConnectionEvent {
ConnectionEstablished {
id: ConnectionId,
output: TTrans::Output,
output: (PeerId, StreamMuxerBox),
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed in parallel. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<TTrans::Error>)>)>,
outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
},
/// A pending connection failed.
PendingFailed {
id: ConnectionId,
error: Either<
PendingOutboundConnectionError<TTrans::Error>,
PendingInboundConnectionError<TTrans::Error>,
>,
error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
},
}

Expand Down Expand Up @@ -97,14 +91,12 @@ pub enum EstablishedConnectionEvent<THandler: ConnectionHandler> {
},
}

pub async fn new_for_pending_outgoing_connection<TTrans>(
pub async fn new_for_pending_outgoing_connection(
connection_id: ConnectionId,
dial: ConcurrentDial<TTrans>,
dial: ConcurrentDial,
abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
) where
TTrans: Transport,
{
mut events: mpsc::Sender<PendingConnectionEvent>,
) {
match futures::future::select(abort_receiver, Box::pin(dial)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events
Expand Down Expand Up @@ -135,14 +127,13 @@ pub async fn new_for_pending_outgoing_connection<TTrans>(
}
}

pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
pub async fn new_for_pending_incoming_connection<TFut>(
connection_id: ConnectionId,
future: TFut,
abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
mut events: mpsc::Sender<PendingConnectionEvent>,
) where
TTrans: Transport,
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
{
match futures::future::select(abort_receiver, Box::pin(future)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
Expand Down
Loading

0 comments on commit 5782a96

Please sign in to comment.