Skip to content

Commit

Permalink
Move forwards to fewer bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
umgefahren committed Dec 6, 2023
1 parent 50fb5e4 commit d34782e
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 140 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/autonatv2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ thiserror = "1.0.50"
bytes = "1"
static_assertions = "1.1.0"
tracing = "0.1.40"
unsigned-varint = { workspace = true, features = ["futures"] }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
Expand Down
20 changes: 16 additions & 4 deletions protocols/autonatv2/src/client/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ use libp2p_core::{multiaddr::Protocol, transport::PortUse, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
behaviour::{ConnectionEstablished, ExternalAddrConfirmed},
dial_opts::{DialOpts, PeerCondition},
ConnectionClosed, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, FromSwarm,
NetworkBehaviour, NewExternalAddrCandidate, NotifyHandler, ToSwarm,
};
use rand::{distributions::Standard, seq::SliceRandom, Rng};
use rand::{seq::SliceRandom, Rng};
use rand_core::{OsRng, RngCore};

use crate::{global_only::IpExt, request_response::DialRequest};
Expand Down Expand Up @@ -66,6 +65,7 @@ where
>,
>,
address_candidates: HashMap<Multiaddr, usize>,
already_tested: HashSet<Multiaddr>,
peers_to_handlers: HashMap<PeerId, ConnectionId>,
ticker: IntervalTicker,
}
Expand Down Expand Up @@ -108,7 +108,10 @@ where
fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
*self.address_candidates.entry(addr.clone()).or_default() += 1;
if !self.already_tested.contains(addr) {
println!("external addr: {addr}");
*self.address_candidates.entry(addr.clone()).or_default() += 1;
}
}
FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed { addr }) => {
self.address_candidates.remove(addr);
Expand Down Expand Up @@ -218,7 +221,10 @@ where
if pending_event.is_ready() {
return pending_event;
}
if self.ticker.ready() && !self.known_servers.is_empty() && !self.address_candidates.is_empty() {
if self.ticker.ready()
&& !self.known_servers.is_empty()
&& !self.address_candidates.is_empty()
{
let mut entries = self.address_candidates.drain().collect::<Vec<_>>();
entries.sort_unstable_by_key(|(_, count)| *count);
let addrs = entries
Expand All @@ -227,6 +233,7 @@ where
.map(|(addr, _)| addr)
.take(self.config.max_addrs_count)
.collect::<Vec<_>>();
self.already_tested.extend(addrs.iter().cloned());
let peers = if self.known_servers.len() < self.config.test_server_count {
self.known_servers.clone()
} else {
Expand Down Expand Up @@ -267,6 +274,7 @@ where
pending_events: VecDeque::new(),
address_candidates: HashMap::new(),
peers_to_handlers: HashMap::new(),
already_tested: HashSet::new(),
ticker: IntervalTicker {
interval: Duration::from_secs(0),
last_tick: Instant::now(),
Expand Down Expand Up @@ -310,6 +318,10 @@ where
}
Poll::Pending
}

pub fn inject_test_addr(&mut self, addr: Multiaddr) {
*self.address_candidates.entry(addr).or_default() += 1;
}
}

impl Default for Behaviour<OsRng> {
Expand Down
8 changes: 4 additions & 4 deletions protocols/autonatv2/src/client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
// TODO: tests
// TODO: Handlers

pub mod dial_back;
pub mod dial_request;
pub(crate) mod dial_back;
pub(crate) mod dial_request;

use either::Either;
use std::time::Duration;

pub(crate) use dial_request::TestEnd;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10000);
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_CONCURRENT_REQUESTS: usize = 10;

pub type Handler = Either<dial_request::Handler, dial_back::Handler>;
pub(crate) type Handler = Either<dial_request::Handler, dial_back::Handler>;
11 changes: 7 additions & 4 deletions protocols/autonatv2/src/client/handler/dial_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use std::{
task::{Context, Poll},
};

use either::Either;
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
use futures_bounded::{FuturesSet, Timeout};
use futures_bounded::FuturesSet;
use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade};
use libp2p_swarm::{
handler::{ConnectionEvent, FullyNegotiatedInbound, ListenUpgradeError},
Expand All @@ -15,7 +14,7 @@ use libp2p_swarm::{

use crate::{request_response::DialBack, Nonce};

use super::DEFAULT_TIMEOUT;
use super::{DEFAULT_TIMEOUT, MAX_CONCURRENT_REQUESTS};

pub(crate) type ToBehaviour = io::Result<Nonce>;

Expand All @@ -26,7 +25,7 @@ pub struct Handler {
impl Handler {
pub(crate) fn new() -> Self {
Self {
inbound: FuturesSet::new(DEFAULT_TIMEOUT, 2),
inbound: FuturesSet::new(DEFAULT_TIMEOUT, MAX_CONCURRENT_REQUESTS),
}
}
}
Expand Down Expand Up @@ -84,6 +83,10 @@ impl ConnectionHandler for Handler {
_ => {}
}
}

fn connection_keep_alive(&self) -> bool {
false
}
}

async fn perform_dial_back(mut stream: impl AsyncRead + AsyncWrite + Unpin) -> io::Result<u64> {
Expand Down
30 changes: 19 additions & 11 deletions protocols/autonatv2/src/client/handler/dial_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{
task::{Context, Poll},
};

use crate::request_response::Coder;
use crate::{
generated::structs::{mod_DialResponse::ResponseStatus, DialStatus},
request_response::{
Expand All @@ -30,10 +31,10 @@ use crate::{
REQUEST_PROTOCOL_NAME, REQUEST_UPGRADE,
};

use super::DEFAULT_TIMEOUT;
use super::{DEFAULT_TIMEOUT, MAX_CONCURRENT_REQUESTS};

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
pub enum Error {
#[error("io error")]
Io(#[from] io::Error),
#[error("invalid referenced address index: {index} (max number of addr: {max})")]
Expand Down Expand Up @@ -103,12 +104,13 @@ impl Handler {
pub(crate) fn new() -> Self {
Self {
queued_events: VecDeque::new(),
outbound: FuturesSet::new(DEFAULT_TIMEOUT, 10),
outbound: FuturesSet::new(DEFAULT_TIMEOUT, MAX_CONCURRENT_REQUESTS),
queued_streams: VecDeque::default(),
}
}

fn perform_request(&mut self, req: DialRequest) {
println!("{req:?}");
let (tx, rx) = oneshot::channel();
self.queued_streams.push_back(tx);
self.queued_events
Expand Down Expand Up @@ -235,14 +237,15 @@ async fn start_substream_handle(

async fn handle_substream(
dial_request: DialRequest,
mut substream: impl AsyncRead + AsyncWrite + Unpin,
substream: impl AsyncRead + AsyncWrite + Unpin,
) -> Result<TestEnd, Error> {
Request::Dial(dial_request.clone())
.write_into(&mut substream)
let mut coder = Coder::new(substream);
coder
.send_request(Request::Dial(dial_request.clone()))
.await?;
let mut suspicious_addr = Vec::new();
loop {
match Response::read_from(&mut substream).await? {
match coder.next_response().await? {
Response::Data(DialDataRequest {
addr_idx,
num_bytes,
Expand Down Expand Up @@ -278,10 +281,11 @@ async fn handle_substream(
}
}

send_aap_data(&mut substream, num_bytes).await?;
println!("Time to bpay the tribute");
send_aap_data(&mut coder, num_bytes).await?;
}
Response::Dial(dial_response) => {
substream.close().await?;
coder.close().await?;
return test_end_from_dial_response(dial_request, dial_response, suspicious_addr);
}
}
Expand Down Expand Up @@ -328,7 +332,10 @@ fn test_end_from_dial_response(
}
}

async fn send_aap_data(mut substream: impl AsyncWrite + Unpin, num_bytes: usize) -> io::Result<()> {
async fn send_aap_data<I>(substream: &mut Coder<I>, num_bytes: usize) -> io::Result<()>
where
I: AsyncWrite + Unpin,
{
let count_full = num_bytes / DATA_FIELD_LEN_UPPER_BOUND;
let partial_len = num_bytes % DATA_FIELD_LEN_UPPER_BOUND;
for req in repeat(DATA_FIELD_LEN_UPPER_BOUND)
Expand All @@ -337,7 +344,8 @@ async fn send_aap_data(mut substream: impl AsyncWrite + Unpin, num_bytes: usize)
.filter(|e| *e > 0)
.map(|data_count| Request::Data(DialDataResponse { data_count }))
{
req.write_into(&mut substream).await?;
println!("Data req: {req:?}");
substream.send_request(req).await?;
}
Ok(())
}
2 changes: 1 addition & 1 deletion protocols/autonatv2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_swarm::StreamProtocol;

pub mod client;
pub mod server;
mod generated;
mod global_only;
pub(crate) mod request_response;
pub mod server;

pub(crate) const REQUEST_PROTOCOL_NAME: StreamProtocol =
StreamProtocol::new("/libp2p/autonat/2/dial-request");
Expand Down
Loading

0 comments on commit d34782e

Please sign in to comment.