diff --git a/Cargo.lock b/Cargo.lock index 66efb5211f2..4f5d739fca6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2520,16 +2520,19 @@ dependencies = [ "futures", "futures-bounded", "libp2p-core", + "libp2p-identify", "libp2p-identity", "libp2p-swarm", + "libp2p-swarm-test", "quick-protobuf", "quick-protobuf-codec", "rand 0.8.5", "rand_core 0.6.4", - "scc", "static_assertions", "thiserror", + "tokio", "tracing", + "tracing-subscriber", "void", ] @@ -5072,12 +5075,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "scc" -version = "2.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca628bbcc4be16ffaeae429444cf4538d7a23b1aa5457cc9ce9a220286befbc3" - [[package]] name = "schannel" version = "0.1.22" diff --git a/protocols/autonatv2/Cargo.toml b/protocols/autonatv2/Cargo.toml index 8729108ab08..7a38ec38313 100644 --- a/protocols/autonatv2/Cargo.toml +++ b/protocols/autonatv2/Cargo.toml @@ -21,11 +21,17 @@ void = "1.0.2" either = "1.9.0" futures = "0.3.29" thiserror = "1.0.50" -scc = "2.0.3" bytes = "1" static_assertions = "1.1.0" tracing = "0.1.40" +[dev-dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +libp2p-swarm-test = { workspace = true } +libp2p-identify = { workspace = true } +libp2p-swarm = { workspace = true, features = ["macros"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"]} + [lints] workspace = true diff --git a/protocols/autonatv2/src/client.rs b/protocols/autonatv2/src/client.rs index dff2397252c..be1f6fd8df0 100644 --- a/protocols/autonatv2/src/client.rs +++ b/protocols/autonatv2/src/client.rs @@ -1,2 +1,4 @@ mod behaviour; mod handler; + +pub use behaviour::Behaviour; diff --git a/protocols/autonatv2/src/client/behaviour.rs b/protocols/autonatv2/src/client/behaviour.rs index d82e4eac17b..9dae7b31ea6 100644 --- a/protocols/autonatv2/src/client/behaviour.rs +++ b/protocols/autonatv2/src/client/behaviour.rs @@ -14,7 +14,7 @@ use libp2p_swarm::{ NetworkBehaviour, NewExternalAddrCandidate, NotifyHandler, ToSwarm, }; use rand::{distributions::Standard, seq::SliceRandom, Rng}; -use rand_core::RngCore; +use rand_core::{OsRng, RngCore}; use crate::{global_only::IpExt, request_response::DialRequest}; @@ -36,12 +36,21 @@ impl IntervalTicker { } } -pub(crate) struct Config { +pub struct Config { pub(crate) test_server_count: usize, pub(crate) max_addrs_count: usize, } -pub(crate) struct Behaviour +impl Default for Config { + fn default() -> Self { + Self { + test_server_count: 3, + max_addrs_count: 10, + } + } +} + +pub struct Behaviour where R: RngCore + 'static, { @@ -79,7 +88,7 @@ where if addr_is_local(remote_addr) { self.local_peers.insert(connection_id); } - Ok(Either::Left(dial_request::Handler::new())) + Ok(Either::Right(dial_back::Handler::new())) } fn handle_established_outbound_connection( @@ -93,7 +102,7 @@ where if addr_is_local(addr) { self.local_peers.insert(connection_id); } - Ok(Either::Right(dial_back::Handler::new())) + Ok(Either::Left(dial_request::Handler::new())) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -139,9 +148,11 @@ where connection_id: ConnectionId, event: ::ToBehaviour, ) { - self.peers_to_handlers - .entry(peer_id) - .or_insert(connection_id); + if matches!(event, Either::Left(_)) { + self.peers_to_handlers + .entry(peer_id) + .or_insert(connection_id); + } match event { Either::Right(Ok(nonce)) => { if self.pending_nonces.remove(&nonce) { @@ -184,12 +195,12 @@ where self.pending_events .push_back(ToSwarm::ExternalAddrConfirmed(reachable_addr)); } - Either::Left(dial_request::ToBehaviour::TestCompleted( - Err(dial_request::Error::UnableToConnectOnSelectedAddress { addr: Some(addr) }) - )) - | Either::Left(dial_request::ToBehaviour::TestCompleted( - Err(dial_request::Error::FailureDuringDialBack { addr: Some(addr) }) - )) => { + Either::Left(dial_request::ToBehaviour::TestCompleted(Err( + dial_request::Error::UnableToConnectOnSelectedAddress { addr: Some(addr) }, + ))) + | Either::Left(dial_request::ToBehaviour::TestCompleted(Err( + dial_request::Error::FailureDuringDialBack { addr: Some(addr) }, + ))) => { self.pending_events .push_back(ToSwarm::ExternalAddrExpired(addr)); } @@ -207,7 +218,7 @@ where if pending_event.is_ready() { return pending_event; } - if self.ticker.ready() && !self.known_servers.is_empty() { + if self.ticker.ready() && !self.known_servers.is_empty() && !self.address_candidates.is_empty() { let mut entries = self.address_candidates.drain().collect::>(); entries.sort_unstable_by_key(|(_, count)| *count); let addrs = entries @@ -246,6 +257,23 @@ impl Behaviour where R: RngCore + 'static, { + pub fn new(rng: R, config: Config) -> Self { + Self { + local_peers: HashSet::new(), + pending_nonces: HashSet::new(), + known_servers: Vec::new(), + rng, + config, + pending_events: VecDeque::new(), + address_candidates: HashMap::new(), + peers_to_handlers: HashMap::new(), + ticker: IntervalTicker { + interval: Duration::from_secs(0), + last_tick: Instant::now(), + }, + } + } + fn submit_req_for_peer(&mut self, peer: PeerId, req: DialRequest) { if let Some(conn_id) = self.peers_to_handlers.get(&peer) { self.pending_events.push_back(ToSwarm::NotifyHandler { @@ -253,6 +281,9 @@ where handler: NotifyHandler::One(*conn_id), event: Either::Left(dial_request::FromBehaviour::PerformRequest(req)), }); + if self.pending_events.is_empty() { + println!("is empty") + } } else { tracing::debug!( "There should be a connection to {:?}, but there isn't", @@ -281,6 +312,12 @@ where } } +impl Default for Behaviour { + fn default() -> Self { + Self::new(OsRng, Config::default()) + } +} + fn addr_is_local(addr: &Multiaddr) -> bool { addr.iter().any(|c| match c { Protocol::Dns(addr) diff --git a/protocols/autonatv2/src/client/handler.rs b/protocols/autonatv2/src/client/handler.rs index 9e8389bb630..f156d7b30c7 100644 --- a/protocols/autonatv2/src/client/handler.rs +++ b/protocols/autonatv2/src/client/handler.rs @@ -6,15 +6,15 @@ // TODO: tests // TODO: Handlers -pub(super) mod dial_back; -pub(super) mod dial_request; +pub mod dial_back; +pub mod dial_request; use either::Either; use std::time::Duration; pub(crate) use dial_request::TestEnd; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10000); const MAX_CONCURRENT_REQUESTS: usize = 10; -pub(crate) type Handler = Either; +pub type Handler = Either; diff --git a/protocols/autonatv2/src/client/handler/dial_back.rs b/protocols/autonatv2/src/client/handler/dial_back.rs index f4505335f2c..e883a45715e 100644 --- a/protocols/autonatv2/src/client/handler/dial_back.rs +++ b/protocols/autonatv2/src/client/handler/dial_back.rs @@ -19,7 +19,7 @@ use super::DEFAULT_TIMEOUT; pub(crate) type ToBehaviour = io::Result; -pub(crate) struct Handler { +pub struct Handler { inbound: FuturesSet>, } diff --git a/protocols/autonatv2/src/client/handler/dial_request.rs b/protocols/autonatv2/src/client/handler/dial_request.rs index a26fe4e458d..9a0ceecec12 100644 --- a/protocols/autonatv2/src/client/handler/dial_request.rs +++ b/protocols/autonatv2/src/client/handler/dial_request.rs @@ -1,4 +1,4 @@ -use futures::{channel::oneshot, AsyncRead, AsyncWrite}; +use futures::{channel::oneshot, AsyncRead, AsyncWrite, AsyncWriteExt}; use futures_bounded::FuturesSet; use libp2p_core::{ upgrade::{DeniedUpgrade, ReadyUpgrade}, @@ -13,7 +13,6 @@ use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; -use scc::hash_cache::DEFAULT_MAXIMUM_CAPACITY; use std::{ collections::VecDeque, convert::identity, @@ -64,24 +63,24 @@ pub(crate) enum Error { } #[derive(Debug)] -pub(crate) struct TestEnd { +pub struct TestEnd { pub(crate) dial_request: DialRequest, pub(crate) suspicious_addr: Vec, pub(crate) reachable_addr: Multiaddr, } #[derive(Debug)] -pub(crate) enum ToBehaviour { +pub enum ToBehaviour { TestCompleted(Result), PeerHasServerSupport, } #[derive(Debug)] -pub(crate) enum FromBehaviour { +pub enum FromBehaviour { PerformRequest(DialRequest), } -pub(crate) struct Handler { +pub struct Handler { queued_events: VecDeque< ConnectionHandlerEvent< ::OutboundProtocol, @@ -104,7 +103,7 @@ impl Handler { pub(crate) fn new() -> Self { Self { queued_events: VecDeque::new(), - outbound: FuturesSet::new(DEFAULT_TIMEOUT, DEFAULT_MAXIMUM_CAPACITY), + outbound: FuturesSet::new(DEFAULT_TIMEOUT, 10), queued_streams: VecDeque::default(), } } @@ -207,9 +206,10 @@ impl ConnectionHandler for Handler { }, ConnectionEvent::RemoteProtocolsChange(ProtocolsChange::Added(mut added)) => { if added.any(|p| p.as_ref() == REQUEST_PROTOCOL_NAME) { - self.queued_events.push_back( - ConnectionHandlerEvent::NotifyBehaviour(ToBehaviour::PeerHasServerSupport) - ); + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + ToBehaviour::PeerHasServerSupport, + )); } } _ => {} @@ -281,6 +281,7 @@ async fn handle_substream( send_aap_data(&mut substream, num_bytes).await?; } Response::Dial(dial_response) => { + substream.close().await?; return test_end_from_dial_response(dial_request, dial_response, suspicious_addr); } } diff --git a/protocols/autonatv2/src/lib.rs b/protocols/autonatv2/src/lib.rs index 5e357b5cf0e..e64b9d480d4 100644 --- a/protocols/autonatv2/src/lib.rs +++ b/protocols/autonatv2/src/lib.rs @@ -1,8 +1,8 @@ use libp2p_core::upgrade::ReadyUpgrade; use libp2p_swarm::StreamProtocol; -mod client; -mod server; +pub mod client; +pub mod server; mod generated; mod global_only; pub(crate) mod request_response; diff --git a/protocols/autonatv2/src/server.rs b/protocols/autonatv2/src/server.rs index dff2397252c..be1f6fd8df0 100644 --- a/protocols/autonatv2/src/server.rs +++ b/protocols/autonatv2/src/server.rs @@ -1,2 +1,4 @@ mod behaviour; mod handler; + +pub use behaviour::Behaviour; diff --git a/protocols/autonatv2/src/server/behaviour.rs b/protocols/autonatv2/src/server/behaviour.rs index a96d412a0b7..0771e83c13f 100644 --- a/protocols/autonatv2/src/server/behaviour.rs +++ b/protocols/autonatv2/src/server/behaviour.rs @@ -11,7 +11,8 @@ use libp2p_swarm::{ ConnectionDenied, ConnectionHandler, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, ToSwarm, }; -use rand_core::RngCore; +use rand_core::{OsRng, RngCore}; +use libp2p_core::multiaddr::Protocol; use super::handler::{ dial_back, @@ -19,7 +20,7 @@ use super::handler::{ Handler, }; -pub struct Behaviour +pub struct Behaviour where R: Clone + Send + RngCore + 'static, { @@ -35,11 +36,17 @@ where rng: R, } +impl Default for Behaviour { + fn default() -> Self { + Self::new(OsRng) + } +} + impl Behaviour where R: RngCore + Send + Clone + 'static, { - pub(crate) fn new(rng: R) -> Self { + pub fn new(rng: R) -> Self { Self { handlers: HashMap::new(), pending_dial_back: HashMap::new(), @@ -95,7 +102,7 @@ where port_use: PortUse, ) -> Result<::ConnectionHandler, ConnectionDenied> { if port_use == PortUse::New { - self.handlers.insert((addr.clone(), peer), connection_id); + self.handlers.insert((addr.iter().filter(|e| !matches!(e, Protocol::P2p(_))).collect(), peer), connection_id); } Ok(Either::Left(dial_back::Handler::new())) } diff --git a/protocols/autonatv2/src/server/handler/dial_back.rs b/protocols/autonatv2/src/server/handler/dial_back.rs index 4f8d7915063..45a3fba8480 100644 --- a/protocols/autonatv2/src/server/handler/dial_back.rs +++ b/protocols/autonatv2/src/server/handler/dial_back.rs @@ -32,7 +32,7 @@ impl Handler { Self { pending_nonce: VecDeque::new(), requested_substream_nonce: VecDeque::new(), - outbound: FuturesSet::new(Duration::from_secs(10), 2), + outbound: FuturesSet::new(Duration::from_secs(10000), 2), } } } @@ -105,7 +105,6 @@ impl ConnectionHandler for Handler { } _ => {} } - todo!() } } diff --git a/protocols/autonatv2/src/server/handler/dial_request.rs b/protocols/autonatv2/src/server/handler/dial_request.rs index f80a1385a6a..554675f5b20 100644 --- a/protocols/autonatv2/src/server/handler/dial_request.rs +++ b/protocols/autonatv2/src/server/handler/dial_request.rs @@ -7,8 +7,9 @@ use std::{ use futures::{ channel::{mpsc, oneshot}, - AsyncRead, AsyncWrite, SinkExt, StreamExt, + AsyncRead, AsyncWrite, AsyncWriteExt, SinkExt, StreamExt, }; +use futures::future::FusedFuture; use futures_bounded::FuturesSet; use libp2p_core::{ upgrade::{DeniedUpgrade, ReadyUpgrade}, @@ -60,7 +61,7 @@ where observed_multiaddr, dial_back_cmd_sender, dial_back_cmd_receiver, - inbound: FuturesSet::new(Duration::from_secs(10), 2), + inbound: FuturesSet::new(Duration::from_secs(1000), 2), rng, } } @@ -105,6 +106,7 @@ where Poll::Pending => {} } if let Poll::Ready(Some(cmd)) = self.dial_back_cmd_receiver.poll_next_unpin(cx) { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Ok(cmd))); } Poll::Pending @@ -146,6 +148,10 @@ where _ => {} } } + + fn connection_keep_alive(&self) -> bool { + true + } } enum HandleFail { @@ -233,7 +239,12 @@ async fn handle_request_internal( .send(dial_back_cmd) .await .map_err(|_| HandleFail::InternalError(idx))?; - let dial_back = rx.await.map_err(|_| HandleFail::InternalError(idx))?; + if rx.is_terminated() { + println!("is terminated"); + } + let dial_back = rx.await.map_err(|e| { + HandleFail::InternalError(idx) + })?; if dial_back != DialBack::Ok { return Err(HandleFail::DialBack { idx, @@ -260,5 +271,6 @@ async fn handle_request( .await .unwrap_or_else(|e| e.into()); Response::Dial(response).write_into(&mut stream).await?; + stream.close().await?; Ok(()) } diff --git a/protocols/autonatv2/tests/autonatv2.rs b/protocols/autonatv2/tests/autonatv2.rs new file mode 100644 index 00000000000..c3acdd3c462 --- /dev/null +++ b/protocols/autonatv2/tests/autonatv2.rs @@ -0,0 +1,103 @@ +use std::task::Poll; + +use futures::StreamExt; +use libp2p_swarm::{Swarm, SwarmEvent, ToSwarm}; +use libp2p_swarm_test::SwarmExt; +use tracing_subscriber::EnvFilter; + +#[tokio::test] +async fn foo() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let mut alice = new_server().await; + let cor_server_peer = alice.local_peer_id().clone(); + let mut bob = new_client().await; + let cor_client_peer = bob.local_peer_id().clone(); + bob.connect(&mut alice).await; + match libp2p_swarm_test::drive(&mut alice, &mut bob).await { + ( + [CombinedServerEvent::Identify(libp2p_identify::Event::Sent { + peer_id: client_peer_sent, + }), CombinedServerEvent::Identify(libp2p_identify::Event::Received { + peer_id: client_peer_recv, + .. + })], + [CombinedClientEvent::Identify(libp2p_identify::Event::Sent { + peer_id: server_peer_sent, + }), CombinedClientEvent::Identify(libp2p_identify::Event::Received { + peer_id: server_peer_recv, + .. + })], + ) => { + assert_eq!(server_peer_sent, cor_server_peer); + assert_eq!(client_peer_sent, cor_client_peer); + assert_eq!(server_peer_recv, cor_server_peer); + assert_eq!(client_peer_recv, cor_client_peer); + } + e => panic!("unexpected event: {:?}", e), + } + match libp2p_swarm_test::drive(&mut alice, &mut bob).await { + ( + [ + SwarmEvent::Dialing { .. }, + SwarmEvent::ConnectionEstablished { .. }, + SwarmEvent::Behaviour(CombinedServerEvent::Identify(_)), + SwarmEvent::Behaviour(CombinedServerEvent::Identify(_)), + SwarmEvent::NewExternalAddrCandidate { ..}, + ],[ + SwarmEvent::NewExternalAddrCandidate { address: addr_new }, + SwarmEvent::IncomingConnection { .. }, + SwarmEvent::ConnectionEstablished { .. }, + SwarmEvent::Behaviour(CombinedClientEvent::Identify(_)), + SwarmEvent::Behaviour(CombinedClientEvent::Identify(_)), + SwarmEvent::NewExternalAddrCandidate { address: addr_snd}, + SwarmEvent::ExternalAddrConfirmed { address: addr_ok} + ] + ) => { + assert_eq!(addr_new, addr_snd); + assert_eq!(addr_snd, addr_ok); + } + _ => todo!() + } + +} + +async fn new_server() -> Swarm { + let mut node = Swarm::new_ephemeral(|identity| CombinedServer { + autonat: libp2p_autonatv2::server::Behaviour::default(), + identify: libp2p_identify::Behaviour::new(libp2p_identify::Config::new( + "/libp2p-test/1.0.0".into(), + identity.public().clone(), + )), + }); + node.listen().with_tcp_addr_external().await; + + node +} + +async fn new_client() -> Swarm { + let mut node = Swarm::new_ephemeral(|identity| CombinedClient { + autonat: libp2p_autonatv2::client::Behaviour::default(), + identify: libp2p_identify::Behaviour::new(libp2p_identify::Config::new( + "/libp2p-test/1.0.0".into(), + identity.public().clone(), + )), + }); + node.listen().with_tcp_addr_external().await; + node +} + +#[derive(libp2p_swarm::NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] +struct CombinedServer { + autonat: libp2p_autonatv2::server::Behaviour, + identify: libp2p_identify::Behaviour, +} + +#[derive(libp2p_swarm::NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] +struct CombinedClient { + autonat: libp2p_autonatv2::client::Behaviour, + identify: libp2p_identify::Behaviour, +}