Skip to content

Commit

Permalink
Update the scheduler message when preponing
Browse files Browse the repository at this point in the history
In practice this should update the reconciliation reason, fixing #1114
  • Loading branch information
nightkr committed Jul 8, 2023
1 parent db585dd commit 51c1b1c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ serde_json = "1.0.68"
thiserror = "1.0.29"
backoff = "0.4.0"
async-trait = "0.1.64"
hashbrown = "0.14.0"

[dependencies.k8s-openapi]
version = "0.18.0"
Expand Down
64 changes: 62 additions & 2 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Delays and deduplicates [`Stream`] items

use futures::{stream::Fuse, Stream, StreamExt};
use hashbrown::{hash_map::Entry, HashMap};
use pin_project::pin_project;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::HashSet,
hash::Hash,
pin::Pin,
task::{Context, Poll},
Expand All @@ -30,8 +31,13 @@ pub struct Scheduler<T, R> {
///
/// To ensure that the metadata is kept up-to-date, use `schedule_message` and
/// `poll_pop_queue_message` rather than manipulating this directly.
///
/// NOTE: `scheduled` should be considered to hold the "canonical" representation of the message.
/// Always pull the message out of `scheduled` once it has been retrieved from `queue`.
queue: DelayQueue<T>,
/// Metadata for all currently scheduled messages. Used to detect duplicate messages.
///
/// `scheduled` is considered to hold the "canonical" representation of the message.
scheduled: HashMap<T, ScheduledEntry>,
/// Messages that are scheduled to have happened, but have been held using `hold_unless`.
pending: HashSet<T>,
Expand Down Expand Up @@ -67,6 +73,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
// TODO: this should add a little delay here to actually debounce
self.queue.reset_at(&entry.queue_key, request.run_at);
entry.run_at = request.run_at;
old_entry.replace_key();
}
Entry::Occupied(_old_entry) => {
// Old entry will run before the new request, so ignore the new request..
Expand Down Expand Up @@ -96,7 +103,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
match self.queue.poll_expired(cx) {
Poll::Ready(Some(msg)) => {
let msg = msg.into_inner();
self.scheduled.remove(&msg).expect(
let (msg, _) = self.scheduled.remove_entry(&msg).expect(
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
if can_take_message(&msg) {
Expand Down Expand Up @@ -204,6 +211,7 @@ mod tests {
use crate::utils::KubeRuntimeStreamExt;

use super::{scheduler, ScheduleRequest};
use derivative::Derivative;
use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt};
use std::task::Poll;
use tokio::time::{advance, pause, sleep, Duration, Instant};
Expand Down Expand Up @@ -392,4 +400,56 @@ mod tests {
scheduler.next().now_or_never().unwrap().unwrap();
assert!(poll!(scheduler.next()).is_pending());
}

#[tokio::test]
async fn scheduler_should_overwrite_message_with_soonest_version() {
// Message type that is always considered equal to itself
#[derive(Derivative, Eq, Clone, Debug)]
#[derivative(PartialEq, Hash)]
struct SingletonMessage(#[derivative(PartialEq = "ignore", Hash = "ignore")] u8);

pause();

let now = Instant::now();
let scheduler = scheduler(
stream::iter([
ScheduleRequest {
message: SingletonMessage(1),
run_at: now + Duration::from_secs(2),
},
ScheduleRequest {
message: SingletonMessage(2),
run_at: now + Duration::from_secs(1),
},
])
.on_complete(sleep(Duration::from_secs(5))),
);
assert_eq!(scheduler.map(|msg| msg.0).collect::<Vec<_>>().await, vec![2]);
}

#[tokio::test]
async fn scheduler_should_not_overwrite_message_with_later_version() {
// Message type that is always considered equal to itself
#[derive(Derivative, Eq, Clone, Debug)]
#[derivative(PartialEq, Hash)]
struct SingletonMessage(#[derivative(PartialEq = "ignore", Hash = "ignore")] u8);

pause();

let now = Instant::now();
let scheduler = scheduler(
stream::iter([
ScheduleRequest {
message: SingletonMessage(1),
run_at: now + Duration::from_secs(1),
},
ScheduleRequest {
message: SingletonMessage(2),
run_at: now + Duration::from_secs(2),
},
])
.on_complete(sleep(Duration::from_secs(5))),
);
assert_eq!(scheduler.map(|msg| msg.0).collect::<Vec<_>>().await, vec![1]);
}
}

0 comments on commit 51c1b1c

Please sign in to comment.