diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 42c666f33284..74604cf8f957 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -14,6 +14,7 @@ categories = ["network-programming", "asynchronous"] async-io = "1.3.1" data-encoding = "2.3.2" dns-parser = "0.8.0" +fnv = "1.0.7" futures = "0.3.13" if-watch = "0.2.0" lazy_static = "1.4.0" @@ -27,5 +28,6 @@ void = "1.0.2" [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } +env_logger = "0.9.0" libp2p = { path = "../.." } tokio = { version = "1.2.0", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index ee32b8502760..5b915e7b86db 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -18,79 +18,33 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::dns::{build_query, build_query_response, build_service_discovery_response}; -use crate::query::MdnsPacket; -use crate::IPV4_MDNS_MULTICAST_ADDRESS; -use async_io::{Async, Timer}; +use crate::instance::Instance; +use crate::MdnsConfig; +use async_io::Timer; +use fnv::FnvHashMap; use futures::prelude::*; use if_watch::{IfEvent, IfWatcher}; use libp2p_core::connection::ListenerId; -use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId}; +use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::{ protocols_handler::DummyProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler, }; use smallvec::SmallVec; -use socket2::{Domain, Socket, Type}; -use std::{ - cmp, - collections::VecDeque, - fmt, io, iter, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, - pin::Pin, - task::Context, - task::Poll, - time::{Duration, Instant}, -}; - -/// Configuration for mDNS. -#[derive(Clone, Debug)] -pub struct MdnsConfig { - /// TTL to use for mdns records. - pub ttl: Duration, - /// Interval at which to poll the network for new peers. This isn't - /// necessary during normal operation but avoids the case that an - /// initial packet was lost and not discovering any peers until a new - /// peer joins the network. Receiving an mdns packet resets the timer - /// preventing unnecessary traffic. - pub query_interval: Duration, - /// IP address for multicast. - pub multicast_addr: IpAddr, -} - -impl Default for MdnsConfig { - fn default() -> Self { - Self { - ttl: Duration::from_secs(6 * 60), - query_interval: Duration::from_secs(5 * 60), - multicast_addr: *IPV4_MDNS_MULTICAST_ADDRESS, - } - } -} +use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant}; /// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds /// them to the topology. #[derive(Debug)] pub struct Mdns { - /// Main socket for listening. - recv_socket: Async, - - /// Query socket for making queries. - send_socket: Async, + /// Instance config. + config: MdnsConfig, /// Iface watcher. if_watch: IfWatcher, - /// Buffer used for receiving data from the main socket. - /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000 - /// bytes, if it can be ensured that all participating devices can handle such large packets. - /// For computers with several interfaces and IP addresses responses can easily reach sizes in - /// the range of 3000 bytes, so 4096 seems sensible for now. For more information see - /// [rfc6762](https://tools.ietf.org/html/rfc6762#page-46). - recv_buffer: [u8; 4096], - - /// Buffers pending to send on the main socket. - send_buffer: VecDeque>, + /// Mdns instances. + instances: FnvHashMap, /// List of nodes that we have discovered, the address, and when their TTL expires. /// @@ -102,77 +56,18 @@ pub struct Mdns { /// /// `None` if `discovered_nodes` is empty. closest_expiration: Option, - - /// Queued events. - events: VecDeque, - - /// Discovery interval. - query_interval: Duration, - - /// Record ttl. - ttl: Duration, - - /// Discovery timer. - timeout: Timer, - - // Multicast address. - multicast_addr: IpAddr, } impl Mdns { /// Builds a new `Mdns` behaviour. pub async fn new(config: MdnsConfig) -> io::Result { - let recv_socket = match config.multicast_addr { - IpAddr::V4(_) => { - let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(socket2::Protocol::UDP))?; - socket.set_reuse_address(true)?; - #[cfg(unix)] - socket.set_reuse_port(true)?; - socket.bind(&SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 5353).into())?; - socket.set_multicast_loop_v4(true)?; - socket.set_multicast_ttl_v4(255)?; - Async::new(UdpSocket::from(socket))? - } - IpAddr::V6(_) => { - let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?; - socket.set_reuse_address(true)?; - #[cfg(unix)] - socket.set_reuse_port(true)?; - socket.bind(&SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 5353).into())?; - socket.set_multicast_loop_v6(true)?; - Async::new(UdpSocket::from(socket))? - } - }; - let send_socket = { - let addr = match config.multicast_addr { - IpAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), - IpAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), - }; - - let socket = std::net::UdpSocket::bind(addr)?; - Async::new(socket)? - }; let if_watch = if_watch::IfWatcher::new().await?; - // randomize timer to prevent all converging and firing at the same time. - let query_interval = { - use rand::Rng; - let mut rng = rand::thread_rng(); - let jitter = rng.gen_range(0..100); - config.query_interval + Duration::from_millis(jitter) - }; Ok(Self { - recv_socket, - send_socket, + config, if_watch, - recv_buffer: [0; 4096], - send_buffer: Default::default(), - discovered_nodes: SmallVec::new(), - closest_expiration: None, - events: Default::default(), - query_interval, - ttl: config.ttl, - timeout: Timer::interval(query_interval), - multicast_addr: config.multicast_addr, + instances: Default::default(), + discovered_nodes: Default::default(), + closest_expiration: Default::default(), }) } @@ -185,87 +80,6 @@ impl Mdns { pub fn discovered_nodes(&self) -> impl ExactSizeIterator { self.discovered_nodes.iter().map(|(p, _, _)| p) } - - fn reset_timer(&mut self) { - self.timeout.set_interval(self.query_interval); - } - - fn fire_timer(&mut self) { - self.timeout - .set_interval_at(Instant::now(), self.query_interval); - } - - fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) { - match packet { - MdnsPacket::Query(query) => { - self.reset_timer(); - log::trace!("sending response"); - for packet in build_query_response( - query.query_id(), - *params.local_peer_id(), - params.listened_addresses(), - self.ttl, - ) { - self.send_buffer.push_back(packet); - } - } - MdnsPacket::Response(response) => { - // We replace the IP address with the address we observe the - // remote as and the address they listen on. - let obs_ip = Protocol::from(response.remote_addr().ip()); - let obs_port = Protocol::Udp(response.remote_addr().port()); - let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect(); - - let mut discovered: SmallVec<[_; 4]> = SmallVec::new(); - for peer in response.discovered_peers() { - if peer.id() == params.local_peer_id() { - continue; - } - - let new_expiration = Instant::now() + peer.ttl(); - - let mut addrs: Vec = Vec::new(); - for addr in peer.addresses() { - if let Some(new_addr) = address_translation(addr, &observed) { - addrs.push(new_addr.clone()) - } - addrs.push(addr.clone()) - } - - for addr in addrs { - if let Some((_, _, cur_expires)) = self - .discovered_nodes - .iter_mut() - .find(|(p, a, _)| p == peer.id() && *a == addr) - { - *cur_expires = cmp::max(*cur_expires, new_expiration); - } else { - self.discovered_nodes - .push((*peer.id(), addr.clone(), new_expiration)); - discovered.push((*peer.id(), addr)); - } - } - } - - self.closest_expiration = self - .discovered_nodes - .iter() - .fold(None, |exp, &(_, _, elem_exp)| { - Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp)) - }) - .map(Timer::at); - - self.events - .push_back(MdnsEvent::Discovered(DiscoveredAddrsIter { - inner: discovered.into_iter(), - })); - } - MdnsPacket::ServiceDiscovery(disc) => { - let resp = build_service_discovery_response(disc.query_id(), self.ttl); - self.send_buffer.push_back(resp); - } - } - } } impl NetworkBehaviour for Mdns { @@ -277,10 +91,9 @@ impl NetworkBehaviour for Mdns { } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - let now = Instant::now(); self.discovered_nodes .iter() - .filter(move |(p, _, expires)| p == peer_id && *expires > now) + .filter(|(peer, _, _)| peer == peer_id) .map(|(_, addr, _)| addr.clone()) .collect() } @@ -295,7 +108,10 @@ impl NetworkBehaviour for Mdns { } fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) { - self.fire_timer(); + log::trace!("waking instances because listening address changed"); + for (_, instance) in &mut self.instances { + instance.fire_timer(); + } } fn poll( @@ -303,123 +119,79 @@ impl NetworkBehaviour for Mdns { cx: &mut Context<'_>, params: &mut impl PollParameters, ) -> Poll> { + // Poll ifwatch. while let Poll::Ready(event) = Pin::new(&mut self.if_watch).poll(cx) { - let socket = self.recv_socket.get_ref(); match event { Ok(IfEvent::Up(inet)) => { - if inet.addr().is_loopback() { + let addr = inet.addr(); + if addr.is_loopback() { continue; } - match self.multicast_addr { - IpAddr::V4(multicast) => { - if let IpAddr::V4(addr) = inet.addr() { - log::trace!("joining multicast on iface {}", addr); - if let Err(err) = socket.join_multicast_v4(&multicast, &addr) { - log::error!("join multicast failed: {}", err); - } else { - self.fire_timer(); - } - } - } - IpAddr::V6(multicast) => { - if let IpAddr::V6(addr) = inet.addr() { - log::trace!("joining multicast on iface {}", addr); - if let Err(err) = socket.join_multicast_v6(&multicast, 0) { - log::error!("join multicast failed: {}", err); - } else { - self.fire_timer(); - } - } - } - } - } - Ok(IfEvent::Down(inet)) => { - if inet.addr().is_loopback() { + if inet.addr().is_ipv6() { continue; } - match self.multicast_addr { - IpAddr::V4(multicast) => { - if let IpAddr::V4(addr) = inet.addr() { - log::trace!("leaving multicast on iface {}", addr); - if let Err(err) = socket.leave_multicast_v4(&multicast, &addr) { - log::error!("leave multicast failed: {}", err); - } - } - } - IpAddr::V6(multicast) => { - if let IpAddr::V6(addr) = inet.addr() { - log::trace!("leaving multicast on iface {}", addr); - if let Err(err) = socket.leave_multicast_v6(&multicast, 0) { - log::error!("leave multicast failed: {}", err); - } + if !self.instances.contains_key(&addr) { + match Instance::new(addr, self.config.clone()) { + Ok(instance) => { + self.instances.insert(addr, instance); } + Err(err) => log::error!("failed to create instance: {}", err), } } } - Err(err) => log::error!("if watch returned an error: {}", err), - } - } - // Poll receive socket. - while self.recv_socket.poll_readable(cx).is_ready() { - match self - .recv_socket - .recv_from(&mut self.recv_buffer) - .now_or_never() - { - Some(Ok((len, from))) => { - if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) - { - self.inject_mdns_packet(packet, params); + Ok(IfEvent::Down(inet)) => { + if self.instances.contains_key(&inet.addr()) { + log::info!("dropping instance {}", inet.addr()); + self.instances.remove(&inet.addr()); } } - Some(Err(err)) => log::error!("Failed reading datagram: {}", err), - _ => {} + Err(err) => log::error!("if watch returned an error: {}", err), } } - // Send responses. - while self.send_socket.poll_writable(cx).is_ready() { - if let Some(packet) = self.send_buffer.pop_front() { - match self - .send_socket - .send_to(&packet, SocketAddr::new(self.multicast_addr, 5353)) - .now_or_never() + // Emit discovered event. + let mut discovered = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); + for (_, instance) in &mut self.instances { + while let Some((peer, addr, expiration)) = instance.poll(cx, params) { + if let Some((_, _, cur_expires)) = self + .discovered_nodes + .iter_mut() + .find(|(p, a, _)| *p == peer && *a == addr) { - Some(Ok(_)) => {} - Some(Err(err)) => log::error!("{}", err), - None => self.send_buffer.push_front(packet), + *cur_expires = cmp::max(*cur_expires, expiration); + } else { + self.discovered_nodes.push((peer, addr.clone(), expiration)); + discovered.push((peer, addr)); } - } else if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { - log::trace!("sending query"); - self.send_buffer.push_back(build_query()); - } else { - break; } } - // Emit discovered event. - if let Some(event) = self.events.pop_front() { + if !discovered.is_empty() { + let event = MdnsEvent::Discovered(DiscoveredAddrsIter { + inner: discovered.into_iter(), + }); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } // Emit expired event. - if let Some(ref mut closest_expiration) = self.closest_expiration { - if let Poll::Ready(now) = Pin::new(closest_expiration).poll(cx) { - let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); - while let Some(pos) = self - .discovered_nodes - .iter() - .position(|(_, _, exp)| *exp <= now) - { - let (peer_id, addr, _) = self.discovered_nodes.remove(pos); - expired.push((peer_id, addr)); - } - - if !expired.is_empty() { - let event = MdnsEvent::Expired(ExpiredAddrsIter { - inner: expired.into_iter(), - }); - - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } + let now = Instant::now(); + let mut closest_expiration = None; + let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); + self.discovered_nodes.retain(|(peer, addr, expiration)| { + if *expiration <= now { + expired.push((*peer, addr.clone())); + return false; } + closest_expiration = Some(closest_expiration.unwrap_or(*expiration).min(*expiration)); + true + }); + if !expired.is_empty() { + let event = MdnsEvent::Expired(ExpiredAddrsIter { + inner: expired.into_iter(), + }); + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + if let Some(closest_expiration) = closest_expiration { + let mut timer = Timer::at(closest_expiration); + Pin::new(&mut timer).poll(cx).is_pending(); + self.closest_expiration = Some(timer); } Poll::Pending } diff --git a/protocols/mdns/src/instance.rs b/protocols/mdns/src/instance.rs new file mode 100644 index 000000000000..4bf96bd2551e --- /dev/null +++ b/protocols/mdns/src/instance.rs @@ -0,0 +1,228 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::dns::{build_query, build_query_response, build_service_discovery_response}; +use crate::query::MdnsPacket; +use crate::MdnsConfig; +use async_io::{Async, Timer}; +use futures::prelude::*; +use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId}; +use libp2p_swarm::PollParameters; +use socket2::{Domain, Socket, Type}; +use std::{ + collections::VecDeque, + io, iter, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, + pin::Pin, + task::Context, + time::{Duration, Instant}, +}; + +/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds +/// them to the topology. +#[derive(Debug)] +pub struct Instance { + /// Address this instance is bound to. + addr: IpAddr, + /// Receive socket. + recv_socket: Async, + /// Send socket. + send_socket: Async, + /// Buffer used for receiving data from the main socket. + /// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000 + /// bytes, if it can be ensured that all participating devices can handle such large packets. + /// For computers with several interfaces and IP addresses responses can easily reach sizes in + /// the range of 3000 bytes, so 4096 seems sensible for now. For more information see + /// [rfc6762](https://tools.ietf.org/html/rfc6762#page-46). + recv_buffer: [u8; 4096], + /// Buffers pending to send on the main socket. + send_buffer: VecDeque>, + /// Discovery interval. + query_interval: Duration, + /// Discovery timer. + timeout: Timer, + /// Multicast address. + multicast_addr: IpAddr, + /// Discovered addresses. + discovered: VecDeque<(PeerId, Multiaddr, Instant)>, + /// TTL + ttl: Duration, +} + +impl Instance { + /// Builds a new `Instance`. + pub fn new(addr: IpAddr, config: MdnsConfig) -> io::Result { + log::info!("creating instance on iface {}", addr); + let recv_socket = match addr { + IpAddr::V4(addr) => { + let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(socket2::Protocol::UDP))?; + socket.set_reuse_address(true)?; + #[cfg(unix)] + socket.set_reuse_port(true)?; + socket.bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 5353).into())?; + socket.set_multicast_loop_v4(true)?; + socket.set_multicast_ttl_v4(255)?; + socket.join_multicast_v4(&*crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?; + Async::new(UdpSocket::from(socket))? + } + IpAddr::V6(_) => { + let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(socket2::Protocol::UDP))?; + socket.set_reuse_address(true)?; + #[cfg(unix)] + socket.set_reuse_port(true)?; + socket.bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 5353).into())?; + socket.set_multicast_loop_v6(true)?; + // TODO: find interface matching addr. + socket.join_multicast_v6(&*crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?; + Async::new(UdpSocket::from(socket))? + } + }; + let send_socket = Async::new(UdpSocket::bind(SocketAddr::new(addr, 0))?)?; + // randomize timer to prevent all converging and firing at the same time. + let query_interval = { + use rand::Rng; + let mut rng = rand::thread_rng(); + let jitter = rng.gen_range(0..100); + config.query_interval + Duration::from_millis(jitter) + }; + let multicast_addr = match addr { + IpAddr::V4(_) => IpAddr::V4(*crate::IPV4_MDNS_MULTICAST_ADDRESS), + IpAddr::V6(_) => IpAddr::V6(*crate::IPV6_MDNS_MULTICAST_ADDRESS), + }; + Ok(Self { + addr, + recv_socket, + send_socket, + recv_buffer: [0; 4096], + send_buffer: Default::default(), + discovered: Default::default(), + query_interval, + timeout: Timer::interval_at(Instant::now(), query_interval), + multicast_addr, + ttl: config.ttl, + }) + } + + pub fn reset_timer(&mut self) { + self.timeout.set_interval(self.query_interval); + } + + pub fn fire_timer(&mut self) { + self.timeout + .set_interval_at(Instant::now(), self.query_interval); + } + + fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) { + log::trace!("received packet on iface {} {:?}", self.addr, packet); + match packet { + MdnsPacket::Query(query) => { + self.reset_timer(); + log::trace!("sending response on iface {}", self.addr); + for packet in build_query_response( + query.query_id(), + *params.local_peer_id(), + params.listened_addresses(), + self.ttl, + ) { + self.send_buffer.push_back(packet); + } + } + MdnsPacket::Response(response) => { + // We replace the IP address with the address we observe the + // remote as and the address they listen on. + let obs_ip = Protocol::from(response.remote_addr().ip()); + let obs_port = Protocol::Udp(response.remote_addr().port()); + let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect(); + + for peer in response.discovered_peers() { + if peer.id() == params.local_peer_id() { + continue; + } + + let new_expiration = Instant::now() + peer.ttl(); + + let mut addrs: Vec = Vec::new(); + for addr in peer.addresses() { + if let Some(new_addr) = address_translation(addr, &observed) { + addrs.push(new_addr.clone()) + } + addrs.push(addr.clone()) + } + + for addr in addrs { + self.discovered + .push_back((*peer.id(), addr, new_expiration)); + } + } + } + MdnsPacket::ServiceDiscovery(disc) => { + let resp = build_service_discovery_response(disc.query_id(), self.ttl); + self.send_buffer.push_back(resp); + } + } + } + + pub fn poll( + &mut self, + cx: &mut Context, + params: &impl PollParameters, + ) -> Option<(PeerId, Multiaddr, Instant)> { + // Poll receive socket. + while self.recv_socket.poll_readable(cx).is_ready() { + match self + .recv_socket + .recv_from(&mut self.recv_buffer) + .now_or_never() + { + Some(Ok((len, from))) => { + if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) + { + self.inject_mdns_packet(packet, params); + } + } + Some(Err(err)) => log::error!("Failed reading datagram: {}", err), + None => unreachable!(), + } + } + // Send responses. + while self.send_socket.poll_writable(cx).is_ready() { + if let Some(packet) = self.send_buffer.pop_front() { + match self + .send_socket + .send_to(&packet, SocketAddr::new(self.multicast_addr, 5353)) + .now_or_never() + { + Some(Ok(_)) => log::trace!("sent packet on iface {}", self.addr), + Some(Err(err)) => { + log::error!("error sending packet on iface {}: {}", self.addr, err) + } + None => self.send_buffer.push_front(packet), + } + } else if Pin::new(&mut self.timeout).poll_next(cx).is_ready() { + log::trace!("sending query on iface {}", self.addr); + self.send_buffer.push_back(build_query()); + } else { + break; + } + } + // Emit discovered event. + self.discovered.pop_front() + } +} diff --git a/protocols/mdns/src/lib.rs b/protocols/mdns/src/lib.rs index d8e51bdd0a7b..1aceea74702e 100644 --- a/protocols/mdns/src/lib.rs +++ b/protocols/mdns/src/lib.rs @@ -30,7 +30,15 @@ //! struct will automatically discover other libp2p nodes on the local network. //! use lazy_static::lazy_static; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{Ipv4Addr, Ipv6Addr}; +use std::time::Duration; + +mod behaviour; +mod dns; +mod instance; +mod query; + +pub use crate::behaviour::{Mdns, MdnsEvent}; /// The DNS service name for all libp2p peers used to query for addresses. const SERVICE_NAME: &[u8] = b"_p2p._udp.local"; @@ -38,13 +46,29 @@ const SERVICE_NAME: &[u8] = b"_p2p._udp.local"; const META_QUERY_SERVICE: &[u8] = b"_services._dns-sd._udp.local"; lazy_static! { - pub static ref IPV4_MDNS_MULTICAST_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::new(224, 0, 0, 251)); - pub static ref IPV6_MDNS_MULTICAST_ADDRESS: IpAddr = - IpAddr::V6(Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0xFB)); + pub static ref IPV4_MDNS_MULTICAST_ADDRESS: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 251); + pub static ref IPV6_MDNS_MULTICAST_ADDRESS: Ipv6Addr = + Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0xFB); } -pub use crate::behaviour::{Mdns, MdnsConfig, MdnsEvent}; +/// Configuration for mDNS. +#[derive(Clone, Debug)] +pub struct MdnsConfig { + /// TTL to use for mdns records. + pub ttl: Duration, + /// Interval at which to poll the network for new peers. This isn't + /// necessary during normal operation but avoids the case that an + /// initial packet was lost and not discovering any peers until a new + /// peer joins the network. Receiving an mdns packet resets the timer + /// preventing unnecessary traffic. + pub query_interval: Duration, +} -mod behaviour; -mod dns; -mod query; +impl Default for MdnsConfig { + fn default() -> Self { + Self { + ttl: Duration::from_secs(6 * 60), + query_interval: Duration::from_secs(5 * 60), + } + } +} diff --git a/protocols/mdns/tests/smoke.rs b/protocols/mdns/tests/smoke.rs index 707ad9f39ab3..1d8b8b7a7592 100644 --- a/protocols/mdns/tests/smoke.rs +++ b/protocols/mdns/tests/smoke.rs @@ -21,7 +21,7 @@ use futures::StreamExt; use libp2p::{ identity, - mdns::{Mdns, MdnsConfig, MdnsEvent, IPV6_MDNS_MULTICAST_ADDRESS}, + mdns::{Mdns, MdnsConfig, MdnsEvent}, swarm::{Swarm, SwarmEvent}, PeerId, }; @@ -39,6 +39,7 @@ async fn create_swarm(config: MdnsConfig) -> Result, Box> } async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box> { + env_logger::try_init().ok(); let mut a = create_swarm(config.clone()).await?; let mut b = create_swarm(config).await?; let mut discovered_a = false; @@ -78,29 +79,15 @@ async fn run_discovery_test(config: MdnsConfig) -> Result<(), Box> { } #[async_std::test] -async fn test_discovery_async_std_ipv4() -> Result<(), Box> { +async fn test_discovery_async_std() -> Result<(), Box> { run_discovery_test(MdnsConfig::default()).await } -#[async_std::test] -async fn test_discovery_async_std_ipv6() -> Result<(), Box> { - let mut config = MdnsConfig::default(); - config.multicast_addr = *IPV6_MDNS_MULTICAST_ADDRESS; - run_discovery_test(config).await -} - #[tokio::test] -async fn test_discovery_tokio_ipv4() -> Result<(), Box> { +async fn test_discovery_tokio() -> Result<(), Box> { run_discovery_test(MdnsConfig::default()).await } -#[tokio::test] -async fn test_discovery_tokio_ipv6() -> Result<(), Box> { - let mut config = MdnsConfig::default(); - config.multicast_addr = *IPV6_MDNS_MULTICAST_ADDRESS; - run_discovery_test(config).await -} - async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box> { let mut a = create_swarm(config.clone()).await?; let mut b = create_swarm(config).await?; @@ -133,11 +120,11 @@ async fn run_peer_expiration_test(config: MdnsConfig) -> Result<(), Box Result<(), Box> { +async fn test_expired_async_std() -> Result<(), Box> { + env_logger::try_init().ok(); let config = MdnsConfig { - ttl: Duration::from_millis(500), - query_interval: Duration::from_secs(1), - ..Default::default() + ttl: Duration::from_secs(1), + query_interval: Duration::from_secs(10), }; async_std::future::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) @@ -146,39 +133,12 @@ async fn test_expired_async_std_ipv4() -> Result<(), Box> { .map_err(|e| Box::new(e) as Box) } -#[async_std::test] -async fn test_expired_async_std_ipv6() -> Result<(), Box> { - let config = MdnsConfig { - ttl: Duration::from_millis(500), - query_interval: Duration::from_secs(1), - multicast_addr: *IPV6_MDNS_MULTICAST_ADDRESS, - }; - - async_std::future::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) - .await - .map(|_| ()) - .map_err(|e| Box::new(e) as Box) -} - -#[tokio::test] -async fn test_expired_tokio_ipv4() -> Result<(), Box> { - let config = MdnsConfig { - ttl: Duration::from_millis(500), - query_interval: Duration::from_secs(1), - ..Default::default() - }; - - tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config)) - .await - .unwrap() -} - #[tokio::test] -async fn test_expired_tokio_ipv6() -> Result<(), Box> { +async fn test_expired_tokio() -> Result<(), Box> { + env_logger::try_init().ok(); let config = MdnsConfig { - ttl: Duration::from_millis(500), - query_interval: Duration::from_secs(1), - multicast_addr: *IPV6_MDNS_MULTICAST_ADDRESS, + ttl: Duration::from_secs(1), + query_interval: Duration::from_secs(10), }; tokio::time::timeout(Duration::from_secs(6), run_peer_expiration_test(config))