Skip to content

Commit

Permalink
Filter duplicate events from ClusterGroup reconcile (#87)
Browse files Browse the repository at this point in the history
- Enroll into unstable-runtime to control and reuse object streams from
  watches.
- Reduce cache size by removing managed_fields
- Set the finalizer on the ClusterGroup object to ensure deletion events
  are not missed, and object is recreated

Signed-off-by: Danil-Grigorev <[email protected]>
  • Loading branch information
Danil-Grigorev authored Aug 1, 2024
1 parent e925cdc commit 11fb534
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 43 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ actix-web = "4.4.0"
futures = "0.3.28"
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
k8s-openapi = { version = "0.22", features = ["latest", "schemars"] }
kube = { version = "0.93.0", features = ["runtime", "client", "derive"]}
kube = { version = "0.93.1", features = ["runtime", "client", "derive", "unstable-runtime"]}
schemars = { version = "0.8.21", features = ["chrono"] }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
Expand Down
4 changes: 2 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
NAME := "cluster-api-fleet-controller"
KUBE_VERSION := env_var_or_default('KUBE_VERSION', '1.26.3')
KUBE_VERSION := env_var_or_default('KUBE_VERSION', '1.30.0')
ORG := "ghcr.io/rancher-sandbox"
TAG := "dev"
HOME_DIR := env_var('HOME')
Expand Down Expand Up @@ -135,7 +135,7 @@ install-fleet: _create-out-dir
API_SERVER_URL=`kubectl get nodes -o json | jq -r '.items[0].status.addresses[] | select(.type=="InternalIP").address'`
API_SERVER_URL=https://${API_SERVER_URL}:6443
helm -n cattle-fleet-system install --version v0.10.1-rc.1 --create-namespace --wait fleet-crd fleet/fleet-crd
helm install --version v0.10.1-rc.1 --create-namespace -n cattle-fleet-system --set apiServerURL=$API_SERVER_URL --set-file apiServerCA={{OUT_DIR}}/ca.pem fleet fleet/fleet --wait
helm install --version v0.10.1-rc.1 --create-namespace -n cattle-fleet-system --set bootstrap.enabled=false --set apiServerURL=$API_SERVER_URL --set-file apiServerCA={{OUT_DIR}}/ca.pem fleet fleet/fleet --wait
# Install cluster api and any providers
install-capi: _download-clusterctl
Expand Down
132 changes: 104 additions & 28 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use futures::channel::mpsc;
use futures::StreamExt;

use k8s_openapi::api::core::v1::Namespace;
use kube::runtime::{metadata_watcher, predicates, reflector, watcher, WatchStreamExt};
use kube::ResourceExt as _;
use kube::{
api::Api,
client::Client,
Expand All @@ -21,6 +23,9 @@ use kube::{
};
use tokio::sync::Mutex;

use std::future;

use std::ops::Deref;
use std::sync::Arc;
use tokio::{sync::RwLock, time::Duration};
use tracing::{self, warn};
Expand Down Expand Up @@ -71,8 +76,6 @@ pub async fn run_cluster_controller(state: State) {
let client = Client::try_default()
.await
.expect("failed to create kube Client");
let clusters = Api::<Cluster>::all(client.clone());
let fleet = Api::<fleet_cluster::Cluster>::all(client.clone());

let config_api: Api<FleetAddonConfig> = Api::all(client.clone());
let config = config_api
Expand All @@ -81,9 +84,9 @@ pub async fn run_cluster_controller(state: State) {
.expect("failed to get FleetAddonConfig resource")
.unwrap_or_default();

let (invoke_reconcile, namespace_trigger) = mpsc::channel(0);
let clusters = Controller::new(
clusters,
let (reader, writer) = reflector::store();
let clusters = watcher(
Api::<Cluster>::all(client.clone()),
Config::default()
.labels_from(
&config
Expand All @@ -92,15 +95,33 @@ pub async fn run_cluster_controller(state: State) {
)
.any_semantic(),
)
.owns(fleet, Config::default().any_semantic())
.reconcile_all_on(namespace_trigger)
.shutdown_on_signal()
.run(
Cluster::reconcile,
error_policy,
state.to_context(client.clone()),
.default_backoff()
.modify(|c| {
c.managed_fields_mut().clear();
})
.reflect(writer)
.touched_objects()
.predicate_filter(predicates::resource_version);

let fleet = metadata_watcher(
Api::<fleet_cluster::Cluster>::all(client.clone()),
Config::default().any_semantic(),
)
.for_each(|_| futures::future::ready(()));
.modify(|g| g.managed_fields_mut().clear())
.touched_objects()
.predicate_filter(predicates::resource_version);

let (invoke_reconcile, namespace_trigger) = mpsc::channel(0);
let clusters = Controller::for_stream(clusters, reader)
.owns_stream(fleet)
.reconcile_all_on(namespace_trigger)
.shutdown_on_signal()
.run(
Cluster::reconcile,
error_policy,
state.to_context(client.clone()),
)
.for_each(|_| futures::future::ready(()));

if config
.namespace_selector()
Expand All @@ -110,7 +131,8 @@ pub async fn run_cluster_controller(state: State) {
return clusters.await;
}

let ns_controller = Controller::new(
let (reader, writer) = reflector::store();
let namespaces = metadata_watcher(
Api::<Namespace>::all(client.clone()),
Config::default()
.labels_from(
Expand All @@ -120,13 +142,24 @@ pub async fn run_cluster_controller(state: State) {
)
.any_semantic(),
)
.shutdown_on_signal()
.run(
Cluster::reconcile_ns,
Cluster::ns_trigger_error_policy,
Arc::new(Mutex::new(invoke_reconcile)),
)
.for_each(|_| futures::future::ready(()));
.default_backoff()
.modify(|ns| {
ns.managed_fields_mut().clear();
ns.annotations_mut().clear();
ns.labels_mut().clear();
})
.reflect(writer)
.touched_objects()
.predicate_filter(predicates::resource_version);

let ns_controller = Controller::for_stream(namespaces, reader)
.shutdown_on_signal()
.run(
Cluster::reconcile_ns,
Cluster::ns_trigger_error_policy,
Arc::new(Mutex::new(invoke_reconcile)),
)
.for_each(|_| futures::future::ready(()));

tokio::join!(clusters, ns_controller);
}
Expand All @@ -136,20 +169,63 @@ pub async fn run_cluster_class_controller(state: State) {
let client = Client::try_default()
.await
.expect("failed to create kube Client");
let cluster_classes = Api::<ClusterClass>::all(client.clone());
let fleet_groups = Api::<ClusterGroup>::all(client.clone());

Controller::new(cluster_classes, Config::default().any_semantic())
.owns(fleet_groups, Config::default().any_semantic())
let (reader, writer) = reflector::store_shared(1024);
let subscriber = writer
.subscribe()
.expect("subscribe for cluster group updates successfully");
let fleet_groups = watcher(
Api::<ClusterGroup>::all(client.clone()),
Config::default().any_semantic(),
)
.default_backoff()
.modify(|cg| {
cg.managed_fields_mut().clear();
cg.status = None;
})
.reflect_shared(writer)
.touched_objects()
.predicate_filter(predicates::resource_version)
.for_each(|_| futures::future::ready(()));

let group_controller = Controller::for_shared_stream(subscriber.clone(), reader)
.shutdown_on_signal()
.run(
ClusterGroup::reconcile,
error_policy,
state.to_context(client.clone()),
)
.for_each(|_| futures::future::ready(()));

let (reader, writer) = reflector::store();
let cluster_classes = watcher(
Api::<ClusterClass>::all(client.clone()),
Config::default().any_semantic(),
)
.default_backoff()
.modify(|cc| cc.managed_fields_mut().clear())
.reflect(writer)
.touched_objects()
.predicate_filter(predicates::resource_version);

let filtered = subscriber
.map(|s| Ok(s.deref().clone()))
.predicate_filter(crate::predicates::generation_with_deletion)
.filter_map(|s| future::ready(s.ok().map(Arc::new)));
let cluster_class_controller = Controller::for_stream(cluster_classes, reader)
.owns_shared_stream(filtered)
.shutdown_on_signal()
.run(
ClusterClass::reconcile,
error_policy,
state.to_context(client.clone()),
)
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await
.for_each(|_| futures::future::ready(()));

tokio::select! {
_ = fleet_groups => {},
_ = futures::future::join(group_controller, cluster_class_controller) => {},
};
}

fn error_policy(doc: Arc<impl kube::Resource>, error: &Error, ctx: Arc<Context>) -> Action {
Expand Down
4 changes: 2 additions & 2 deletions src/controllers/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Cluster {
None | Some(ClusterTopology { .. }) => self.labels().clone(),
};

let agent_tolerations = Some(vec![ClusterAgentTolerations{
let agent_tolerations = Some(vec![ClusterAgentTolerations {
effect: Some("NoSchedule".into()),
operator: Some("Equal".into()),
key: Some("node.kubernetes.io/not-ready".into()),
Expand Down Expand Up @@ -207,7 +207,7 @@ impl Cluster {
}

pub async fn reconcile_ns(
_: Arc<Namespace>,
_: Arc<impl Resource>,
invoke_reconcile: Arc<Mutex<Sender<()>>>,
) -> crate::Result<Action> {
let mut sender = invoke_reconcile.lock().await;
Expand Down
34 changes: 34 additions & 0 deletions src/controllers/cluster_group.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::api::fleet_addon_config::FleetAddonConfig;
use crate::api::fleet_clustergroup::ClusterGroup;
use crate::Result;

use kube::runtime::controller::Action;

use std::sync::Arc;

use super::controller::{patch, Context, FleetBundle, FleetController};
use super::{GroupSyncError, SyncError};

impl FleetBundle for ClusterGroup {
// Applies finalizer on the existing ClusterGroup object, so the deletion event is not missed
async fn sync(&self, ctx: Arc<Context>) -> Result<Action> {
patch(ctx.clone(), self.clone())
.await
.map_err(Into::<GroupSyncError>::into)
.map_err(Into::<SyncError>::into)?;

Ok(Action::await_change())
}
}

impl FleetController for ClusterGroup {
type Bundle = ClusterGroup;

async fn to_bundle(
&self,
_ctx: Arc<Context>,
_config: &FleetAddonConfig,
) -> Result<Self::Bundle> {
Ok(self.clone())
}
}
1 change: 1 addition & 0 deletions src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ pub enum PatchError {

pub mod cluster;
pub mod cluster_class;
pub mod cluster_group;
pub mod controller;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub mod controller;
pub use crate::controller::*;
pub mod api;
pub mod controllers;
pub mod predicates;

/// Log and trace integrations
pub mod telemetry;
Expand Down
9 changes: 9 additions & 0 deletions src/predicates.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use kube::runtime::predicates;
use kube::ResourceExt;

pub fn generation_with_deletion(obj: &impl ResourceExt) -> Option<u64> {
match obj.meta().deletion_timestamp {
Some(_) => predicates::resource_version(obj),
None => predicates::generation(obj),
}
}

0 comments on commit 11fb534

Please sign in to comment.