diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 095f9cf25..cd85f241f 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1087,15 +1087,14 @@ where /// Trigger the reconciliation process for a managed object `ObjectRef` whenever `trigger` emits a value /// - /// For example, this can be used to watch resources once and use the stream to trigger reconciliation and also keep a cache of those objects. - /// That way it's possible to use this up to date cache instead of querying Kubernetes to access those resources + /// This can be used to inject reconciliations for specific objects from an external resource. /// /// # Example: /// /// ```no_run /// # async { - /// # use futures::{StreamExt, TryStreamExt}; - /// # use k8s_openapi::api::core::v1::{ConfigMap, Pod}; + /// # use futures::{StreamExt, Stream, stream, TryStreamExt}; + /// # use k8s_openapi::api::core::v1::{ConfigMap}; /// # use kube::api::Api; /// # use kube::runtime::controller::Action; /// # use kube::runtime::reflector::{ObjectRef, Store}; @@ -1106,37 +1105,34 @@ where /// # use std::sync::Arc; /// # /// # let client: Client = todo!(); - /// # async fn reconcile(_: Arc, _: Arc>) -> Result { Ok(Action::await_change()) } - /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc>) -> Action { Action::await_change() } - /// # - /// // Store can be used in the reconciler instead of querying Kube - /// let (pod_store, writer) = reflector::store(); - /// let pod_stream = watcher(Api::::all(client.clone()), Config::default()) - /// .default_backoff() - /// .reflect(writer) - /// .applied_objects() - /// // Map to the relevant `ObjectRef` to reconcile - /// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap())); + /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } + /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } + /// # fn watch_external_objects() -> impl Stream { stream::iter(vec![]) } + /// # let ns = "controller-ns".to_string(); + /// struct ExternalObject { + /// name: String, + /// } + /// let external_stream = watch_external_objects().map(|ext| { + /// ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns) + /// }); /// - /// Controller::new(Api::::all(client), Config::default()) - /// .reconcile_on(pod_stream) - /// // The store can be re-used between controllers and even inspected from the reconciler through [Context] - /// .run(reconcile, error_policy, Arc::new(pod_store)) + /// Controller::new(Api::::namespaced(client, &ns), Config::default()) + /// .reconcile_on(external_stream) + /// .run(reconcile, error_policy, Arc::new(())) /// .for_each(|_| future::ready(())) /// .await; /// # }; /// ``` #[cfg(feature = "unstable-runtime-reconcile-on")] #[must_use] - pub fn reconcile_on( - mut self, - trigger: impl Stream, watcher::Error>> + Send + 'static, - ) -> Self { + pub fn reconcile_on(mut self, trigger: impl Stream> + Send + 'static) -> Self { self.trigger_selector.push( trigger - .map_ok(move |obj| ReconcileRequest { - obj_ref: obj, - reason: ReconcileReason::Unknown, + .map(move |obj| { + Ok(ReconcileRequest { + obj_ref: obj, + reason: ReconcileReason::Unknown, + }) }) .boxed(), );