Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tests of libp2p-ping #1321

Merged
merged 2 commits into from
Nov 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions protocols/ping/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ wasm-timer = "0.2"
void = "1.0"

[dev-dependencies]
async-std = "1.0"
libp2p-tcp = { version = "0.13.0", path = "../../transports/tcp" }
libp2p-secio = { version = "0.13.0", path = "../../protocols/secio" }
libp2p-yamux = { version = "0.13.0", path = "../../muxers/yamux" }
quickcheck = "0.9.0"
tokio = "0.1"
tokio-tcp = "0.1"
33 changes: 11 additions & 22 deletions protocols/ping/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,10 @@ where
mod tests {
use super::*;

use async_std::net::TcpStream;
use futures::future;
use quickcheck::*;
use rand::Rng;
use tokio_tcp::TcpStream;
use tokio::runtime::current_thread::Runtime;

impl Arbitrary for PingConfig {
fn arbitrary<G: Gen>(g: &mut G) -> PingConfig {
Expand All @@ -280,46 +279,36 @@ mod tests {
}
}

fn tick(h: &mut PingHandler<TcpStream>) -> Result<
ProtocolsHandlerEvent<protocol::Ping, (), PingResult>,
PingFailure
> {
futures::executor::block_on(future::poll_fn(|| h.poll() ))
fn tick(h: &mut PingHandler<TcpStream>)
-> ProtocolsHandlerEvent<protocol::Ping, (), PingResult, PingFailure>
{
futures::executor::block_on(future::poll_fn(|cx| h.poll(cx) ))
}

#[test]
fn ping_interval() {
fn prop(cfg: PingConfig, ping_rtt: Duration) -> bool {
let mut h = PingHandler::<TcpStream>::new(cfg);

// The first ping is scheduled "immediately".
let start = h.next_ping.deadline();
assert!(start <= Instant::now());

// Send ping
match tick(&mut h) {
Ok(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ }) => {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: _ } => {
// The handler must use the configured timeout.
assert_eq!(protocol.timeout(), &h.config.timeout);
// The next ping must be scheduled no earlier than the ping timeout.
assert!(h.next_ping.deadline() >= start + h.config.timeout);
}
e => panic!("Unexpected event: {:?}", e)
}

let now = Instant::now();

// Receive pong
h.inject_fully_negotiated_outbound(ping_rtt, ());
match tick(&mut h) {
Ok(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt }))) => {
ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { rtt })) => {
// The handler must report the given RTT.
assert_eq!(rtt, ping_rtt);
// The next ping must be scheduled no earlier than the ping interval.
assert!(now + h.config.interval <= h.next_ping.deadline());
}
e => panic!("Unexpected event: {:?}", e)
}

true
}

Expand All @@ -333,20 +322,20 @@ mod tests {
for _ in 0 .. h.config.max_failures.get() - 1 {
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
match tick(&mut h) {
Ok(ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout))) => {}
ProtocolsHandlerEvent::Custom(Err(PingFailure::Timeout)) => {}
e => panic!("Unexpected event: {:?}", e)
}
}
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
match tick(&mut h) {
Err(PingFailure::Timeout) => {
ProtocolsHandlerEvent::Close(PingFailure::Timeout) => {
assert_eq!(h.failures, h.config.max_failures.get());
}
e => panic!("Unexpected event: {:?}", e)
}
h.inject_fully_negotiated_outbound(Duration::from_secs(1), ());
match tick(&mut h) {
Ok(ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. }))) => {
ProtocolsHandlerEvent::Custom(Ok(PingSuccess::Ping { .. })) => {
// A success resets the counter for consecutive failures.
assert_eq!(h.failures, 0);
}
Expand Down
34 changes: 13 additions & 21 deletions protocols/ping/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,31 +121,23 @@ mod tests {
let mut listener = MemoryTransport.listen_on(mem_addr).unwrap();

let listener_addr =
if let Ok(Poll::Ready(Some(ListenerEvent::NewAddress(a)))) = listener.poll() {
if let Some(Some(Ok(ListenerEvent::NewAddress(a)))) = listener.next().now_or_never() {
a
} else {
panic!("MemoryTransport not listening on an address!");
};

async_std::task::spawn(async move {
let listener_event = listener.next().await.unwrap();
let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap();
let conn = listener_upgrade.await.unwrap();
upgrade::apply_inbound(conn, Ping::default()).await.unwrap();
});

let server = listener
.into_future()
.map_err(|(e, _)| e)
.and_then(|(listener_event, _)| {
let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap();
let conn = listener_upgrade.wait().unwrap();
upgrade::apply_inbound(conn, Ping::default())
.map_err(|e| panic!(e))
});

let client = MemoryTransport.dial(listener_addr).unwrap()
.and_then(|c| {
upgrade::apply_outbound(c, Ping::default(), upgrade::Version::V1)
.map_err(|e| panic!(e))
});

let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.spawn(server.map_err(|e| panic!(e)));
let rtt = runtime.block_on(client).expect("RTT");
assert!(rtt > Duration::from_secs(0));
async_std::task::block_on(async move {
let c = MemoryTransport.dial(listener_addr).unwrap().await.unwrap();
let rtt = upgrade::apply_outbound(c, Ping::default(), upgrade::Version::V1).await.unwrap();
assert!(rtt > Duration::from_secs(0));
});
}
}
77 changes: 32 additions & 45 deletions protocols/ping/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,18 @@
use libp2p_core::{
Multiaddr,
PeerId,
Negotiated,
identity,
muxing::StreamMuxerBox,
transport::{Transport, boxed::Boxed},
either::EitherError,
upgrade::{self, UpgradeError}
};
use libp2p_ping::*;
use libp2p_yamux::{self as yamux, Yamux};
use libp2p_secio::{SecioConfig, SecioOutput, SecioError};
use libp2p_secio::{SecioConfig, SecioError};
use libp2p_swarm::Swarm;
use libp2p_tcp::{TcpConfig, TcpTransStream};
use futures::{future, prelude::*};
use std::{io, time::Duration, sync::mpsc::sync_channel};
use tokio::runtime::Runtime;
use libp2p_tcp::TcpConfig;
use futures::{prelude::*, channel::mpsc};
use std::{io, time::Duration};

#[test]
fn ping() {
Expand All @@ -48,64 +46,53 @@ fn ping() {
let (peer2_id, trans) = mk_transport();
let mut swarm2 = Swarm::new(trans, Ping::new(cfg), peer2_id.clone());

let (tx, rx) = sync_channel::<Multiaddr>(1);
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);

let pid1 = peer1_id.clone();
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
let mut listening = false;
Swarm::listen_on(&mut swarm1, addr).unwrap();
let peer1 = future::poll_fn(move || -> Result<_, ()> {

let peer1 = async move {
while let Some(_) = swarm1.next().now_or_never() {}

for l in Swarm::listeners(&swarm1) {
tx.send(l.clone()).await.unwrap();
}

loop {
match swarm1.poll().expect("Error while polling swarm") {
Async::Ready(Some(PingEvent { peer, result })) => match result {
Ok(PingSuccess::Ping { rtt }) =>
return Ok(Async::Ready((pid1.clone(), peer, rtt))),
_ => {}
match swarm1.next().await.unwrap().unwrap() {
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
return (pid1.clone(), peer, rtt)
},
_ => {
if !listening {
for l in Swarm::listeners(&swarm1) {
tx.send(l.clone()).unwrap();
listening = true;
}
}
return Ok(Async::NotReady)
}
_ => {}
}
}
});
};

let pid2 = peer2_id.clone();
let mut dialing = false;
let peer2 = future::poll_fn(move || -> Result<_, ()> {
let peer2 = async move {
Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap();

loop {
match swarm2.poll().expect("Error while polling swarm") {
Async::Ready(Some(PingEvent { peer, result })) => match result {
Ok(PingSuccess::Ping { rtt }) =>
return Ok(Async::Ready((pid2.clone(), peer, rtt))),
_ => {}
match swarm2.next().await.unwrap().unwrap() {
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
return (pid2.clone(), peer, rtt)
},
_ => {
if !dialing {
Swarm::dial_addr(&mut swarm2, rx.recv().unwrap()).unwrap();
dialing = true;
}
return Ok(Async::NotReady)
}
_ => {}
}
}
});
};

let result = peer1.select(peer2).map_err(|e| panic!(e));
let ((p1, p2, rtt), _) = futures::executor::block_on(result).unwrap();
let result = future::select(Box::pin(peer1), Box::pin(peer2));
let ((p1, p2, rtt), _) = futures::executor::block_on(result).factor_first();
assert!(p1 == peer1_id && p2 == peer2_id || p1 == peer2_id && p2 == peer1_id);
assert!(rtt < Duration::from_millis(50));
}

fn mk_transport() -> (
PeerId,
Boxed<
(PeerId, Yamux<Negotiated<SecioOutput<Negotiated<TcpTransStream>>>>),
(PeerId, StreamMuxerBox),
EitherError<EitherError<io::Error, UpgradeError<SecioError>>, UpgradeError<io::Error>>
>
) {
Expand All @@ -115,8 +102,8 @@ fn mk_transport() -> (
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(id_keys))
.multiplex(yamux::Config::default())
.multiplex(libp2p_yamux::Config::default())
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.boxed();
(peer_id, transport)
}