Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm)!: Allow NetworkBehaviours to manage incoming connections #3099

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d501019
Introduce `NetworkBehaviour::DialPayload`
thomaseizinger Nov 9, 2022
77c85f6
WIP Migrate production code
thomaseizinger Nov 9, 2022
118ffb6
Deprecate `new_handler`
thomaseizinger Nov 9, 2022
377c351
WIP: Completely remove `IntoConnectionHandler`
thomaseizinger Nov 14, 2022
9972eb4
Set supported protocols upon connection establishment
thomaseizinger Nov 15, 2022
fb9cdbc
Remove TODOs
thomaseizinger Nov 15, 2022
a96780d
Fix bad boolean logic
thomaseizinger Nov 15, 2022
3c9d98e
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 15, 2022
900edef
Fix gossipsub tests
thomaseizinger Nov 15, 2022
d1eea3a
Fix clippy warning
thomaseizinger Nov 15, 2022
98bf927
Update docs
thomaseizinger Nov 15, 2022
1cef941
Reduce diff
thomaseizinger Nov 15, 2022
ca3ef3e
Fix clippy errors
thomaseizinger Nov 15, 2022
a7f0685
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 16, 2022
d95b038
Update swarm/src/behaviour.rs
thomaseizinger Nov 16, 2022
edbbe41
Add changelog entry
thomaseizinger Nov 17, 2022
fef8d85
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 18, 2022
a942248
Remove unnecessary bounds
thomaseizinger Nov 18, 2022
0578134
Remove old example
thomaseizinger Nov 18, 2022
f7def2b
fmt
thomaseizinger Nov 18, 2022
a5a728b
Make `new_handler` fallible
thomaseizinger Nov 18, 2022
27e1b2d
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 19, 2022
19348e9
Allow each `NetworkBehaviour` to have their own `ConnectionDenied` re…
thomaseizinger Nov 22, 2022
9c55601
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 23, 2022
527eeda
Merge branch 'master' into 2824-remove-into-connection-handler
thomaseizinger Nov 29, 2022
a269e9d
Revert "Allow each `NetworkBehaviour` to have their own `ConnectionDe…
thomaseizinger Dec 7, 2022
dcb4f96
Always box cause for denied connection
thomaseizinger Dec 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use libp2p_request_response::{
handler::RequestResponseHandlerEvent, ProtocolSupport, RequestId, RequestResponse,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
};
use libp2p_swarm::behaviour::THandlerInEvent;
use libp2p_swarm::{
DialError, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
Expand Down Expand Up @@ -303,6 +304,7 @@ impl Behaviour {
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = <RequestResponse<AutoNatCodec> as NetworkBehaviour>::ConnectionHandler;
type OutEvent = Event;
type DialPayload = ();

fn inject_connection_established(
&mut self,
Expand Down Expand Up @@ -372,10 +374,10 @@ impl NetworkBehaviour for Behaviour {
fn inject_dial_failure(
&mut self,
peer: Option<PeerId>,
handler: Self::ConnectionHandler,
initial_event: Option<Self::DialPayload>,
error: &DialError,
) {
self.inner.inject_dial_failure(peer, handler, error);
self.inner.inject_dial_failure(peer, initial_event, error);
if let Some(event) = self.as_server().on_outbound_dial_error(peer, error) {
self.pending_out_events
.push_back(Event::InboundProbe(event));
Expand Down Expand Up @@ -487,14 +489,8 @@ impl NetworkBehaviour for Behaviour {
self.inner.inject_event(peer_id, conn, event)
}

fn inject_listen_failure(
&mut self,
local_addr: &Multiaddr,
send_back_addr: &Multiaddr,
handler: Self::ConnectionHandler,
) {
self.inner
.inject_listen_failure(local_addr, send_back_addr, handler)
fn inject_listen_failure(&mut self, local_addr: &Multiaddr, send_back_addr: &Multiaddr) {
self.inner.inject_listen_failure(local_addr, send_back_addr)
}

fn inject_new_listener(&mut self, id: ListenerId) {
Expand All @@ -512,7 +508,7 @@ impl NetworkBehaviour for Behaviour {

type Action = NetworkBehaviourAction<
<Behaviour as NetworkBehaviour>::OutEvent,
<Behaviour as NetworkBehaviour>::ConnectionHandler,
THandlerInEvent<<Behaviour as NetworkBehaviour>::ConnectionHandler>,
>;

// Trait implemented for `AsClient` as `AsServer` to handle events from the inner [`RequestResponse`] Protocol.
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonat/src/behaviour/as_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use libp2p_request_response::{
};
use libp2p_swarm::{
dial_opts::{DialOpts, PeerCondition},
DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
DialError, NetworkBehaviourAction, PollParameters,
};
use std::{
collections::{HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
.override_dial_concurrency_factor(NonZeroU8::new(1).expect("1 > 0"))
.addresses(addrs)
.build(),
handler: self.inner.new_handler(),
dial_payload: (),
});
}
Err((status_text, error)) => {
Expand Down
51 changes: 41 additions & 10 deletions protocols/dcutr/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use either::Either;
use libp2p_core::connection::{ConnectedPoint, ConnectionId};
use libp2p_core::multiaddr::Protocol;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::behaviour::THandlerInEvent;
use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerUpgrErr, DialError, IntoConnectionHandler,
Expand Down Expand Up @@ -86,11 +87,26 @@ impl Behaviour {
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = handler::Prototype;
type OutEvent = Event;
type DialPayload = handler::Prototype;
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

fn new_handler(&mut self) -> Self::ConnectionHandler {
handler::Prototype::UnknownConnection
}

fn new_inbound_handler(&mut self) -> Self::ConnectionHandler {
handler::Prototype::UnknownConnection
}

fn new_outbound_handler(
&mut self,
dial_payload: Option<Self::DialPayload>,
) -> Self::ConnectionHandler {
match dial_payload {
Some(prototype) => prototype,
None => handler::Prototype::UnknownConnection,
}
}

fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
vec![]
}
Expand Down Expand Up @@ -143,13 +159,13 @@ impl NetworkBehaviour for Behaviour {
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ConnectionHandler,
dial_payload: Option<Self::DialPayload>,
_error: &DialError,
) {
if let handler::Prototype::DirectConnection {
if let Some(handler::Prototype::DirectConnection {
relayed_connection_id,
role: handler::Role::Initiator { attempt },
} = handler
}) = dial_payload
{
let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known.");
if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
Expand Down Expand Up @@ -243,7 +259,7 @@ impl NetworkBehaviour for Behaviour {
.addresses(remote_addrs)
.condition(dial_opts::PeerCondition::Always)
.build(),
handler: handler::Prototype::DirectConnection {
dial_payload: handler::Prototype::DirectConnection {
relayed_connection_id: connection,
role: handler::Role::Listener,
},
Expand Down Expand Up @@ -271,7 +287,7 @@ impl NetworkBehaviour for Behaviour {
.addresses(remote_addrs)
.override_role()
.build(),
handler: handler::Prototype::DirectConnection {
dial_payload: handler::Prototype::DirectConnection {
relayed_connection_id: connection,
role: handler::Role::Initiator { attempt },
},
Expand Down Expand Up @@ -309,7 +325,13 @@ impl NetworkBehaviour for Behaviour {
&mut self,
_cx: &mut Context<'_>,
poll_parameters: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
) -> Poll<
NetworkBehaviourAction<
Self::OutEvent,
THandlerInEvent<Self::ConnectionHandler>,
handler::Prototype,
>,
> {
if let Some(action) = self.queued_actions.pop_front() {
return Poll::Ready(action.build(poll_parameters));
}
Expand All @@ -321,7 +343,7 @@ impl NetworkBehaviour for Behaviour {
/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`]
/// before being returned in [`Behaviour::poll`].
enum ActionBuilder {
Done(NetworkBehaviourAction<Event, handler::Prototype>),
Done(NetworkBehaviourAction<Event, THandlerInEvent<handler::Prototype>, handler::Prototype>),
Connect {
attempt: u8,
handler: NotifyHandler,
Expand All @@ -334,8 +356,16 @@ enum ActionBuilder {
},
}

impl From<NetworkBehaviourAction<Event, handler::Prototype>> for ActionBuilder {
fn from(action: NetworkBehaviourAction<Event, handler::Prototype>) -> Self {
impl From<NetworkBehaviourAction<Event, THandlerInEvent<handler::Prototype>, handler::Prototype>>
for ActionBuilder
{
fn from(
action: NetworkBehaviourAction<
Event,
THandlerInEvent<handler::Prototype>,
handler::Prototype,
>,
) -> Self {
Self::Done(action)
}
}
Expand All @@ -344,7 +374,8 @@ impl ActionBuilder {
fn build(
self,
poll_parameters: &mut impl PollParameters,
) -> NetworkBehaviourAction<Event, handler::Prototype> {
) -> NetworkBehaviourAction<Event, THandlerInEvent<handler::Prototype>, handler::Prototype>
{
let obs_addrs = || {
poll_parameters
.external_addresses()
Expand Down
18 changes: 7 additions & 11 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use cuckoofilter::{CuckooError, CuckooFilter};
use fnv::FnvHashSet;
use libp2p_core::{connection::ConnectionId, PeerId};
use libp2p_core::{ConnectedPoint, Multiaddr};
use libp2p_swarm::behaviour::THandlerInEvent;
use libp2p_swarm::{
dial_opts::DialOpts, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, OneShotHandler,
PollParameters,
Expand All @@ -41,12 +42,7 @@ use std::{collections::VecDeque, iter};
/// Network behaviour that handles the floodsub protocol.
pub struct Floodsub {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<
NetworkBehaviourAction<
FloodsubEvent,
OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>,
>,
>,
events: VecDeque<NetworkBehaviourAction<FloodsubEvent, FloodsubRpc>>,

config: FloodsubConfig,

Expand Down Expand Up @@ -107,10 +103,9 @@ impl Floodsub {
}

if self.target_peers.insert(peer_id) {
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(peer_id).build(),
handler,
dial_payload: (),
});
}
}
Expand Down Expand Up @@ -281,6 +276,7 @@ impl Floodsub {
impl NetworkBehaviour for Floodsub {
type ConnectionHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
type OutEvent = FloodsubEvent;
type DialPayload = ();

fn new_handler(&mut self) -> Self::ConnectionHandler {
Default::default()
Expand Down Expand Up @@ -339,10 +335,9 @@ impl NetworkBehaviour for Floodsub {
// We can be disconnected by the remote in case of inactivity for example, so we always
// try to reconnect.
if self.target_peers.contains(id) {
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(*id).build(),
handler,
dial_payload: (),
});
}
}
Expand Down Expand Up @@ -470,7 +465,8 @@ impl NetworkBehaviour for Floodsub {
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self::ConnectionHandler>>>
{
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
Expand Down
12 changes: 5 additions & 7 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl From<MessageAuthenticity> for PublishConfig {
}

type GossipsubNetworkBehaviourAction =
NetworkBehaviourAction<GossipsubEvent, GossipsubHandler, Arc<GossipsubHandlerIn>>;
NetworkBehaviourAction<GossipsubEvent, Arc<GossipsubHandlerIn>>;

/// Network behaviour that handles the gossipsub protocol.
///
Expand Down Expand Up @@ -1139,10 +1139,9 @@ where
if !self.peer_topics.contains_key(peer_id) {
// Connect to peer
debug!("Connecting to explicit peer {:?}", peer_id);
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(*peer_id).build(),
handler,
dial_payload: (),
});
}
}
Expand Down Expand Up @@ -1637,11 +1636,9 @@ where
// mark as px peer
self.px_peers.insert(peer_id);

// dial peer
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(peer_id).build(),
handler,
dial_payload: (),
});
}
}
Expand Down Expand Up @@ -3045,6 +3042,7 @@ where
{
type ConnectionHandler = GossipsubHandler;
type OutEvent = GossipsubEvent;
type DialPayload = ();

fn new_handler(&mut self) -> Self::ConnectionHandler {
let protocol_config = ProtocolConfig::new(
Expand Down Expand Up @@ -3437,7 +3435,7 @@ where
&mut self,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, GossipsubHandlerIn>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event.map_in(|e: Arc<GossipsubHandlerIn>| {
// clone send event reference if others references are present
Expand Down
22 changes: 16 additions & 6 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,10 @@ fn test_explicit_peer_gets_connected() {
.events
.iter()
.filter(|e| match e {
NetworkBehaviourAction::Dial { opts, handler: _ } => opts.get_peer_id() == Some(peer),
NetworkBehaviourAction::Dial {
opts,
initial_in_event: _,
} => opts.get_peer_id() == Some(peer),
_ => false,
})
.count();
Expand Down Expand Up @@ -1397,8 +1400,10 @@ fn test_explicit_peer_reconnects() {
gs.events
.iter()
.filter(|e| match e {
NetworkBehaviourAction::Dial { opts, handler: _ } =>
opts.get_peer_id() == Some(*peer),
NetworkBehaviourAction::Dial {
opts,
initial_in_event: _,
} => opts.get_peer_id() == Some(*peer),
_ => false,
})
.count(),
Expand All @@ -1413,8 +1418,10 @@ fn test_explicit_peer_reconnects() {
gs.events
.iter()
.filter(|e| match e {
NetworkBehaviourAction::Dial { opts, handler: _ } =>
opts.get_peer_id() == Some(*peer),
NetworkBehaviourAction::Dial {
opts,
initial_in_event: _,
} => opts.get_peer_id() == Some(*peer),
_ => false,
})
.count()
Expand Down Expand Up @@ -1794,7 +1801,10 @@ fn test_connect_to_px_peers_on_handle_prune() {
.events
.iter()
.filter_map(|e| match e {
NetworkBehaviourAction::Dial { opts, handler: _ } => opts.get_peer_id(),
NetworkBehaviourAction::Dial {
opts,
initial_in_event: _,
} => opts.get_peer_id(),
_ => None,
})
.collect();
Expand Down
Loading