Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm)!: Allow NetworkBehaviours to manage incoming connections #3099

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d501019
Introduce `NetworkBehaviour::DialPayload`
thomaseizinger Nov 9, 2022
77c85f6
WIP Migrate production code
thomaseizinger Nov 9, 2022
118ffb6
Deprecate `new_handler`
thomaseizinger Nov 9, 2022
377c351
WIP: Completely remove `IntoConnectionHandler`
thomaseizinger Nov 14, 2022
9972eb4
Set supported protocols upon connection establishment
thomaseizinger Nov 15, 2022
fb9cdbc
Remove TODOs
thomaseizinger Nov 15, 2022
a96780d
Fix bad boolean logic
thomaseizinger Nov 15, 2022
3c9d98e
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 15, 2022
900edef
Fix gossipsub tests
thomaseizinger Nov 15, 2022
d1eea3a
Fix clippy warning
thomaseizinger Nov 15, 2022
98bf927
Update docs
thomaseizinger Nov 15, 2022
1cef941
Reduce diff
thomaseizinger Nov 15, 2022
ca3ef3e
Fix clippy errors
thomaseizinger Nov 15, 2022
a7f0685
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 16, 2022
d95b038
Update swarm/src/behaviour.rs
thomaseizinger Nov 16, 2022
edbbe41
Add changelog entry
thomaseizinger Nov 17, 2022
fef8d85
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 18, 2022
a942248
Remove unnecessary bounds
thomaseizinger Nov 18, 2022
0578134
Remove old example
thomaseizinger Nov 18, 2022
f7def2b
fmt
thomaseizinger Nov 18, 2022
a5a728b
Make `new_handler` fallible
thomaseizinger Nov 18, 2022
27e1b2d
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 19, 2022
19348e9
Allow each `NetworkBehaviour` to have their own `ConnectionDenied` re…
thomaseizinger Nov 22, 2022
9c55601
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 23, 2022
527eeda
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 29, 2022
a269e9d
Revert "Allow each `NetworkBehaviour` to have their own `ConnectionDe…
thomaseizinger Dec 7, 2022
dcb4f96
Always box cause for denied connection
thomaseizinger Dec 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ use libp2p_request_response::{
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
};
use libp2p_swarm::behaviour::THandlerInEvent;
use libp2p_swarm::{
DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p_swarm::{DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use std::{
collections::{HashMap, VecDeque},
iter,
Expand Down Expand Up @@ -304,7 +302,6 @@ impl Behaviour {
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = <RequestResponse<AutoNatCodec> as NetworkBehaviour>::ConnectionHandler;
type OutEvent = Event;
type DialPayload = ();

fn inject_connection_established(
&mut self,
Expand Down Expand Up @@ -358,7 +355,7 @@ impl NetworkBehaviour for Behaviour {
peer: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
handler: Self::ConnectionHandler,
remaining_established: usize,
) {
self.inner
Expand All @@ -371,14 +368,9 @@ impl NetworkBehaviour for Behaviour {
}
}

fn inject_dial_failure(
&mut self,
peer: Option<PeerId>,
initial_event: Option<Self::DialPayload>,
error: &DialError,
) {
self.inner.inject_dial_failure(peer, initial_event, error);
if let Some(event) = self.as_server().on_outbound_dial_error(peer, error) {
fn inject_dial_failure(&mut self, _peer_id: Option<PeerId>, _error: &DialError) {
self.inner.inject_dial_failure(_peer_id, _error);
if let Some(event) = self.as_server().on_outbound_dial_error(_peer_id, _error) {
self.pending_out_events
.push_back(Event::InboundProbe(event));
}
Expand Down Expand Up @@ -472,8 +464,12 @@ impl NetworkBehaviour for Behaviour {
}
}

fn new_handler(&mut self) -> Self::ConnectionHandler {
self.inner.new_handler()
fn new_handler(
&mut self,
peer: &PeerId,
connected_point: &ConnectedPoint,
) -> Self::ConnectionHandler {
self.inner.new_handler(peer, connected_point)
}

fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
Expand Down
1 change: 0 additions & 1 deletion protocols/autonat/src/behaviour/as_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
.override_dial_concurrency_factor(NonZeroU8::new(1).expect("1 > 0"))
.addresses(addrs)
.build(),
dial_payload: (),
});
}
Err((status_text, error)) => {
Expand Down
144 changes: 60 additions & 84 deletions protocols/dcutr/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::behaviour::THandlerInEvent;
use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerUpgrErr, DialError, IntoConnectionHandler,
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
dummy, ConnectionHandler, ConnectionHandlerUpgrErr, DialError, NetworkBehaviour,
NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Context, Poll};
Expand Down Expand Up @@ -75,6 +75,9 @@ pub struct Behaviour {
direct_connections: HashMap<PeerId, HashSet<ConnectionId>>,
}

type Handler =
Either<handler::relayed::Handler, Either<handler::direct::Handler, dummy::ConnectionHandler>>;

impl Behaviour {
pub fn new() -> Self {
Behaviour {
Expand All @@ -85,26 +88,17 @@ impl Behaviour {
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = handler::Prototype;
type ConnectionHandler = Handler;
type OutEvent = Event;
type DialPayload = handler::Prototype;

fn new_handler(&mut self) -> Self::ConnectionHandler {
handler::Prototype::UnknownConnection
}

fn new_inbound_handler(&mut self) -> Self::ConnectionHandler {
handler::Prototype::UnknownConnection
}

fn new_outbound_handler(
fn new_handler(
&mut self,
dial_payload: Option<Self::DialPayload>,
peer: &PeerId,
connected_point: &ConnectedPoint,
) -> Self::ConnectionHandler {
match dial_payload {
Some(prototype) => prototype,
None => handler::Prototype::UnknownConnection,
}
// handler::Prototype::UnknownConnection

todo!()
}

fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
Expand Down Expand Up @@ -156,50 +150,46 @@ impl NetworkBehaviour for Behaviour {
}
}

fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
dial_payload: Option<Self::DialPayload>,
_error: &DialError,
) {
if let Some(handler::Prototype::DirectConnection {
relayed_connection_id,
role: handler::Role::Initiator { attempt },
}) = dial_payload
{
let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known.");
if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
self.queued_actions.push_back(ActionBuilder::Connect {
peer_id,
handler: NotifyHandler::One(relayed_connection_id),
attempt: attempt + 1,
});
} else {
self.queued_actions.extend([
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(
handler::relayed::Command::UpgradeFinishedDontKeepAlive,
),
}
.into(),
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed {
remote_peer_id: peer_id,
error: UpgradeError::Dial,
})
.into(),
]);
}
}
fn inject_dial_failure(&mut self, _peer_id: Option<PeerId>, _error: &DialError) {
// TODO: Track state in behaviour
// if let Some(handler::Prototype::DirectConnection {
// relayed_connection_id,
// role: handler::Role::Initiator { attempt },
// }) = dial_payload
// {
// let peer_id = _peer_id.expect("Peer of `Prototype::DirectConnection` is always known.");
// if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
// self.queued_actions.push_back(ActionBuilder::Connect {
// peer_id,
// handler: NotifyHandler::One(relayed_connection_id),
// attempt: attempt + 1,
// });
// } else {
// self.queued_actions.extend([
// NetworkBehaviourAction::NotifyHandler {
// peer_id,
// handler: NotifyHandler::One(relayed_connection_id),
// event: Either::Left(
// handler::relayed::Command::UpgradeFinishedDontKeepAlive,
// ),
// }
// .into(),
// NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed {
// remote_peer_id: peer_id,
// error: UpgradeError::Dial,
// })
// .into(),
// ]);
// }
// }
}

fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
connected_point: &ConnectedPoint,
_handler: <<Self as NetworkBehaviour>::ConnectionHandler as IntoConnectionHandler>::Handler,
_handler: Self::ConnectionHandler,
_remaining_established: usize,
) {
if !connected_point.is_relayed() {
Expand All @@ -221,7 +211,7 @@ impl NetworkBehaviour for Behaviour {
&mut self,
event_source: PeerId,
connection: ConnectionId,
handler_event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
handler_event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
match handler_event {
Either::Left(handler::relayed::Event::InboundConnectRequest {
Expand Down Expand Up @@ -259,10 +249,10 @@ impl NetworkBehaviour for Behaviour {
.addresses(remote_addrs)
.condition(dial_opts::PeerCondition::Always)
.build(),
dial_payload: handler::Prototype::DirectConnection {
relayed_connection_id: connection,
role: handler::Role::Listener,
},
// dial_payload: handler::Prototype::DirectConnection {
// relayed_connection_id: connection,
// role: handler::Role::Listener,
// },
}
.into(),
);
Expand All @@ -287,10 +277,10 @@ impl NetworkBehaviour for Behaviour {
.addresses(remote_addrs)
.override_role()
.build(),
dial_payload: handler::Prototype::DirectConnection {
relayed_connection_id: connection,
role: handler::Role::Initiator { attempt },
},
// dial_payload: handler::Prototype::DirectConnection {
// relayed_connection_id: connection,
// role: handler::Role::Initiator { attempt },
// },
}
.into(),
);
Expand Down Expand Up @@ -325,13 +315,8 @@ impl NetworkBehaviour for Behaviour {
&mut self,
_cx: &mut Context<'_>,
poll_parameters: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
Self::OutEvent,
THandlerInEvent<Self::ConnectionHandler>,
handler::Prototype,
>,
> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self::ConnectionHandler>>>
{
if let Some(action) = self.queued_actions.pop_front() {
return Poll::Ready(action.build(poll_parameters));
}
Expand All @@ -343,7 +328,7 @@ impl NetworkBehaviour for Behaviour {
/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`]
/// before being returned in [`Behaviour::poll`].
enum ActionBuilder {
Done(NetworkBehaviourAction<Event, THandlerInEvent<handler::Prototype>, handler::Prototype>),
Done(NetworkBehaviourAction<Event, THandlerInEvent<Handler>>),
Connect {
attempt: u8,
handler: NotifyHandler,
Expand All @@ -356,16 +341,8 @@ enum ActionBuilder {
},
}

impl From<NetworkBehaviourAction<Event, THandlerInEvent<handler::Prototype>, handler::Prototype>>
for ActionBuilder
{
fn from(
action: NetworkBehaviourAction<
Event,
THandlerInEvent<handler::Prototype>,
handler::Prototype,
>,
) -> Self {
impl From<NetworkBehaviourAction<Event, THandlerInEvent<Handler>>> for ActionBuilder {
fn from(action: NetworkBehaviourAction<Event, THandlerInEvent<Handler>>) -> Self {
Self::Done(action)
}
}
Expand All @@ -374,8 +351,7 @@ impl ActionBuilder {
fn build(
self,
poll_parameters: &mut impl PollParameters,
) -> NetworkBehaviourAction<Event, THandlerInEvent<handler::Prototype>, handler::Prototype>
{
) -> NetworkBehaviourAction<Event, THandlerInEvent<Handler>> {
let obs_addrs = || {
poll_parameters
.external_addresses()
Expand Down
75 changes: 38 additions & 37 deletions protocols/dcutr/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use libp2p_core::upgrade::{self, DeniedUpgrade};
use libp2p_core::{ConnectedPoint, PeerId};
use libp2p_swarm::dummy;
use libp2p_swarm::handler::SendWrapper;
use libp2p_swarm::{ConnectionHandler, IntoConnectionHandler};
use libp2p_swarm::ConnectionHandler;

pub mod direct;
pub mod relayed;
Expand All @@ -43,39 +43,40 @@ pub enum Role {
Listener,
}

impl IntoConnectionHandler for Prototype {
type Handler = Either<relayed::Handler, Either<direct::Handler, dummy::ConnectionHandler>>;

fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
match self {
Self::UnknownConnection => {
if endpoint.is_relayed() {
Either::Left(relayed::Handler::new(endpoint.clone()))
} else {
Either::Right(Either::Right(dummy::ConnectionHandler))
}
}
Self::DirectConnection {
relayed_connection_id,
..
} => {
assert!(
!endpoint.is_relayed(),
"`Prototype::DirectConnection` is never created for relayed connection."
);
Either::Right(Either::Left(direct::Handler::new(relayed_connection_id)))
}
}
}

fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
match self {
Prototype::UnknownConnection => upgrade::EitherUpgrade::A(SendWrapper(
upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}),
)),
Prototype::DirectConnection { .. } => {
upgrade::EitherUpgrade::A(SendWrapper(upgrade::EitherUpgrade::B(DeniedUpgrade)))
}
}
}
}
// TODO
// impl IntoConnectionHandler for Prototype {
// type Handler = Either<relayed::Handler, Either<direct::Handler, dummy::ConnectionHandler>>;
//
// fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
// match self {
// Self::UnknownConnection => {
// if endpoint.is_relayed() {
// Either::Left(relayed::Handler::new(endpoint.clone()))
// } else {
// Either::Right(Either::Right(dummy::ConnectionHandler))
// }
// }
// Self::DirectConnection {
// relayed_connection_id,
// ..
// } => {
// assert!(
// !endpoint.is_relayed(),
// "`Prototype::DirectConnection` is never created for relayed connection."
// );
// Either::Right(Either::Left(direct::Handler::new(relayed_connection_id)))
// }
// }
// }
//
// fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
// match self {
// Prototype::UnknownConnection => upgrade::EitherUpgrade::A(SendWrapper(
// upgrade::EitherUpgrade::A(protocol::inbound::Upgrade {}),
// )),
// Prototype::DirectConnection { .. } => {
// upgrade::EitherUpgrade::A(SendWrapper(upgrade::EitherUpgrade::B(DeniedUpgrade)))
// }
// }
// }
// }
Loading