From c6256e1aed40838ae36cbd4e1102927f9c044e4e Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Fri, 25 Nov 2022 01:02:10 +0100 Subject: [PATCH] refactor(quic): Add WriteState for substreams --- transports/quic/src/connection.rs | 14 ++++-- transports/quic/src/connection/substream.rs | 52 +++++++++++++-------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/transports/quic/src/connection.rs b/transports/quic/src/connection.rs index 9e292df74e9..9a16a314ecb 100644 --- a/transports/quic/src/connection.rs +++ b/transports/quic/src/connection.rs @@ -27,7 +27,7 @@ use crate::{ }; pub use connecting::Connecting; pub use substream::Substream; -use substream::SubstreamState; +use substream::{SubstreamState, WriteState}; use futures::{channel::mpsc, ready, FutureExt, StreamExt}; use futures_timer::Delay; @@ -277,11 +277,16 @@ impl StreamMuxer for Connection { } quinn_proto::Event::Stream(quinn_proto::StreamEvent::Finished { id }) => { if let Some(substream) = inner.substreams.get_mut(&id) { - substream.is_write_closed = true; + if matches!( + substream.write_state, + WriteState::Open | WriteState::Closing + ) { + substream.write_state = WriteState::Closed; + } if let Some(waker) = substream.write_waker.take() { waker.wake(); } - if let Some(waker) = substream.finished_waker.take() { + if let Some(waker) = substream.close_waker.take() { waker.wake(); } } @@ -291,10 +296,11 @@ impl StreamMuxer for Connection { error_code: _, }) => { if let Some(substream) = inner.substreams.get_mut(&id) { + substream.write_state = WriteState::Stopped; if let Some(waker) = substream.write_waker.take() { waker.wake(); } - if let Some(waker) = substream.finished_waker.take() { + if let Some(waker) = substream.close_waker.take() { waker.wake(); } } diff --git a/transports/quic/src/connection/substream.rs b/transports/quic/src/connection/substream.rs index da166678cc0..a38c6d16c9d 100644 --- a/transports/quic/src/connection/substream.rs +++ b/transports/quic/src/connection/substream.rs @@ -38,13 +38,9 @@ pub struct SubstreamState { /// Waker to wake if the substream becomes writable, closed or stopped. pub write_waker: Option, /// Waker to wake if the substream becomes closed or stopped. - pub finished_waker: Option, + pub close_waker: Option, - /// `true` if the writing side of the stream is closing, i.e. `AsyncWrite::poll_close` - /// was called. - pub is_finishing: bool, - /// `true` if we successfully wrote all data and closed the stream. - pub is_write_closed: bool, + pub write_state: WriteState, } impl SubstreamState { @@ -56,7 +52,7 @@ impl SubstreamState { if let Some(waker) = self.write_waker.take() { waker.wake(); } - if let Some(waker) = self.finished_waker.take() { + if let Some(waker) = self.close_waker.take() { waker.wake(); } } @@ -180,24 +176,25 @@ impl AsyncWrite for Substream { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut inner = self.state.lock(); - if let Ok(Some(reason)) = inner.connection.send_stream(self.id).stopped() { - let err = quinn_proto::FinishError::Stopped(reason); - return Poll::Ready(Err(io::Error::new(io::ErrorKind::ConnectionReset, err))); - } let substream_state = inner.unchecked_substream_state(self.id); - if substream_state.is_write_closed { - return Poll::Ready(Ok(())); - } - if substream_state.is_finishing { - substream_state.finished_waker = Some(cx.waker().clone()); - return Poll::Pending; + match substream_state.write_state { + WriteState::Open => {} + WriteState::Closing => { + substream_state.close_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + WriteState::Closed => return Poll::Ready(Ok(())), + WriteState::Stopped => { + let err = quinn_proto::FinishError::Stopped(0u32.into()); + return Poll::Ready(Err(io::Error::new(io::ErrorKind::ConnectionReset, err))); + } } match inner.connection.send_stream(self.id).finish() { Ok(()) => { let substream_state = inner.unchecked_substream_state(self.id); - substream_state.finished_waker = Some(cx.waker().clone()); - substream_state.is_finishing = true; + substream_state.close_waker = Some(cx.waker().clone()); + substream_state.write_state = WriteState::Closing; Poll::Pending } Err(err @ quinn_proto::FinishError::Stopped(_)) => { @@ -218,7 +215,7 @@ impl Drop for Substream { let mut state = self.state.lock(); state.substreams.remove(&self.id); // Send `STOP_STREAM` if the remote did not finish the stream yet. - // We have to manually check the read stream since we might may have + // We have to manually check the read stream since we might have // received a `FIN` (without any other stream data) after the last // time we tried to read. let mut is_read_done = false; @@ -243,3 +240,18 @@ impl Drop for Substream { } } } + +#[derive(Debug, Default, Clone)] +pub enum WriteState { + /// The stream is open for writing. + #[default] + Open, + /// The writing side of the stream is closing. + Closing, + /// All data was successfully sent to the remote and the stream + /// closed, i.e. a [`StreamEvent::Finished`] was reported for it. + Closed, + /// The stream was stopped by the remote before all data could be + /// sent. + Stopped, +}