Skip to content

Commit

Permalink
chore: add penalty in latency-based routing
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Aug 12, 2024
1 parent 386d353 commit 68171e3
Showing 1 changed file with 50 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use crate::agent::http_transport::dynamic_routing::{
health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot,
};

// Some big value implying that node is unhealthy, should be much bigger than node's latency.
const MAX_LATENCY: Duration = Duration::from_secs(500);
// When a node is detected as unhealthy, we take the following actions:
// - Remove the node entirely from the routing process.
// - Penalize its moving average by adding a specified value to the stored latency window. This ensures that any node exhibiting intermittent outages is appropriately penalized.
const PUNISH_LATENCY: Duration = Duration::from_secs(2);

const WINDOW_SIZE: usize = 15;

Expand All @@ -20,6 +22,8 @@ type LatencyMovAvg = SumTreeSMA<Duration, u32, WINDOW_SIZE>;
#[derive(Clone, Debug)]
struct WeightedNode {
node: Node,
/// Reflects the status of the most recent health check.
is_healthy: bool,
/// Moving mean of latencies measurements.
latency_mov_avg: LatencyMovAvg,
/// Weight of the node (invers of the average latency), used for stochastic weighted random sampling.
Expand Down Expand Up @@ -49,14 +53,14 @@ impl LatencyRoutingSnapshot {
/// Helper function to sample nodes based on their weights.
/// Here weight index is selected based on the input number in range [0, 1]
#[inline(always)]
fn weighted_sample(weights: &[f64], number: f64) -> Option<usize> {
fn weighted_sample(weighted_nodes: &[(f64, &Node)], number: f64) -> Option<usize> {
if !(0.0..=1.0).contains(&number) {
return None;
}
let sum: f64 = weights.iter().sum();
let sum: f64 = weighted_nodes.iter().map(|n| n.0).sum();
let mut weighted_number = number * sum;
for (idx, weight) in weights.iter().enumerate() {
weighted_number -= weight;
for (idx, weighted_node) in weighted_nodes.iter().enumerate() {
weighted_number -= weighted_node.0;
if weighted_number <= 0.0 {
return Some(idx);
}
Expand All @@ -70,18 +74,21 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
}

fn next(&self) -> Option<Node> {
// We select a node based on it's weight, using a stochastic weighted random sampling approach.
let weights = self
.weighted_nodes
.iter()
.map(|n| n.weight)
.collect::<Vec<_>>();
// We select a healthy node based on its weight, using a stochastic weighted random sampling approach.

// Preallocate array for a better efficiency.
let mut healthy_weighted_nodes = Vec::with_capacity(self.weighted_nodes.len());
for n in &self.weighted_nodes {
if n.is_healthy {
healthy_weighted_nodes.push((n.weight, &n.node));
}
}
// Generate a random float in the range [0, 1)
let mut rng = rand::thread_rng();
let rand_num = rng.gen::<f64>();
// Using this random float and an array of weights we get an index of the node.
let idx = weighted_sample(weights.as_slice(), rand_num);
idx.map(|idx| self.weighted_nodes[idx].node.clone())
let idx = weighted_sample(&healthy_weighted_nodes.as_slice(), rand_num);
idx.map(|idx| healthy_weighted_nodes[idx].1.clone())
}

fn sync_nodes(&mut self, nodes: &[Node]) -> bool {
Expand Down Expand Up @@ -116,11 +123,12 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
return false;
}

// If latency is None (meaning Node is unhealthy), we assign some big value
let latency = health.latency().unwrap_or(MAX_LATENCY);
// If the node is unhealthy, we penalize it's moving average.
let latency = health.latency().unwrap_or(PUNISH_LATENCY);

if let Some(idx) = self.weighted_nodes.iter().position(|x| &x.node == node) {
// Node is already in the array (it is not the first update_node() call).
self.weighted_nodes[idx].is_healthy = health.is_healthy();
self.weighted_nodes[idx].latency_mov_avg.add_sample(latency);
let latency_avg = self.weighted_nodes[idx].latency_mov_avg.get_average();
// As nodes with smaller average latencies are preferred for routing, we use inverted values for weights.
Expand All @@ -131,6 +139,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
latency_mov_avg.add_sample(latency);
let weight = 1.0 / latency_mov_avg.get_average().as_secs_f64();
self.weighted_nodes.push(WeightedNode {
is_healthy: health.is_healthy(),
latency_mov_avg,
node: node.clone(),
weight,
Expand All @@ -152,7 +161,8 @@ mod tests {
node::Node,
snapshot::{
latency_based_routing::{
weighted_sample, LatencyMovAvg, LatencyRoutingSnapshot, WeightedNode, MAX_LATENCY,
weighted_sample, LatencyMovAvg, LatencyRoutingSnapshot, WeightedNode,
PUNISH_LATENCY,
},
routing_snapshot::RoutingSnapshot,
},
Expand Down Expand Up @@ -212,6 +222,7 @@ mod tests {
Duration::from_millis(1500)
);
assert_eq!(weighted_node.weight, 1.0 / 1.5);
assert_eq!(snapshot.next().unwrap(), node);
// Check third update
let health = HealthCheckStatus::new(Some(Duration::from_secs(3)));
let is_updated = snapshot.update_node(&node, health);
Expand All @@ -222,12 +233,25 @@ mod tests {
Duration::from_millis(2000)
);
assert_eq!(weighted_node.weight, 0.5);
assert_eq!(snapshot.next().unwrap(), node);
// Check forth update with none
let health = HealthCheckStatus::new(None);
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
let avg_latency = Duration::from_secs_f64((MAX_LATENCY.as_secs() as f64 + 6.0) / 4.0);
let avg_latency = Duration::from_secs_f64((PUNISH_LATENCY.as_secs() as f64 + 6.0) / 4.0);
assert_eq!(weighted_node.latency_mov_avg.get_average(), avg_latency);
assert_eq!(weighted_node.weight, 1.0 / avg_latency.as_secs_f64());
assert_eq!(snapshot.weighted_nodes.len(), 1);
assert_eq!(snapshot.existing_nodes.len(), 1);
// No nodes returned, as the node is unhealthy.
assert!(snapshot.next().is_none());
// Check fifth update
let health = HealthCheckStatus::new(Some(Duration::from_secs(1)));
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
let avg_latency = Duration::from_secs_f64((PUNISH_LATENCY.as_secs() as f64 + 7.0) / 5.0);
assert_eq!(weighted_node.latency_mov_avg.get_average(), avg_latency);
assert_eq!(weighted_node.weight, 1.0 / avg_latency.as_secs_f64());
assert_eq!(snapshot.weighted_nodes.len(), 1);
Expand All @@ -250,6 +274,7 @@ mod tests {
);
// Add node_1 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
is_healthy: true,
node: node_1.clone(),
latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO),
weight: 0.0,
Expand All @@ -274,6 +299,7 @@ mod tests {
assert!(snapshot.weighted_nodes.is_empty());
// Add node_2 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
is_healthy: true,
node: node_2.clone(),
latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO),
weight: 0.0,
Expand All @@ -289,6 +315,7 @@ mod tests {
assert_eq!(snapshot.weighted_nodes[0].node, node_2);
// Add node_3 to weighted_nodes manually
snapshot.weighted_nodes.push(WeightedNode {
is_healthy: true,
node: node_3,
latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO),
weight: 0.0,
Expand All @@ -308,11 +335,12 @@ mod tests {
#[test]
fn test_weighted_sample() {
// Case 1: empty array
let arr: &[f64] = &[];
let node = Node::new("ic0.com").unwrap();
let arr = &[(0.5, &node)];
let idx = weighted_sample(arr, 0.5);
assert_eq!(idx, None);
// Case 2: single element in array
let arr: &[f64] = &[1.0];
let arr = &[(1.0, &node)];
let idx = weighted_sample(arr, 0.0);
assert_eq!(idx, Some(0));
let idx = weighted_sample(arr, 1.0);
Expand All @@ -323,7 +351,7 @@ mod tests {
let idx = weighted_sample(arr, 1.1);
assert_eq!(idx, None);
// Case 3: two elements in array (second element has twice the weight of the first)
let arr: &[f64] = &[1.0, 2.0]; // prefixed_sum = [1.0, 3.0]
let arr = &[(1.0, &node), (2.0, &node)]; // prefixed_sum = [1.0, 3.0]
let idx = weighted_sample(arr, 0.0); // 0.0 * 3.0 < 1.0
assert_eq!(idx, Some(0));
let idx = weighted_sample(arr, 0.33); // 0.33 * 3.0 < 1.0
Expand All @@ -338,7 +366,7 @@ mod tests {
let idx = weighted_sample(arr, 1.1);
assert_eq!(idx, None);
// Case 4: four elements in array
let arr: &[f64] = &[1.0, 2.0, 1.5, 2.5]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0]
let arr = &[(1.0, &node), (2.0, &node), (1.5, &node), (2.5, &node)]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0]
let idx = weighted_sample(arr, 0.14); // 0.14 * 7 < 1.0
assert_eq!(idx, Some(0)); // probability ~0.14
let idx = weighted_sample(arr, 0.15); // 0.15 * 7 > 1.0
Expand Down

0 comments on commit 68171e3

Please sign in to comment.