Skip to content

Commit

Permalink
feat: Implement idle connection timeout (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariusc93 authored Sep 10, 2023
1 parent dfadba4 commit 00e85e2
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# 0.4.2 [unreleased]
- feat: Implement idle connections timeout [PR 98]
- chore: Add UninitializedIpfs::set_listening_addrs and minor changes
- chore: Add peer to dht when discovered over mdns

[PR 98]: https://github.com/dariusc93/rust-ipfs/pull/98

# 0.4.1
- fix: Dont close connections on ping error [PR 95]

Expand Down
24 changes: 17 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ pub struct IpfsOptions {
/// Enables ipv6 for mdns
pub mdns_ipv6: bool,

/// Keep connection alive
pub keep_alive: bool,

/// Enables dcutr
pub dcutr: bool,

Expand Down Expand Up @@ -227,6 +224,9 @@ pub struct IpfsOptions {

pub keystore: Keystore,

/// Connection idle
pub connection_idle: Duration,

/// Repo Provider option
pub provider: RepoProvider,
/// The span for tracing purposes, `None` value is converted to `tracing::trace_span!("ipfs")`.
Expand Down Expand Up @@ -265,7 +265,6 @@ impl Default for IpfsOptions {
disable_kad: Default::default(),
disable_bitswap: Default::default(),
bitswap_config: Default::default(),
keep_alive: Default::default(),
relay_server: Default::default(),
relay_server_config: Default::default(),
kad_configuration: Default::default(),
Expand All @@ -275,6 +274,7 @@ impl Default for IpfsOptions {
addr_config: Default::default(),
provider: Default::default(),
keystore: Keystore::in_memory(),
connection_idle: Duration::from_secs(30),
listening_addrs: vec![],
port_mapping: false,
transport_configuration: None,
Expand Down Expand Up @@ -598,6 +598,16 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
self
}

/// Set timeout for idle connections
pub fn set_idle_connection_timeout(mut self, duration: u64) -> Self {
let duration = match duration > 0 {
true => duration,
false => 30
};
self.options.connection_idle = Duration::from_secs(duration);
self
}

/// Set swarm configuration
pub fn set_swarm_configuration(mut self, config: crate::p2p::SwarmConfig) -> Self {
self.options.swarm_configuration = Some(config);
Expand Down Expand Up @@ -668,9 +678,9 @@ impl<C: NetworkBehaviour<ToSwarm = void::Void> + Send> UninitializedIpfs<C> {
}

/// Enable keep alive
pub fn enable_keepalive(mut self) -> Self {
self.options.keep_alive = true;
self
#[deprecated(note = "use UninitializedIpfs::set_idle_connection(u64::MAX)")]
pub fn enable_keepalive(self) -> Self {
self.set_idle_connection_timeout(u64::MAX)
}

/// Disables kademlia
Expand Down
10 changes: 4 additions & 6 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::gossipsub::GossipsubStream;
use super::{addressbook, protocol};
use super::{addressbook, connection_idle, protocol};
use bytes::Bytes;
use libp2p_allow_block_list::BlockedPeers;

Expand Down Expand Up @@ -29,7 +29,6 @@ use libp2p::relay::client::Behaviour as RelayClient;
use libp2p::relay::client::{self, Transport as ClientTransport};
use libp2p::relay::Behaviour as Relay;
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::swarm::keep_alive::Behaviour as KeepAliveBehaviour;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{autonat, StreamProtocol};
use std::borrow::Cow;
Expand All @@ -49,7 +48,6 @@ where
pub kademlia: Toggle<Kademlia<MemoryStore>>,
pub ping: Ping,
pub identify: Identify,
pub keepalive: Toggle<KeepAliveBehaviour>,
pub pubsub: GossipsubStream,
pub autonat: autonat::Behaviour,
pub upnp: Toggle<libp2p_nat::Behaviour>,
Expand All @@ -58,6 +56,7 @@ where
pub relay_client: Toggle<RelayClient>,
pub dcutr: Toggle<Dcutr>,
pub addressbook: addressbook::Behaviour,
pub connection_idle: connection_idle::Behaviour,
pub peerbook: peerbook::Behaviour,
pub protocol: protocol::Behaviour,
pub custom: Toggle<C>,
Expand Down Expand Up @@ -388,8 +387,6 @@ where
.then_some(Bitswap::new(peer_id, repo, Default::default()).await)
.into();

let keepalive = options.keep_alive.then(KeepAliveBehaviour::default).into();

let ping = Ping::new(options.ping_config.unwrap_or_default());

let identify = Identify::new(
Expand Down Expand Up @@ -461,14 +458,14 @@ where

let block_list = libp2p_allow_block_list::Behaviour::default();
let protocol = protocol::Behaviour::default();
let connection_idle = connection_idle::Behaviour::new(options.connection_idle);
let custom = Toggle::from(custom);

Ok((
Behaviour {
mdns,
kademlia,
bitswap,
keepalive,
ping,
identify,
autonat,
Expand All @@ -482,6 +479,7 @@ where
addressbook,
protocol,
custom,
connection_idle
},
transport,
))
Expand Down
87 changes: 87 additions & 0 deletions src/p2p/connection_idle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
mod handler;

use std::{
task::{Context, Poll},
time::Duration,
};

use libp2p::{
core::Endpoint,
swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};

pub struct Behaviour {
idle: Duration,
}

impl Behaviour {
pub fn new(idle: Duration) -> Behaviour {
Behaviour { idle }
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = handler::Handler;
type ToSwarm = void::Void;

fn handle_pending_inbound_connection(
&mut self,
_: ConnectionId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<(), ConnectionDenied> {
Ok(())
}

fn handle_pending_outbound_connection(
&mut self,
_: ConnectionId,
_: Option<PeerId>,
_: &[Multiaddr],
_: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
Ok(vec![])
}

fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(handler::Handler::new(self.idle))
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(handler::Handler::new(self.idle))
}

fn on_connection_handler_event(
&mut self,
_: PeerId,
_: ConnectionId,
_: THandlerOutEvent<Self>,
) {
}

fn on_swarm_event(&mut self, _: FromSwarm<Self::ConnectionHandler>) {}

fn poll(
&mut self,
_: &mut Context,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
Poll::Pending
}
}
70 changes: 70 additions & 0 deletions src/p2p/connection_idle/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::{
task::{Context, Poll},
time::{Duration, Instant},
};

use libp2p::{
core::upgrade::DeniedUpgrade,
swarm::{handler::ConnectionEvent, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol},
};
use void::Void;

#[derive(Clone, Debug)]
pub struct Handler {
keep_alive: KeepAlive,
}

impl Handler {
pub fn new(idle: Duration) -> Self {
Self {
keep_alive: KeepAlive::Until(Instant::now() + idle),
}
}
}

impl libp2p::swarm::ConnectionHandler for Handler {
type FromBehaviour = Void;
type ToBehaviour = Void;
type Error = Void;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade;
type InboundOpenInfo = ();
type OutboundOpenInfo = Void;

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(DeniedUpgrade, ())
}

fn on_behaviour_event(&mut self, v: Self::FromBehaviour) {
void::unreachable(v)
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}

fn poll(
&mut self,
_: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
> {
Poll::Pending
}

fn on_connection_event(
&mut self,
_: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
}
}
11 changes: 7 additions & 4 deletions src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! P2P handling for IPFS nodes.
use std::convert::TryInto;
use std::num::{NonZeroU8, NonZeroUsize};
use std::time::Duration;

use crate::error::Error;
use crate::repo::Repo;
Expand All @@ -19,6 +20,7 @@ use tracing::Span;

pub(crate) mod addr;
pub(crate) mod addressbook;
pub(crate) mod connection_idle;
pub(crate) mod peerbook;
pub mod protocol;

Expand Down Expand Up @@ -139,12 +141,12 @@ pub struct SwarmOptions {
pub addrbook_config: Option<AddressBookConfig>,
/// UPnP/PortMapping
pub portmapping: bool,
/// Keep alive
pub keep_alive: bool,
/// Relay client
pub relay: bool,
/// Enables dcutr
pub dcutr: bool,
/// Connection idle
pub connection_idle: Duration,
}

impl From<&IpfsOptions> for SwarmOptions {
Expand All @@ -163,12 +165,13 @@ impl From<&IpfsOptions> for SwarmOptions {
let disable_bitswap = options.disable_bitswap;
let bitswap_config = options.bitswap_config.clone();

let keep_alive = options.keep_alive;
let identify_config = options.identify_configuration.clone();
let portmapping = options.port_mapping;
let pubsub_config = options.pubsub_config.clone();
let addrbook_config = options.addr_config;

let connection_idle = options.connection_idle;

SwarmOptions {
bootstrap,
mdns,
Expand All @@ -183,11 +186,11 @@ impl From<&IpfsOptions> for SwarmOptions {
kad_config,
kad_store_config,
ping_config,
keep_alive,
identify_config,
portmapping,
addrbook_config,
pubsub_config,
connection_idle,
}
}
}
Expand Down

0 comments on commit 00e85e2

Please sign in to comment.