diff --git a/quic/s2n-quic-core/src/stream/mod.rs b/quic/s2n-quic-core/src/stream/mod.rs index 9f27a1bd69..a38c8db632 100644 --- a/quic/s2n-quic-core/src/stream/mod.rs +++ b/quic/s2n-quic-core/src/stream/mod.rs @@ -7,6 +7,7 @@ pub mod iter; pub mod limits; #[cfg(feature = "alloc")] pub mod ops; +pub mod state; mod type_; pub use error::*; diff --git a/quic/s2n-quic-core/src/stream/state.rs b/quic/s2n-quic-core/src/stream/state.rs new file mode 100644 index 0000000000..5d82abb175 --- /dev/null +++ b/quic/s2n-quic-core/src/stream/state.rs @@ -0,0 +1,63 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::ensure; +use core::fmt; + +pub type Result = core::result::Result<(), Error>; + +macro_rules! transition { + ($state:ident, $valid:pat => $target:expr) => {{ + ensure!(*$state != $target, Err(Error::NoOp { current: $target })); + ensure!( + matches!($state, $valid), + Err(Error::InvalidTransition { + current: $state.clone(), + target: $target + }) + ); + #[cfg(feature = "tracing")] + { + tracing::debug!(prev = ?$state, next = ?$target); + } + *$state = $target; + Ok(()) + }}; +} + +macro_rules! is { + ($($state:ident)|+, $function:ident) => { + #[inline] + pub fn $function(&self) -> bool { + matches!(self, $(Self::$state)|*) + } + }; +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Error { + NoOp { current: T }, + InvalidTransition { current: T, target: T }, +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NoOp { current } => { + write!(f, "state is already set to {current:?}") + } + Self::InvalidTransition { current, target } => { + write!(f, "invalid transition from {current:?} to {target:?}",) + } + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Error {} + +mod recv; +mod send; + +pub use recv::Receiver; +pub use send::Sender; diff --git a/quic/s2n-quic-core/src/stream/state/recv.rs b/quic/s2n-quic-core/src/stream/state/recv.rs new file mode 100644 index 0000000000..a57d42bbf0 --- /dev/null +++ b/quic/s2n-quic-core/src/stream/state/recv.rs @@ -0,0 +1,124 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; + +//= https://www.rfc-editor.org/rfc/rfc9000#section-3.2 +//# o +//# | Recv STREAM / STREAM_DATA_BLOCKED / RESET_STREAM +//# | Create Bidirectional Stream (Sending) +//# | Recv MAX_STREAM_DATA / STOP_SENDING (Bidirectional) +//# | Create Higher-Numbered Stream +//# v +//# +-------+ +//# | Recv | Recv RESET_STREAM +//# | |-----------------------. +//# +-------+ | +//# | | +//# | Recv STREAM + FIN | +//# v | +//# +-------+ | +//# | Size | Recv RESET_STREAM | +//# | Known |---------------------->| +//# +-------+ | +//# | | +//# | Recv All Data | +//# v v +//# +-------+ Recv RESET_STREAM +-------+ +//# | Data |--- (optional) --->| Reset | +//# | Recvd | Recv All Data | Recvd | +//# +-------+<-- (optional) ----+-------+ +//# | | +//# | App Read All Data | App Read Reset +//# v v +//# +-------+ +-------+ +//# | Data | | Reset | +//# | Read | | Read | +//# +-------+ +-------+ + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub enum Receiver { + #[default] + Recv, + SizeKnown, + DataRecvd, + DataRead, + ResetRecvd, + ResetRead, +} + +impl Receiver { + is!(Recv, is_receiving); + is!(SizeKnown, is_size_known); + is!(DataRecvd, is_data_received); + is!(DataRead, is_data_read); + is!(ResetRecvd, is_reset_received); + is!(ResetRead, is_reset_read); + is!(DataRead | ResetRead, is_terminal); + + #[inline] + pub fn on_receive_fin(&mut self) -> Result { + use Receiver::*; + transition!(self, Recv => SizeKnown) + } + + #[inline] + pub fn on_receive_all_data(&mut self) -> Result { + use Receiver::*; + transition!(self, SizeKnown => DataRecvd) + } + + #[inline] + pub fn on_app_read_all_data(&mut self) -> Result { + use Receiver::*; + transition!(self, DataRecvd => DataRead) + } + + #[inline] + pub fn on_reset(&mut self) -> Result { + use Receiver::*; + transition!(self, Recv | SizeKnown => ResetRecvd) + } + + #[inline] + pub fn on_app_read_reset(&mut self) -> Result { + use Receiver::*; + transition!(self, ResetRecvd => ResetRead) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use insta::assert_debug_snapshot; + + #[test] + #[cfg_attr(miri, ignore)] + fn snapshots() { + let mut outcomes = vec![]; + let states = [ + Receiver::Recv, + Receiver::SizeKnown, + Receiver::DataRecvd, + Receiver::DataRead, + Receiver::ResetRecvd, + Receiver::ResetRead, + ]; + for state in states { + macro_rules! push { + ($event:ident) => { + let mut target = state.clone(); + let result = target.$event().map(|_| target); + outcomes.push((state.clone(), stringify!($event), result)); + }; + } + push!(on_receive_fin); + push!(on_receive_all_data); + push!(on_app_read_all_data); + push!(on_reset); + push!(on_app_read_reset); + } + + assert_debug_snapshot!(outcomes); + } +} diff --git a/quic/s2n-quic-core/src/stream/state/send.rs b/quic/s2n-quic-core/src/stream/state/send.rs new file mode 100644 index 0000000000..4917206c19 --- /dev/null +++ b/quic/s2n-quic-core/src/stream/state/send.rs @@ -0,0 +1,137 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; + +//= https://www.rfc-editor.org/rfc/rfc9000#section-3.1 +//# o +//# | Create Stream (Sending) +//# | Peer Creates Bidirectional Stream +//# v +//# +-------+ +//# | Ready | Send RESET_STREAM +//# | |-----------------------. +//# +-------+ | +//# | | +//# | Send STREAM / | +//# | STREAM_DATA_BLOCKED | +//# v | +//# +-------+ | +//# | Send | Send RESET_STREAM | +//# | |---------------------->| +//# +-------+ | +//# | | +//# | Send STREAM + FIN | +//# v v +//# +-------+ +-------+ +//# | Data | Send RESET_STREAM | Reset | +//# | Sent |------------------>| Sent | +//# +-------+ +-------+ +//# | | +//# | Recv All ACKs | Recv ACK +//# v v +//# +-------+ +-------+ +//# | Data | | Reset | +//# | Recvd | | Recvd | +//# +-------+ +-------+ + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub enum Sender { + #[default] + Ready, + Send, + DataSent, + DataRecvd, + /// An additional state for implementations to separate queueing a RESET_STREAM from actually + /// sending it + ResetQueued, + ResetSent, + ResetRecvd, +} + +impl Sender { + is!(Ready, is_ready); + is!(Send, is_sending); + is!(DataSent, is_data_sent); + is!(DataRecvd, is_data_received); + is!(ResetQueued, is_reset_queued); + is!(ResetSent, is_reset_sent); + is!(ResetRecvd, is_reset_received); + is!(DataRecvd | ResetRecvd, is_terminal); + + #[inline] + pub fn on_send_stream(&mut self) -> Result { + use Sender::*; + transition!(self, Ready => Send) + } + + #[inline] + pub fn on_send_fin(&mut self) -> Result { + use Sender::*; + // we can jump from Ready to DataSent even though the + // diagram doesn't explicitly highlight this transition + transition!(self, Ready | Send => DataSent) + } + + #[inline] + pub fn on_queue_reset(&mut self) -> Result { + use Sender::*; + transition!(self, Ready | Send | DataSent => ResetQueued) + } + + #[inline] + pub fn on_send_reset(&mut self) -> Result { + use Sender::*; + transition!(self, Ready | Send | DataSent | ResetQueued => ResetSent) + } + + #[inline] + pub fn on_recv_all_acks(&mut self) -> Result { + use Sender::*; + transition!(self, DataSent | ResetQueued => DataRecvd) + } + + #[inline] + pub fn on_recv_reset_ack(&mut self) -> Result { + use Sender::*; + transition!(self, ResetSent => ResetRecvd) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use insta::assert_debug_snapshot; + + #[test] + #[cfg_attr(miri, ignore)] + fn snapshots() { + let mut outcomes = vec![]; + let states = [ + Sender::Ready, + Sender::Send, + Sender::DataSent, + Sender::DataRecvd, + Sender::ResetQueued, + Sender::ResetSent, + Sender::ResetRecvd, + ]; + for state in states { + macro_rules! push { + ($event:ident) => { + let mut target = state.clone(); + let result = target.$event().map(|_| target); + outcomes.push((state.clone(), stringify!($event), result)); + }; + } + push!(on_send_stream); + push!(on_send_fin); + push!(on_queue_reset); + push!(on_send_reset); + push!(on_recv_all_acks); + push!(on_recv_reset_ack); + } + + assert_debug_snapshot!(outcomes); + } +} diff --git a/quic/s2n-quic-core/src/stream/state/snapshots/s2n_quic_core__stream__state__recv__tests__snapshots.snap b/quic/s2n-quic-core/src/stream/state/snapshots/s2n_quic_core__stream__state__recv__tests__snapshots.snap new file mode 100644 index 0000000000..4b25635dee --- /dev/null +++ b/quic/s2n-quic-core/src/stream/state/snapshots/s2n_quic_core__stream__state__recv__tests__snapshots.snap @@ -0,0 +1,283 @@ +--- +source: quic/s2n-quic-core/src/stream/state/recv.rs +expression: outcomes +--- +[ + ( + Recv, + "on_receive_fin", + Ok( + SizeKnown, + ), + ), + ( + Recv, + "on_receive_all_data", + Err( + InvalidTransition { + current: Recv, + target: DataRecvd, + }, + ), + ), + ( + Recv, + "on_app_read_all_data", + Err( + InvalidTransition { + current: Recv, + target: DataRead, + }, + ), + ), + ( + Recv, + "on_reset", + Ok( + ResetRecvd, + ), + ), + ( + Recv, + "on_app_read_reset", + Err( + InvalidTransition { + current: Recv, + target: ResetRead, + }, + ), + ), + ( + SizeKnown, + "on_receive_fin", + Err( + NoOp { + current: SizeKnown, + }, + ), + ), + ( + SizeKnown, + "on_receive_all_data", + Ok( + DataRecvd, + ), + ), + ( + SizeKnown, + "on_app_read_all_data", + Err( + InvalidTransition { + current: SizeKnown, + target: DataRead, + }, + ), + ), + ( + SizeKnown, + "on_reset", + Ok( + ResetRecvd, + ), + ), + ( + SizeKnown, + "on_app_read_reset", + Err( + InvalidTransition { + current: SizeKnown, + target: ResetRead, + }, + ), + ), + ( + DataRecvd, + "on_receive_fin", + Err( + InvalidTransition { + current: DataRecvd, + target: SizeKnown, + }, + ), + ), + ( + DataRecvd, + "on_receive_all_data", + Err( + NoOp { + current: DataRecvd, + }, + ), + ), + ( + DataRecvd, + "on_app_read_all_data", + Ok( + DataRead, + ), + ), + ( + DataRecvd, + "on_reset", + Err( + InvalidTransition { + current: DataRecvd, + target: ResetRecvd, + }, + ), + ), + ( + DataRecvd, + "on_app_read_reset", + Err( + InvalidTransition { + current: DataRecvd, + target: ResetRead, + }, + ), + ), + ( + DataRead, + "on_receive_fin", + Err( + InvalidTransition { + current: DataRead, + target: SizeKnown, + }, + ), + ), + ( + DataRead, + "on_receive_all_data", + Err( + InvalidTransition { + current: DataRead, + target: DataRecvd, + }, + ), + ), + ( + DataRead, + "on_app_read_all_data", + Err( + NoOp { + current: DataRead, + }, + ), + ), + ( + DataRead, + "on_reset", + Err( + InvalidTransition { + current: DataRead, + target: ResetRecvd, + }, + ), + ), + ( + DataRead, + "on_app_read_reset", + Err( + InvalidTransition { + current: DataRead, + target: ResetRead, + }, + ), + ), + ( + ResetRecvd, + "on_receive_fin", + Err( + InvalidTransition { + current: ResetRecvd, + target: SizeKnown, + }, + ), + ), + ( + ResetRecvd, + "on_receive_all_data", + Err( + InvalidTransition { + current: ResetRecvd, + target: DataRecvd, + }, + ), + ), + ( + ResetRecvd, + "on_app_read_all_data", + Err( + InvalidTransition { + current: ResetRecvd, + target: DataRead, + }, + ), + ), + ( + ResetRecvd, + "on_reset", + Err( + NoOp { + current: ResetRecvd, + }, + ), + ), + ( + ResetRecvd, + "on_app_read_reset", + Ok( + ResetRead, + ), + ), + ( + ResetRead, + "on_receive_fin", + Err( + InvalidTransition { + current: ResetRead, + target: SizeKnown, + }, + ), + ), + ( + ResetRead, + "on_receive_all_data", + Err( + InvalidTransition { + current: ResetRead, + target: DataRecvd, + }, + ), + ), + ( + ResetRead, + "on_app_read_all_data", + Err( + InvalidTransition { + current: ResetRead, + target: DataRead, + }, + ), + ), + ( + ResetRead, + "on_reset", + Err( + InvalidTransition { + current: ResetRead, + target: ResetRecvd, + }, + ), + ), + ( + ResetRead, + "on_app_read_reset", + Err( + NoOp { + current: ResetRead, + }, + ), + ), +] diff --git a/quic/s2n-quic-core/src/stream/state/snapshots/s2n_quic_core__stream__state__send__tests__snapshots.snap b/quic/s2n-quic-core/src/stream/state/snapshots/s2n_quic_core__stream__state__send__tests__snapshots.snap new file mode 100644 index 0000000000..21615b2d11 --- /dev/null +++ b/quic/s2n-quic-core/src/stream/state/snapshots/s2n_quic_core__stream__state__send__tests__snapshots.snap @@ -0,0 +1,381 @@ +--- +source: quic/s2n-quic-core/src/stream/state/send.rs +expression: outcomes +--- +[ + ( + Ready, + "on_send_stream", + Ok( + Send, + ), + ), + ( + Ready, + "on_send_fin", + Ok( + DataSent, + ), + ), + ( + Ready, + "on_queue_reset", + Ok( + ResetQueued, + ), + ), + ( + Ready, + "on_send_reset", + Ok( + ResetSent, + ), + ), + ( + Ready, + "on_recv_all_acks", + Err( + InvalidTransition { + current: Ready, + target: DataRecvd, + }, + ), + ), + ( + Ready, + "on_recv_reset_ack", + Err( + InvalidTransition { + current: Ready, + target: ResetRecvd, + }, + ), + ), + ( + Send, + "on_send_stream", + Err( + NoOp { + current: Send, + }, + ), + ), + ( + Send, + "on_send_fin", + Ok( + DataSent, + ), + ), + ( + Send, + "on_queue_reset", + Ok( + ResetQueued, + ), + ), + ( + Send, + "on_send_reset", + Ok( + ResetSent, + ), + ), + ( + Send, + "on_recv_all_acks", + Err( + InvalidTransition { + current: Send, + target: DataRecvd, + }, + ), + ), + ( + Send, + "on_recv_reset_ack", + Err( + InvalidTransition { + current: Send, + target: ResetRecvd, + }, + ), + ), + ( + DataSent, + "on_send_stream", + Err( + InvalidTransition { + current: DataSent, + target: Send, + }, + ), + ), + ( + DataSent, + "on_send_fin", + Err( + NoOp { + current: DataSent, + }, + ), + ), + ( + DataSent, + "on_queue_reset", + Ok( + ResetQueued, + ), + ), + ( + DataSent, + "on_send_reset", + Ok( + ResetSent, + ), + ), + ( + DataSent, + "on_recv_all_acks", + Ok( + DataRecvd, + ), + ), + ( + DataSent, + "on_recv_reset_ack", + Err( + InvalidTransition { + current: DataSent, + target: ResetRecvd, + }, + ), + ), + ( + DataRecvd, + "on_send_stream", + Err( + InvalidTransition { + current: DataRecvd, + target: Send, + }, + ), + ), + ( + DataRecvd, + "on_send_fin", + Err( + InvalidTransition { + current: DataRecvd, + target: DataSent, + }, + ), + ), + ( + DataRecvd, + "on_queue_reset", + Err( + InvalidTransition { + current: DataRecvd, + target: ResetQueued, + }, + ), + ), + ( + DataRecvd, + "on_send_reset", + Err( + InvalidTransition { + current: DataRecvd, + target: ResetSent, + }, + ), + ), + ( + DataRecvd, + "on_recv_all_acks", + Err( + NoOp { + current: DataRecvd, + }, + ), + ), + ( + DataRecvd, + "on_recv_reset_ack", + Err( + InvalidTransition { + current: DataRecvd, + target: ResetRecvd, + }, + ), + ), + ( + ResetQueued, + "on_send_stream", + Err( + InvalidTransition { + current: ResetQueued, + target: Send, + }, + ), + ), + ( + ResetQueued, + "on_send_fin", + Err( + InvalidTransition { + current: ResetQueued, + target: DataSent, + }, + ), + ), + ( + ResetQueued, + "on_queue_reset", + Err( + NoOp { + current: ResetQueued, + }, + ), + ), + ( + ResetQueued, + "on_send_reset", + Ok( + ResetSent, + ), + ), + ( + ResetQueued, + "on_recv_all_acks", + Ok( + DataRecvd, + ), + ), + ( + ResetQueued, + "on_recv_reset_ack", + Err( + InvalidTransition { + current: ResetQueued, + target: ResetRecvd, + }, + ), + ), + ( + ResetSent, + "on_send_stream", + Err( + InvalidTransition { + current: ResetSent, + target: Send, + }, + ), + ), + ( + ResetSent, + "on_send_fin", + Err( + InvalidTransition { + current: ResetSent, + target: DataSent, + }, + ), + ), + ( + ResetSent, + "on_queue_reset", + Err( + InvalidTransition { + current: ResetSent, + target: ResetQueued, + }, + ), + ), + ( + ResetSent, + "on_send_reset", + Err( + NoOp { + current: ResetSent, + }, + ), + ), + ( + ResetSent, + "on_recv_all_acks", + Err( + InvalidTransition { + current: ResetSent, + target: DataRecvd, + }, + ), + ), + ( + ResetSent, + "on_recv_reset_ack", + Ok( + ResetRecvd, + ), + ), + ( + ResetRecvd, + "on_send_stream", + Err( + InvalidTransition { + current: ResetRecvd, + target: Send, + }, + ), + ), + ( + ResetRecvd, + "on_send_fin", + Err( + InvalidTransition { + current: ResetRecvd, + target: DataSent, + }, + ), + ), + ( + ResetRecvd, + "on_queue_reset", + Err( + InvalidTransition { + current: ResetRecvd, + target: ResetQueued, + }, + ), + ), + ( + ResetRecvd, + "on_send_reset", + Err( + InvalidTransition { + current: ResetRecvd, + target: ResetSent, + }, + ), + ), + ( + ResetRecvd, + "on_recv_all_acks", + Err( + InvalidTransition { + current: ResetRecvd, + target: DataRecvd, + }, + ), + ), + ( + ResetRecvd, + "on_recv_reset_ack", + Err( + NoOp { + current: ResetRecvd, + }, + ), + ), +]