-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow reads concurrent to pending writes. #112
Conversation
Thereby propagate pending write back-pressure via the paused command channels, rather than by "blocking" `Connection::next()` itself until a write completes. Notably, no new buffers are introduced. When a frame write cannot complete, command channels are paused and I/O reads can continue while the write is pending. The paused command channels exercise write back-pressure on the streams and API and ensure that the only frames we still try to send are those as a result of reading a frame - these then indeed wait for completion of the prior pending send operation, if any, but since it is done as a result of reading a frame, the remote can in turn write again, should it have been waiting to be able to do so before it in turn can read again. Unexpected write deadlocks of peers which otherwise read & write concurrently from substreams can thus be avoided.
To be fair, one can say that I essentially introduced a buffer of 1 on top of the send buffer / capacity of the connection in order to break this particular deadlock. The question then is whether that is worthwhile doing and can be considered a small enhancement, or is just more complexity for too little gain. Fundamentally, one eventually needs to either wait/block on sending or abort the connection if writes cannot proceed, as it is not possible to only read (i.e. at least window updates must be sent). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the general structure of the approach taken here, namely to stop polling the stream- and command-receiver when the socket is busy sending an item, as well as introducing a one-item send buffer into Io
.
It did take me a bit of time to fully understand the concepts introduced in this pull request (PollSend
) as well as how they interact with the existing Pausable
wrapper.
I have not yet put enough thoughts into the suggestions below to consider them a solid review, thus please take the below with a grain of salt. I have played with your changes locally, but I am not yet at a state where my local changes compile.
- How about implementing
Sink
forIo
, keeping your new "buffer of 1 on top of the send buffer". - How about in
Connection::next
only pollself.stream_receiver
andself.command_receiver
whenself.socket.poll_ready
returnsPoll::Ready(Ok())
? - How about in
Connection::on_control_command
,Connection::on_stream_command
andConnection::on_frame
to useSinkExt::feed
instead of the customsend
function?
I would expect using the concepts from the futures
library, namely Sink
and SinkExt
, instead of our own PollSend
and Pausable
to make this pull request easier to understand, given that one is likely already familiar with the former concepts.
What do you think? Happy to expand on my ideas, or try to provide a compiling version.
My work-in-progress changes on top of this pull request
diff --git a/src/connection.rs b/src/connection.rs
index 60bed2c..355af6c 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -392,27 +392,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
num_terminated += 1;
Either::Left(future::pending())
} else {
- let socket = &mut self.socket;
- let io = future::poll_fn(move |cx| {
- // Progress writing.
- match socket.get_mut().poll_send::<()>(cx, None) {
- frame::PollSend::Pending(_) => {}
- frame::PollSend::Ready(res) => {
- res.or(Err(ConnectionError::Closed))?;
- if stream_receiver_paused {
- return Poll::Ready(Result::Ok(IoEvent::OutboundReady))
- }
- }
- }
- // Progress reading.
- let next_frame = match futures::ready!(socket.poll_next_unpin(cx)) {
- None => Ok(None),
- Some(Err(e)) => Err(e.into()),
- Some(Ok(f)) => Ok(Some(f))
- };
- Poll::Ready(Ok(IoEvent::Inbound(next_frame)))
- });
- Either::Right(io)
+ Either::Right(self.socket.try_next().err_into())
};
let mut next_stream_command =
@@ -420,7 +400,19 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
num_terminated += 1;
Either::Left(future::pending())
} else {
- Either::Right(self.stream_receiver.next())
+ Either::Right(
+ async {
+ // mxinden: Only consider polling the stream_receiver when the socket is ready to
+ // accept a new item.
+ //
+ // mxinden: Likely does not pass the borrow checker with the socket
+ // being borrowed both here and below for the next_control_command.
+ future::poll_fn(|cx| {
+ self.socket.poll_ready(cx)
+ }).await;
+ self.stream_receiver.next()
+ }
+ )
};
let mut next_control_command =
@@ -428,7 +420,18 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
num_terminated += 1;
Either::Left(future::pending())
} else {
- Either::Right(self.control_receiver.next())
+ Either::Right(
+ async {
+ // mxinden: Only consider polling the control_receiver when the socket is ready
+ // to accept a new item.
+ future::poll_fn(|cx| {
+ future::poll_fn(|cx| {
+ self.socket.poll_ready(cx)
+ }).await;
+ self.control_receiver.next()
+
+ }
+ )
};
if num_terminated == 3 {
@@ -504,7 +507,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
let mut frame = Frame::window_update(id, extra_credit);
frame.header_mut().syn();
log::trace!("{}/{}: sending initial {}", self.id, id, frame.header());
- self.send(frame).await.or(Err(ConnectionError::Closed))?
+ // mxinden: Use SinkExt::feed which will send the item into the sink, but does
+ // not consecutively flush it.
+ self.socket.get_mut().feed(frame).await.or(Err(ConnectionError::Closed))?
}
let stream = {
let config = self.config.clone();
@@ -557,10 +562,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
Ok(())
}
- async fn send(&mut self, f: impl Into<Frame<()>>) -> Result<()> {
- send(self.id, &mut self.socket, &mut self.stream_receiver, &mut self.control_receiver, f).await
- }
-
/// Process a command from one of our `Stream`s.
async fn on_stream_command(&mut self, cmd: Option<StreamCommand>) -> Result<()> {
match cmd {
@@ -996,54 +997,6 @@ enum IoEvent {
OutboundReady,
}
-/// Sends a frame on the given `io` stream.
-///
-/// If the frame is taken by `io` but cannot be fully sent, the command
-/// receivers are paused, without waiting for completion.
-///
-/// If a prior send operation is still pending, waits for its completion.
-async fn send<T: AsyncRead + AsyncWrite + Unpin>(
- id: Id,
- io: &mut Fuse<frame::Io<T>>,
- stream_receiver: &mut Pausable<mpsc::Receiver<StreamCommand>>,
- control_receiver: &mut Pausable<mpsc::Receiver<ControlCommand>>,
- frame: impl Into<Frame<()>>
-) -> Result<()> {
- let mut frame = Some(frame.into());
- future::poll_fn(move |cx| {
- let next = frame.take().expect("Frame has not yet been taken by `io`.");
- match io.get_mut().poll_send(cx, Some(next)) {
- frame::PollSend::Pending(Some(f)) => {
- debug_assert!(stream_receiver.is_paused());
- log::debug!("{}: send: Prior write pending. Waiting.", id);
- frame = Some(f);
- return Poll::Pending
- }
- frame::PollSend::Pending(None) => {
- // The frame could not yet fully be written to the underlying
- // socket, so we pause the processing of commands in order to
- // pause writing while still being able to read from the socket.
- // The only frames that may still be sent while commands are paused
- // are as a reaction to frames being read, which in turn allows
- // the remote to make progress eventually, if it should
- // currently be blocked on writing. In this way unnecessary
- // deadlocks between peers blocked on writing are avoided.
- log::trace!("{}: send: Write pending. Continuing with paused command streams.", id);
- stream_receiver.pause();
- control_receiver.pause();
- return Poll::Ready(Ok(()))
- }
- frame::PollSend::Ready(Err(e)) => {
- return Poll::Ready(Err(e.into()))
- }
- frame::PollSend::Ready(Ok(())) => {
- // Note: We leave the unpausing of the command streams to `Connection::next()`.
- return Poll::Ready(Ok(()))
- }
- }
- }).await
-}
-
/// Turn a Yamux [`Connection`] into a [`futures::Stream`].
pub fn into_stream<T>(c: Connection<T>) -> impl futures::stream::Stream<Item = Result<Stream>>
where
diff --git a/src/frame/io.rs b/src/frame/io.rs
index f909aec..13f6f4f 100644
--- a/src/frame/io.rs
+++ b/src/frame/io.rs
@@ -34,45 +34,48 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Io<T> {
}
}
- /// Continue writing on the underlying connection, possibly with a new frame.
- ///
- /// If there is no data pending to be written from a prior frame, an attempt
- /// is made at writing the new frame, if any.
- ///
- /// If there is data pending to be written from a prior frame, an attempt is made to
- /// complete the pending writes before accepting the new frame, if any. If
- /// the new frame cannot be accepted due to pending writes, it is returned.
- pub(crate) fn poll_send<A>(&mut self, cx: &mut Context<'_>, mut frame: Option<Frame<A>>)
- -> PollSend<A, io::Result<()>>
- {
+ #[cfg(test)]
+ async fn flush(&mut self) -> io::Result<()> {
+ future::poll_fn(|cx| self.poll_flush(cx)).await
+ }
+
+ #[cfg(test)]
+ async fn send<A>(&mut self, frame: Frame<A>) -> io::Result<()> {
+ let mut frame = Some(frame);
+ future::poll_fn(|cx| {
+ match self.poll_send(cx, Some(frame.take().expect("`frame` not yet taken"))) {
+ PollSend::Pending(f) => { frame = f; return Poll::Pending }
+ PollSend::Ready(r) => return Poll::Ready(r)
+ }
+ }).await
+ }
+
+ pub(crate) async fn close(&mut self) -> io::Result<()> {
+ future::poll_fn(|cx| self.poll_flush(cx)).await?;
+ self.io.close().await
+ }
+}
+
+impl <T: AsyncWrite + Unpin, A> Sink<Frame<A>> for Io<T> {
+ type Error = io::Error;
+
+ fn poll_ready(
+ self: Pin<&mut Self>,
+ cx: &mut Context
+ ) -> Poll<Result<(), Self::Error>> {
loop {
match &mut self.write_state {
WriteState::Init => {
- if let Some(f) = frame.take() {
- let header = header::encode(&f.header);
- let buffer = f.body;
- self.write_state = WriteState::Header { header, buffer, offset: 0 };
- } else {
- return PollSend::Ready(Ok(()))
- }
+ return Poll::Ready(Ok(()))
}
WriteState::Header { header, buffer, ref mut offset } => {
- match Pin::new(&mut self.io).poll_write(cx, &header[*offset ..]) {
- Poll::Pending => {
- if let Some(f) = frame.take() {
- return PollSend::Pending(Some(f))
- }
- return PollSend::Pending(None)
- }
- Poll::Ready(Err(e)) => {
- if let Some(f) = frame.take() {
- return PollSend::Pending(Some(f))
- }
+ match ready!(Pin::new(&mut self.io).poll_write(cx, &header[*offset ..])) {
+ Err(e) => {
return PollSend::Ready(Err(e))
}
- Poll::Ready(Ok(n)) => {
+ Ok(n) => {
if n == 0 {
- return PollSend::Ready(Err(io::ErrorKind::WriteZero.into()))
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
*offset += n;
if *offset == header.len() {
@@ -87,20 +90,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Io<T> {
}
}
WriteState::Body { buffer, ref mut offset } => {
- match Pin::new(&mut self.io).poll_write(cx, &buffer[*offset ..]) {
- Poll::Pending => {
- if let Some(f) = frame.take() {
- return PollSend::Pending(Some(f))
- }
- return PollSend::Pending(None)
+ match ready!(Pin::new(&mut self.io).poll_write(cx, &buffer[*offset ..])) {
+ Err(e) => {
+ return Poll::Ready(Err(e))
}
- Poll::Ready(Err(e)) => {
- if let Some(f) = frame.take() {
- return PollSend::Pending(Some(f))
- }
- return PollSend::Ready(Err(e))
- }
- Poll::Ready(Ok(n)) => {
+ Ok(n) => {
if n == 0 {
return PollSend::Ready(Err(io::ErrorKind::WriteZero.into()))
}
@@ -115,33 +109,42 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Io<T> {
}
}
- pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- match self.poll_send(cx, Option::<Frame<()>>::None) {
- PollSend::Pending(_) => return Poll::Pending,
- PollSend::Ready(r) => r?
+ /// Continue writing on the underlying connection, possibly with a new frame.
+ ///
+ /// If there is no data pending to be written from a prior frame, an attempt
+ /// is made at writing the new frame, if any.
+ ///
+ /// If there is data pending to be written from a prior frame, an attempt is made to
+ /// complete the pending writes before accepting the new frame, if any. If
+ /// the new frame cannot be accepted due to pending writes, it is returned.
+ pub(crate) fn start_send(self: Pin<&mut Self>, mut f: Frame<A>)
+ -> Result<(), Self::Error>
+ {
+ loop {
+ match &mut self.write_state {
+ WriteState::Init => {
+ let header = header::encode(&f.header);
+ let buffer = f.body;
+ self.write_state = WriteState::Header { header, buffer, offset: 0 };
+ }
+ WriteState::Header { header, buffer, ref mut offset } => {
+ panic!();
+ }
+ WriteState::Body { buffer, ref mut offset } => {
+ panic!();
+ }
+ }
}
- Pin::new(&mut self.io).poll_flush(cx)
- }
-
- pub(crate) async fn close(&mut self) -> io::Result<()> {
- future::poll_fn(|cx| self.poll_flush(cx)).await?;
- self.io.close().await
}
- #[cfg(test)]
- async fn flush(&mut self) -> io::Result<()> {
- future::poll_fn(|cx| self.poll_flush(cx)).await
+ pub(crate) fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.poll_ready(cx)
}
-
- #[cfg(test)]
- async fn send<A>(&mut self, frame: Frame<A>) -> io::Result<()> {
- let mut frame = Some(frame);
- future::poll_fn(|cx| {
- match self.poll_send(cx, Some(frame.take().expect("`frame` not yet taken"))) {
- PollSend::Pending(f) => { frame = f; return Poll::Pending }
- PollSend::Ready(r) => return Poll::Ready(r)
- }
- }).await
+ fn poll_close(
+ self: Pin<&mut Self>,
+ cx: &mut Context
+ ) -> Poll<Result<(), Self::Error>> {
+ unimplemented!();
}
}
Thanks for the review. I will take a closer look at your suggestions and report back here. I also want to give this some more thought and testing. Meanwhile, I think it makes sense to release the current state in |
As you mentioned in a comment, the main problem with the changes in |
As a first improvement based on your suggestions, |
A small update: The write deadlock seems to be relatively easy to reproduce with TCP in the context of the test in For anyone reading this without following all the details, I'd like to emphasise though again that even with the latest release a mutual write deadlock is unlikely to be encountered in practice as it requires sending large amounts of data simultaneously from both endpoints, coupled with insufficiently sized TCP send buffers, and increasing TCP send buffers can always solve the problem. Still, I am convinced as a result of further testing that the changes proposed here, namely to pause command streams and continue reading from the socket while the current frame write is pending, make it even more unlikely to occur and hence are beneficial and desirable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the additional refactorings.
I would like to suggest one more change, namely the removal of the Pausable
wrapper on the stream command channel. Related to that suggestion, already raised earlier, we ran into the issue of multiple borrows of socket
(see comment).
I found a way around the borrow issue. romanb#1 implements the suggestion. It compiles and passes all tests, including the deadlock test. I hope for the patch to be self-explanatory. Please let me know in case it isn't.
What do you think @romanb? Would this be a simplification?
src/connection: Remove Pausable wrapper around stream command
…-write-concurrent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, especially for the thorough testing.
As far as I can tell this pull request is ready to be merged. @tomaka would you mind approving the pull request so it can be merged? |
In #112, flushing is no longer awaited, but instead, is called at each iteration of the next loop in Connection. Unfortunately, when using wasm-ext with Noise, frames may never get written until the next iteration is triggered by another event (incoming frame or stream/control events). The fix here is to make sure that flushing the I/O socket gets progressed in the main loop.
Currently, all write operations (e.g. for a particular frame sent by a substream) are
.await
ed for completion within theConnection::next()
loop, thus prohibiting reading from the underlying socket while the write is pending. This can lead to unexpected write deadlocks between two peers in specific, though likely uncommon, situations. See #104 for further details and a test.This PR proposes to allow concurrent reads while a frame write is pending, thereby propagating pending write back-pressure via the paused command channels, rather than doing so by "blocking"
Connection::next()
entirely until such a write completes.Notably, no new buffers are introduced. We still send one frame at a time from the context of a
Connection
. However, when a frame write cannot complete, the command channels are paused and I/O reads can continue while the write is pending. The paused command channels exercise write back-pressure on the streams and API and ensure that the only frames we still try to send are those as a result of reading a frame - these writes then indeed wait for completion of the prior pending send operation, if any, but since it is done as a result of reading a frame, the remote can in turn usually write again, should it have been waiting to be able to do so before it in turn can read again. Unexpected write deadlocks of peers which otherwise appear to read & write concurrently from substreams can thus be avoided. I'm not entirely sure if there may not still be situations where a write deadlock can be provoked but I did not want to introduce new "pending frames" buffers, which would again be bounded and raise the question of what to do when these are also full. I thus came up with this "refinement" of the status quo without new buffers.The previously ignored test introduced in #104 is hereby enabled.