Skip to content

Commit

Permalink
adds watch-list implementation without breaking changes (#1255)
Browse files Browse the repository at this point in the history
* adds streaming watch list operation

Signed-off-by: Ivan Porto Carrero <ivan@flanders.co.nz>

* update the params and docs based on review comments

Signed-off-by: Ivan Porto Carrero <ivan@flanders.co.nz>

---------

Signed-off-by: Ivan Porto Carrero <ivan@flanders.co.nz>
  • Loading branch information
casualjim committed Aug 22, 2023
1 parent d410464 commit 3af6b7f
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 22 deletions.
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,7 @@ path = "secret_syncer.rs"
name = "pod_shell_crossterm"
path = "pod_shell_crossterm.rs"
required-features = ["ws"]

[[example]]
name = "namespace_reflector"
path = "namespace_reflector.rs"
49 changes: 49 additions & 0 deletions examples/namespace_reflector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Namespace;
use kube::{
api::Api,
runtime::{predicates, reflector, watcher, WatchStreamExt},
Client, ResourceExt,
};
use tracing::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let api: Api<Namespace> = Api::all(client);
let (reader, writer) = reflector::store::<Namespace>();

tokio::spawn(async move {
// Show state every 5 seconds of watching
loop {
reader.wait_until_ready().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("Current namespace count: {}", reader.state().len());
// full information with debug logs
for p in reader.state() {
let yaml = serde_yaml::to_string(p.as_ref()).unwrap();
debug!("Namespace {}: \n{}", p.name_any(), yaml);
}
}
});

let stream = watcher(api, watcher::Config::default().streaming_lists())
.default_backoff()
.modify(|ns| {
// memory optimization for our store - we don't care about managed fields/annotations/status
ns.managed_fields_mut().clear();
ns.annotations_mut().clear();
ns.status = None;
})
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::resource_version); // NB: requires an unstable feature

futures::pin_mut!(stream);
while let Some(ns) = stream.try_next().await? {
info!("saw {}", ns.name_any());
}
Ok(())
}
5 changes: 3 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ e2e-job-musl features:
chmod +x e2e/job

k3d:
k3d cluster create main --servers 1 --registry-create main \
k3d cluster create main --servers 1 --registry-create main --image rancher/k3s:v1.27.3-k3s1 \
--no-lb --no-rollback \
--k3s-arg "--disable=traefik,servicelb,metrics-server@server:*" \
--k3s-arg '--kubelet-arg=eviction-hard=imagefs.available<1%,nodefs.available<1%@agent:*' \
--k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*'
--k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' \
--k3s-arg '--kube-apiserver-arg=feature-gates=WatchList=true'

## RELEASE RELATED

Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/client/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub mod rustls_tls {

let mut client_config = if let Some((chain, pkey)) = identity_pem.map(client_auth).transpose()? {
config_builder
.with_single_cert(chain, pkey)
.with_client_auth_cert(chain, pkey)
.map_err(Error::InvalidPrivateKey)?
} else {
config_builder.with_no_client_auth()
Expand Down
75 changes: 75 additions & 0 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,29 @@ pub struct WatchParams {
/// If the feature gate WatchBookmarks is not enabled in apiserver,
/// this field is ignored.
pub bookmarks: bool,

/// Kubernetes 1.27 Streaming Lists
/// `sendInitialEvents=true` may be set together with `watch=true`.
/// In that case, the watch stream will begin with synthetic events to
/// produce the current state of objects in the collection. Once all such
/// events have been sent, a synthetic "Bookmark" event will be sent.
/// The bookmark will report the ResourceVersion (RV) corresponding to the
/// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation.
/// Afterwards, the watch stream will proceed as usual, sending watch events
/// corresponding to changes (subsequent to the RV) to objects watched.
///
/// When `sendInitialEvents` option is set, we require `resourceVersionMatch`
/// option to also be set. The semantic of the watch request is as following:
/// - `resourceVersionMatch` = NotOlderThan
/// is interpreted as "data at least as new as the provided `resourceVersion`"
/// and the bookmark event is send when the state is synced
/// to a `resourceVersion` at least as fresh as the one provided by the ListOptions.
/// If `resourceVersion` is unset, this is interpreted as "consistent read" and the
/// bookmark event is send when the state is synced at least to the moment
/// when request started being processed.
/// - `resourceVersionMatch` set to any other value or unset
/// Invalid error is returned.
pub send_initial_events: bool,
}

impl WatchParams {
Expand All @@ -315,6 +338,11 @@ impl WatchParams {
return Err(Error::Validation("WatchParams::timeout must be < 295s".into()));
}
}
if self.send_initial_events && !self.bookmarks {
return Err(Error::Validation(
"WatchParams::bookmarks must be set when using send_initial_events".into(),
));
}
Ok(())
}

Expand All @@ -334,6 +362,10 @@ impl WatchParams {
if self.bookmarks {
qp.append_pair("allowWatchBookmarks", "true");
}
if self.send_initial_events {
qp.append_pair("sendInitialEvents", "true");
qp.append_pair("resourceVersionMatch", "NotOlderThan");
}
}
}

Expand All @@ -347,6 +379,7 @@ impl Default for WatchParams {
label_selector: None,
field_selector: None,
timeout: None,
send_initial_events: false,
}
}
}
Expand Down Expand Up @@ -401,6 +434,48 @@ impl WatchParams {
self.bookmarks = false;
self
}

/// Kubernetes 1.27 Streaming Lists
/// `sendInitialEvents=true` may be set together with `watch=true`.
/// In that case, the watch stream will begin with synthetic events to
/// produce the current state of objects in the collection. Once all such
/// events have been sent, a synthetic "Bookmark" event will be sent.
/// The bookmark will report the ResourceVersion (RV) corresponding to the
/// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation.
/// Afterwards, the watch stream will proceed as usual, sending watch events
/// corresponding to changes (subsequent to the RV) to objects watched.
///
/// When `sendInitialEvents` option is set, we require `resourceVersionMatch`
/// option to also be set. The semantic of the watch request is as following:
/// - `resourceVersionMatch` = NotOlderThan
/// is interpreted as "data at least as new as the provided `resourceVersion`"
/// and the bookmark event is send when the state is synced
/// to a `resourceVersion` at least as fresh as the one provided by the ListOptions.
/// If `resourceVersion` is unset, this is interpreted as "consistent read" and the
/// bookmark event is send when the state is synced at least to the moment
/// when request started being processed.
/// - `resourceVersionMatch` set to any other value or unset
/// Invalid error is returned.
///
/// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward
/// compatibility reasons) and to false otherwise.
#[must_use]
pub fn initial_events(mut self) -> Self {
self.send_initial_events = true;

self
}

/// Constructor for doing Kubernetes 1.27 Streaming List watches
///
/// Enables [`VersionMatch::NotGreaterThan`] semantics and [`WatchParams::send_initial_events`].
pub fn streaming_lists() -> Self {
Self {
send_initial_events: true,
bookmarks: true, // required
..WatchParams::default()
}
}
}

/// Common query parameters for put/post calls
Expand Down
12 changes: 12 additions & 0 deletions kube-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,18 @@ mod test {
"/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&resourceVersion=0"
);
}

#[test]
fn watch_streaming_list() {
let url = corev1::Pod::url_path(&(), Some("ns"));
let wp = WatchParams::default().initial_events();
let req = Request::new(url).watch(&wp, "0").unwrap();
assert_eq!(
req.uri(),
"/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&sendInitialEvents=true&resourceVersionMatch=NotOlderThan&resourceVersion=0"
);
}

#[test]
fn watch_metadata_path() {
let url = corev1::Pod::url_path(&(), Some("ns"));
Expand Down
5 changes: 5 additions & 0 deletions kube-core/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ pub struct Bookmark {
pub struct BookmarkMeta {
/// The only field we need from a Bookmark event.
pub resource_version: String,

/// Kubernetes 1.27 Streaming Lists
/// The rest of the fields are optional and may be empty.
#[serde(default)]
pub annotations: std::collections::BTreeMap<String, String>,
}
Loading

0 comments on commit 3af6b7f

Please sign in to comment.