Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup watcher docs, and watchlist doc examples #1284

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,3 @@ path = "secret_syncer.rs"
name = "pod_shell_crossterm"
path = "pod_shell_crossterm.rs"
required-features = ["ws"]

[[example]]
name = "namespace_reflector"
path = "namespace_reflector.rs"
6 changes: 6 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ cargo run --example node_watcher
cargo run --example dynamic_watcher
```

The `node_` and `pod_` watcher also allows using [Kubernetes 1.27 Streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists) via `WATCHLIST=1`:

```sh
WATCHLIST=1 RUST_LOG=info,kube=debug cargo run --example pod_watcher
```

### controllers
Main example requires you creating the custom resource first:

Expand Down
24 changes: 15 additions & 9 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Event;
use k8s_openapi::api::{core::v1::ObjectReference, events::v1::Event};
use kube::{
api::Api,
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
use tracing::info;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let events: Api<Event> = Api::all(client);
let wc = watcher::Config::default();

let ew = watcher(events, wc).applied_objects();
let ew = watcher(events, watcher::Config::default()).applied_objects();

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
Expand All @@ -27,10 +25,18 @@ async fn main() -> anyhow::Result<()> {
// This function lets the app handle an added/modified event from k8s
fn handle_event(ev: Event) -> anyhow::Result<()> {
info!(
"Event: \"{}\" via {} {}",
ev.message.unwrap().trim(),
ev.involved_object.kind.unwrap(),
ev.involved_object.name.unwrap()
"{}: {} ({})",
ev.regarding.map(fmt_obj_ref).unwrap_or_default(),
ev.reason.unwrap_or_default(),
ev.note.unwrap_or_default(),
);
Ok(())
}

fn fmt_obj_ref(oref: ObjectReference) -> String {
format!(
"{}/{}",
oref.kind.unwrap_or_default(),
oref.name.unwrap_or_default()
)
}
49 changes: 0 additions & 49 deletions examples/namespace_reflector.rs

This file was deleted.

8 changes: 7 additions & 1 deletion examples/node_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client.clone());
let nodes: Api<Node> = Api::all(client.clone());

let wc = watcher::Config::default().labels("beta.kubernetes.io/arch=amd64");
let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false);
let wc = if use_watchlist {
// requires WatchList feature gate on 1.27 or later
watcher::Config::default().streaming_lists()
} else {
watcher::Config::default()
};
let obs = watcher(nodes, wc).default_backoff().applied_objects();

pin_mut!(obs);
Expand Down
9 changes: 8 additions & 1 deletion examples/pod_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let api = Api::<Pod>::default_namespaced(client);
let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false);
let wc = if use_watchlist {
// requires WatchList feature gate on 1.27 or later
watcher::Config::default().streaming_lists()
} else {
watcher::Config::default()
};

watcher(api, watcher::Config::default())
watcher(api, wc)
.applied_objects()
.default_backoff()
.try_for_each(|p| async move {
Expand Down
6 changes: 3 additions & 3 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl ListParams {
} else {
// When there's a continue token, we don't want to set resourceVersion
if let Some(rv) = &self.resource_version {
if rv != "0" || (rv == "0" && self.limit.is_none()) {
if rv != "0" || self.limit.is_none() {
qp.append_pair("resourceVersion", rv.as_str());

match &self.version_match {
Expand Down Expand Up @@ -189,7 +189,7 @@ impl ListParams {

/// Sets an arbitary resource version match strategy
///
/// A non-default strategy such as `VersionMatch::Exact` or `VersionMatch::NotGreaterThan`
/// A non-default strategy such as `VersionMatch::Exact` or `VersionMatch::NotOlderThan`
/// requires an explicit `resource_version` set to pass request validation.
#[must_use]
pub fn matching(mut self, version_match: VersionMatch) -> Self {
Expand Down Expand Up @@ -472,7 +472,7 @@ impl WatchParams {

/// Constructor for doing Kubernetes 1.27 Streaming List watches
///
/// Enables [`VersionMatch::NotGreaterThan`] semantics and [`WatchParams::send_initial_events`].
/// Enables [`VersionMatch::NotOlderThan`] semantics and [`WatchParams::send_initial_events`].
pub fn streaming_lists() -> Self {
Self {
send_initial_events: true,
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ pub struct Config {
impl Config {
/// The debounce duration used to deduplicate reconciliation requests.
///
/// When set to a non-zero duration, debouncing is enabled in the [`Scheduler`] resulting
/// in __trailing edge debouncing__ of reqonciler requests.
/// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
/// resulting in __trailing edge debouncing__ of reqonciler requests.
/// This option can help to reduce the amount of unnecessary reconciler calls
/// when using multiple controller relations, or during rapid phase transitions.
///
Expand Down
2 changes: 0 additions & 2 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ where
S::Ok: Debug,
S::Error: Debug,
{
// `arc_with_non_send_sync` false positive: https://github.com/rust-lang/rust-clippy/issues/11076
#[allow(clippy::arc_with_non_send_sync)]
let stream = Arc::new(Mutex::new(stream.into_stream().peekable()));
(
SplitCase {
Expand Down
2 changes: 0 additions & 2 deletions kube-runtime/src/utils/stream_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ impl<S: Stream> Stream for StreamSubscribe<S> {

match item {
Poll::Ready(Some(item)) => {
// `arc_with_non_send_sync` false positive: https://github.com/rust-lang/rust-clippy/issues/11076
#[allow(clippy::arc_with_non_send_sync)]
let item = Arc::new(item);
this.sender.send(Some(item.clone())).ok();
Poll::Ready(Some(item))
Expand Down
24 changes: 15 additions & 9 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,19 @@ pub enum ListSemantic {
}

/// Configurable watcher listwatch semantics

#[derive(Clone, Default, Debug, PartialEq)]
pub enum InitialListStrategy {
/// List first, then watch from given resouce version
///
/// This is the old and default way of watching. The watcher will do a paginated list call first before watching.
/// When using this mode, you can configure the page_size on the watcher.
#[default]
ListWatch,
/// Kubernetes 1.27 Streaming Lists
/// https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
///
/// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
/// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
StreamingList,
}

Expand Down Expand Up @@ -233,10 +240,9 @@ pub struct Config {
///
/// Configures re-list for performance vs. consistency.
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
pub list_semantic: ListSemantic,

/// Kubernetes 1.27 Streaming Lists
/// Control how the watcher fetches the initial list of objects.
///
/// ListWatch: The watcher will fetch the initial list of objects using a list call.
Expand All @@ -245,8 +251,8 @@ pub struct Config {
/// StreamingList is more efficient than ListWatch, but it requires the server to support
/// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27).
///
/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
/// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
/// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
/// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
pub initial_list_strategy: InitialListStrategy,

/// Maximum number of objects retrieved per list operation resyncs.
Expand All @@ -256,7 +262,7 @@ pub struct Config {
///
/// Defaults to 500. Note that `None` represents unbounded.
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
pub page_size: Option<u32>,

/// Enables watch events with type "BOOKMARK".
Expand Down Expand Up @@ -325,7 +331,7 @@ impl Config {

/// Sets list semantic to configure re-list performance and consistency
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
#[must_use]
pub fn list_semantic(mut self, semantic: ListSemantic) -> Self {
self.list_semantic = semantic;
Expand All @@ -334,7 +340,7 @@ impl Config {

/// Sets list semantic to `Any` to improve list performance
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
#[must_use]
pub fn any_semantic(self) -> Self {
self.list_semantic(ListSemantic::Any)
Expand All @@ -355,7 +361,7 @@ impl Config {
/// This can reduce the memory consumption during resyncs, at the cost of requiring more
/// API roundtrips to complete.
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
#[must_use]
pub fn page_size(mut self, page_size: u32) -> Self {
self.page_size = Some(page_size);
Expand Down
Loading