From a1fd69ffae57b7c7a3678fddc93fe7a93c362c42 Mon Sep 17 00:00:00 2001 From: Robert Ream Date: Mon, 10 May 2021 12:28:49 -0700 Subject: [PATCH] high_bandwidth --- srt-tokio/tests/high_bandwidth.rs | 81 +++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 srt-tokio/tests/high_bandwidth.rs diff --git a/srt-tokio/tests/high_bandwidth.rs b/srt-tokio/tests/high_bandwidth.rs new file mode 100644 index 00000000..4dc64001 --- /dev/null +++ b/srt-tokio/tests/high_bandwidth.rs @@ -0,0 +1,81 @@ +use std::{ + collections::VecDeque, + time::{Duration, Instant}, +}; + +use anyhow::Error; +use bytes::Bytes; +use futures::{stream, SinkExt, Stream, StreamExt, TryStreamExt}; +use log::info; +use srt_tokio::SrtSocketBuilder; + +fn stream_exact(duration: Duration) -> impl Stream { + let message = Bytes::from(vec![5; 1024]); + let last = tokio::time::Instant::now(); + stream::unfold((message, last, duration), |(message, last, d)| async move { + tokio::time::sleep_until(last + d).await; + Some((message.clone(), (message, last + d, d))) + }) +} + +#[tokio::test] +async fn high_bandwidth() -> Result<(), Error> { + let _ = pretty_env_logger::try_init(); + + let sender_fut = async { + let mut sock = SrtSocketBuilder::new_connect("127.0.0.1:6654") + .connect() + .await?; + + // send 1gbps (125 MB/s) + let mut stream_gbps = stream_exact(Duration::from_micros(1_000_000 / 1024 / 35)) + .map(|bytes| Ok((Instant::now(), bytes))) + .boxed(); + + info!("Sender all connected"); + + sock.send_all(&mut stream_gbps).await?; + + Ok::<_, Error>(()) + }; + + let recv_fut = async { + let mut sock = SrtSocketBuilder::new_listen() + .local_port(6654) + .connect() + .await?; + + let mut window = VecDeque::new(); + let mut bytes_received = 0; + let window_size = Duration::from_secs(1); + + while let Some((_, bytes)) = sock.try_next().await? { + bytes_received += bytes.len(); + window.push_back((Instant::now(), bytes.len())); + + while let Some((a, bytes)) = window.front() { + if Instant::now() - *a > window_size { + bytes_received -= *bytes; + window.pop_front(); + } else { + break; + } + } + + print!( + "Received {:10.3}MB, rate={:10.3}MB/s\r", + bytes_received as f64 / 1024. / 1024., + bytes_received as f64 / 1024. / 1024. / window_size.as_secs_f64(), + ); + } + + Ok::<_, Error>(()) + }; + + tokio::spawn(sender_fut); + tokio::spawn(recv_fut); + + tokio::time::sleep(Duration::from_secs(60)).await; + + Ok(()) +}