Skip to content

Commit

Permalink
refactor(quic): Add WriteState for substreams
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 committed Nov 25, 2022
1 parent 8bda2c7 commit c6256e1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
14 changes: 10 additions & 4 deletions transports/quic/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
};
pub use connecting::Connecting;
pub use substream::Substream;
use substream::SubstreamState;
use substream::{SubstreamState, WriteState};

use futures::{channel::mpsc, ready, FutureExt, StreamExt};
use futures_timer::Delay;
Expand Down Expand Up @@ -277,11 +277,16 @@ impl StreamMuxer for Connection {
}
quinn_proto::Event::Stream(quinn_proto::StreamEvent::Finished { id }) => {
if let Some(substream) = inner.substreams.get_mut(&id) {
substream.is_write_closed = true;
if matches!(
substream.write_state,
WriteState::Open | WriteState::Closing
) {
substream.write_state = WriteState::Closed;
}
if let Some(waker) = substream.write_waker.take() {
waker.wake();
}
if let Some(waker) = substream.finished_waker.take() {
if let Some(waker) = substream.close_waker.take() {
waker.wake();
}
}
Expand All @@ -291,10 +296,11 @@ impl StreamMuxer for Connection {
error_code: _,
}) => {
if let Some(substream) = inner.substreams.get_mut(&id) {
substream.write_state = WriteState::Stopped;
if let Some(waker) = substream.write_waker.take() {
waker.wake();
}
if let Some(waker) = substream.finished_waker.take() {
if let Some(waker) = substream.close_waker.take() {
waker.wake();
}
}
Expand Down
52 changes: 32 additions & 20 deletions transports/quic/src/connection/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,9 @@ pub struct SubstreamState {
/// Waker to wake if the substream becomes writable, closed or stopped.
pub write_waker: Option<Waker>,
/// Waker to wake if the substream becomes closed or stopped.
pub finished_waker: Option<Waker>,
pub close_waker: Option<Waker>,

/// `true` if the writing side of the stream is closing, i.e. `AsyncWrite::poll_close`
/// was called.
pub is_finishing: bool,
/// `true` if we successfully wrote all data and closed the stream.
pub is_write_closed: bool,
pub write_state: WriteState,
}

impl SubstreamState {
Expand All @@ -56,7 +52,7 @@ impl SubstreamState {
if let Some(waker) = self.write_waker.take() {
waker.wake();
}
if let Some(waker) = self.finished_waker.take() {
if let Some(waker) = self.close_waker.take() {
waker.wake();
}
}
Expand Down Expand Up @@ -180,24 +176,25 @@ impl AsyncWrite for Substream {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let mut inner = self.state.lock();

if let Ok(Some(reason)) = inner.connection.send_stream(self.id).stopped() {
let err = quinn_proto::FinishError::Stopped(reason);
return Poll::Ready(Err(io::Error::new(io::ErrorKind::ConnectionReset, err)));
}
let substream_state = inner.unchecked_substream_state(self.id);
if substream_state.is_write_closed {
return Poll::Ready(Ok(()));
}
if substream_state.is_finishing {
substream_state.finished_waker = Some(cx.waker().clone());
return Poll::Pending;
match substream_state.write_state {
WriteState::Open => {}
WriteState::Closing => {
substream_state.close_waker = Some(cx.waker().clone());
return Poll::Pending;
}
WriteState::Closed => return Poll::Ready(Ok(())),
WriteState::Stopped => {
let err = quinn_proto::FinishError::Stopped(0u32.into());
return Poll::Ready(Err(io::Error::new(io::ErrorKind::ConnectionReset, err)));
}
}

match inner.connection.send_stream(self.id).finish() {
Ok(()) => {
let substream_state = inner.unchecked_substream_state(self.id);
substream_state.finished_waker = Some(cx.waker().clone());
substream_state.is_finishing = true;
substream_state.close_waker = Some(cx.waker().clone());
substream_state.write_state = WriteState::Closing;
Poll::Pending
}
Err(err @ quinn_proto::FinishError::Stopped(_)) => {
Expand All @@ -218,7 +215,7 @@ impl Drop for Substream {
let mut state = self.state.lock();
state.substreams.remove(&self.id);
// Send `STOP_STREAM` if the remote did not finish the stream yet.
// We have to manually check the read stream since we might may have
// We have to manually check the read stream since we might have
// received a `FIN` (without any other stream data) after the last
// time we tried to read.
let mut is_read_done = false;
Expand All @@ -243,3 +240,18 @@ impl Drop for Substream {
}
}
}

#[derive(Debug, Default, Clone)]
pub enum WriteState {
/// The stream is open for writing.
#[default]
Open,
/// The writing side of the stream is closing.
Closing,
/// All data was successfully sent to the remote and the stream
/// closed, i.e. a [`StreamEvent::Finished`] was reported for it.
Closed,
/// The stream was stopped by the remote before all data could be
/// sent.
Stopped,
}

0 comments on commit c6256e1

Please sign in to comment.