Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select links based on message Priority & Reliability #1360

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c6cfcfc
Add wip `QoS`-based priority-to-link dispatch impl
fuzzypixelz Aug 26, 2024
ef89a22
Improve `QoS` state machine
fuzzypixelz Sep 4, 2024
76549d4
Add `PriorityRange` negotiation tests
fuzzypixelz Sep 4, 2024
88c6bb6
Refactor link selection function
fuzzypixelz Sep 4, 2024
9b38c31
Minor edits
fuzzypixelz Sep 4, 2024
0e07c4a
Add Link selectioh tests
fuzzypixelz Sep 4, 2024
6603f24
Minor edits
fuzzypixelz Sep 4, 2024
b35190d
More minor edits
fuzzypixelz Sep 4, 2024
8f90bc0
Never disobey Clippy
fuzzypixelz Sep 4, 2024
daaceb1
Implement Reliability negotiation
fuzzypixelz Sep 4, 2024
c5552e4
Apply negotiated Reliability to Link config
fuzzypixelz Sep 5, 2024
5e6b579
Document Endpoint `reliability` metadata
fuzzypixelz Sep 5, 2024
ae674b7
I'm sorry Clippy
fuzzypixelz Sep 5, 2024
d917643
Make `PriorityRange` inclusive
fuzzypixelz Sep 5, 2024
c98763a
Clippy lints are inevitable
fuzzypixelz Sep 5, 2024
184ba14
Make Reliability negotiation stricter
fuzzypixelz Sep 5, 2024
3ff6918
Refactor negotiation tests
fuzzypixelz Sep 5, 2024
95365a2
We are still not `core::error::Error`
fuzzypixelz Sep 5, 2024
410d75d
Use `RangeInclusive`
fuzzypixelz Sep 5, 2024
53154db
Clippy at it again
fuzzypixelz Sep 5, 2024
0799e76
Split `State` into `StateOpen` and `StateAccept`
fuzzypixelz Sep 6, 2024
4eddb98
Remove `NewLinkUnicast`
fuzzypixelz Sep 6, 2024
211f885
Fix test typos
fuzzypixelz Sep 6, 2024
4c45695
Patch `Link::src` and `Link::dst` with negotiated metadata
fuzzypixelz Sep 6, 2024
743c199
Optimize `QoS` extension overhead
fuzzypixelz Sep 6, 2024
608e095
Implement `Display` instead of `ToString` for `PriorityRange`
fuzzypixelz Sep 6, 2024
1ca7513
Fix typo (metdata -> metadata)
fuzzypixelz Sep 6, 2024
6663f0b
Fix `n_exts` in `INIT` codec
fuzzypixelz Sep 6, 2024
ae0f027
Add missing `'static` lifetime in const
fuzzypixelz Sep 6, 2024
adde290
Don't compare `Link` to `TransportLinkUnicast`
fuzzypixelz Sep 9, 2024
657dba0
Don't set Link Reliability if not configured
fuzzypixelz Sep 9, 2024
0780d95
Update DEFAULT_CONFIG
fuzzypixelz Sep 16, 2024
7757f24
Move metadata docs to `Endpoint`
fuzzypixelz Sep 17, 2024
ac3953d
Add Endpoint examples
fuzzypixelz Sep 17, 2024
a6d3796
Fix doc list items without indentation
fuzzypixelz Sep 17, 2024
3e76e8d
Update Endpoint links in DEFAULT_CONFIG
fuzzypixelz Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,20 @@
},
},
link: {
/// An optional whitelist of protocols to be used for accepting and opening sessions.
/// If not configured, all the supported protocols are automatically whitelisted.
/// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"]
/// For example, to only enable "tls" and "quic":
// protocols: ["tls", "quic"],
/// An optional whitelist of protocols to be used for accepting and opening sessions. If not
/// configured, all the supported protocols are automatically whitelisted. The supported
/// protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] For
/// example, to only enable "tls" and "quic": protocols: ["tls", "quic"],
///
/// ## Endpoint metadata
fuzzypixelz marked this conversation as resolved.
Show resolved Hide resolved
///
/// **priorities**: a range bounded inclusively below and above (e.g. `2..4` signifies
/// priorities 2, 3 and 4). This value is used to select the link used for transmission based
/// on the Priority of the message in question.
///
/// **reliability**: either "best_effort" or "reliable". This value is used to select the link
/// used for transmission based on the Reliability of the message in question.
///
/// Configure the zenoh TX parameters of a link
tx: {
/// The resolution in bits to be used for the message sequence numbers.
Expand Down Expand Up @@ -387,7 +396,7 @@
enabled: true,
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
}
},
},
},
/// Configure the zenoh RX parameters of a link
Expand Down
26 changes: 26 additions & 0 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ where
resolution,
batch_size,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -59,6 +60,7 @@ where
header |= flag::S;
}
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_qos_optimized.is_some() as u8)
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
Expand Down Expand Up @@ -98,6 +100,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(qos_optimized) = ext_qos_optimized.as_ref() {
Mallets marked this conversation as resolved.
Show resolved Hide resolved
n_exts -= 1;
self.write(&mut *writer, (qos_optimized, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
Expand Down Expand Up @@ -173,6 +179,7 @@ where

// Extensions
let mut ext_qos = None;
let mut ext_qos_optimized = None;
#[cfg(feature = "shared-memory")]
let mut ext_shm = None;
let mut ext_auth = None;
Expand All @@ -190,6 +197,11 @@ where
ext_qos = Some(q);
has_ext = ext;
}
ext::QoSOptimized::ID => {
let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?;
ext_qos_optimized = Some(q);
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
Expand Down Expand Up @@ -229,6 +241,7 @@ where
resolution,
batch_size,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -255,6 +268,7 @@ where
batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -269,6 +283,7 @@ where
header |= flag::S;
}
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_qos_optimized.is_some() as u8)
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
Expand Down Expand Up @@ -311,6 +326,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(qos_optimized) = ext_qos_optimized.as_ref() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above: both ext_qos and ext_qos_optimized could be sent on the network.
If ext_qos_optimized then I don't see much the need of sending as well ext_qos.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

n_exts -= 1;
self.write(&mut *writer, (qos_optimized, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
Expand Down Expand Up @@ -389,6 +408,7 @@ where

// Extensions
let mut ext_qos = None;
let mut ext_qos_optimized = None;
#[cfg(feature = "shared-memory")]
let mut ext_shm = None;
let mut ext_auth = None;
Expand All @@ -406,6 +426,11 @@ where
ext_qos = Some(q);
has_ext = ext;
}
ext::QoSOptimized::ID => {
let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?;
ext_qos_optimized = Some(q);
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
Expand Down Expand Up @@ -446,6 +471,7 @@ where
batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand Down
3 changes: 3 additions & 0 deletions commons/zenoh-protocol/src/core/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ impl fmt::Debug for AddressMut<'_> {
pub struct Metadata<'a>(pub(super) &'a str);

impl<'a> Metadata<'a> {
pub const RELIABILITY: &'static str = "reliability";
pub const PRIORITIES: &'static str = "priorities";

pub fn as_str(&self) -> &'a str {
self.0
}
Expand Down
4 changes: 4 additions & 0 deletions commons/zenoh-protocol/src/core/locator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl Locator {
pub fn as_str(&self) -> &str {
self.0.as_str()
}

pub fn to_endpoint(&self) -> EndPoint {
self.0.clone()
}
}

impl From<EndPoint> for Locator {
Expand Down
180 changes: 177 additions & 3 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use alloc::{
};
use core::{
convert::{From, TryFrom, TryInto},
fmt,
fmt::{self, Display},
hash::Hash,
ops::{Deref, RangeInclusive},
str::FromStr,
};

use serde::Serialize;
pub use uhlc::{Timestamp, NTP64};
use zenoh_keyexpr::OwnedKeyExpr;
use zenoh_result::{bail, zerror};
Expand Down Expand Up @@ -295,7 +297,7 @@ impl EntityGlobalIdProto {
}

#[repr(u8)]
#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq)]
#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Serialize)]
pub enum Priority {
Control = 0,
RealTime = 1,
Expand All @@ -308,6 +310,115 @@ pub enum Priority {
Background = 7,
}

#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize)]
/// A [`Priority`] range bounded inclusively below and above.
pub struct PriorityRange(RangeInclusive<Priority>);

impl Deref for PriorityRange {
type Target = RangeInclusive<Priority>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl PriorityRange {
pub fn new(range: RangeInclusive<Priority>) -> Self {
Self(range)
}

/// Returns `true` if `self` is a superset of `other`.
pub fn includes(&self, other: &PriorityRange) -> bool {
self.start() <= other.start() && other.end() <= self.end()
}

pub fn len(&self) -> usize {
*self.end() as usize - *self.start() as usize + 1
}

pub fn is_empty(&self) -> bool {
false
}

#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let start = rng.gen_range(Priority::MAX as u8..Priority::MIN as u8);
let end = rng.gen_range((start + 1)..=Priority::MIN as u8);

Self(Priority::try_from(start).unwrap()..=Priority::try_from(end).unwrap())
}
}

impl Display for PriorityRange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}..={}", *self.start() as u8, *self.end() as u8)
}
}

#[derive(Debug)]
pub enum InvalidPriorityRange {
InvalidSyntax { found: String },
InvalidBound { message: String },
}

impl Display for InvalidPriorityRange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InvalidPriorityRange::InvalidSyntax { found } => write!(f, "invalid PriorityRange string, expected an range of the form `start..=end` but found {found}"),
InvalidPriorityRange::InvalidBound { message } => write!(f, "invalid PriorityRange bound: {message}"),
}
}
}

#[cfg(feature = "std")]
impl std::error::Error for InvalidPriorityRange {}

impl FromStr for PriorityRange {
type Err = InvalidPriorityRange;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut metadata = s.split("..=");
fuzzypixelz marked this conversation as resolved.
Show resolved Hide resolved

let start = metadata
.next()
.ok_or_else(|| InvalidPriorityRange::InvalidSyntax {
found: s.to_string(),
})?
.parse::<u8>()
.map(Priority::try_from)
.map_err(|err| InvalidPriorityRange::InvalidBound {
message: err.to_string(),
})?
.map_err(|err| InvalidPriorityRange::InvalidBound {
message: err.to_string(),
})?;

let end = metadata
.next()
.ok_or_else(|| InvalidPriorityRange::InvalidSyntax {
found: s.to_string(),
})?
.parse::<u8>()
.map(Priority::try_from)
.map_err(|err| InvalidPriorityRange::InvalidBound {
message: err.to_string(),
})?
.map_err(|err| InvalidPriorityRange::InvalidBound {
message: err.to_string(),
})?;

if metadata.next().is_some() {
return Err(InvalidPriorityRange::InvalidSyntax {
found: s.to_string(),
});
};

Ok(PriorityRange::new(start..=end))
}
}

impl Priority {
/// Default
pub const DEFAULT: Self = Self::Data;
Expand Down Expand Up @@ -342,7 +453,7 @@ impl TryFrom<u8> for Priority {
}
}

#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize)]
#[repr(u8)]
pub enum Reliability {
#[default]
Expand All @@ -353,6 +464,16 @@ pub enum Reliability {
impl Reliability {
pub const DEFAULT: Self = Self::Reliable;

const BEST_EFFORT_STR: &'static str = "best_effort";
const RELIABLE_STR: &'static str = "reliable";

pub fn as_str(&self) -> &str {
match self {
Reliability::BestEffort => Reliability::BEST_EFFORT_STR,
Reliability::Reliable => Reliability::RELIABLE_STR,
}
}

#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
Expand All @@ -367,6 +488,59 @@ impl Reliability {
}
}

impl From<bool> for Reliability {
fn from(value: bool) -> Self {
if value {
Reliability::Reliable
} else {
Reliability::BestEffort
}
}
}

impl From<Reliability> for bool {
fn from(value: Reliability) -> Self {
match value {
Reliability::BestEffort => false,
Reliability::Reliable => true,
}
}
}

#[derive(Debug)]
pub struct InvalidReliability {
found: String,
}

impl Display for InvalidReliability {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"invalid Reliability string, expected `{}` or `{}` but found {}",
Reliability::BEST_EFFORT_STR,
Reliability::RELIABLE_STR,
self.found
)
}
}

#[cfg(feature = "std")]
impl std::error::Error for InvalidReliability {}

impl FromStr for Reliability {
type Err = InvalidReliability;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
Reliability::RELIABLE_STR => Ok(Reliability::Reliable),
Reliability::BEST_EFFORT_STR => Ok(Reliability::BestEffort),
other => Err(InvalidReliability {
found: other.to_string(),
}),
}
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
pub struct Channel {
pub priority: Priority,
Expand Down
Loading