Skip to content

Commit

Permalink
Pass the basic test for the first time.
Browse files Browse the repository at this point in the history
  • Loading branch information
umgefahren committed Dec 6, 2023
1 parent 3bc107e commit 50fb5e4
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 49 deletions.
11 changes: 4 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion protocols/autonatv2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ void = "1.0.2"
either = "1.9.0"
futures = "0.3.29"
thiserror = "1.0.50"
scc = "2.0.3"
bytes = "1"
static_assertions = "1.1.0"
tracing = "0.1.40"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
libp2p-swarm-test = { workspace = true }
libp2p-identify = { workspace = true }
libp2p-swarm = { workspace = true, features = ["macros"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"]}

[lints]
workspace = true

Expand Down
2 changes: 2 additions & 0 deletions protocols/autonatv2/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
mod behaviour;
mod handler;

pub use behaviour::Behaviour;
67 changes: 52 additions & 15 deletions protocols/autonatv2/src/client/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use libp2p_swarm::{
NetworkBehaviour, NewExternalAddrCandidate, NotifyHandler, ToSwarm,
};
use rand::{distributions::Standard, seq::SliceRandom, Rng};
use rand_core::RngCore;
use rand_core::{OsRng, RngCore};

use crate::{global_only::IpExt, request_response::DialRequest};

Expand All @@ -36,12 +36,21 @@ impl IntervalTicker {
}
}

pub(crate) struct Config {
pub struct Config {
pub(crate) test_server_count: usize,
pub(crate) max_addrs_count: usize,
}

pub(crate) struct Behaviour<R>
impl Default for Config {
fn default() -> Self {
Self {
test_server_count: 3,
max_addrs_count: 10,
}
}
}

pub struct Behaviour<R = OsRng>
where
R: RngCore + 'static,
{
Expand Down Expand Up @@ -79,7 +88,7 @@ where
if addr_is_local(remote_addr) {
self.local_peers.insert(connection_id);
}
Ok(Either::Left(dial_request::Handler::new()))
Ok(Either::Right(dial_back::Handler::new()))
}

fn handle_established_outbound_connection(
Expand All @@ -93,7 +102,7 @@ where
if addr_is_local(addr) {
self.local_peers.insert(connection_id);
}
Ok(Either::Right(dial_back::Handler::new()))
Ok(Either::Left(dial_request::Handler::new()))
}

fn on_swarm_event(&mut self, event: FromSwarm) {
Expand Down Expand Up @@ -139,9 +148,11 @@ where
connection_id: ConnectionId,
event: <Handler as ConnectionHandler>::ToBehaviour,
) {
self.peers_to_handlers
.entry(peer_id)
.or_insert(connection_id);
if matches!(event, Either::Left(_)) {
self.peers_to_handlers
.entry(peer_id)
.or_insert(connection_id);
}
match event {
Either::Right(Ok(nonce)) => {
if self.pending_nonces.remove(&nonce) {
Expand Down Expand Up @@ -184,12 +195,12 @@ where
self.pending_events
.push_back(ToSwarm::ExternalAddrConfirmed(reachable_addr));
}
Either::Left(dial_request::ToBehaviour::TestCompleted(
Err(dial_request::Error::UnableToConnectOnSelectedAddress { addr: Some(addr) })
))
| Either::Left(dial_request::ToBehaviour::TestCompleted(
Err(dial_request::Error::FailureDuringDialBack { addr: Some(addr) })
)) => {
Either::Left(dial_request::ToBehaviour::TestCompleted(Err(
dial_request::Error::UnableToConnectOnSelectedAddress { addr: Some(addr) },
)))
| Either::Left(dial_request::ToBehaviour::TestCompleted(Err(
dial_request::Error::FailureDuringDialBack { addr: Some(addr) },
))) => {
self.pending_events
.push_back(ToSwarm::ExternalAddrExpired(addr));
}
Expand All @@ -207,7 +218,7 @@ where
if pending_event.is_ready() {
return pending_event;
}
if self.ticker.ready() && !self.known_servers.is_empty() {
if self.ticker.ready() && !self.known_servers.is_empty() && !self.address_candidates.is_empty() {
let mut entries = self.address_candidates.drain().collect::<Vec<_>>();
entries.sort_unstable_by_key(|(_, count)| *count);
let addrs = entries
Expand Down Expand Up @@ -246,13 +257,33 @@ impl<R> Behaviour<R>
where
R: RngCore + 'static,
{
pub fn new(rng: R, config: Config) -> Self {
Self {
local_peers: HashSet::new(),
pending_nonces: HashSet::new(),
known_servers: Vec::new(),
rng,
config,
pending_events: VecDeque::new(),
address_candidates: HashMap::new(),
peers_to_handlers: HashMap::new(),
ticker: IntervalTicker {
interval: Duration::from_secs(0),
last_tick: Instant::now(),
},
}
}

fn submit_req_for_peer(&mut self, peer: PeerId, req: DialRequest) {
if let Some(conn_id) = self.peers_to_handlers.get(&peer) {
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::One(*conn_id),
event: Either::Left(dial_request::FromBehaviour::PerformRequest(req)),
});
if self.pending_events.is_empty() {
println!("is empty")
}
} else {
tracing::debug!(
"There should be a connection to {:?}, but there isn't",
Expand Down Expand Up @@ -281,6 +312,12 @@ where
}
}

impl Default for Behaviour<OsRng> {
fn default() -> Self {
Self::new(OsRng, Config::default())
}
}

fn addr_is_local(addr: &Multiaddr) -> bool {
addr.iter().any(|c| match c {
Protocol::Dns(addr)
Expand Down
8 changes: 4 additions & 4 deletions protocols/autonatv2/src/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
// TODO: tests
// TODO: Handlers

pub(super) mod dial_back;
pub(super) mod dial_request;
pub mod dial_back;
pub mod dial_request;

use either::Either;
use std::time::Duration;

pub(crate) use dial_request::TestEnd;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10000);
const MAX_CONCURRENT_REQUESTS: usize = 10;

pub(crate) type Handler = Either<dial_request::Handler, dial_back::Handler>;
pub type Handler = Either<dial_request::Handler, dial_back::Handler>;
2 changes: 1 addition & 1 deletion protocols/autonatv2/src/client/handler/dial_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::DEFAULT_TIMEOUT;

pub(crate) type ToBehaviour = io::Result<Nonce>;

pub(crate) struct Handler {
pub struct Handler {
inbound: FuturesSet<io::Result<Nonce>>,
}

Expand Down
21 changes: 11 additions & 10 deletions protocols/autonatv2/src/client/handler/dial_request.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::{channel::oneshot, AsyncRead, AsyncWrite};
use futures::{channel::oneshot, AsyncRead, AsyncWrite, AsyncWriteExt};
use futures_bounded::FuturesSet;
use libp2p_core::{
upgrade::{DeniedUpgrade, ReadyUpgrade},
Expand All @@ -13,7 +13,6 @@ use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
SubstreamProtocol,
};
use scc::hash_cache::DEFAULT_MAXIMUM_CAPACITY;
use std::{
collections::VecDeque,
convert::identity,
Expand Down Expand Up @@ -64,24 +63,24 @@ pub(crate) enum Error {
}

#[derive(Debug)]
pub(crate) struct TestEnd {
pub struct TestEnd {
pub(crate) dial_request: DialRequest,
pub(crate) suspicious_addr: Vec<Multiaddr>,
pub(crate) reachable_addr: Multiaddr,
}

#[derive(Debug)]
pub(crate) enum ToBehaviour {
pub enum ToBehaviour {
TestCompleted(Result<TestEnd, Error>),
PeerHasServerSupport,
}

#[derive(Debug)]
pub(crate) enum FromBehaviour {
pub enum FromBehaviour {
PerformRequest(DialRequest),
}

pub(crate) struct Handler {
pub struct Handler {
queued_events: VecDeque<
ConnectionHandlerEvent<
<Self as ConnectionHandler>::OutboundProtocol,
Expand All @@ -104,7 +103,7 @@ impl Handler {
pub(crate) fn new() -> Self {
Self {
queued_events: VecDeque::new(),
outbound: FuturesSet::new(DEFAULT_TIMEOUT, DEFAULT_MAXIMUM_CAPACITY),
outbound: FuturesSet::new(DEFAULT_TIMEOUT, 10),
queued_streams: VecDeque::default(),
}
}
Expand Down Expand Up @@ -207,9 +206,10 @@ impl ConnectionHandler for Handler {
},
ConnectionEvent::RemoteProtocolsChange(ProtocolsChange::Added(mut added)) => {
if added.any(|p| p.as_ref() == REQUEST_PROTOCOL_NAME) {
self.queued_events.push_back(
ConnectionHandlerEvent::NotifyBehaviour(ToBehaviour::PeerHasServerSupport)
);
self.queued_events
.push_back(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviour::PeerHasServerSupport,
));
}
}
_ => {}
Expand Down Expand Up @@ -281,6 +281,7 @@ async fn handle_substream(
send_aap_data(&mut substream, num_bytes).await?;
}
Response::Dial(dial_response) => {
substream.close().await?;
return test_end_from_dial_response(dial_request, dial_response, suspicious_addr);
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/autonatv2/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_swarm::StreamProtocol;

mod client;
mod server;
pub mod client;
pub mod server;
mod generated;
mod global_only;
pub(crate) mod request_response;
Expand Down
2 changes: 2 additions & 0 deletions protocols/autonatv2/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
mod behaviour;
mod handler;

pub use behaviour::Behaviour;
15 changes: 11 additions & 4 deletions protocols/autonatv2/src/server/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ use libp2p_swarm::{
ConnectionDenied, ConnectionHandler, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler,
ToSwarm,
};
use rand_core::RngCore;
use rand_core::{OsRng, RngCore};
use libp2p_core::multiaddr::Protocol;

use super::handler::{
dial_back,
dial_request::{self, DialBackCommand},
Handler,
};

pub struct Behaviour<R>
pub struct Behaviour<R = OsRng>
where
R: Clone + Send + RngCore + 'static,
{
Expand All @@ -35,11 +36,17 @@ where
rng: R,
}

impl Default for Behaviour<OsRng> {
fn default() -> Self {
Self::new(OsRng)
}
}

impl<R> Behaviour<R>
where
R: RngCore + Send + Clone + 'static,
{
pub(crate) fn new(rng: R) -> Self {
pub fn new(rng: R) -> Self {
Self {
handlers: HashMap::new(),
pending_dial_back: HashMap::new(),
Expand Down Expand Up @@ -95,7 +102,7 @@ where
port_use: PortUse,
) -> Result<<Self as NetworkBehaviour>::ConnectionHandler, ConnectionDenied> {
if port_use == PortUse::New {
self.handlers.insert((addr.clone(), peer), connection_id);
self.handlers.insert((addr.iter().filter(|e| !matches!(e, Protocol::P2p(_))).collect(), peer), connection_id);
}
Ok(Either::Left(dial_back::Handler::new()))
}
Expand Down
3 changes: 1 addition & 2 deletions protocols/autonatv2/src/server/handler/dial_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl Handler {
Self {
pending_nonce: VecDeque::new(),
requested_substream_nonce: VecDeque::new(),
outbound: FuturesSet::new(Duration::from_secs(10), 2),
outbound: FuturesSet::new(Duration::from_secs(10000), 2),
}
}
}
Expand Down Expand Up @@ -105,7 +105,6 @@ impl ConnectionHandler for Handler {
}
_ => {}
}
todo!()
}
}

Expand Down
Loading

0 comments on commit 50fb5e4

Please sign in to comment.