Skip to content

Commit

Permalink
Optimize QoS extension overhead
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Sep 6, 2024
1 parent 8da03bb commit f873bc9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 22 deletions.
24 changes: 24 additions & 0 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ where
resolution,
batch_size,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand Down Expand Up @@ -98,6 +99,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(qos_optimized) = ext_qos_optimized.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (qos_optimized, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
Expand Down Expand Up @@ -173,6 +178,7 @@ where

// Extensions
let mut ext_qos = None;
let mut ext_qos_optimized = None;
#[cfg(feature = "shared-memory")]
let mut ext_shm = None;
let mut ext_auth = None;
Expand All @@ -190,6 +196,11 @@ where
ext_qos = Some(q);
has_ext = ext;
}
ext::QoSOptimized::ID => {
let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?;
ext_qos_optimized = Some(q);
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
Expand Down Expand Up @@ -229,6 +240,7 @@ where
resolution,
batch_size,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -255,6 +267,7 @@ where
batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand Down Expand Up @@ -311,6 +324,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(qos_optimized) = ext_qos_optimized.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (qos_optimized, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
Expand Down Expand Up @@ -389,6 +406,7 @@ where

// Extensions
let mut ext_qos = None;
let mut ext_qos_optimized = None;
#[cfg(feature = "shared-memory")]
let mut ext_shm = None;
let mut ext_auth = None;
Expand All @@ -406,6 +424,11 @@ where
ext_qos = Some(q);
has_ext = ext;
}
ext::QoSOptimized::ID => {
let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?;
ext_qos_optimized = Some(q);
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
Expand Down Expand Up @@ -446,6 +469,7 @@ where
batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand Down
7 changes: 7 additions & 0 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub struct InitSyn {
pub resolution: Resolution,
pub batch_size: BatchSize,
pub ext_qos: Option<ext::QoS>,
pub ext_qos_optimized: Option<ext::QoSOptimized>,
#[cfg(feature = "shared-memory")]
pub ext_shm: Option<ext::Shm>,
pub ext_auth: Option<ext::Auth>,
Expand All @@ -133,6 +134,7 @@ pub mod ext {
/// # QoS extension
/// Used to negotiate the use of QoS
pub type QoS = zextz64!(0x1, false);
pub type QoSOptimized = zextunit!(0x1, false);

/// # Shm extension
/// Used as challenge for probing shared memory capabilities
Expand Down Expand Up @@ -171,6 +173,7 @@ impl InitSyn {
let resolution = Resolution::rand();
let batch_size: BatchSize = rng.gen();
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
let ext_qos_optimized = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand All @@ -185,6 +188,7 @@ impl InitSyn {
resolution,
batch_size,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -204,6 +208,7 @@ pub struct InitAck {
pub batch_size: BatchSize,
pub cookie: ZSlice,
pub ext_qos: Option<ext::QoS>,
pub ext_qos_optimized: Option<ext::QoSOptimized>,
#[cfg(feature = "shared-memory")]
pub ext_shm: Option<ext::Shm>,
pub ext_auth: Option<ext::Auth>,
Expand Down Expand Up @@ -232,6 +237,7 @@ impl InitAck {
let batch_size: BatchSize = rng.gen();
let cookie = ZSlice::rand(64);
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
let ext_qos_optimized = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand All @@ -247,6 +253,7 @@ impl InitAck {
batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand Down
8 changes: 6 additions & 2 deletions io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {

// Extension QoS
self.ext_qos
.recv_init_syn((&mut state.transport.ext_qos, init_syn.ext_qos))
.recv_init_syn((
&mut state.transport.ext_qos,
(init_syn.ext_qos, init_syn.ext_qos_optimized),
))
.await
.map_err(|e| (e, Some(close::reason::GENERIC)))?;

Expand Down Expand Up @@ -279,7 +282,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
let (mut state, input) = input;

// Extension QoS
let ext_qos = self
let (ext_qos, ext_qos_optimized) = self
.ext_qos
.send_init_ack(&state.transport.ext_qos)
.await
Expand Down Expand Up @@ -376,6 +379,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
batch_size: state.transport.batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand Down
61 changes: 43 additions & 18 deletions io/zenoh-transport/src/unicast/establishment/ext/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,38 @@ impl State {
}
}

fn to_ext(&self) -> Option<init::ext::QoS> {
if self.is_qos() {
Some(init::ext::QoS::new(self.to_u64()))
} else {
None
fn to_exts(&self) -> (Option<init::ext::QoS>, Option<init::ext::QoSOptimized>) {
match self {
State::NoQoS => (None, None),
State::QoS {
reliability: None,
priorities: None,
} => (None, Some(init::ext::QoSOptimized::new())),
State::QoS {
reliability: Some(_),
..
}
| State::QoS {
priorities: Some(_),
..
} => (Some(init::ext::QoS::new(self.to_u64())), None),
}
}

fn try_from_ext(ext: Option<init::ext::QoS>) -> ZResult<Self> {
if let Some(ext) = ext {
State::try_from_u64(ext.value)
} else {
Ok(State::NoQoS)
fn try_from_exts(
(qos, qos_optimized): (Option<init::ext::QoS>, Option<init::ext::QoSOptimized>),
) -> ZResult<Self> {
match (qos, qos_optimized) {
(Some(_), Some(_)) => Err(zerror!(
"Extensions QoS and QoSOptimized cannot both be enabled at once"
)
.into()),
(None, None) => Ok(State::NoQoS),
(None, Some(_)) => Ok(State::QoS {
reliability: None,
priorities: None,
}),
(Some(qos), None) => State::try_from_u64(qos.value),
}
}

Expand Down Expand Up @@ -251,23 +270,26 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> {
type Error = ZError;

type SendInitSynIn = &'a StateOpen;
type SendInitSynOut = Option<init::ext::QoS>;
type SendInitSynOut = (Option<init::ext::QoS>, Option<init::ext::QoSOptimized>);
async fn send_init_syn(
self,
state: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
Ok(state.0.to_ext())
Ok(state.0.to_exts())
}

type RecvInitAckIn = (&'a mut StateOpen, Option<init::ext::QoS>);
type RecvInitAckIn = (
&'a mut StateOpen,
(Option<init::ext::QoS>, Option<init::ext::QoSOptimized>),
);
type RecvInitAckOut = ();
async fn recv_init_ack(
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
let (state_self, other_ext) = input;

let state_other = State::try_from_ext(other_ext)?;
let state_other = State::try_from_exts(other_ext)?;

let (
State::QoS {
Expand Down Expand Up @@ -401,15 +423,18 @@ where
impl<'a> AcceptFsm for &'a QoSFsm<'a> {
type Error = ZError;

type RecvInitSynIn = (&'a mut StateAccept, Option<init::ext::QoS>);
type RecvInitSynIn = (
&'a mut StateAccept,
(Option<init::ext::QoS>, Option<init::ext::QoSOptimized>),
);
type RecvInitSynOut = ();
async fn recv_init_syn(
self,
input: Self::RecvInitSynIn,
) -> Result<Self::RecvInitSynOut, Self::Error> {
let (state_self, other_ext) = input;

let state_other = State::try_from_ext(other_ext)?;
let state_other = State::try_from_exts(other_ext)?;

let (
State::QoS {
Expand Down Expand Up @@ -464,12 +489,12 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> {
}

type SendInitAckIn = &'a StateAccept;
type SendInitAckOut = Option<init::ext::QoS>;
type SendInitAckOut = (Option<init::ext::QoS>, Option<init::ext::QoSOptimized>);
async fn send_init_ack(
self,
state: Self::SendInitAckIn,
) -> Result<Self::SendInitAckOut, Self::Error> {
Ok(state.0.to_ext())
Ok(state.0.to_exts())
}

type RecvOpenSynIn = (&'a mut StateAccept, Option<open::ext::QoS>);
Expand Down
8 changes: 6 additions & 2 deletions io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> {
let (link, state, input) = input;

// Extension QoS
let ext_qos = self
let (ext_qos, ext_qos_optimized) = self
.ext_qos
.send_init_syn(&state.transport.ext_qos)
.await
Expand Down Expand Up @@ -199,6 +199,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> {
batch_size: state.transport.batch_size,
resolution: state.transport.resolution,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand Down Expand Up @@ -302,7 +303,10 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> {

// Extension QoS
self.ext_qos
.recv_init_ack((&mut state.transport.ext_qos, init_ack.ext_qos))
.recv_init_ack((
&mut state.transport.ext_qos,
(init_ack.ext_qos, init_ack.ext_qos_optimized),
))
.await
.map_err(|e| (e, Some(close::reason::GENERIC)))?;

Expand Down

0 comments on commit f873bc9

Please sign in to comment.