Skip to content

Commit

Permalink
Continue poc work & get tests to pass
Browse files Browse the repository at this point in the history
Signed-off-by: Matei David <[email protected]>
  • Loading branch information
mateiidavid committed Aug 4, 2023
1 parent 48cb59b commit 54c2508
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 83 deletions.
6 changes: 3 additions & 3 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use kube::{
use serde::de::DeserializeOwned;
use tracing::*;

use std::{env, fmt::Debug};
use std::{env, fmt::Debug, sync::Arc};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -39,9 +39,9 @@ async fn main() -> anyhow::Result<()> {
}

async fn handle_events<
K: Resource<DynamicType = ApiResource> + Clone + Debug + Send + DeserializeOwned + 'static,
K: Resource<DynamicType = ApiResource> + Clone + Debug + Send + Sync + DeserializeOwned + 'static,
>(
stream: impl Stream<Item = watcher::Result<Event<K>>> + Send + 'static,
stream: impl Stream<Item = watcher::Result<Event<Arc<K>>>> + Send + 'static,
ar: &ApiResource,
) -> anyhow::Result<()> {
let mut items = stream.applied_objects().boxed();
Expand Down
10 changes: 6 additions & 4 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Event;
use kube::{
Expand Down Expand Up @@ -25,12 +27,12 @@ async fn main() -> anyhow::Result<()> {
}

// This function lets the app handle an added/modified event from k8s
fn handle_event(ev: Event) -> anyhow::Result<()> {
fn handle_event(ev: Arc<Event>) -> anyhow::Result<()> {
info!(
"Event: \"{}\" via {} {}",
ev.message.unwrap().trim(),
ev.involved_object.kind.unwrap(),
ev.involved_object.name.unwrap()
ev.message.as_ref().unwrap().trim(),
ev.involved_object.kind.as_ref().unwrap(),
ev.involved_object.name.as_ref().unwrap()
);
Ok(())
}
8 changes: 5 additions & 3 deletions examples/multi_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use futures::{stream, StreamExt, TryStreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
Expand Down Expand Up @@ -31,9 +33,9 @@ async fn main() -> anyhow::Result<()> {
// SelectAll Stream elements must have the same Item, so all packed in this:
#[allow(clippy::large_enum_variant)]
enum Watched {
Config(ConfigMap),
Deploy(Deployment),
Secret(Secret),
Config(Arc<ConfigMap>),
Deploy(Arc<Deployment>),
Secret(Arc<Secret>),
}
while let Some(o) = combo_stream.try_next().await? {
match o {
Expand Down
7 changes: 5 additions & 2 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
Expand Down Expand Up @@ -25,12 +27,13 @@ async fn main() -> anyhow::Result<()> {
}

// A simple node problem detector
async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result<()> {
async fn check_for_node_failures(events: &Api<Event>, o: Arc<Node>) -> anyhow::Result<()> {
let name = o.name_any();
// Nodes often modify a lot - only print broken nodes
if let Some(true) = o.spec.unwrap().unschedulable {
if let Some(true) = o.spec.clone().unwrap().unschedulable {
let failed = o
.status
.clone()
.unwrap()
.conditions
.unwrap()
Expand Down
10 changes: 5 additions & 5 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ where
/// # type CustomResource = ConfigMap;
/// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
/// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
/// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
/// # async fn doc(client: kube::Client) {
/// let api: Api<DaemonSet> = Api::all(client.clone());
/// let cr: Api<CustomResource> = Api::all(client.clone());
Expand Down Expand Up @@ -1207,7 +1207,7 @@ where
applier(
move |obj, ctx| {
CancelableJoinHandle::spawn(
reconciler(obj.clone(), ctx).into_future().in_current_span(),
reconciler(obj, ctx).into_future().in_current_span(),
&Handle::current(),
)
},
Expand Down Expand Up @@ -1246,7 +1246,7 @@ mod tests {
// and returns a WatchEvent generic over a resource `K`
fn assert_stream<T, K>(x: T) -> T
where
T: Stream<Item = watcher::Result<Event<K>>> + Send,
T: Stream<Item = watcher::Result<Event<Arc<K>>>> + Send,
K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
{
x
Expand Down Expand Up @@ -1312,14 +1312,14 @@ mod tests {
);
pin_mut!(applier);
for i in 0..items {
let obj = ConfigMap {
let obj = Arc::new(ConfigMap {
metadata: ObjectMeta {
name: Some(format!("cm-{i}")),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
});
store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone()));
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
}
Expand Down
36 changes: 26 additions & 10 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,30 @@ mod tests {
distributions::{Bernoulli, Uniform},
Rng,
};
use std::collections::{BTreeMap, HashMap};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};

#[tokio::test]
async fn reflector_applied_should_add_object() {
let store_w = store::Writer::default();
let store = store_w.as_reader();
let cm = ConfigMap {
let cm = Arc::new(ConfigMap {
metadata: ObjectMeta {
name: Some("a".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
});
reflector(
store_w,
stream::iter(vec![Ok(watcher::Event::Applied(cm.clone()))]),
)
.map(|_| ())
.collect::<()>()
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(cm.as_ref()));
}

#[tokio::test]
Expand All @@ -150,6 +153,9 @@ mod tests {
}),
..cm.clone()
};
let cm = Arc::new(cm);
let updated_cm = Arc::new(updated_cm);

reflector(
store_w,
stream::iter(vec![
Expand All @@ -160,20 +166,23 @@ mod tests {
.map(|_| ())
.collect::<()>()
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&updated_cm));
assert_eq!(
store.get(&ObjectRef::from_obj(&cm)).as_deref(),
Some(updated_cm.as_ref())
);
}

#[tokio::test]
async fn reflector_deleted_should_remove_object() {
let store_w = store::Writer::default();
let store = store_w.as_reader();
let cm = ConfigMap {
let cm = Arc::new(ConfigMap {
metadata: ObjectMeta {
name: Some("a".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
});
reflector(
store_w,
stream::iter(vec![
Expand Down Expand Up @@ -205,6 +214,10 @@ mod tests {
},
..ConfigMap::default()
};

let cm_a = Arc::new(cm_a);
let cm_b = Arc::new(cm_b);

reflector(
store_w,
stream::iter(vec![
Expand All @@ -216,7 +229,10 @@ mod tests {
.collect::<()>()
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm_a)), None);
assert_eq!(store.get(&ObjectRef::from_obj(&cm_b)).as_deref(), Some(&cm_b));
assert_eq!(
store.get(&ObjectRef::from_obj(&cm_b)).as_deref(),
Some(cm_b.as_ref())
);
}

#[tokio::test]
Expand All @@ -231,14 +247,14 @@ mod tests {
stream::iter((0_u32..100_000).map(|gen| {
let item = rng.sample(item_dist);
let deleted = rng.sample(deleted_dist);
let obj = ConfigMap {
let obj = Arc::new(ConfigMap {
metadata: ObjectMeta {
name: Some(item.to_string()),
resource_version: Some(gen.to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
});
Ok(if deleted {
watcher::Event::Deleted(obj)
} else {
Expand Down
27 changes: 18 additions & 9 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,25 +210,27 @@ where

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::{store, Writer};
use crate::{reflector::ObjectRef, watcher};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::api::ObjectMeta;

#[test]
fn should_allow_getting_namespaced_object_by_namespaced_ref() {
let cm = ConfigMap {
let cm = Arc::new(ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: Some("ns".to_string()),
..ObjectMeta::default()
},
..ConfigMap::default()
};
});
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(cm.as_ref()));
}

#[test]
Expand All @@ -244,24 +246,24 @@ mod tests {
let mut cluster_cm = cm.clone();
cluster_cm.metadata.namespace = None;
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm));
store_w.apply_watcher_event(&watcher::Event::Applied(Arc::new(cm)));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
}

#[test]
fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
let cm = ConfigMap {
let cm = Arc::new(ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
namespace: None,
..ObjectMeta::default()
},
..ConfigMap::default()
};
});
let (store, mut writer) = store();
writer.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(cm.as_ref()));
}

#[test]
Expand All @@ -276,11 +278,16 @@ mod tests {
};
#[allow(clippy::redundant_clone)] // false positive
let mut nsed_cm = cm.clone();
// Need cm to be an Arc to be wrapped by Event type
let cm = Arc::new(cm);
nsed_cm.metadata.namespace = Some("ns".to_string());
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm));
assert_eq!(
store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(),
Some(cm.as_ref())
);
}

#[test]
Expand All @@ -295,6 +302,7 @@ mod tests {
};
let mut target_cm = cm.clone();

let cm = Arc::new(cm);
let (reader, mut writer) = store::<ConfigMap>();
assert!(reader.is_empty());
writer.apply_watcher_event(&watcher::Event::Applied(cm));
Expand All @@ -304,10 +312,11 @@ mod tests {

target_cm.metadata.name = Some("obj1".to_string());
target_cm.metadata.generation = Some(1234);
let target_cm = Arc::new(target_cm);
writer.apply_watcher_event(&watcher::Event::Applied(target_cm.clone()));
assert!(!reader.is_empty());
assert_eq!(reader.len(), 2);
let found = reader.find(|k| k.metadata.generation == Some(1234));
assert_eq!(found.as_deref(), Some(&target_cm));
assert_eq!(found.as_deref(), Some(target_cm.as_ref()));
}
}
30 changes: 17 additions & 13 deletions kube-runtime/src/utils/event_flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,43 @@ where

#[cfg(test)]
pub(crate) mod tests {
use std::task::Poll;
use std::{sync::Arc, task::Poll};

use super::{Error, Event, EventFlatten};
use futures::{pin_mut, poll, stream, StreamExt};

#[tokio::test]
async fn watches_applies_uses_correct_eventflattened_stream() {
let zero = Arc::new(0);
let one = Arc::new(1);
let two = Arc::new(2);

let data = stream::iter([
Ok(Event::Applied(0)),
Ok(Event::Applied(1)),
Ok(Event::Deleted(0)),
Ok(Event::Applied(2)),
Ok(Event::Restarted(vec![1, 2])),
Ok(Event::Applied(zero.clone())),
Ok(Event::Applied(one.clone())),
Ok(Event::Deleted(zero.clone())),
Ok(Event::Applied(two.clone())),
Ok(Event::Restarted(vec![one.clone(), two.clone()])),
Err(Error::TooManyObjects),
Ok(Event::Applied(2)),
Ok(Event::Applied(two.clone())),
]);
let rx = EventFlatten::new(data, false);
pin_mut!(rx);
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 0));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 1));
// NB: no Deleted events here
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(2)))));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 2));
// Restart comes through, currently in reverse order
// (normally on restart they just come in alphabetical order by name)
// this is fine though, alphabetical event order has no functional meaning in watchers
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(2)))));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 1));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 2));
// Error passed through
assert!(matches!(
poll!(rx.next()),
Poll::Ready(Some(Err(Error::TooManyObjects)))
));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(2)))));
assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 2));
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}
}
Loading

0 comments on commit 54c2508

Please sign in to comment.