Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reads concurrent to pending writes. #112

Merged
merged 15 commits into from
Apr 12, 2021

Conversation

romanb
Copy link
Contributor

@romanb romanb commented Feb 11, 2021

Currently, all write operations (e.g. for a particular frame sent by a substream) are .awaited for completion within the Connection::next() loop, thus prohibiting reading from the underlying socket while the write is pending. This can lead to unexpected write deadlocks between two peers in specific, though likely uncommon, situations. See #104 for further details and a test.

This PR proposes to allow concurrent reads while a frame write is pending, thereby propagating pending write back-pressure via the paused command channels, rather than doing so by "blocking"Connection::next() entirely until such a write completes.

Notably, no new buffers are introduced. We still send one frame at a time from the context of a Connection. However, when a frame write cannot complete, the command channels are paused and I/O reads can continue while the write is pending. The paused command channels exercise write back-pressure on the streams and API and ensure that the only frames we still try to send are those as a result of reading a frame - these writes then indeed wait for completion of the prior pending send operation, if any, but since it is done as a result of reading a frame, the remote can in turn usually write again, should it have been waiting to be able to do so before it in turn can read again. Unexpected write deadlocks of peers which otherwise appear to read & write concurrently from substreams can thus be avoided. I'm not entirely sure if there may not still be situations where a write deadlock can be provoked but I did not want to introduce new "pending frames" buffers, which would again be bounded and raise the question of what to do when these are also full. I thus came up with this "refinement" of the status quo without new buffers.

The previously ignored test introduced in #104 is hereby enabled.

Thereby propagate pending write back-pressure via the
paused command channels, rather than by "blocking"
`Connection::next()` itself until a write completes.

Notably, no new buffers are introduced. When a frame write
cannot complete, command channels are paused and I/O
reads can continue while the write is pending. The paused
command channels exercise write back-pressure on the streams
and API and ensure that the only frames we still try
to send are those as a result of reading a frame - these
then indeed wait for completion of the prior pending
send operation, if any, but since it is done as a result
of reading a frame, the remote can in turn write again,
should it have been waiting to be able to do so before
it in turn can read again. Unexpected write deadlocks
of peers which otherwise read & write concurrently
from substreams can thus be avoided.
@romanb romanb added the enhancement New feature or request label Feb 11, 2021
@romanb
Copy link
Contributor Author

romanb commented Feb 12, 2021

I'm not entirely sure if there may not still be situations where a write deadlock can be provoked but I did not want to introduce new "pending frames" buffers, [..]

To be fair, one can say that I essentially introduced a buffer of 1 on top of the send buffer / capacity of the connection in order to break this particular deadlock. The question then is whether that is worthwhile doing and can be considered a small enhancement, or is just more complexity for too little gain. Fundamentally, one eventually needs to either wait/block on sending or abort the connection if writes cannot proceed, as it is not possible to only read (i.e. at least window updates must be sent).

Copy link
Member

@mxinden mxinden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the general structure of the approach taken here, namely to stop polling the stream- and command-receiver when the socket is busy sending an item, as well as introducing a one-item send buffer into Io.

It did take me a bit of time to fully understand the concepts introduced in this pull request (PollSend) as well as how they interact with the existing Pausable wrapper.

I have not yet put enough thoughts into the suggestions below to consider them a solid review, thus please take the below with a grain of salt. I have played with your changes locally, but I am not yet at a state where my local changes compile.

  • How about implementing Sink for Io, keeping your new "buffer of 1 on top of the send buffer".
  • How about in Connection::next only poll self.stream_receiver and self.command_receiver when self.socket.poll_ready returns Poll::Ready(Ok())?
  • How about in Connection::on_control_command, Connection::on_stream_command and Connection::on_frame to use SinkExt::feed instead of the custom send function?

I would expect using the concepts from the futures library, namely Sink and SinkExt, instead of our own PollSend and Pausable to make this pull request easier to understand, given that one is likely already familiar with the former concepts.

What do you think? Happy to expand on my ideas, or try to provide a compiling version.

My work-in-progress changes on top of this pull request
diff --git a/src/connection.rs b/src/connection.rs
index 60bed2c..355af6c 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -392,27 +392,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
                     num_terminated += 1;
                     Either::Left(future::pending())
                 } else {
-                    let socket = &mut self.socket;
-                    let io = future::poll_fn(move |cx| {
-                        // Progress writing.
-                        match socket.get_mut().poll_send::<()>(cx, None) {
-                            frame::PollSend::Pending(_) => {}
-                            frame::PollSend::Ready(res) => {
-                                res.or(Err(ConnectionError::Closed))?;
-                                if stream_receiver_paused {
-                                    return Poll::Ready(Result::Ok(IoEvent::OutboundReady))
-                                }
-                            }
-                        }
-                        // Progress reading.
-                        let next_frame = match futures::ready!(socket.poll_next_unpin(cx)) {
-                            None => Ok(None),
-                            Some(Err(e)) => Err(e.into()),
-                            Some(Ok(f)) => Ok(Some(f))
-                        };
-                        Poll::Ready(Ok(IoEvent::Inbound(next_frame)))
-                    });
-                    Either::Right(io)
+                    Either::Right(self.socket.try_next().err_into())
                 };
 
             let mut next_stream_command =
@@ -420,7 +400,19 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
                     num_terminated += 1;
                     Either::Left(future::pending())
                 } else {
-                    Either::Right(self.stream_receiver.next())
+                    Either::Right(
+                        async {
+                            // mxinden: Only consider polling the stream_receiver when the socket is ready to
+                            // accept a new item.
+                            //
+                            // mxinden: Likely does not pass the borrow checker with the socket
+                            // being borrowed both here and below for the next_control_command.
+                            future::poll_fn(|cx| {
+                                self.socket.poll_ready(cx)
+                            }).await;
+                            self.stream_receiver.next()
+                        }
+                    )
                 };
 
             let mut next_control_command =
@@ -428,7 +420,18 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
                     num_terminated += 1;
                     Either::Left(future::pending())
                 } else {
-                    Either::Right(self.control_receiver.next())
+                    Either::Right(
+                        async {
+                            // mxinden: Only consider polling the control_receiver when the socket is ready
+                            // to accept a new item.
+                            future::poll_fn(|cx| {
+                            future::poll_fn(|cx| {
+                                self.socket.poll_ready(cx)
+                            }).await;
+                            self.control_receiver.next()
+
+                        }
+                    )
                 };
 
             if num_terminated == 3 {
@@ -504,7 +507,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
                     let mut frame = Frame::window_update(id, extra_credit);
                     frame.header_mut().syn();
                     log::trace!("{}/{}: sending initial {}", self.id, id, frame.header());
-                    self.send(frame).await.or(Err(ConnectionError::Closed))?
+                    // mxinden: Use SinkExt::feed which will send the item into the sink, but does
+                    // not consecutively flush it.
+                    self.socket.get_mut().feed(frame).await.or(Err(ConnectionError::Closed))?
                 }
                 let stream = {
                     let config = self.config.clone();
@@ -557,10 +562,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
         Ok(())
     }
 
-    async fn send(&mut self, f: impl Into<Frame<()>>) -> Result<()> {
-        send(self.id, &mut self.socket, &mut self.stream_receiver, &mut self.control_receiver, f).await
-    }
-
     /// Process a command from one of our `Stream`s.
     async fn on_stream_command(&mut self, cmd: Option<StreamCommand>) -> Result<()> {
         match cmd {
@@ -996,54 +997,6 @@ enum IoEvent {
     OutboundReady,
 }
 
-/// Sends a frame on the given `io` stream.
-///
-/// If the frame is taken by `io` but cannot be fully sent, the command
-/// receivers are paused, without waiting for completion.
-///
-/// If a prior send operation is still pending, waits for its completion.
-async fn send<T: AsyncRead + AsyncWrite + Unpin>(
-    id: Id,
-    io: &mut Fuse<frame::Io<T>>,
-    stream_receiver: &mut Pausable<mpsc::Receiver<StreamCommand>>,
-    control_receiver: &mut Pausable<mpsc::Receiver<ControlCommand>>,
-    frame: impl Into<Frame<()>>
-) -> Result<()> {
-    let mut frame = Some(frame.into());
-    future::poll_fn(move |cx| {
-        let next = frame.take().expect("Frame has not yet been taken by `io`.");
-        match io.get_mut().poll_send(cx, Some(next)) {
-            frame::PollSend::Pending(Some(f)) => {
-                debug_assert!(stream_receiver.is_paused());
-                log::debug!("{}: send: Prior write pending. Waiting.", id);
-                frame = Some(f);
-                return Poll::Pending
-            }
-            frame::PollSend::Pending(None) => {
-                // The frame could not yet fully be written to the underlying
-                // socket, so we pause the processing of commands in order to
-                // pause writing while still being able to read from the socket.
-                // The only frames that may still be sent while commands are paused
-                // are as a reaction to frames being read, which in turn allows
-                // the remote to make progress eventually, if it should
-                // currently be blocked on writing. In this way unnecessary
-                // deadlocks between peers blocked on writing are avoided.
-                log::trace!("{}: send: Write pending. Continuing with paused command streams.", id);
-                stream_receiver.pause();
-                control_receiver.pause();
-                return Poll::Ready(Ok(()))
-            }
-            frame::PollSend::Ready(Err(e)) => {
-                return Poll::Ready(Err(e.into()))
-            }
-            frame::PollSend::Ready(Ok(())) => {
-                // Note: We leave the unpausing of the command streams to `Connection::next()`.
-                return Poll::Ready(Ok(()))
-            }
-        }
-    }).await
-}
-
 /// Turn a Yamux [`Connection`] into a [`futures::Stream`].
 pub fn into_stream<T>(c: Connection<T>) -> impl futures::stream::Stream<Item = Result<Stream>>
 where
diff --git a/src/frame/io.rs b/src/frame/io.rs
index f909aec..13f6f4f 100644
--- a/src/frame/io.rs
+++ b/src/frame/io.rs
@@ -34,45 +34,48 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Io<T> {
         }
     }
 
-    /// Continue writing on the underlying connection, possibly with a new frame.
-    ///
-    /// If there is no data pending to be written from a prior frame, an attempt
-    /// is made at writing the new frame, if any.
-    ///
-    /// If there is data pending to be written from a prior frame, an attempt is made to
-    /// complete the pending writes before accepting the new frame, if any. If
-    /// the new frame cannot be accepted due to pending writes, it is returned.
-    pub(crate) fn poll_send<A>(&mut self, cx: &mut Context<'_>, mut frame: Option<Frame<A>>)
-        -> PollSend<A, io::Result<()>>
-    {
+    #[cfg(test)]
+    async fn flush(&mut self) -> io::Result<()> {
+        future::poll_fn(|cx| self.poll_flush(cx)).await
+    }
+
+    #[cfg(test)]
+    async fn send<A>(&mut self, frame: Frame<A>) -> io::Result<()> {
+        let mut frame = Some(frame);
+        future::poll_fn(|cx| {
+            match self.poll_send(cx, Some(frame.take().expect("`frame` not yet taken"))) {
+                PollSend::Pending(f) => { frame = f; return Poll::Pending }
+                PollSend::Ready(r) => return Poll::Ready(r)
+            }
+        }).await
+    }
+
+    pub(crate) async fn close(&mut self) -> io::Result<()> {
+        future::poll_fn(|cx| self.poll_flush(cx)).await?;
+        self.io.close().await
+    }
+}
+
+impl <T: AsyncWrite + Unpin, A> Sink<Frame<A>> for Io<T> {
+    type Error = io::Error;
+
+    fn poll_ready(
+        self: Pin<&mut Self>,
+        cx: &mut Context
+    ) -> Poll<Result<(), Self::Error>> {
         loop {
             match &mut self.write_state {
                 WriteState::Init => {
-                    if let Some(f) = frame.take() {
-                        let header = header::encode(&f.header);
-                        let buffer = f.body;
-                        self.write_state = WriteState::Header { header, buffer, offset: 0 };
-                    } else {
-                        return PollSend::Ready(Ok(()))
-                    }
+                    return Poll::Ready(Ok(()))
                 }
                 WriteState::Header { header, buffer, ref mut offset } => {
-                    match Pin::new(&mut self.io).poll_write(cx, &header[*offset ..]) {
-                        Poll::Pending => {
-                            if let Some(f) = frame.take() {
-                                return PollSend::Pending(Some(f))
-                            }
-                            return PollSend::Pending(None)
-                        }
-                        Poll::Ready(Err(e)) => {
-                            if let Some(f) = frame.take() {
-                                return PollSend::Pending(Some(f))
-                            }
+                    match ready!(Pin::new(&mut self.io).poll_write(cx, &header[*offset ..])) {
+                        Err(e) => {
                             return PollSend::Ready(Err(e))
                         }
-                        Poll::Ready(Ok(n)) => {
+                        Ok(n) => {
                             if n == 0 {
-                                return PollSend::Ready(Err(io::ErrorKind::WriteZero.into()))
+                                return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
                             }
                             *offset += n;
                             if *offset == header.len() {
@@ -87,20 +90,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Io<T> {
                     }
                 }
                 WriteState::Body { buffer, ref mut offset } => {
-                    match Pin::new(&mut self.io).poll_write(cx, &buffer[*offset ..]) {
-                        Poll::Pending => {
-                            if let Some(f) = frame.take() {
-                                return PollSend::Pending(Some(f))
-                            }
-                            return PollSend::Pending(None)
+                    match ready!(Pin::new(&mut self.io).poll_write(cx, &buffer[*offset ..])) {
+                        Err(e) => {
+                            return Poll::Ready(Err(e))
                         }
-                        Poll::Ready(Err(e)) => {
-                            if let Some(f) = frame.take() {
-                                return PollSend::Pending(Some(f))
-                            }
-                            return PollSend::Ready(Err(e))
-                        }
-                        Poll::Ready(Ok(n)) => {
+                        Ok(n) => {
                             if n == 0 {
                                 return PollSend::Ready(Err(io::ErrorKind::WriteZero.into()))
                             }
@@ -115,33 +109,42 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Io<T> {
         }
     }
 
-    pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
-        match self.poll_send(cx, Option::<Frame<()>>::None) {
-            PollSend::Pending(_) => return Poll::Pending,
-            PollSend::Ready(r) => r?
+    /// Continue writing on the underlying connection, possibly with a new frame.
+    ///
+    /// If there is no data pending to be written from a prior frame, an attempt
+    /// is made at writing the new frame, if any.
+    ///
+    /// If there is data pending to be written from a prior frame, an attempt is made to
+    /// complete the pending writes before accepting the new frame, if any. If
+    /// the new frame cannot be accepted due to pending writes, it is returned.
+    pub(crate) fn start_send(self: Pin<&mut Self>, mut f: Frame<A>)
+        -> Result<(), Self::Error>
+    {
+        loop {
+            match &mut self.write_state {
+                WriteState::Init => {
+                    let header = header::encode(&f.header);
+                    let buffer = f.body;
+                    self.write_state = WriteState::Header { header, buffer, offset: 0 };
+                }
+                WriteState::Header { header, buffer, ref mut offset } => {
+                    panic!();
+                }
+                WriteState::Body { buffer, ref mut offset } => {
+                    panic!();
+                }
+            }
         }
-        Pin::new(&mut self.io).poll_flush(cx)
-    }
-
-    pub(crate) async fn close(&mut self) -> io::Result<()> {
-        future::poll_fn(|cx| self.poll_flush(cx)).await?;
-        self.io.close().await
     }
 
-    #[cfg(test)]
-    async fn flush(&mut self) -> io::Result<()> {
-        future::poll_fn(|cx| self.poll_flush(cx)).await
+    pub(crate) fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        self.poll_ready(cx)
     }
-
-    #[cfg(test)]
-    async fn send<A>(&mut self, frame: Frame<A>) -> io::Result<()> {
-        let mut frame = Some(frame);
-        future::poll_fn(|cx| {
-            match self.poll_send(cx, Some(frame.take().expect("`frame` not yet taken"))) {
-                PollSend::Pending(f) => { frame = f; return Poll::Pending }
-                PollSend::Ready(r) => return Poll::Ready(r)
-            }
-        }).await
+    fn poll_close(
+        self: Pin<&mut Self>,
+        cx: &mut Context
+    ) -> Poll<Result<(), Self::Error>> {
+        unimplemented!();
     }
 }

@romanb
Copy link
Contributor Author

romanb commented Feb 15, 2021

Thanks for the review. I will take a closer look at your suggestions and report back here. I also want to give this some more thought and testing. Meanwhile, I think it makes sense to release the current state in develop, so I will prepare a PR for such a release today.

@romanb
Copy link
Contributor Author

romanb commented Feb 15, 2021

My work-in-progress changes on top of this pull request.

As you mentioned in a comment, the main problem with the changes in connection.rs are that you cannot mutably borrow self.socket across multiple futures like this, which is why at least this particular approach seems to be a dead end.

@romanb
Copy link
Contributor Author

romanb commented Feb 15, 2021

As a first improvement based on your suggestions, Io now implements Sink in eb6c732.

@romanb
Copy link
Contributor Author

romanb commented Feb 22, 2021

A small update: The write deadlock seems to be relatively easy to reproduce with TCP in the context of the test in tests/concurrent.rs by playing around with the TCP send buffer sizes, which I did on master, develop as well as this branch for comparison. To be clear upfront though, and as already stated earlier, the changes in this PR cannot completely avoid a mutual write deadlock under any circumstance - at the end of the day, one needs to tune the TCP send buffer sizes according to the expected throughput and traffic pattern. Nevertheless, the change in this PR, to continue reading from the socket when the sending of the current frame is Pending, seems to reduce the likelihood of running into such a mutual blocking write significantly. This is the case for both WindowUpdateModes but special attention should be given to the fact that with WindowUpdateMode::OnRead, all window updates are sent from the context of a Stream via StreamCommands. That means once the stream command channel is paused due to the current write peing Pending in the Connection, we also have back-pressure for window updates and the only remaining cases where the Connection may try to send a frame while continuing to read is when encountering protocol violations. So with WindowUpdateMode::OnRead, a mutual write deadlock is very unlikely to occur (unless the client code is at fault by not reading from streams until a large write has completed), which my testing with tests/concurrent.rs confirmed even with send buffer sizes that are smaller than a single Yamux frame, i.e. even that works on this branch without issues. With WindowUpdateMode::OnReceive a mutual write deadlock can still occur primarily due to the intermittent sending of window updates from Connection::next_stream(), which must await the sending of the previous frame, if any.

For anyone reading this without following all the details, I'd like to emphasise though again that even with the latest release a mutual write deadlock is unlikely to be encountered in practice as it requires sending large amounts of data simultaneously from both endpoints, coupled with insufficiently sized TCP send buffers, and increasing TCP send buffers can always solve the problem. Still, I am convinced as a result of further testing that the changes proposed here, namely to pause command streams and continue reading from the socket while the current frame write is pending, make it even more unlikely to occur and hence are beneficial and desirable.

Copy link
Member

@mxinden mxinden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the additional refactorings.

I would like to suggest one more change, namely the removal of the Pausable wrapper on the stream command channel. Related to that suggestion, already raised earlier, we ran into the issue of multiple borrows of socket (see comment).

I found a way around the borrow issue. romanb#1 implements the suggestion. It compiles and passes all tests, including the deadlock test. I hope for the patch to be self-explanatory. Please let me know in case it isn't.

What do you think @romanb? Would this be a simplification?

Copy link
Member

@mxinden mxinden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, especially for the thorough testing.

@mxinden
Copy link
Member

mxinden commented Mar 17, 2021

As far as I can tell this pull request is ready to be merged. @tomaka would you mind approving the pull request so it can be merged?

@tomaka tomaka merged commit 5509775 into libp2p:develop Apr 12, 2021
@appaquet appaquet mentioned this pull request Feb 17, 2022
mxinden pushed a commit that referenced this pull request Feb 25, 2022
In #112, flushing is no longer awaited, but instead, is called at each iteration
of the next loop in Connection. Unfortunately, when using wasm-ext with Noise,
frames may never get written until the next iteration is triggered by another
event (incoming frame or stream/control events).

The fix here is to make sure that flushing the I/O socket gets progressed in the
main loop.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants