Skip to content

Commit

Permalink
Document Controller::reconcile_on and remove Err input requirement (
Browse files Browse the repository at this point in the history
#1304)

* Better docs for `Controller::reconcile_on`

The unstable method currently suggests that this method can be used to help share a store with the reconciler.
This is actually nothing specific to `reconcile_on`, and you can do the same with the streams interface with `watches_stream`.

We made the `reconcile_on` right before `watches_stream` became a thing so this makes sense.

Have reworded the example to highlight that this has a better use-case with actually getting arbitrary third-party info,
and then mapping that to kubernetes objects.

First example that came to mind was using an IntervalStream with tokio and just cycle through a bunch of objects, but there may be a better example that does not pull in the extra dev dep.

Signed-off-by: clux <[email protected]>

* do the same as on kube.rs

Signed-off-by: clux <[email protected]>

* Update kube-runtime/src/controller/mod.rs

Co-authored-by: David Herberth <[email protected]>
Signed-off-by: Eirik A <[email protected]>

* use david's suggestion

Signed-off-by: clux <[email protected]>

* no need for send + static

Signed-off-by: clux <[email protected]>

* stop pretending to handle errors in reconcile_on

Signed-off-by: clux <[email protected]>

---------

Signed-off-by: clux <[email protected]>
Signed-off-by: Eirik A <[email protected]>
Co-authored-by: David Herberth <[email protected]>
  • Loading branch information
clux and Dav1dde authored Oct 10, 2023
1 parent 68ae55c commit 3d7ebdd
Showing 1 changed file with 22 additions and 26 deletions.
48 changes: 22 additions & 26 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,15 +1087,14 @@ where

/// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
///
/// For example, this can be used to watch resources once and use the stream to trigger reconciliation and also keep a cache of those objects.
/// That way it's possible to use this up to date cache instead of querying Kubernetes to access those resources
/// This can be used to inject reconciliations for specific objects from an external resource.
///
/// # Example:
///
/// ```no_run
/// # async {
/// # use futures::{StreamExt, TryStreamExt};
/// # use k8s_openapi::api::core::v1::{ConfigMap, Pod};
/// # use futures::{StreamExt, Stream, stream, TryStreamExt};
/// # use k8s_openapi::api::core::v1::{ConfigMap};
/// # use kube::api::Api;
/// # use kube::runtime::controller::Action;
/// # use kube::runtime::reflector::{ObjectRef, Store};
Expand All @@ -1106,37 +1105,34 @@ where
/// # use std::sync::Arc;
/// #
/// # let client: Client = todo!();
/// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Store<Pod>>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Store<Pod>>) -> Action { Action::await_change() }
/// #
/// // Store can be used in the reconciler instead of querying Kube
/// let (pod_store, writer) = reflector::store();
/// let pod_stream = watcher(Api::<Pod>::all(client.clone()), Config::default())
/// .default_backoff()
/// .reflect(writer)
/// .applied_objects()
/// // Map to the relevant `ObjectRef<K>` to reconcile
/// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap()));
/// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
/// # fn watch_external_objects() -> impl Stream<Item = ExternalObject> { stream::iter(vec![]) }
/// # let ns = "controller-ns".to_string();
/// struct ExternalObject {
/// name: String,
/// }
/// let external_stream = watch_external_objects().map(|ext| {
/// ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
/// });
///
/// Controller::new(Api::<ConfigMap>::all(client), Config::default())
/// .reconcile_on(pod_stream)
/// // The store can be re-used between controllers and even inspected from the reconciler through [Context]
/// .run(reconcile, error_policy, Arc::new(pod_store))
/// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
/// .reconcile_on(external_stream)
/// .run(reconcile, error_policy, Arc::new(()))
/// .for_each(|_| future::ready(()))
/// .await;
/// # };
/// ```
#[cfg(feature = "unstable-runtime-reconcile-on")]
#[must_use]
pub fn reconcile_on(
mut self,
trigger: impl Stream<Item = Result<ObjectRef<K>, watcher::Error>> + Send + 'static,
) -> Self {
pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
self.trigger_selector.push(
trigger
.map_ok(move |obj| ReconcileRequest {
obj_ref: obj,
reason: ReconcileReason::Unknown,
.map(move |obj| {
Ok(ReconcileRequest {
obj_ref: obj,
reason: ReconcileReason::Unknown,
})
})
.boxed(),
);
Expand Down

0 comments on commit 3d7ebdd

Please sign in to comment.