From 743c199819f0a303a9a2174617974a87c6ca1b67 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Fri, 6 Sep 2024 09:40:31 +0000 Subject: [PATCH] Optimize `QoS` extension overhead --- commons/zenoh-codec/src/transport/init.rs | 24 +++++++ commons/zenoh-protocol/src/transport/init.rs | 7 +++ .../src/unicast/establishment/accept.rs | 8 ++- .../src/unicast/establishment/ext/qos.rs | 62 +++++++++++++------ .../src/unicast/establishment/open.rs | 8 ++- 5 files changed, 86 insertions(+), 23 deletions(-) diff --git a/commons/zenoh-codec/src/transport/init.rs b/commons/zenoh-codec/src/transport/init.rs index c559fdbd5..c2609f265 100644 --- a/commons/zenoh-codec/src/transport/init.rs +++ b/commons/zenoh-codec/src/transport/init.rs @@ -45,6 +45,7 @@ where resolution, batch_size, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -98,6 +99,10 @@ where n_exts -= 1; self.write(&mut *writer, (qos, n_exts != 0))?; } + if let Some(qos_optimized) = ext_qos_optimized.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (qos_optimized, n_exts != 0))?; + } #[cfg(feature = "shared-memory")] if let Some(shm) = ext_shm.as_ref() { n_exts -= 1; @@ -173,6 +178,7 @@ where // Extensions let mut ext_qos = None; + let mut ext_qos_optimized = None; #[cfg(feature = "shared-memory")] let mut ext_shm = None; let mut ext_auth = None; @@ -190,6 +196,11 @@ where ext_qos = Some(q); has_ext = ext; } + ext::QoSOptimized::ID => { + let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?; + ext_qos_optimized = Some(q); + has_ext = ext; + } #[cfg(feature = "shared-memory")] ext::Shm::ID => { let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?; @@ -229,6 +240,7 @@ where resolution, batch_size, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -255,6 +267,7 @@ where batch_size, cookie, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -311,6 +324,10 @@ where n_exts -= 1; self.write(&mut *writer, (qos, n_exts != 0))?; } + if let Some(qos_optimized) = ext_qos_optimized.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (qos_optimized, n_exts != 0))?; + } #[cfg(feature = "shared-memory")] if let Some(shm) = ext_shm.as_ref() { n_exts -= 1; @@ -389,6 +406,7 @@ where // Extensions let mut ext_qos = None; + let mut ext_qos_optimized = None; #[cfg(feature = "shared-memory")] let mut ext_shm = None; let mut ext_auth = None; @@ -406,6 +424,11 @@ where ext_qos = Some(q); has_ext = ext; } + ext::QoSOptimized::ID => { + let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?; + ext_qos_optimized = Some(q); + has_ext = ext; + } #[cfg(feature = "shared-memory")] ext::Shm::ID => { let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?; @@ -446,6 +469,7 @@ where batch_size, cookie, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 0a4e97f95..7650035d4 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -115,6 +115,7 @@ pub struct InitSyn { pub resolution: Resolution, pub batch_size: BatchSize, pub ext_qos: Option, + pub ext_qos_optimized: Option, #[cfg(feature = "shared-memory")] pub ext_shm: Option, pub ext_auth: Option, @@ -133,6 +134,7 @@ pub mod ext { /// # QoS extension /// Used to negotiate the use of QoS pub type QoS = zextz64!(0x1, false); + pub type QoSOptimized = zextunit!(0x1, false); /// # Shm extension /// Used as challenge for probing shared memory capabilities @@ -171,6 +173,7 @@ impl InitSyn { let resolution = Resolution::rand(); let batch_size: BatchSize = rng.gen(); let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); + let ext_qos_optimized = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); @@ -185,6 +188,7 @@ impl InitSyn { resolution, batch_size, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -204,6 +208,7 @@ pub struct InitAck { pub batch_size: BatchSize, pub cookie: ZSlice, pub ext_qos: Option, + pub ext_qos_optimized: Option, #[cfg(feature = "shared-memory")] pub ext_shm: Option, pub ext_auth: Option, @@ -232,6 +237,7 @@ impl InitAck { let batch_size: BatchSize = rng.gen(); let cookie = ZSlice::rand(64); let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); + let ext_qos_optimized = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); @@ -247,6 +253,7 @@ impl InitAck { batch_size, cookie, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index b66b4981e..91ad3cac3 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -224,7 +224,10 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { // Extension QoS self.ext_qos - .recv_init_syn((&mut state.transport.ext_qos, init_syn.ext_qos)) + .recv_init_syn(( + &mut state.transport.ext_qos, + (init_syn.ext_qos, init_syn.ext_qos_optimized), + )) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -284,7 +287,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { let (mut state, input) = input; // Extension QoS - let ext_qos = self + let (ext_qos, ext_qos_optimized) = self .ext_qos .send_init_ack(&state.transport.ext_qos) .await @@ -381,6 +384,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { batch_size: state.transport.batch_size, cookie, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, diff --git a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs index d094b7607..350794ff7 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs @@ -41,7 +41,6 @@ impl<'a> QoSFsm<'a> { } } -// TODO(fuzzypixelz): Fallback to ZExtUnit QoS matching QoS::Disabled or QoS::Enabled { reliability: None, priorities: None } #[derive(Clone, Debug, PartialEq, Eq)] enum State { NoQoS, @@ -153,19 +152,38 @@ impl State { } } - fn to_ext(&self) -> Option { - if self.is_qos() { - Some(init::ext::QoS::new(self.to_u64())) - } else { - None + fn to_exts(&self) -> (Option, Option) { + match self { + State::NoQoS => (None, None), + State::QoS { + reliability: None, + priorities: None, + } => (None, Some(init::ext::QoSOptimized::new())), + State::QoS { + reliability: Some(_), + .. + } + | State::QoS { + priorities: Some(_), + .. + } => (Some(init::ext::QoS::new(self.to_u64())), None), } } - fn try_from_ext(ext: Option) -> ZResult { - if let Some(ext) = ext { - State::try_from_u64(ext.value) - } else { - Ok(State::NoQoS) + fn try_from_exts( + (qos, qos_optimized): (Option, Option), + ) -> ZResult { + match (qos, qos_optimized) { + (Some(_), Some(_)) => Err(zerror!( + "Extensions QoS and QoSOptimized cannot both be enabled at once" + ) + .into()), + (None, None) => Ok(State::NoQoS), + (None, Some(_)) => Ok(State::QoS { + reliability: None, + priorities: None, + }), + (Some(qos), None) => State::try_from_u64(qos.value), } } @@ -251,15 +269,18 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { type Error = ZError; type SendInitSynIn = &'a StateOpen; - type SendInitSynOut = Option; + type SendInitSynOut = (Option, Option); async fn send_init_syn( self, state: Self::SendInitSynIn, ) -> Result { - Ok(state.0.to_ext()) + Ok(state.0.to_exts()) } - type RecvInitAckIn = (&'a mut StateOpen, Option); + type RecvInitAckIn = ( + &'a mut StateOpen, + (Option, Option), + ); type RecvInitAckOut = (); async fn recv_init_ack( self, @@ -267,7 +288,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { ) -> Result { let (state_self, other_ext) = input; - let state_other = State::try_from_ext(other_ext)?; + let state_other = State::try_from_exts(other_ext)?; let ( State::QoS { @@ -401,7 +422,10 @@ where impl<'a> AcceptFsm for &'a QoSFsm<'a> { type Error = ZError; - type RecvInitSynIn = (&'a mut StateAccept, Option); + type RecvInitSynIn = ( + &'a mut StateAccept, + (Option, Option), + ); type RecvInitSynOut = (); async fn recv_init_syn( self, @@ -409,7 +433,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { ) -> Result { let (state_self, other_ext) = input; - let state_other = State::try_from_ext(other_ext)?; + let state_other = State::try_from_exts(other_ext)?; let ( State::QoS { @@ -464,12 +488,12 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { } type SendInitAckIn = &'a StateAccept; - type SendInitAckOut = Option; + type SendInitAckOut = (Option, Option); async fn send_init_ack( self, state: Self::SendInitAckIn, ) -> Result { - Ok(state.0.to_ext()) + Ok(state.0.to_exts()) } type RecvOpenSynIn = (&'a mut StateAccept, Option); diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index 0805dad65..f85cdbe9f 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -139,7 +139,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { let (link, state, input) = input; // Extension QoS - let ext_qos = self + let (ext_qos, ext_qos_optimized) = self .ext_qos .send_init_syn(&state.transport.ext_qos) .await @@ -199,6 +199,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { batch_size: state.transport.batch_size, resolution: state.transport.resolution, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -302,7 +303,10 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { // Extension QoS self.ext_qos - .recv_init_ack((&mut state.transport.ext_qos, init_ack.ext_qos)) + .recv_init_ack(( + &mut state.transport.ext_qos, + (init_ack.ext_qos, init_ack.ext_qos_optimized), + )) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?;