Skip to content

Commit

Permalink
Add default pagination to watcher (#1249)
Browse files Browse the repository at this point in the history
* Paginate initial watch list

See #1209, relatively untested prototype and doesn't include any proposed API changes

* Remove managed_fields stripper

Users can do this anyway using for_stream

* Make page size configurable

* initial clippy fixes

Signed-off-by: clux <[email protected]>

* more unrelated clippy fixes

Signed-off-by: clux <[email protected]>

* clippy fixes related to this pr

Signed-off-by: clux <[email protected]>

* align default pagination size with client-go (500)

Signed-off-by: clux <[email protected]>

* caveat the page example + rename to page_size

name matches what it's called upstream + it's shorter

Signed-off-by: clux <[email protected]>

* add a new mock test system for the runtime

Signed-off-by: clux <[email protected]>

* remove unnecessary boxing + slight rename

Signed-off-by: clux <[email protected]>

---------

Signed-off-by: clux <[email protected]>
Co-authored-by: Natalie Klestrup Röijezon <[email protected]>
  • Loading branch information
clux and nightkr authored Jul 13, 2023
1 parent e99d0dc commit fa1e92e
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 46 deletions.
4 changes: 4 additions & 0 deletions examples/pod_paged.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use kube::{
};
use tracing::*;

// This example shows how to do pagination with the raw `Api` only.
// In many realistic setups that need a continual, paginated, safe list-watch;
// the `watcher` is an easier abstraction that has configurable pagination built in.

const PAGE_SIZE: u32 = 5;

#[tokio::main]
Expand Down
8 changes: 4 additions & 4 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ const APPLIER_REQUEUE_BUF_SIZE: usize = 100;

/// Apply a reconciler to an input stream, with a given retry policy
///
/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector`].
/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
///
/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
/// the [`reflector`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector`]
/// with a [`watcher`](watcher()) or [`reflector`](reflector()) for the subobject.
/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
/// with a [`watcher()`] or [`reflector()`] for the subobject.
///
/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
Expand Down
11 changes: 3 additions & 8 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,13 @@ mod tests {
};
use futures::{
channel::{mpsc, oneshot},
future, poll, stream, FutureExt, SinkExt, StreamExt, TryStreamExt,
};
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
sync::Mutex,
time::Duration,
future, poll, stream, SinkExt, StreamExt, TryStreamExt,
};
use std::{cell::RefCell, collections::HashMap, sync::Mutex, time::Duration};
use tokio::{
runtime::Handle,
task::yield_now,
time::{error::Elapsed, pause, sleep, timeout, Instant},
time::{pause, sleep, timeout, Instant},
};

#[tokio::test]
Expand Down
3 changes: 2 additions & 1 deletion kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ where
{
/// Wait for the store to be populated by Kubernetes.
///
/// Note that this will _not_ await the source calling the associated [`Writer`] (such as the [`reflector`]).
/// Note that polling this will _not_ await the source of the stream that populates the [`Writer`].
/// The [`reflector`](crate::reflector()) stream must be awaited separately.
///
/// # Errors
/// Returns an error if the [`Writer`] was dropped before any value was written.
Expand Down
10 changes: 2 additions & 8 deletions kube-runtime/src/utils/delayed_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,13 @@ pub struct InitDropped;

#[cfg(test)]
mod tests {
use std::{
sync::{Arc, Mutex},
task::Poll,
};
use std::task::Poll;

use super::DelayedInit;
use futures::{pin_mut, poll};
use tracing::Level;
use tracing_subscriber::util::SubscriberInitExt;

use crate::utils::delayed_init::ReceiverState;

use super::DelayedInit;

fn setup_tracing() -> tracing::dispatcher::DefaultGuard {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
Expand Down
3 changes: 1 addition & 2 deletions kube-runtime/src/utils/event_modify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use core::{
pin::Pin,
task::{Context, Poll},
};
use std::task::ready;

use futures::{Stream, TryStream};
use pin_project::pin_project;
Expand Down Expand Up @@ -50,7 +49,7 @@ pub(crate) mod test {
use std::{task::Poll, vec};

use super::{Error, Event, EventModify};
use futures::{pin_mut, poll, stream, Stream, StreamExt};
use futures::{pin_mut, poll, stream, StreamExt};

#[tokio::test]
async fn eventmodify_modifies_innner_value_of_event() {
Expand Down
92 changes: 69 additions & 23 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ impl<K> Event<K> {
/// The internal finite state machine driving the [`watcher`]
enum State<K> {
/// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
Empty,
Empty {
continue_token: Option<String>,
objects: Vec<K>,
},
/// The initial LIST was successful, so we should move on to starting the actual watch.
InitListed { resource_version: String },
/// The watch is in progress, from this point we just return events from the server.
Expand All @@ -134,6 +137,15 @@ enum State<K> {
},
}

impl<K: Resource + Clone> Default for State<K> {
fn default() -> Self {
Self::Empty {
continue_token: None,
objects: vec![],
}
}
}

/// Used to control whether the watcher receives the full object, or only the
/// metadata
#[async_trait]
Expand Down Expand Up @@ -205,6 +217,14 @@ pub struct Config {
/// Configures re-list for performance vs. consistency.
pub list_semantic: ListSemantic,

/// Maximum number of objects retrieved per list operation resyncs.
///
/// This can reduce the memory consumption during resyncs, at the cost of requiring more
/// API roundtrips to complete.
///
/// Defaults to 500. Note that `None` represents unbounded.
pub page_size: Option<u32>,

/// Enables watch events with type "BOOKMARK".
///
/// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls.
Expand All @@ -220,6 +240,9 @@ impl Default for Config {
field_selector: None,
timeout: None,
list_semantic: ListSemantic::default(),
// same default page size limit as client-go
// https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31
page_size: Some(500),
}
}
}
Expand Down Expand Up @@ -288,6 +311,16 @@ impl Config {
self
}

/// Limits the number of objects retrieved in each list operation during resync.
///
/// This can reduce the memory consumption during resyncs, at the cost of requiring more
/// API roundtrips to complete.
#[must_use]
pub fn page_size(mut self, page_size: u32) -> Self {
self.page_size = Some(page_size);
self
}

/// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests.
fn to_list_params(&self) -> ListParams {
let (resource_version, version_match) = match self.list_semantic {
Expand All @@ -300,9 +333,8 @@ impl Config {
timeout: self.timeout,
version_match,
resource_version,
// It is not permissible for users to configure the continue token and limit for the watcher, as these parameters are associated with paging.
// The watcher must handle paging internally.
limit: None,
// The watcher handles pagination internally.
limit: self.page_size,
continue_token: None,
}
}
Expand Down Expand Up @@ -368,6 +400,7 @@ where
///
/// This function should be trampolined: if event == `None`
/// then the function should be called again until it returns a Some.
#[allow(clippy::too_many_lines)] // for now
async fn step_trampolined<A>(
api: &A,
wc: &Config,
Expand All @@ -378,25 +411,38 @@ where
A::Value: Resource + 'static,
{
match state {
State::Empty => match api.list(&wc.to_list_params()).await {
Ok(list) => {
if let Some(resource_version) = list.metadata.resource_version {
(Some(Ok(Event::Restarted(list.items))), State::InitListed {
resource_version,
})
} else {
(Some(Err(Error::NoResourceVersion)), State::Empty)
State::Empty {
continue_token,
mut objects,
} => {
let mut lp = wc.to_list_params();
lp.continue_token = continue_token;
match api.list(&lp).await {
Ok(list) => {
objects.extend(list.items);
if let Some(continue_token) = list.metadata.continue_ {
(None, State::Empty {
continue_token: Some(continue_token),
objects,
})
} else if let Some(resource_version) = list.metadata.resource_version {
(Some(Ok(Event::Restarted(objects))), State::InitListed {
resource_version,
})
} else {
(Some(Err(Error::NoResourceVersion)), State::default())
}
}
}
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch list error with 403: {err:?}");
} else {
debug!("watch list error: {err:?}");
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch list error with 403: {err:?}");
} else {
debug!("watch list error: {err:?}");
}
(Some(Err(err).map_err(Error::InitialListFailed)), State::default())
}
(Some(Err(err).map_err(Error::InitialListFailed)), State::Empty)
}
},
}
State::InitListed { resource_version } => {
match api.watch(&wc.to_watch_params(), &resource_version).await {
Ok(stream) => (None, State::Watching {
Expand Down Expand Up @@ -441,7 +487,7 @@ where
Some(Ok(WatchEvent::Error(err))) => {
// HTTP GONE, means we have desynced and need to start over and re-list :(
let new_state = if err.code == 410 {
State::Empty
State::default()
} else {
State::Watching {
resource_version,
Expand Down Expand Up @@ -543,7 +589,7 @@ pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
watcher_config: Config,
) -> impl Stream<Item = Result<Event<K>>> + Send {
futures::stream::unfold(
(api, watcher_config, State::Empty),
(api, watcher_config, State::default()),
|(api, watcher_config, state)| async {
let (event, state) = step(&FullObject { api: &api }, &watcher_config, state).await;
Some((event, (api, watcher_config, state)))
Expand Down Expand Up @@ -607,7 +653,7 @@ pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send +
watcher_config: Config,
) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send {
futures::stream::unfold(
(api, watcher_config, State::Empty),
(api, watcher_config, State::default()),
|(api, watcher_config, state)| async {
let (event, state) = step(&MetaOnly { api: &api }, &watcher_config, state).await;
Some((event, (api, watcher_config, state)))
Expand Down
4 changes: 4 additions & 0 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ futures = "0.3.17"
serde_json = "1.0.68"
serde = { version = "1.0.130", features = ["derive"] }
schemars = "0.8.6"
hyper = "0.14.27"
http = "0.2.9"
tower-test = "0.4.0"
anyhow = "1.0.71"

[dev-dependencies.k8s-openapi]
version = "0.18.0"
Expand Down
5 changes: 5 additions & 0 deletions kube/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
#[doc(inline)]
pub use kube_core as core;

// Mock tests for the runtime
#[cfg(test)]
#[cfg(all(feature = "derive", feature = "runtime"))]
mod mock_tests;

// Tests that require a cluster and the complete feature set
// Can be run with `cargo test -p kube --lib --features=runtime,derive -- --ignored`
#[cfg(test)]
Expand Down
Loading

0 comments on commit fa1e92e

Please sign in to comment.