Skip to content

Commit

Permalink
feat(s2n-quic-core): add stream state enums (#2132)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Mar 8, 2024
1 parent a14804f commit d0c9f76
Show file tree
Hide file tree
Showing 6 changed files with 989 additions and 0 deletions.
1 change: 1 addition & 0 deletions quic/s2n-quic-core/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod iter;
pub mod limits;
#[cfg(feature = "alloc")]
pub mod ops;
pub mod state;
mod type_;

pub use error::*;
Expand Down
63 changes: 63 additions & 0 deletions quic/s2n-quic-core/src/stream/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::ensure;
use core::fmt;

pub type Result<T> = core::result::Result<(), Error<T>>;

macro_rules! transition {
($state:ident, $valid:pat => $target:expr) => {{
ensure!(*$state != $target, Err(Error::NoOp { current: $target }));
ensure!(
matches!($state, $valid),
Err(Error::InvalidTransition {
current: $state.clone(),
target: $target
})
);
#[cfg(feature = "tracing")]
{
tracing::debug!(prev = ?$state, next = ?$target);
}
*$state = $target;
Ok(())
}};
}

macro_rules! is {
($($state:ident)|+, $function:ident) => {
#[inline]
pub fn $function(&self) -> bool {
matches!(self, $(Self::$state)|*)
}
};
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Error<T> {
NoOp { current: T },
InvalidTransition { current: T, target: T },
}

impl<T: fmt::Debug> fmt::Display for Error<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoOp { current } => {
write!(f, "state is already set to {current:?}")
}
Self::InvalidTransition { current, target } => {
write!(f, "invalid transition from {current:?} to {target:?}",)
}
}
}
}

#[cfg(feature = "std")]
impl<T: fmt::Debug> std::error::Error for Error<T> {}

mod recv;
mod send;

pub use recv::Receiver;
pub use send::Sender;
124 changes: 124 additions & 0 deletions quic/s2n-quic-core/src/stream/state/recv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;

//= https://www.rfc-editor.org/rfc/rfc9000#section-3.2
//# o
//# | Recv STREAM / STREAM_DATA_BLOCKED / RESET_STREAM
//# | Create Bidirectional Stream (Sending)
//# | Recv MAX_STREAM_DATA / STOP_SENDING (Bidirectional)
//# | Create Higher-Numbered Stream
//# v
//# +-------+
//# | Recv | Recv RESET_STREAM
//# | |-----------------------.
//# +-------+ |
//# | |
//# | Recv STREAM + FIN |
//# v |
//# +-------+ |
//# | Size | Recv RESET_STREAM |
//# | Known |---------------------->|
//# +-------+ |
//# | |
//# | Recv All Data |
//# v v
//# +-------+ Recv RESET_STREAM +-------+
//# | Data |--- (optional) --->| Reset |
//# | Recvd | Recv All Data | Recvd |
//# +-------+<-- (optional) ----+-------+
//# | |
//# | App Read All Data | App Read Reset
//# v v
//# +-------+ +-------+
//# | Data | | Reset |
//# | Read | | Read |
//# +-------+ +-------+

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub enum Receiver {
#[default]
Recv,
SizeKnown,
DataRecvd,
DataRead,
ResetRecvd,
ResetRead,
}

impl Receiver {
is!(Recv, is_receiving);
is!(SizeKnown, is_size_known);
is!(DataRecvd, is_data_received);
is!(DataRead, is_data_read);
is!(ResetRecvd, is_reset_received);
is!(ResetRead, is_reset_read);
is!(DataRead | ResetRead, is_terminal);

#[inline]
pub fn on_receive_fin(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, Recv => SizeKnown)
}

#[inline]
pub fn on_receive_all_data(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, SizeKnown => DataRecvd)
}

#[inline]
pub fn on_app_read_all_data(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, DataRecvd => DataRead)
}

#[inline]
pub fn on_reset(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, Recv | SizeKnown => ResetRecvd)
}

#[inline]
pub fn on_app_read_reset(&mut self) -> Result<Self> {
use Receiver::*;
transition!(self, ResetRecvd => ResetRead)
}
}

#[cfg(test)]
mod tests {
use super::*;
use insta::assert_debug_snapshot;

#[test]
#[cfg_attr(miri, ignore)]
fn snapshots() {
let mut outcomes = vec![];
let states = [
Receiver::Recv,
Receiver::SizeKnown,
Receiver::DataRecvd,
Receiver::DataRead,
Receiver::ResetRecvd,
Receiver::ResetRead,
];
for state in states {
macro_rules! push {
($event:ident) => {
let mut target = state.clone();
let result = target.$event().map(|_| target);
outcomes.push((state.clone(), stringify!($event), result));
};
}
push!(on_receive_fin);
push!(on_receive_all_data);
push!(on_app_read_all_data);
push!(on_reset);
push!(on_app_read_reset);
}

assert_debug_snapshot!(outcomes);
}
}
137 changes: 137 additions & 0 deletions quic/s2n-quic-core/src/stream/state/send.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;

//= https://www.rfc-editor.org/rfc/rfc9000#section-3.1
//# o
//# | Create Stream (Sending)
//# | Peer Creates Bidirectional Stream
//# v
//# +-------+
//# | Ready | Send RESET_STREAM
//# | |-----------------------.
//# +-------+ |
//# | |
//# | Send STREAM / |
//# | STREAM_DATA_BLOCKED |
//# v |
//# +-------+ |
//# | Send | Send RESET_STREAM |
//# | |---------------------->|
//# +-------+ |
//# | |
//# | Send STREAM + FIN |
//# v v
//# +-------+ +-------+
//# | Data | Send RESET_STREAM | Reset |
//# | Sent |------------------>| Sent |
//# +-------+ +-------+
//# | |
//# | Recv All ACKs | Recv ACK
//# v v
//# +-------+ +-------+
//# | Data | | Reset |
//# | Recvd | | Recvd |
//# +-------+ +-------+

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub enum Sender {
#[default]
Ready,
Send,
DataSent,
DataRecvd,
/// An additional state for implementations to separate queueing a RESET_STREAM from actually
/// sending it
ResetQueued,
ResetSent,
ResetRecvd,
}

impl Sender {
is!(Ready, is_ready);
is!(Send, is_sending);
is!(DataSent, is_data_sent);
is!(DataRecvd, is_data_received);
is!(ResetQueued, is_reset_queued);
is!(ResetSent, is_reset_sent);
is!(ResetRecvd, is_reset_received);
is!(DataRecvd | ResetRecvd, is_terminal);

#[inline]
pub fn on_send_stream(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, Ready => Send)
}

#[inline]
pub fn on_send_fin(&mut self) -> Result<Self> {
use Sender::*;
// we can jump from Ready to DataSent even though the
// diagram doesn't explicitly highlight this transition
transition!(self, Ready | Send => DataSent)
}

#[inline]
pub fn on_queue_reset(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, Ready | Send | DataSent => ResetQueued)
}

#[inline]
pub fn on_send_reset(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, Ready | Send | DataSent | ResetQueued => ResetSent)
}

#[inline]
pub fn on_recv_all_acks(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, DataSent | ResetQueued => DataRecvd)
}

#[inline]
pub fn on_recv_reset_ack(&mut self) -> Result<Self> {
use Sender::*;
transition!(self, ResetSent => ResetRecvd)
}
}

#[cfg(test)]
mod tests {
use super::*;
use insta::assert_debug_snapshot;

#[test]
#[cfg_attr(miri, ignore)]
fn snapshots() {
let mut outcomes = vec![];
let states = [
Sender::Ready,
Sender::Send,
Sender::DataSent,
Sender::DataRecvd,
Sender::ResetQueued,
Sender::ResetSent,
Sender::ResetRecvd,
];
for state in states {
macro_rules! push {
($event:ident) => {
let mut target = state.clone();
let result = target.$event().map(|_| target);
outcomes.push((state.clone(), stringify!($event), result));
};
}
push!(on_send_stream);
push!(on_send_fin);
push!(on_queue_reset);
push!(on_send_reset);
push!(on_recv_all_acks);
push!(on_recv_reset_ack);
}

assert_debug_snapshot!(outcomes);
}
}
Loading

0 comments on commit d0c9f76

Please sign in to comment.