Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add controller::Config and debounce period to scheduler #1265

Merged
merged 6 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Client {
// Error requesting
.or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
// Error from another middleware
.unwrap_or_else(|err| Error::Service(err))
.unwrap_or_else(Error::Service)
})?;
Ok(res)
}
Expand Down
69 changes: 54 additions & 15 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
store::{Store, Writer},
ObjectRef,
},
scheduler::{scheduler, ScheduleRequest},
scheduler::{debounced_scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
watcher::{self, metadata_watcher, watcher, Config, DefaultBackoff},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
};
use backoff::backoff::Backoff;
use derivative::Derivative;
Expand Down Expand Up @@ -246,12 +246,14 @@
///
/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
#[allow(clippy::needless_pass_by_value)]
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
config: Config,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
Expand All @@ -276,7 +278,7 @@
.map_err(Error::QueueError)
.map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
run_at: Instant::now(),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
Expand All @@ -291,7 +293,7 @@
)),
// all the Oks from the select gets passed through the scheduler stream, and are then executed
move |s| {
Runner::new(scheduler(s), move |request| {
Runner::new(debounced_scheduler(s, config.debounce), move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
Expand Down Expand Up @@ -417,6 +419,29 @@
}
}

/// Accumulates all options that can be used on a [`Controller`] invocation.
#[derive(Clone, Debug, Default)]
pub struct Config {
debounce: Duration,
}

impl Config {
/// The debounce duration used to deduplicate reconciliation requests.
///
/// When set to a non-zero duration, debouncing is enabled in the [`Scheduler`] resulting
/// in __trailing edge debouncing__ of reqonciler requests.
/// This option can help to reduce the amount of unnecessary reconciler calls
/// when using multiple controller relations, or during rapid phase transitions.
///
/// ## Warning
/// This option delays (and keeps delaying) reconcile requests for objects while
/// the object is updated. It can **permanently hide** updates from your reconciler
/// if set too high on objects that are updated frequently (like nodes).
pub fn debounce(&mut self, debounce: Duration) {
self.debounce = debounce;
}
aryan9600 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Controller for a Resource `K`
///
/// A controller is an infinite stream of objects to be reconciled.
Expand Down Expand Up @@ -505,6 +530,7 @@
forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
dyntype: K::DynamicType,
reader: Store<K>,
config: Config,
}

impl<K> Controller<K>
Expand All @@ -516,11 +542,11 @@
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
///
/// The [`Config`] controls to the possible subset of objects of `K` that you want to manage
/// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
/// and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
/// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
#[must_use]
pub fn new(main_api: Api<K>, wc: Config) -> Self
pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self

Check warning on line 549 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L549

Added line #L549 was not covered by tests
where
K::DynamicType: Default,
{
Expand All @@ -531,17 +557,17 @@
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
///
/// The [`Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
/// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
/// to watch - in the Api's configured scope - and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
///
/// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
///
/// [`Config`]: crate::watcher::Config
/// [`watcher::Config`]: crate::watcher::Config
/// [`Api`]: kube_client::Api
/// [`dynamic`]: kube_client::core::dynamic
/// [`Config::default`]: crate::watcher::Config::default
pub fn new_with(main_api: Api<K>, wc: Config, dyntype: K::DynamicType) -> Self {
pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {

Check warning on line 570 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L570

Added line #L570 was not covered by tests
let writer = Writer::<K>::new(dyntype.clone());
let reader = writer.as_reader();
let mut trigger_selector = stream::SelectAll::new();
Expand All @@ -564,6 +590,7 @@
],
dyntype,
reader,
config: Default::default(),

Check warning on line 593 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L593

Added line #L593 was not covered by tests
}
}

Expand Down Expand Up @@ -649,9 +676,17 @@
],
dyntype,
reader,
config: Default::default(),

Check warning on line 679 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L679

Added line #L679 was not covered by tests
}
}

/// Specify the configuration for the controller's behavior.
#[must_use]
pub fn with_config(mut self, config: Config) -> Self {
self.config = config;
self

Check warning on line 687 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L685-L687

Added lines #L685 - L687 were not covered by tests
}

/// Specify the backoff policy for "trigger" watches
///
/// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
Expand Down Expand Up @@ -683,7 +718,7 @@
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
wc: Config,
wc: watcher::Config,
) -> Self {
self.owns_with(api, (), wc)
}
Expand All @@ -696,7 +731,7 @@
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
wc: Config,
wc: watcher::Config,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
Expand Down Expand Up @@ -847,7 +882,7 @@
pub fn watches<Other, I>(
self,
api: Api<Other>,
wc: Config,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Expand All @@ -867,7 +902,7 @@
mut self,
api: Api<Other>,
dyntype: Other::DynamicType,
wc: Config,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Expand Down Expand Up @@ -1214,6 +1249,7 @@
self.reader,
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
self.config,

Check warning on line 1252 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L1252

Added line #L1252 was not covered by tests
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
}
Expand All @@ -1228,7 +1264,7 @@
applier,
reflector::{self, ObjectRef},
watcher::{self, metadata_watcher, watcher, Event},
Controller,
Config, Controller,
};
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
Expand Down Expand Up @@ -1295,6 +1331,8 @@

let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
let (store_rx, mut store_tx) = reflector::store();
let mut config = Config::default();
config.debounce(Duration::from_millis(1));
let applier = applier(
|obj, _| {
Box::pin(async move {
Expand All @@ -1307,6 +1345,7 @@
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
config,
);
pin_mut!(applier);
for i in 0..items {
Expand Down
2 changes: 2 additions & 0 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ mod tests {
let mut count = 0;
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
// The debounce period needs to zero because a debounce period > 0
// will lead to the second request to be discarded.
Runner::new(scheduler(sched_rx), |_| {
count += 1;
// Panic if this ref is already held, to simulate some unsafe action..
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod utils;
pub mod wait;
pub mod watcher;

pub use controller::{applier, Controller};
pub use controller::{applier, Config, Controller};
pub use finalizer::finalizer;
pub use reflector::reflector;
pub use scheduler::scheduler;
Expand Down
98 changes: 90 additions & 8 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
hash::Hash,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::time::Instant;
use tokio_util::time::delay_queue::{self, DelayQueue};
Expand Down Expand Up @@ -44,15 +45,22 @@ pub struct Scheduler<T, R> {
/// Incoming queue of scheduling requests.
#[pin]
requests: Fuse<R>,
/// Debounce time to allow for deduplication of requests. It is added to the request's
/// initial expiration time. If another request with the same message arrives before
/// the request expires, its added to the new request's expiration time. This allows
/// for a request to be emitted, if the scheduler is "uninterrupted" for the configured
/// debounce period. Its primary purpose to deduplicate requests that expire instantly.
debounce: Duration,
}

impl<T, R: Stream> Scheduler<T, R> {
fn new(requests: R) -> Self {
fn new(requests: R, debounce: Duration) -> Self {
Self {
queue: DelayQueue::new(),
scheduled: HashMap::new(),
pending: HashSet::new(),
requests: requests.fuse(),
debounce,
}
}
}
Expand All @@ -67,12 +75,15 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
return;
}
match self.scheduled.entry(request.message) {
// If new request is supposed to be earlier than the current entry's scheduled
// time (for eg: the new request is user triggered and the current entry is the
// reconciler's usual retry), then give priority to the new request.
Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => {
// Old entry will run after the new request, so replace it..
let entry = old_entry.get_mut();
// TODO: this should add a little delay here to actually debounce
self.queue.reset_at(&entry.queue_key, request.run_at);
entry.run_at = request.run_at;
self.queue
.reset_at(&entry.queue_key, request.run_at + *self.debounce);
entry.run_at = request.run_at + *self.debounce;
old_entry.replace_key();
}
Entry::Occupied(_old_entry) => {
Expand All @@ -82,8 +93,8 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
// No old entry, we're free to go!
let message = entry.key().clone();
entry.insert(ScheduledEntry {
run_at: request.run_at,
queue_key: self.queue.insert_at(message, request.run_at),
run_at: request.run_at + *self.debounce,
queue_key: self.queue.insert_at(message, request.run_at + *self.debounce),
});
}
}
Expand Down Expand Up @@ -203,14 +214,29 @@ where
///
/// The [`Scheduler`] terminates as soon as `requests` does.
pub fn scheduler<T: Eq + Hash + Clone, S: Stream<Item = ScheduleRequest<T>>>(requests: S) -> Scheduler<T, S> {
Scheduler::new(requests)
Scheduler::new(requests, Duration::ZERO)
}

/// Stream transformer that delays and deduplicates [`Stream`] items.
///
/// The debounce period lets the scheduler deduplicate requests that ask to be
/// emitted instantly, by making sure we wait for the configured period of time
/// to receive an uninterrupted request before actually emitting it.
///
/// For more info, see [`scheduler()`].
#[allow(clippy::module_name_repetitions)]
pub fn debounced_scheduler<T: Eq + Hash + Clone, S: Stream<Item = ScheduleRequest<T>>>(
requests: S,
debounce: Duration,
) -> Scheduler<T, S> {
Scheduler::new(requests, debounce)
}

#[cfg(test)]
mod tests {
use crate::utils::KubeRuntimeStreamExt;

use super::{scheduler, ScheduleRequest};
use super::{debounced_scheduler, scheduler, ScheduleRequest};
use derivative::Derivative;
use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt};
use std::task::Poll;
Expand Down Expand Up @@ -447,4 +473,60 @@ mod tests {
);
assert_eq!(scheduler.map(|msg| msg.0).collect::<Vec<_>>().await, vec![1]);
}

#[tokio::test]
async fn scheduler_should_add_debounce_to_a_request() {
pause();

let now = Instant::now();
let (mut sched_tx, sched_rx) = mpsc::unbounded::<ScheduleRequest<SingletonMessage>>();
let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(2));

sched_tx
.send(ScheduleRequest {
message: SingletonMessage(1),
run_at: now,
})
.await
.unwrap();
advance(Duration::from_secs(1)).await;
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(3)).await;
aryan9600 marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 1);
}

#[tokio::test]
async fn scheduler_should_dedup_message_within_debounce_period() {
pause();

let mut now = Instant::now();
let (mut sched_tx, sched_rx) = mpsc::unbounded::<ScheduleRequest<SingletonMessage>>();
let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(3));

sched_tx
.send(ScheduleRequest {
message: SingletonMessage(1),
run_at: now,
})
.await
.unwrap();
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(1)).await;

now = Instant::now();
sched_tx
.send(ScheduleRequest {
message: SingletonMessage(2),
run_at: now,
})
.await
.unwrap();
// Check if the initial request was indeed duplicated.
advance(Duration::from_millis(2500)).await;
assert!(poll!(scheduler.next()).is_pending());

advance(Duration::from_secs(3)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 2);
assert!(poll!(scheduler.next()).is_pending());
}
}
Loading