Skip to content

Commit

Permalink
better doc examples
Browse files Browse the repository at this point in the history
Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux committed Jul 16, 2023
1 parent 437ab66 commit 562b6c1
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 9 deletions.
19 changes: 10 additions & 9 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ where
/// General setup:
/// ```no_run
/// use kube::{Api, Client, CustomResource};
/// use kube::runtime::{controller::{Controller, Action}, watcher, reflector};
/// use kube::runtime::{controller::{Controller, Action}, watcher};
/// # use serde::{Deserialize, Serialize};
/// # use tokio::time::Duration;
/// use futures::StreamExt;
Expand Down Expand Up @@ -589,7 +589,9 @@ where
/// # async fn doc(client: kube::Client) {
/// let api: Api<Deployment> = Api::default_namespaced(client);
/// let (reader, writer) = reflector::store();
/// let deploys = reflector(writer, watcher(api, watcher::Config::default()))
/// let deploys = watcher(api, watcher::Config::default())
/// .default_backoff()
/// .reflect(writer)
/// .applied_objects()
/// .predicate_filter(predicates::generation);
///
Expand Down Expand Up @@ -1053,13 +1055,12 @@ where
/// #
/// // Store can be used in the reconciler instead of querying Kube
/// let (pod_store, writer) = reflector::store();
/// let pod_stream = reflector(
/// writer,
/// watcher(Api::<Pod>::all(client.clone()), Config::default()),
/// )
/// .applied_objects()
/// // Map to the relevant `ObjectRef<K>` to reconcile
/// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap()));
/// let pod_stream = watcher(Api::<Pod>::all(client.clone()), Config::default())
/// .default_backoff()
/// .reflect(writer)
/// .applied_objects()
/// // Map to the relevant `ObjectRef<K>` to reconcile
/// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap()));
///
/// Controller::new(Api::<ConfigMap>::all(client), Config::default())
/// .reconcile_on(pod_stream)
Expand Down
42 changes: 42 additions & 0 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,48 @@ pub trait WatchStreamExt: Stream {
///
/// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`].
/// This populates a [`Store`] as the stream is polled.
///
/// ## Usage
/// ```no_run
/// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
/// # use std::time::Duration;
/// # use tracing::{info, warn};
/// use kube::{Api, Client, ResourceExt};
/// use kube_runtime::{watcher, WatchStreamExt, reflector};
/// use k8s_openapi::api::apps::v1::Deployment;
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
///
/// let deploys: Api<Deployment> = Api::default_namespaced(client);
/// let (reader, writer) = reflector::store::<Deployment>();
///
/// tokio::spawn(async move {
/// // start polling the store once the reader is ready
/// reader.wait_until_ready().await.unwrap();
/// loop {
/// let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
/// info!("Current {} deploys: {:?}", names.len(), names);
/// tokio::time::sleep(Duration::from_secs(10)).await;
/// }
/// });
///
/// // configure the watcher stream and populate the store while polling
/// watcher(deploys, watcher::Config::default())
/// .reflect(writer)
/// .applied_objects()
/// .for_each(|res| async move {
/// match res {
/// Ok(o) => info!("saw {}", o.name_any()),
/// Err(e) => warn!("watcher error: {}", e),
/// }
/// })
/// .await;
///
/// # Ok(())
/// # }
/// ```
///
/// [`Store`]: crate::reflector::Store
fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
where
Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
Expand Down

0 comments on commit 562b6c1

Please sign in to comment.