diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 985566972..0ce18ea4d 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -213,3 +213,7 @@ path = "secret_syncer.rs" name = "pod_shell_crossterm" path = "pod_shell_crossterm.rs" required-features = ["ws"] + +[[example]] +name = "namespace_reflector" +path = "namespace_reflector.rs" diff --git a/examples/namespace_reflector.rs b/examples/namespace_reflector.rs new file mode 100644 index 000000000..6d7af6385 --- /dev/null +++ b/examples/namespace_reflector.rs @@ -0,0 +1,49 @@ +use futures::TryStreamExt; +use k8s_openapi::api::core::v1::Namespace; +use kube::{ + api::Api, + runtime::{predicates, reflector, watcher, WatchStreamExt}, + Client, ResourceExt, +}; +use tracing::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let client = Client::try_default().await?; + + let api: Api = Api::all(client); + let (reader, writer) = reflector::store::(); + + tokio::spawn(async move { + // Show state every 5 seconds of watching + loop { + reader.wait_until_ready().await.unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + info!("Current namespace count: {}", reader.state().len()); + // full information with debug logs + for p in reader.state() { + let yaml = serde_yaml::to_string(p.as_ref()).unwrap(); + debug!("Namespace {}: \n{}", p.name_any(), yaml); + } + } + }); + + let stream = watcher(api, watcher::Config::default().streaming_lists()) + .default_backoff() + .modify(|ns| { + // memory optimization for our store - we don't care about managed fields/annotations/status + ns.managed_fields_mut().clear(); + ns.annotations_mut().clear(); + ns.status = None; + }) + .reflect(writer) + .applied_objects() + .predicate_filter(predicates::resource_version); // NB: requires an unstable feature + + futures::pin_mut!(stream); + while let Some(ns) = stream.try_next().await? { + info!("saw {}", ns.name_any()); + } + Ok(()) +} diff --git a/justfile b/justfile index ed7d4ab18..31139ee54 100644 --- a/justfile +++ b/justfile @@ -83,11 +83,12 @@ e2e-job-musl features: chmod +x e2e/job k3d: - k3d cluster create main --servers 1 --registry-create main \ + k3d cluster create main --servers 1 --registry-create main --image rancher/k3s:v1.27.3-k3s1 \ --no-lb --no-rollback \ --k3s-arg "--disable=traefik,servicelb,metrics-server@server:*" \ --k3s-arg '--kubelet-arg=eviction-hard=imagefs.available<1%,nodefs.available<1%@agent:*' \ - --k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' + --k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' \ + --k3s-arg '--kube-apiserver-arg=feature-gates=WatchList=true' ## RELEASE RELATED diff --git a/kube-client/src/client/tls.rs b/kube-client/src/client/tls.rs index 3c84477f9..45785a8c9 100644 --- a/kube-client/src/client/tls.rs +++ b/kube-client/src/client/tls.rs @@ -53,7 +53,7 @@ pub mod rustls_tls { let mut client_config = if let Some((chain, pkey)) = identity_pem.map(client_auth).transpose()? { config_builder - .with_single_cert(chain, pkey) + .with_client_auth_cert(chain, pkey) .map_err(Error::InvalidPrivateKey)? } else { config_builder.with_no_client_auth() diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index 93d333f19..41eb04b26 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -305,6 +305,29 @@ pub struct WatchParams { /// If the feature gate WatchBookmarks is not enabled in apiserver, /// this field is ignored. pub bookmarks: bool, + + /// Kubernetes 1.27 Streaming Lists + /// `sendInitialEvents=true` may be set together with `watch=true`. + /// In that case, the watch stream will begin with synthetic events to + /// produce the current state of objects in the collection. Once all such + /// events have been sent, a synthetic "Bookmark" event will be sent. + /// The bookmark will report the ResourceVersion (RV) corresponding to the + /// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation. + /// Afterwards, the watch stream will proceed as usual, sending watch events + /// corresponding to changes (subsequent to the RV) to objects watched. + /// + /// When `sendInitialEvents` option is set, we require `resourceVersionMatch` + /// option to also be set. The semantic of the watch request is as following: + /// - `resourceVersionMatch` = NotOlderThan + /// is interpreted as "data at least as new as the provided `resourceVersion`" + /// and the bookmark event is send when the state is synced + /// to a `resourceVersion` at least as fresh as the one provided by the ListOptions. + /// If `resourceVersion` is unset, this is interpreted as "consistent read" and the + /// bookmark event is send when the state is synced at least to the moment + /// when request started being processed. + /// - `resourceVersionMatch` set to any other value or unset + /// Invalid error is returned. + pub send_initial_events: bool, } impl WatchParams { @@ -315,6 +338,11 @@ impl WatchParams { return Err(Error::Validation("WatchParams::timeout must be < 295s".into())); } } + if self.send_initial_events && !self.bookmarks { + return Err(Error::Validation( + "WatchParams::bookmarks must be set when using send_initial_events".into(), + )); + } Ok(()) } @@ -334,6 +362,10 @@ impl WatchParams { if self.bookmarks { qp.append_pair("allowWatchBookmarks", "true"); } + if self.send_initial_events { + qp.append_pair("sendInitialEvents", "true"); + qp.append_pair("resourceVersionMatch", "NotOlderThan"); + } } } @@ -347,6 +379,7 @@ impl Default for WatchParams { label_selector: None, field_selector: None, timeout: None, + send_initial_events: false, } } } @@ -401,6 +434,48 @@ impl WatchParams { self.bookmarks = false; self } + + /// Kubernetes 1.27 Streaming Lists + /// `sendInitialEvents=true` may be set together with `watch=true`. + /// In that case, the watch stream will begin with synthetic events to + /// produce the current state of objects in the collection. Once all such + /// events have been sent, a synthetic "Bookmark" event will be sent. + /// The bookmark will report the ResourceVersion (RV) corresponding to the + /// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation. + /// Afterwards, the watch stream will proceed as usual, sending watch events + /// corresponding to changes (subsequent to the RV) to objects watched. + /// + /// When `sendInitialEvents` option is set, we require `resourceVersionMatch` + /// option to also be set. The semantic of the watch request is as following: + /// - `resourceVersionMatch` = NotOlderThan + /// is interpreted as "data at least as new as the provided `resourceVersion`" + /// and the bookmark event is send when the state is synced + /// to a `resourceVersion` at least as fresh as the one provided by the ListOptions. + /// If `resourceVersion` is unset, this is interpreted as "consistent read" and the + /// bookmark event is send when the state is synced at least to the moment + /// when request started being processed. + /// - `resourceVersionMatch` set to any other value or unset + /// Invalid error is returned. + /// + /// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward + /// compatibility reasons) and to false otherwise. + #[must_use] + pub fn initial_events(mut self) -> Self { + self.send_initial_events = true; + + self + } + + /// Constructor for doing Kubernetes 1.27 Streaming List watches + /// + /// Enables [`VersionMatch::NotGreaterThan`] semantics and [`WatchParams::send_initial_events`]. + pub fn streaming_lists() -> Self { + Self { + send_initial_events: true, + bookmarks: true, // required + ..WatchParams::default() + } + } } /// Common query parameters for put/post calls diff --git a/kube-core/src/request.rs b/kube-core/src/request.rs index 2490c925e..93dd5d8af 100644 --- a/kube-core/src/request.rs +++ b/kube-core/src/request.rs @@ -522,6 +522,18 @@ mod test { "/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&resourceVersion=0" ); } + + #[test] + fn watch_streaming_list() { + let url = corev1::Pod::url_path(&(), Some("ns")); + let wp = WatchParams::default().initial_events(); + let req = Request::new(url).watch(&wp, "0").unwrap(); + assert_eq!( + req.uri(), + "/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&sendInitialEvents=true&resourceVersionMatch=NotOlderThan&resourceVersion=0" + ); + } + #[test] fn watch_metadata_path() { let url = corev1::Pod::url_path(&(), Some("ns")); diff --git a/kube-core/src/watch.rs b/kube-core/src/watch.rs index 4155d490e..f2423af8d 100644 --- a/kube-core/src/watch.rs +++ b/kube-core/src/watch.rs @@ -59,4 +59,9 @@ pub struct Bookmark { pub struct BookmarkMeta { /// The only field we need from a Bookmark event. pub resource_version: String, + + /// Kubernetes 1.27 Streaming Lists + /// The rest of the fields are optional and may be empty. + #[serde(default)] + pub annotations: std::collections::BTreeMap, } diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 0b3815ccc..150ed0f75 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -122,6 +122,13 @@ enum State { continue_token: Option, objects: Vec, }, + /// Kubernetes 1.27 Streaming Lists + /// The initial watch is in progress + IntialWatch { + objects: Vec, + #[derivative(Debug = "ignore")] + stream: BoxStream<'static, kube_client::Result>>, + }, /// The initial LIST was successful, so we should move on to starting the actual watch. InitListed { resource_version: String }, /// The watch is in progress, from this point we just return events from the server. @@ -192,6 +199,16 @@ pub enum ListSemantic { Any, } +/// Configurable watcher listwatch semantics +#[derive(Clone, Default, Debug, PartialEq)] +pub enum InitialListStrategy { + #[default] + ListWatch, + /// Kubernetes 1.27 Streaming Lists + /// https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists + StreamingList, +} + /// Accumulates all options that can be used on the watcher invocation. #[derive(Clone, Debug, PartialEq)] pub struct Config { @@ -215,14 +232,31 @@ pub struct Config { /// Semantics for list calls. /// /// Configures re-list for performance vs. consistency. + /// + /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. pub list_semantic: ListSemantic, + /// Kubernetes 1.27 Streaming Lists + /// Control how the watcher fetches the initial list of objects. + /// + /// ListWatch: The watcher will fetch the initial list of objects using a list call. + /// StreamingList: The watcher will fetch the initial list of objects using a watch call. + /// + /// StreamingList is more efficient than ListWatch, but it requires the server to support + /// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27). + /// + /// See https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists + /// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details + pub initial_list_strategy: InitialListStrategy, + /// Maximum number of objects retrieved per list operation resyncs. /// /// This can reduce the memory consumption during resyncs, at the cost of requiring more /// API roundtrips to complete. /// /// Defaults to 500. Note that `None` represents unbounded. + /// + /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. pub page_size: Option, /// Enables watch events with type "BOOKMARK". @@ -243,6 +277,7 @@ impl Default for Config { // same default page size limit as client-go // https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31 page_size: Some(500), + initial_list_strategy: InitialListStrategy::ListWatch, } } } @@ -289,6 +324,8 @@ impl Config { } /// Sets list semantic to configure re-list performance and consistency + /// + /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. #[must_use] pub fn list_semantic(mut self, semantic: ListSemantic) -> Self { self.list_semantic = semantic; @@ -296,6 +333,8 @@ impl Config { } /// Sets list semantic to `Any` to improve list performance + /// + /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. #[must_use] pub fn any_semantic(self) -> Self { self.list_semantic(ListSemantic::Any) @@ -315,12 +354,22 @@ impl Config { /// /// This can reduce the memory consumption during resyncs, at the cost of requiring more /// API roundtrips to complete. + /// + /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. #[must_use] pub fn page_size(mut self, page_size: u32) -> Self { self.page_size = Some(page_size); self } + /// Kubernetes 1.27 Streaming Lists + /// Sets list semantic to `Stream` to make use of watch bookmarks + #[must_use] + pub fn streaming_lists(mut self) -> Self { + self.initial_list_strategy = InitialListStrategy::StreamingList; + self + } + /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests. fn to_list_params(&self) -> ListParams { let (resource_version, version_match) = match self.list_semantic { @@ -346,6 +395,7 @@ impl Config { field_selector: self.field_selector.clone(), timeout: self.timeout, bookmarks: self.bookmarks, + send_initial_events: self.initial_list_strategy == InitialListStrategy::StreamingList, } } } @@ -414,35 +464,103 @@ where State::Empty { continue_token, mut objects, + } => match wc.initial_list_strategy { + InitialListStrategy::ListWatch => { + let mut lp = wc.to_list_params(); + lp.continue_token = continue_token; + match api.list(&lp).await { + Ok(list) => { + objects.extend(list.items); + if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) { + (None, State::Empty { + continue_token: Some(continue_token), + objects, + }) + } else if let Some(resource_version) = + list.metadata.resource_version.filter(|s| !s.is_empty()) + { + (Some(Ok(Event::Restarted(objects))), State::InitListed { + resource_version, + }) + } else { + (Some(Err(Error::NoResourceVersion)), State::default()) + } + } + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch list error with 403: {err:?}"); + } else { + debug!("watch list error: {err:?}"); + } + (Some(Err(err).map_err(Error::InitialListFailed)), State::default()) + } + } + } + InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await { + Ok(stream) => (None, State::IntialWatch { stream, objects }), + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch initlist error with 403: {err:?}"); + } else { + debug!("watch initlist error: {err:?}"); + } + (Some(Err(err).map_err(Error::WatchStartFailed)), State::default()) + } + }, + }, + State::IntialWatch { + mut objects, + mut stream, } => { - let mut lp = wc.to_list_params(); - lp.continue_token = continue_token; - match api.list(&lp).await { - Ok(list) => { - objects.extend(list.items); - if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) { - (None, State::Empty { - continue_token: Some(continue_token), - objects, + match stream.next().await { + Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { + objects.push(obj); + (None, State::IntialWatch { objects, stream }) + } + Some(Ok(WatchEvent::Deleted(obj))) => { + objects.retain(|o| o.name_any() != obj.name_any() && o.namespace() != obj.namespace()); + (None, State::IntialWatch { objects, stream }) + } + Some(Ok(WatchEvent::Bookmark(bm))) => { + let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end"); + if marks_initial_end { + (Some(Ok(Event::Restarted(objects))), State::Watching { + resource_version: bm.metadata.resource_version, + stream, }) - } else if let Some(resource_version) = - list.metadata.resource_version.filter(|s| !s.is_empty()) - { - (Some(Ok(Event::Restarted(objects))), State::InitListed { - resource_version, + } else { + (None, State::Watching { + resource_version: bm.metadata.resource_version, + stream, }) + } + } + Some(Ok(WatchEvent::Error(err))) => { + // HTTP GONE, means we have desynced and need to start over and re-list :( + let new_state = if err.code == 410 { + State::default() + } else { + State::IntialWatch { objects, stream } + }; + if err.code == 403 { + warn!("watcher watchevent error 403: {err:?}"); } else { - (Some(Err(Error::NoResourceVersion)), State::default()) + debug!("error watchevent error: {err:?}"); } + (Some(Err(err).map_err(Error::WatchError)), new_state) } - Err(err) => { + Some(Err(err)) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { - warn!("watch list error with 403: {err:?}"); + warn!("watcher error 403: {err:?}"); } else { - debug!("watch list error: {err:?}"); + debug!("watcher error: {err:?}"); } - (Some(Err(err).map_err(Error::InitialListFailed)), State::default()) + (Some(Err(err).map_err(Error::WatchFailed)), State::IntialWatch { + objects, + stream, + }) } + None => (None, State::default()), } } State::InitListed { resource_version } => {