Skip to content

Commit

Permalink
Add Predicate trait to allow combination + fallbacks (#1228)
Browse files Browse the repository at this point in the history
* Add `Predicate` trait to allow implementing `Fallback`

Signed-off-by: clux <sszynrae@gmail.com>

* propagate predicate properly

Signed-off-by: clux <sszynrae@gmail.com>

* add a way to combine predicates also

Signed-off-by: clux <sszynrae@gmail.com>

* make combination better by hashing the output

Signed-off-by: clux <sszynrae@gmail.com>

* import

Signed-off-by: clux <sszynrae@gmail.com>

---------

Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux committed Jun 20, 2023
1 parent 2ad3827 commit 5806a2d
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 31 deletions.
6 changes: 3 additions & 3 deletions examples/node_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Node;
use kube::{
api::{Api, ResourceExt},
runtime::{predicates, reflector, watcher, WatchStreamExt},
runtime::{predicates, reflector, watcher, Predicate, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> {
let (reader, writer) = reflector::store();
let rf = reflector(writer, watcher(nodes, wc))
.applied_objects()
.predicate_filter(predicates::labels); // NB: requires an unstable feature
.predicate_filter(predicates::labels.combine(predicates::annotations)); // NB: requires an unstable feature

// Periodically read our state in the background
tokio::spawn(async move {
Expand All @@ -34,7 +34,7 @@ async fn main() -> anyhow::Result<()> {
// Log applied events with changes from the reflector
pin_mut!(rf);
while let Some(node) = rf.try_next().await? {
info!("saw node {} with hitherto unseen labels", node.name_any());
info!("saw node {} with new labels/annots", node.name_any());
}

Ok(())
Expand Down
6 changes: 4 additions & 2 deletions examples/pod_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::Api,
runtime::{reflector, watcher, WatchStreamExt},
runtime::{predicates, reflector, watcher, WatchStreamExt},
Client, ResourceExt,
};
use tracing::*;
Expand Down Expand Up @@ -37,7 +37,9 @@ async fn main() -> anyhow::Result<()> {
})
});

let rf = reflector(writer, stream).applied_objects();
let rf = reflector(writer, stream)
.applied_objects()
.predicate_filter(predicates::resource_version); // NB: requires an unstable feature
futures::pin_mut!(rf);

while let Some(pod) = rf.try_next().await? {
Expand Down
1 change: 0 additions & 1 deletion kube-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,6 @@ mod test {
assert!(format!("{err}").contains("non-zero resource_version is required when using an Exact match"));
}


#[test]
fn list_not_older() {
let url = corev1::Pod::url_path(&(), Some("ns"));
Expand Down
3 changes: 2 additions & 1 deletion kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ pub use scheduler::scheduler;
pub use utils::WatchStreamExt;
pub use watcher::{metadata_watcher, watcher};

#[cfg(feature = "unstable-runtime-predicates")] pub use utils::predicates;
#[cfg(feature = "unstable-runtime-predicates")]
pub use utils::{predicates, Predicate};
pub use wait::conditions;
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod watch_ext;
pub use backoff_reset_timer::ResetTimerBackoff;
pub use event_flatten::EventFlatten;
#[cfg(feature = "unstable-runtime-predicates")]
pub use predicate::{predicates, PredicateFilter};
pub use predicate::{predicates, Predicate, PredicateFilter};
pub use stream_backoff::StreamBackoff;
#[cfg(feature = "unstable-runtime-subscribe")]
pub use stream_subscribe::StreamSubscribe;
Expand Down
116 changes: 97 additions & 19 deletions kube-runtime/src/utils/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,125 @@ use core::{
use futures::{ready, Stream};
use kube_client::Resource;
use pin_project::pin_project;
use std::{collections::HashMap, hash::Hash};
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
};

fn hash<T: Hash>(t: &T) -> u64 {
let mut hasher = DefaultHasher::new();
t.hash(&mut hasher);
hasher.finish()
}

/// A predicate is a hasher of Kubernetes objects stream filtering
pub trait Predicate<K> {
/// A predicate only needs to implement optional hashing when keys exist
fn hash_property(&self, obj: &K) -> Option<u64>;

/// Returns a `Predicate` that falls back to an alternate property if the first does not exist
///
/// # Usage
///
/// ```
/// # use k8s_openapi::api::core::v1::Pod;
/// use kube::runtime::{predicates, Predicate};
/// # fn blah<K>(a: impl Predicate<K>) {}
/// let pred = predicates::generation.fallback(predicates::resource_version);
/// blah::<Pod>(pred);
/// ```
fn fallback<F: Predicate<K>>(self, f: F) -> Fallback<Self, F>
where
Self: Sized,
{
Fallback(self, f)
}

/// Returns a `Predicate` that combines all available hashes
///
/// # Usage
///
/// ```
/// # use k8s_openapi::api::core::v1::Pod;
/// use kube::runtime::{predicates, Predicate};
/// # fn blah<K>(a: impl Predicate<K>) {}
/// let pred = predicates::labels.combine(predicates::annotations);
/// blah::<Pod>(pred);
/// ```
fn combine<F: Predicate<K>>(self, f: F) -> Combine<Self, F>
where
Self: Sized,
{
Combine(self, f)
}
}

impl<K, F: Fn(&K) -> Option<u64>> Predicate<K> for F {
fn hash_property(&self, obj: &K) -> Option<u64> {
(self)(obj)
}
}

/// See [`Predicate::fallback`]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Fallback<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Predicate<K> for Fallback<A, B>
where
A: Predicate<K>,
B: Predicate<K>,
{
fn hash_property(&self, obj: &K) -> Option<u64> {
self.0.hash_property(obj).or_else(|| self.1.hash_property(obj))
}
}
/// See [`Predicate::combine`]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Combine<A, B>(pub(super) A, pub(super) B);
impl<A, B, K> Predicate<K> for Combine<A, B>
where
A: Predicate<K>,
B: Predicate<K>,
{
fn hash_property(&self, obj: &K) -> Option<u64> {
match (self.0.hash_property(obj), self.1.hash_property(obj)) {
// pass on both missing properties so people can chain .fallback
(None, None) => None,
// but any other combination of properties are hashed together
(a, b) => Some(hash(&(a, b))),
}
}
}

#[allow(clippy::pedantic)]
#[pin_project]
/// Stream returned by the [`predicate_filter`](super::WatchStreamExt::predicate_filter) method.
#[must_use = "streams do nothing unless polled"]
pub struct PredicateFilter<St, K: Resource, Func> {
pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
#[pin]
stream: St,
predicate: Func,
predicate: P,
cache: HashMap<ObjectRef<K>, u64>,
}
impl<St, K, F> PredicateFilter<St, K, F>
impl<St, K, P> PredicateFilter<St, K, P>
where
St: Stream<Item = Result<K, Error>>,
K: Resource,
F: Fn(&K) -> Option<u64> + 'static,
P: Predicate<K>,
{
pub(super) fn new(stream: St, predicate: F) -> Self {
pub(super) fn new(stream: St, predicate: P) -> Self {
Self {
stream,
predicate,
cache: HashMap::new(),
}
}
}
impl<St, K, F> Stream for PredicateFilter<St, K, F>
impl<St, K, P> Stream for PredicateFilter<St, K, P>
where
St: Stream<Item = Result<K, Error>>,
K: Resource,
K::DynamicType: Default + Eq + Hash,
F: Fn(&K) -> Option<u64> + 'static,
P: Predicate<K>,
{
type Item = Result<K, Error>;

Expand All @@ -46,7 +133,7 @@ where
Poll::Ready(loop {
break match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(obj)) => {
if let Some(val) = (me.predicate)(&obj) {
if let Some(val) = me.predicate.hash_property(&obj) {
let key = ObjectRef::from_obj(&obj);
let changed = if let Some(old) = me.cache.get(&key) {
*old != val
Expand Down Expand Up @@ -82,17 +169,8 @@ where
///
/// Functional rewrite of the [controller-runtime/predicate module](https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/predicate/predicate.go).
pub mod predicates {
use super::hash;
use kube_client::{Resource, ResourceExt};
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};

fn hash<T: Hash>(t: &T) -> u64 {
let mut hasher = DefaultHasher::new();
t.hash(&mut hasher);
hasher.finish()
}

/// Hash the generation of a Resource K
pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
Expand Down
7 changes: 3 additions & 4 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "unstable-runtime-predicates")]
use crate::utils::predicate::PredicateFilter;
use crate::utils::predicate::{Predicate, PredicateFilter};
#[cfg(feature = "unstable-runtime-subscribe")]
use crate::utils::stream_subscribe::StreamSubscribe;
use crate::{
Expand Down Expand Up @@ -42,7 +42,6 @@ pub trait WatchStreamExt: Stream {
EventFlatten::new(self, true)
}


/// Filter out a flattened stream on [`predicates`](crate::predicates).
///
/// This will filter out repeat calls where the predicate returns the same result.
Expand Down Expand Up @@ -72,11 +71,11 @@ pub trait WatchStreamExt: Stream {
/// # }
/// ```
#[cfg(feature = "unstable-runtime-predicates")]
fn predicate_filter<K, F>(self, predicate: F) -> PredicateFilter<Self, K, F>
fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
where
Self: Stream<Item = Result<K, watcher::Error>> + Sized,
K: Resource + 'static,
F: Fn(&K) -> Option<u64> + 'static,
P: Predicate<K> + 'static,
{
PredicateFilter::new(self, predicate)
}
Expand Down

0 comments on commit 5806a2d

Please sign in to comment.