diff --git a/Cargo.lock b/Cargo.lock index ac6bc52d79f..cef33101b5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2767,7 +2767,7 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.44.1" +version = "0.44.2" dependencies = [ "arrayvec", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 321e81a24a1..db54152709f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,7 @@ libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.45.0", path = "protocols/gossipsub" } libp2p-identify = { version = "0.43.0", path = "protocols/identify" } libp2p-identity = { version = "0.2.1" } -libp2p-kad = { version = "0.44.1", path = "protocols/kad" } +libp2p-kad = { version = "0.44.2", path = "protocols/kad" } libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" } libp2p-metrics = { version = "0.13.0", path = "misc/metrics" } libp2p-mplex = { version = "0.40.0", path = "muxers/mplex" } diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 899f2519a98..35cc73e14b0 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.44.2 - unreleased + +- Allow to explicitly set `Mode::{Client,Server}`. + See [PR 4132] + +[PR 4132]: https://github.com/libp2p/rust-libp2p/pull/4132 + ## 0.44.1 - Expose `KBucketDistance`. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 72a90fbdb6c..5b71b384283 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = "1.65.0" description = "Kademlia protocol for libp2p" -version = "0.44.1" +version = "0.44.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 7df17c91e1a..a594946ecb6 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -52,7 +52,7 @@ use smallvec::SmallVec; use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::fmt; use std::num::NonZeroUsize; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, Waker}; use std::time::Duration; use std::vec; use thiserror::Error; @@ -114,6 +114,8 @@ pub struct Kademlia { local_peer_id: PeerId, mode: Mode, + auto_mode: bool, + no_events_waker: Option, /// The record storage. store: TStore, @@ -456,6 +458,8 @@ where local_peer_id: id, connections: Default::default(), mode: Mode::Client, + auto_mode: true, + no_events_waker: None, } } @@ -990,6 +994,94 @@ where id } + /// Set the [`Mode`] in which we should operate. + /// + /// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`]. + /// + /// Setting a mode via this function disables this automatic behaviour and unconditionally operates in the specified mode. + /// To reactivate the automatic configuration, pass [`None`] instead. + pub fn set_mode(&mut self, mode: Option) { + match mode { + Some(mode) => { + self.mode = mode; + self.auto_mode = false; + self.reconfigure_mode(); + } + None => { + self.auto_mode = true; + self.determine_mode_from_external_addresses(); + } + } + + if let Some(waker) = self.no_events_waker.take() { + waker.wake(); + } + } + + fn reconfigure_mode(&mut self) { + if self.connections.is_empty() { + return; + } + + let num_connections = self.connections.len(); + + log::debug!( + "Re-configuring {} established connection{}", + num_connections, + if num_connections > 1 { "s" } else { "" } + ); + + self.queued_events + .extend( + self.connections + .iter() + .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::One(*conn_id), + event: KademliaHandlerIn::ReconfigureMode { + new_mode: self.mode, + }, + }), + ); + } + + fn determine_mode_from_external_addresses(&mut self) { + self.mode = match (self.external_addresses.as_slice(), self.mode) { + ([], Mode::Server) => { + log::debug!("Switching to client-mode because we no longer have any confirmed external addresses"); + + Mode::Client + } + ([], Mode::Client) => { + // Previously client-mode, now also client-mode because no external addresses. + + Mode::Client + } + (confirmed_external_addresses, Mode::Client) => { + if log::log_enabled!(log::Level::Debug) { + let confirmed_external_addresses = + to_comma_separated_list(confirmed_external_addresses); + + log::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable"); + } + + Mode::Server + } + (confirmed_external_addresses, Mode::Server) => { + debug_assert!( + !confirmed_external_addresses.is_empty(), + "Previous match arm handled empty list" + ); + + // Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam. + + Mode::Server + } + }; + + self.reconfigure_mode(); + } + /// Processes discovered peers from a successful request in an iterative `Query`. fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I) where @@ -2428,6 +2520,8 @@ where // If no new events have been queued either, signal `NotReady` to // be polled again later. if self.queued_events.is_empty() { + self.no_events_waker = Some(cx.waker().clone()); + return Poll::Pending; } } @@ -2437,60 +2531,8 @@ where self.listen_addresses.on_swarm_event(&event); let external_addresses_changed = self.external_addresses.on_swarm_event(&event); - self.mode = match (self.external_addresses.as_slice(), self.mode) { - ([], Mode::Server) => { - log::debug!("Switching to client-mode because we no longer have any confirmed external addresses"); - - Mode::Client - } - ([], Mode::Client) => { - // Previously client-mode, now also client-mode because no external addresses. - - Mode::Client - } - (confirmed_external_addresses, Mode::Client) => { - if log::log_enabled!(log::Level::Debug) { - let confirmed_external_addresses = - to_comma_separated_list(confirmed_external_addresses); - - log::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable"); - } - - Mode::Server - } - (confirmed_external_addresses, Mode::Server) => { - debug_assert!( - !confirmed_external_addresses.is_empty(), - "Previous match arm handled empty list" - ); - - // Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam. - - Mode::Server - } - }; - - if external_addresses_changed && !self.connections.is_empty() { - let num_connections = self.connections.len(); - - log::debug!( - "External addresses changed, re-configuring {} established connection{}", - num_connections, - if num_connections > 1 { "s" } else { "" } - ); - - self.queued_events - .extend( - self.connections - .iter() - .map(|(conn_id, peer_id)| ToSwarm::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::One(*conn_id), - event: KademliaHandlerIn::ReconfigureMode { - new_mode: self.mode, - }, - }), - ); + if self.auto_mode && external_addresses_changed { + self.determine_mode_from_external_addresses(); } match event { diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index fbf8a0b0e12..ccdb06b885d 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -68,7 +68,7 @@ pub use behaviour::{ AddProviderContext, AddProviderError, AddProviderOk, AddProviderPhase, AddProviderResult, BootstrapError, BootstrapOk, BootstrapResult, GetClosestPeersError, GetClosestPeersOk, GetClosestPeersResult, GetProvidersError, GetProvidersOk, GetProvidersResult, GetRecordError, - GetRecordOk, GetRecordResult, InboundRequest, NoKnownPeers, PeerRecord, PutRecordContext, + GetRecordOk, GetRecordResult, InboundRequest, Mode, NoKnownPeers, PeerRecord, PutRecordContext, PutRecordError, PutRecordOk, PutRecordPhase, PutRecordResult, QueryInfo, QueryMut, QueryRef, QueryResult, QueryStats, RoutingUpdate, }; diff --git a/protocols/kad/tests/client_mode.rs b/protocols/kad/tests/client_mode.rs index afd35ab0686..b2530569518 100644 --- a/protocols/kad/tests/client_mode.rs +++ b/protocols/kad/tests/client_mode.rs @@ -1,7 +1,7 @@ use libp2p_identify as identify; use libp2p_identity as identity; use libp2p_kad::store::MemoryStore; -use libp2p_kad::{Kademlia, KademliaConfig, KademliaEvent}; +use libp2p_kad::{Kademlia, KademliaConfig, KademliaEvent, Mode}; use libp2p_swarm::Swarm; use libp2p_swarm_test::SwarmExt; @@ -111,6 +111,50 @@ async fn adding_an_external_addresses_activates_server_mode_on_existing_connecti } } +#[async_std::test] +async fn set_client_to_server_mode() { + let _ = env_logger::try_init(); + + let mut client = Swarm::new_ephemeral(MyBehaviour::new); + client.behaviour_mut().kad.set_mode(Some(Mode::Client)); + + let mut server = Swarm::new_ephemeral(MyBehaviour::new); + + server.listen().await; + client.connect(&mut server).await; + + let server_peer_id = *server.local_peer_id(); + + match libp2p_swarm_test::drive(&mut client, &mut server).await { + ( + [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(KademliaEvent::RoutingUpdated { peer, .. })], + [MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(identify::Event::Received { info, .. })], + ) => { + assert_eq!(peer, server_peer_id); + assert!(info + .protocols + .iter() + .all(|proto| libp2p_kad::PROTOCOL_NAME.ne(proto))) + } + other => panic!("Unexpected events: {other:?}"), + } + + client.behaviour_mut().kad.set_mode(Some(Mode::Server)); + + match libp2p_swarm_test::drive(&mut client, &mut server).await { + ( + [MyBehaviourEvent::Identify(_)], + [MyBehaviourEvent::Identify(identify::Event::Received { info, .. }), MyBehaviourEvent::Kad(_)], + ) => { + assert!(info + .protocols + .iter() + .any(|proto| libp2p_kad::PROTOCOL_NAME.eq(proto))) + } + other => panic!("Unexpected events: {other:?}"), + } +} + #[derive(libp2p_swarm::NetworkBehaviour)] #[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct MyBehaviour {