Skip to content

Commit

Permalink
Implement Sink for Io.
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman S. Borschel committed Feb 15, 2021
1 parent 24d5464 commit eb6c732
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 131 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ readme = "README.md"
edition = "2018"

[dependencies]
futures = { version = "0.3.4", default-features = false, features = ["std"] }
futures = { version = "0.3.12", default-features = false, features = ["std"] }
log = "0.4.8"
nohash-hasher = "0.2"
parking_lot = "0.11"
Expand Down
48 changes: 15 additions & 33 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ use futures::{
channel::{mpsc, oneshot},
future::{self, Either},
prelude::*,
stream::{Fuse, FusedStream}
stream::{Fuse, FusedStream},
sink::SinkExt,
};
use nohash_hasher::IntMap;
use std::{fmt, io, sync::Arc, task::{Context, Poll}};
Expand Down Expand Up @@ -395,9 +396,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
let socket = &mut self.socket;
let io = future::poll_fn(move |cx| {
// Progress writing.
match socket.get_mut().poll_send::<()>(cx, None) {
frame::PollSend::Pending(_) => {}
frame::PollSend::Ready(res) => {
match socket.get_mut().poll_ready_unpin(cx) {
Poll::Pending => {}
Poll::Ready(res) => {
res.or(Err(ConnectionError::Closed))?;
if stream_receiver_paused {
return Poll::Ready(Result::Ok(IoEvent::OutboundReady))
Expand Down Expand Up @@ -883,7 +884,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
/// Try to flush the underlying I/O stream, without waiting for it.
async fn flush_nowait(&mut self) -> Result<()> {
future::poll_fn(|cx| {
let _ = self.socket.get_mut().poll_flush(cx)?;
let _ = self.socket.get_mut().poll_flush_unpin(cx)?;
Poll::Ready(Ok(()))
}).await
}
Expand Down Expand Up @@ -1009,37 +1010,18 @@ async fn send<T: AsyncRead + AsyncWrite + Unpin>(
control_receiver: &mut Pausable<mpsc::Receiver<ControlCommand>>,
frame: impl Into<Frame<()>>
) -> Result<()> {
let mut frame = Some(frame.into());
future::poll_fn(move |cx| {
let next = frame.take().expect("Frame has not yet been taken by `io`.");
match io.get_mut().poll_send(cx, Some(next)) {
frame::PollSend::Pending(Some(f)) => {
debug_assert!(stream_receiver.is_paused());
log::debug!("{}: send: Prior write pending. Waiting.", id);
frame = Some(f);
return Poll::Pending
}
frame::PollSend::Pending(None) => {
// The frame could not yet fully be written to the underlying
// socket, so we pause the processing of commands in order to
// pause writing while still being able to read from the socket.
// The only frames that may still be sent while commands are paused
// are as a reaction to frames being read, which in turn allows
// the remote to make progress eventually, if it should
// currently be blocked on writing. In this way unnecessary
// deadlocks between peers blocked on writing are avoided.
log::trace!("{}: send: Write pending. Continuing with paused command streams.", id);
SinkExt::feed(io.get_mut(), frame.into()).await?;
// Check if the write "goes through" or is pending due to back-pressure
// from the underlying socket.
future::poll_fn(|cx| {
match io.get_mut().poll_ready_unpin(cx)? {
Poll::Pending => {
log::debug!("{}: send: Write pending. Continuing with paused command streams.", id);
stream_receiver.pause();
control_receiver.pause();
return Poll::Ready(Ok(()))
}
frame::PollSend::Ready(Err(e)) => {
return Poll::Ready(Err(e.into()))
}
frame::PollSend::Ready(Ok(())) => {
// Note: We leave the unpausing of the command streams to `Connection::next()`.
return Poll::Ready(Ok(()))
return Poll::Ready(Result::Ok(()))
}
Poll::Ready(()) => return Poll::Ready(Ok(()))
}
}).await
}
Expand Down
2 changes: 1 addition & 1 deletion src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::future::Either;
use header::{Header, StreamId, Data, WindowUpdate, GoAway, Ping};
use std::{convert::TryInto, num::TryFromIntError};

pub(crate) use io::{Io, PollSend};
pub(crate) use io::Io;
pub use io::FrameDecodeError;

/// A Yamux message frame consisting of header and body.
Expand Down
152 changes: 56 additions & 96 deletions src/frame/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,80 +33,65 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Io<T> {
max_body_len: max_frame_body_len
}
}
}

/// The stages of writing a new `Frame`.
#[derive(Debug)]
enum WriteState {
Init,
Header {
header: [u8; header::HEADER_SIZE],
buffer: Vec<u8>,
offset: usize
},
Body {
buffer: Vec<u8>,
offset: usize
}
}

impl<T: AsyncRead + AsyncWrite + Unpin> Sink<Frame<()>> for Io<T> {
type Error = io::Error;

/// Continue writing on the underlying connection, possibly with a new frame.
///
/// If there is no data pending to be written from a prior frame, an attempt
/// is made at writing the new frame, if any.
///
/// If there is data pending to be written from a prior frame, an attempt is made to
/// complete the pending writes before accepting the new frame, if any. If
/// the new frame cannot be accepted due to pending writes, it is returned.
pub(crate) fn poll_send<A>(&mut self, cx: &mut Context<'_>, mut frame: Option<Frame<A>>)
-> PollSend<A, io::Result<()>>
{
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), Self::Error>> {
let this = Pin::into_inner(self);
loop {
match &mut self.write_state {
WriteState::Init => {
if let Some(f) = frame.take() {
let header = header::encode(&f.header);
let buffer = f.body;
self.write_state = WriteState::Header { header, buffer, offset: 0 };
} else {
return PollSend::Ready(Ok(()))
}
}
match &mut this.write_state {
WriteState::Init => return Poll::Ready(Ok(())),
WriteState::Header { header, buffer, ref mut offset } => {
match Pin::new(&mut self.io).poll_write(cx, &header[*offset ..]) {
Poll::Pending => {
if let Some(f) = frame.take() {
return PollSend::Pending(Some(f))
}
return PollSend::Pending(None)
}
Poll::Ready(Err(e)) => {
if let Some(f) = frame.take() {
return PollSend::Pending(Some(f))
}
return PollSend::Ready(Err(e))
}
match Pin::new(&mut this.io).poll_write(cx, &header[*offset ..]) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(n)) => {
if n == 0 {
return PollSend::Ready(Err(io::ErrorKind::WriteZero.into()))
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
*offset += n;
if *offset == header.len() {
if buffer.len() > 0 {
let buffer = std::mem::take(buffer);
self.write_state = WriteState::Body { buffer, offset: 0 };
this.write_state = WriteState::Body { buffer, offset: 0 };
} else {
self.write_state = WriteState::Init;
this.write_state = WriteState::Init;
}
}
}
}
}
WriteState::Body { buffer, ref mut offset } => {
match Pin::new(&mut self.io).poll_write(cx, &buffer[*offset ..]) {
Poll::Pending => {
if let Some(f) = frame.take() {
return PollSend::Pending(Some(f))
}
return PollSend::Pending(None)
}
Poll::Ready(Err(e)) => {
if let Some(f) = frame.take() {
return PollSend::Pending(Some(f))
}
return PollSend::Ready(Err(e))
}
match Pin::new(&mut this.io).poll_write(cx, &buffer[*offset ..]) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(n)) => {
if n == 0 {
return PollSend::Ready(Err(io::ErrorKind::WriteZero.into()))
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
*offset += n;
if *offset == buffer.len() {
self.write_state = WriteState::Init;
this.write_state = WriteState::Init;
}
}
}
Expand All @@ -115,54 +100,29 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Io<T> {
}
}

pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.poll_send(cx, Option::<Frame<()>>::None) {
PollSend::Pending(_) => return Poll::Pending,
PollSend::Ready(r) => r?
}
Pin::new(&mut self.io).poll_flush(cx)
}

pub(crate) async fn close(&mut self) -> io::Result<()> {
future::poll_fn(|cx| self.poll_flush(cx)).await?;
self.io.close().await
fn start_send(self: Pin<&mut Self>, f: Frame<()>) -> Result<(), Self::Error> {
let header = header::encode(&f.header);
let buffer = f.body;
self.get_mut().write_state = WriteState::Header { header, buffer, offset: 0 };
Ok(())
}

#[cfg(test)]
async fn flush(&mut self) -> io::Result<()> {
future::poll_fn(|cx| self.poll_flush(cx)).await
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), Self::Error>> {
let this = Pin::into_inner(self);
ready!(this.poll_ready_unpin(cx))?;
Pin::new(&mut this.io).poll_flush(cx)
}

#[cfg(test)]
async fn send<A>(&mut self, frame: Frame<A>) -> io::Result<()> {
let mut frame = Some(frame);
future::poll_fn(|cx| {
match self.poll_send(cx, Some(frame.take().expect("`frame` not yet taken"))) {
PollSend::Pending(f) => { frame = f; return Poll::Pending }
PollSend::Ready(r) => return Poll::Ready(r)
}
}).await
}
}

/// The result of [`Io::poll_send`].
pub enum PollSend<A, B> {
Pending(Option<Frame<A>>),
Ready(B),
}

/// The stages of writing a new `Frame`.
#[derive(Debug)]
enum WriteState {
Init,
Header {
header: [u8; header::HEADER_SIZE],
buffer: Vec<u8>,
offset: usize
},
Body {
buffer: Vec<u8>,
offset: usize
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), Self::Error>> {
let this = Pin::into_inner(self);
ready!(this.poll_ready_unpin(cx))?;
Pin::new(&mut this.io).poll_close(cx)
}
}

Expand Down

0 comments on commit eb6c732

Please sign in to comment.