Skip to content

Commit

Permalink
refactor(identify): use ReadyUpgrade for {In,Out}boundUpgrade
Browse files Browse the repository at this point in the history
Related: libp2p#2863.

Pull-Request: libp2p#4563.
  • Loading branch information
dgarus authored Sep 28, 2023
1 parent 665181e commit ecdd0ff
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 179 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ categories = ["network-programming", "asynchronous"]
asynchronous-codec = "0.6"
futures = "0.3.28"
futures-timer = "3.0.2"
futures-bounded = { workspace = true }
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
Expand Down
200 changes: 122 additions & 78 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{Identify, InboundPush, OutboundPush, Push, UpgradeError};
use crate::protocol::{Info, PushInfo};
use crate::protocol::{Info, PushInfo, UpgradeError};
use crate::{protocol, PROTOCOL_NAME, PUSH_PROTOCOL_NAME};
use either::Either;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use futures_bounded::Timeout;
use futures_timer::Delay;
use libp2p_core::upgrade::SelectUpgrade;
use libp2p_core::upgrade::{ReadyUpgrade, SelectUpgrade};
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
Expand All @@ -42,21 +41,27 @@ use smallvec::SmallVec;
use std::collections::HashSet;
use std::{io, task::Context, task::Poll, time::Duration};

const STREAM_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;

/// Protocol handler for sending and receiving identification requests.
///
/// Outbound requests are sent periodically. The handler performs expects
/// at least one identification request to be answered by the remote before
/// permitting the underlying connection to be closed.
pub struct Handler {
remote_peer_id: PeerId,
inbound_identify_push: Option<BoxFuture<'static, Result<PushInfo, UpgradeError>>>,
/// Pending events to yield.
events: SmallVec<
[ConnectionHandlerEvent<Either<Identify, Push<OutboundPush>>, (), Event, io::Error>; 4],
[ConnectionHandlerEvent<
Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>,
(),
Event,
io::Error,
>; 4],
>,

/// Pending identification replies, awaiting being sent.
pending_replies: FuturesUnordered<BoxFuture<'static, Result<(), UpgradeError>>>,
active_streams: futures_bounded::FuturesSet<Result<Success, UpgradeError>>,

/// Future that fires when we need to identify the node again.
trigger_next_identify: Delay,
Expand Down Expand Up @@ -125,9 +130,11 @@ impl Handler {
) -> Self {
Self {
remote_peer_id,
inbound_identify_push: Default::default(),
events: SmallVec::new(),
pending_replies: FuturesUnordered::new(),
active_streams: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
trigger_next_identify: Delay::new(initial_delay),
exchanged_one_periodic_identify: false,
interval,
Expand All @@ -152,19 +159,28 @@ impl Handler {
>,
) {
match output {
future::Either::Left(substream) => {
future::Either::Left(stream) => {
let info = self.build_info();

self.pending_replies
.push(crate::protocol::send(substream, info).boxed());
if self
.active_streams
.try_push(
protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentify),
)
.is_err()
{
warn!("Dropping inbound stream because we are at capacity");
} else {
self.exchanged_one_periodic_identify = true;
}
}
future::Either::Right(fut) => {
if self.inbound_identify_push.replace(fut).is_some() {
warn!(
"New inbound identify push stream from {} while still \
upgrading previous one. Replacing previous with new.",
self.remote_peer_id,
);
future::Either::Right(stream) => {
if self
.active_streams
.try_push(protocol::recv_push(stream).map_ok(Success::ReceivedIdentifyPush))
.is_err()
{
warn!("Dropping inbound identify push stream because we are at capacity");
}
}
}
Expand All @@ -180,34 +196,31 @@ impl Handler {
>,
) {
match output {
future::Either::Left(remote_info) => {
self.handle_incoming_info(&remote_info);
future::Either::Left(stream) => {
if self
.active_streams
.try_push(protocol::recv_identify(stream).map_ok(Success::ReceivedIdentify))
.is_err()
{
warn!("Dropping outbound identify stream because we are at capacity");
}
}
future::Either::Right(stream) => {
let info = self.build_info();

self.events
.push(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
)));
if self
.active_streams
.try_push(
protocol::send_identify(stream, info).map_ok(|_| Success::SentIdentifyPush),
)
.is_err()
{
warn!("Dropping outbound identify push stream because we are at capacity");
}
}
future::Either::Right(()) => self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationPushed,
)),
}
}

fn on_dial_upgrade_error(
&mut self,
DialUpgradeError { error: err, .. }: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
let err = err.map_upgrade_err(|e| e.into_inner());
self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(err),
));
self.trigger_next_identify.reset(self.interval);
}

fn build_info(&mut self) -> Info {
Info {
public_key: self.public_key.clone(),
Expand Down Expand Up @@ -268,13 +281,20 @@ impl ConnectionHandler for Handler {
type FromBehaviour = InEvent;
type ToBehaviour = Event;
type Error = io::Error;
type InboundProtocol = SelectUpgrade<Identify, Push<InboundPush>>;
type OutboundProtocol = Either<Identify, Push<OutboundPush>>;
type InboundProtocol =
SelectUpgrade<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
type OutboundProtocol = Either<ReadyUpgrade<StreamProtocol>, ReadyUpgrade<StreamProtocol>>;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(SelectUpgrade::new(Identify, Push::inbound()), ())
SubstreamProtocol::new(
SelectUpgrade::new(
ReadyUpgrade::new(PROTOCOL_NAME),
ReadyUpgrade::new(PUSH_PROTOCOL_NAME),
),
(),
)
}

fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
Expand All @@ -283,21 +303,19 @@ impl ConnectionHandler for Handler {
self.external_addresses = addresses;
}
InEvent::Push => {
let info = self.build_info();
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Either::Right(Push::outbound(info)), ()),
protocol: SubstreamProtocol::new(
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
if self.inbound_identify_push.is_some() {
return KeepAlive::Yes;
}

if !self.pending_replies.is_empty() {
if !self.active_streams.is_empty() {
return KeepAlive::Yes;
}

Expand All @@ -317,20 +335,34 @@ impl ConnectionHandler for Handler {
// Poll the future that fires when we need to identify the node again.
if let Poll::Ready(()) = self.trigger_next_identify.poll_unpin(cx) {
self.trigger_next_identify.reset(self.interval);
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Either::Left(Identify), ()),
let event = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)),
(),
),
};
return Poll::Ready(ev);
return Poll::Ready(event);
}

if let Some(Poll::Ready(res)) = self
.inbound_identify_push
.as_mut()
.map(|f| f.poll_unpin(cx))
{
self.inbound_identify_push.take();
match self.active_streams.poll_unpin(cx) {
Poll::Ready(Ok(Ok(Success::ReceivedIdentify(remote_info)))) => {
self.handle_incoming_info(&remote_info);

if let Ok(remote_push_info) = res {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Event::Identified(
remote_info,
)));
}
Poll::Ready(Ok(Ok(Success::SentIdentifyPush))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationPushed,
));
}
Poll::Ready(Ok(Ok(Success::SentIdentify))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::Identification,
));
}
Poll::Ready(Ok(Ok(Success::ReceivedIdentifyPush(remote_push_info)))) => {
if let Some(mut info) = self.remote_info.clone() {
info.merge(remote_push_info);
self.handle_incoming_info(&info);
Expand All @@ -340,16 +372,17 @@ impl ConnectionHandler for Handler {
));
};
}
}

// Check for pending replies to send.
if let Poll::Ready(Some(result)) = self.pending_replies.poll_next_unpin(cx) {
let event = result
.map(|()| Event::Identification)
.unwrap_or_else(|err| Event::IdentificationError(StreamUpgradeError::Apply(err)));
self.exchanged_one_periodic_identify = true;

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
Poll::Ready(Ok(Err(e))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Apply(e)),
));
}
Poll::Ready(Err(Timeout { .. })) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(StreamUpgradeError::Timeout),
));
}
Poll::Pending => {}
}

Poll::Pending
Expand All @@ -371,8 +404,13 @@ impl ConnectionHandler for Handler {
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
self.events.push(ConnectionHandlerEvent::NotifyBehaviour(
Event::IdentificationError(
error.map_upgrade_err(|e| void::unreachable(e.into_inner())),
),
));
self.trigger_next_identify.reset(self.interval);
}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
Expand All @@ -392,11 +430,10 @@ impl ConnectionHandler for Handler {
self.remote_peer_id
);

let info = self.build_info();
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
Either::Right(Push::outbound(info)),
Either::Right(ReadyUpgrade::new(PUSH_PROTOCOL_NAME)),
(),
),
});
Expand All @@ -405,3 +442,10 @@ impl ConnectionHandler for Handler {
}
}
}

enum Success {
SentIdentify,
ReceivedIdentify(Info),
SentIdentifyPush,
ReceivedIdentifyPush(PushInfo),
}
Loading

0 comments on commit ecdd0ff

Please sign in to comment.