diff --git a/balter-core/src/config.rs b/balter-core/src/config.rs index b68e425..b8c96dd 100644 --- a/balter-core/src/config.rs +++ b/balter-core/src/config.rs @@ -1,4 +1,4 @@ -use crate::BASELINE_TPS; +use crate::BASE_TPS; #[cfg(feature = "rt")] use serde::{Deserialize, Serialize}; #[allow(unused_imports)] @@ -50,7 +50,7 @@ impl ScenarioConfig { } | ScenarioConfig { latency: Some(_), .. - } => Some(BASELINE_TPS), + } => Some(BASE_TPS), ScenarioConfig { max_tps: Some(tps), .. diff --git a/balter-core/src/constants.rs b/balter-core/src/constants.rs index c653a5a..1e53cf8 100644 --- a/balter-core/src/constants.rs +++ b/balter-core/src/constants.rs @@ -1,3 +1,9 @@ use std::num::NonZeroU32; +use std::time::Duration; -pub const BASELINE_TPS: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(256) }; +pub const BASE_TPS: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(256) }; +pub const BASE_CONCURRENCY: usize = 4; +pub const BASE_INTERVAL: Duration = Duration::from_millis(200); + +pub const MIN_SAMPLE_COUNT: u64 = 256; +pub const ADJUSTABLE_SAMPLE_COUNT: u64 = 5_000; diff --git a/balter-core/src/lib.rs b/balter-core/src/lib.rs index d63d8e0..dd7b4d3 100644 --- a/balter-core/src/lib.rs +++ b/balter-core/src/lib.rs @@ -1,11 +1,9 @@ mod config; mod constants; -mod data; mod metrics; mod stats; pub use config::*; pub use constants::*; -pub use data::*; pub use metrics::*; pub use stats::*; diff --git a/balter/src/controllers.rs b/balter/src/controllers.rs index 6f10a18..80a7882 100644 --- a/balter/src/controllers.rs +++ b/balter/src/controllers.rs @@ -8,7 +8,8 @@ pub(crate) use constant::ConstantController; pub(crate) use error_rate::ErrorRateController; pub(crate) use latency::LatencyController; -use balter_core::{LatencyConfig, SampleSet, ScenarioConfig}; +use crate::data::SampleSet; +use balter_core::{LatencyConfig, ScenarioConfig}; use std::num::NonZeroU32; pub(crate) trait Controller: Send { diff --git a/balter/src/controllers/concurrency.rs b/balter/src/controllers/concurrency.rs index b8fb2d5..7e8c535 100644 --- a/balter/src/controllers/concurrency.rs +++ b/balter/src/controllers/concurrency.rs @@ -1,4 +1,4 @@ -use balter_core::SampleSet; +use crate::data::SampleSet; use std::num::NonZeroU32; #[allow(unused)] use tracing::{debug, error, trace}; @@ -174,20 +174,20 @@ struct Measurement { #[cfg(test)] mod tests { use super::*; - use balter_core::SampleData; + use crate::data::SampleData; use std::num::NonZeroU32; use std::time::Duration; pub fn generate_tps(count: usize, tps: u64) -> SampleSet { - let mut samples = SampleSet::new(count); + let mut samples = SampleSet::new(); for _ in 0..count { let success_count = tps; let elapsed = Duration::from_secs(1); samples.push(SampleData { - success_count, - error_count: 0, + success: success_count, + error: 0, elapsed, }) } diff --git a/balter/src/controllers/constant.rs b/balter/src/controllers/constant.rs index e59a7ee..95d9ba1 100644 --- a/balter/src/controllers/constant.rs +++ b/balter/src/controllers/constant.rs @@ -1,5 +1,5 @@ use crate::controllers::Controller; -use balter_core::SampleSet; +use crate::data::SampleSet; use std::num::NonZeroU32; pub(crate) struct ConstantController { diff --git a/balter/src/controllers/error_rate.rs b/balter/src/controllers/error_rate.rs index 4fa754b..df1e430 100644 --- a/balter/src/controllers/error_rate.rs +++ b/balter/src/controllers/error_rate.rs @@ -1,5 +1,6 @@ use crate::controllers::Controller; -use balter_core::{SampleSet, BASELINE_TPS}; +use crate::data::SampleSet; +use balter_core::BASE_TPS; use std::num::NonZeroU32; #[allow(unused_imports)] use tracing::{debug, error, info, instrument, trace, warn, Instrument}; @@ -18,7 +19,7 @@ impl ErrorRateController { pub fn new(name: &str, error_rate: f64) -> Self { Self { base_label: format!("balter_{name}"), - goal_tps: BASELINE_TPS, + goal_tps: BASE_TPS, error_rate, state: State::BigStep, } @@ -43,7 +44,7 @@ impl ErrorRateController { impl Controller for ErrorRateController { fn initial_tps(&self) -> NonZeroU32 { - BASELINE_TPS + BASE_TPS } fn limit(&mut self, samples: &SampleSet, stable: bool) -> NonZeroU32 { diff --git a/balter/src/controllers/latency.rs b/balter/src/controllers/latency.rs index 9d78144..76f7a7b 100644 --- a/balter/src/controllers/latency.rs +++ b/balter/src/controllers/latency.rs @@ -1,5 +1,6 @@ use crate::controllers::Controller; -use balter_core::{SampleSet, BASELINE_TPS}; +use crate::data::SampleSet; +use balter_core::BASE_TPS; use std::num::NonZeroU32; use std::time::Duration; #[allow(unused)] @@ -21,7 +22,7 @@ impl LatencyController { base_label: format!("balter_{name}"), latency, quantile, - goal_tps: BASELINE_TPS, + goal_tps: BASE_TPS, }; s.goal_tps_metric(); s @@ -36,7 +37,7 @@ impl LatencyController { impl Controller for LatencyController { fn initial_tps(&self) -> NonZeroU32 { - BASELINE_TPS + BASE_TPS } fn limit(&mut self, samples: &SampleSet, stable: bool) -> NonZeroU32 { diff --git a/balter-core/src/data.rs b/balter/src/data.rs similarity index 59% rename from balter-core/src/data.rs rename to balter/src/data.rs index 863a0fd..a0bf293 100644 --- a/balter-core/src/data.rs +++ b/balter/src/data.rs @@ -8,67 +8,43 @@ const TDIGEST_BACKLOG_SIZE: usize = 100; pub struct SampleSet { samples: VecDeque, latency: TDigest, - window_size: usize, - skip_first_n: Option, - skip_window: usize, - latency_skip_window: usize, } impl SampleSet { - pub fn new(window_size: usize) -> Self { + pub fn new() -> Self { Self { samples: VecDeque::new(), latency: default_tdigest(), - window_size, - skip_first_n: None, - skip_window: 0, - latency_skip_window: 0, } } - pub fn skip_first_n(mut self, n_to_skip: usize) -> Self { - self.skip_first_n = Some(n_to_skip); - self.skip_window = n_to_skip; - self.latency_skip_window = n_to_skip; - self - } - pub fn push(&mut self, sample: SampleData) { - if self.skip_window > 0 { - self.skip_window -= 1; - return; - } - self.samples.push_back(sample); - if self.samples.len() > self.window_size { - self.samples.pop_front(); - } + } + + pub fn len(&self) -> usize { + self.samples.len() } /// Separate Latency push method since the TDigest datastructure does not support merge, and is /// probabilistic in nature. pub fn push_latency(&mut self, latency: Duration) { - if self.latency_skip_window > 0 { - self.latency_skip_window -= 1; - return; - } - self.latency.insert(latency.as_secs_f64()); + } - // TODO: The latency measurements have no windowing effect, and are strictly cumulative. + pub fn push_latencies(&mut self, mut latencies: Vec) { + for latency in latencies.drain(..) { + self.latency.insert(latency.as_secs_f64()); + } } pub fn clear(&mut self) { self.samples.clear(); self.latency = default_tdigest(); - if let Some(skip_n) = self.skip_first_n { - self.skip_window = skip_n; - self.latency_skip_window = skip_n; - } } pub fn full(&self) -> bool { - self.samples.len() == self.window_size + self.samples.len() > 0 } pub fn mean_err(&self) -> f64 { @@ -81,6 +57,15 @@ impl SampleSet { sum / self.samples.len() as f64 } + pub fn var_tps(&self) -> f64 { + let mean = self.mean_tps(); + self.samples + .iter() + .map(|x| (x.tps() - mean).powi(2)) + .sum::() + / self.samples.len() as f64 + } + pub fn latency(&self, quantile: f64) -> Duration { let secs = self.latency.quantile(quantile); Duration::from_secs_f64(secs) @@ -94,8 +79,8 @@ fn default_tdigest() -> TDigest { #[derive(Debug, Clone)] pub struct SampleData { - pub success_count: u64, - pub error_count: u64, + pub success: u64, + pub error: u64, pub elapsed: Duration, } @@ -105,10 +90,10 @@ impl SampleData { } pub fn error_rate(&self) -> f64 { - self.error_count as f64 / self.total() as f64 + self.error as f64 / self.total() as f64 } pub fn total(&self) -> u64 { - self.success_count + self.error_count + self.success + self.error } } diff --git a/balter/src/hints.rs b/balter/src/hints.rs new file mode 100644 index 0000000..0920eb0 --- /dev/null +++ b/balter/src/hints.rs @@ -0,0 +1,9 @@ +/// User provided hints for setting autoscaling parameters. +/// +/// Balter attempts to find the optimal values for all parameters, however sometimes the control +/// loops can take a while to stabalize. These are user-provided hints (see [crate::Scenario#method.hint]) +pub enum Hint { + /// Provide the starting concurrency value. Useful for Scenarios with low TPS (which Balter can + /// take a long time to stablize on). + Concurrency(usize), +} diff --git a/balter/src/lib.rs b/balter/src/lib.rs index e7ef3bc..12ca3ba 100644 --- a/balter/src/lib.rs +++ b/balter/src/lib.rs @@ -37,14 +37,19 @@ pub mod scenario; #[doc(hidden)] pub mod transaction; +mod hints; + #[macro_use] #[doc(hidden)] pub mod macros; pub(crate) mod controllers; +pub(crate) mod data; +pub(crate) mod sampler; #[cfg(not(feature = "rt"))] pub use balter_macros::{scenario, transaction}; +pub use hints::Hint; pub use scenario::Scenario; cfg_rt! { diff --git a/balter/src/scenario/sampler.rs b/balter/src/sampler.rs similarity index 97% rename from balter/src/scenario/sampler.rs rename to balter/src/sampler.rs index feaae9f..86c66da 100644 --- a/balter/src/scenario/sampler.rs +++ b/balter/src/sampler.rs @@ -1,7 +1,7 @@ use crate::controllers::{CCOutcome, ConcurrencyController}; +use crate::data::{SampleData, SampleSet}; use crate::transaction::{TransactionData, TRANSACTION_HOOK}; use arc_swap::ArcSwap; -use balter_core::{SampleData, SampleSet}; use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; use metrics_util::AtomicBucket; use std::future::Future; @@ -18,6 +18,8 @@ use tokio::time::{interval, Interval}; #[allow(unused)] use tracing::{debug, error, info, trace, warn}; +mod base_sampler; + const SAMPLE_WINDOW_SIZE: usize = 100; const SKIP_SIZE: usize = 25; @@ -44,7 +46,7 @@ where base_label: format!("balter_{name}"), sampler: Sampler::new(scenario, goal_tps), cc: ConcurrencyController::new(goal_tps), - samples: SampleSet::new(SAMPLE_WINDOW_SIZE).skip_first_n(SKIP_SIZE), + samples: SampleSet::new(), needs_clear: false, tps_limited: false, }; @@ -230,8 +232,8 @@ where let data = SampleData { elapsed, - success_count, - error_count, + success: success_count, + error: error_count, }; // TODO: We should adjust interval timing based on noise not just sample count. @@ -349,7 +351,7 @@ mod tests { let mut tps_sampler = Sampler::new(mock_trivial_scenario, NonZeroU32::new(1_000).unwrap()); tps_sampler.set_concurrency(20); - let mut samples = SampleSet::new(1); + let mut samples = SampleSet::new(); tps_sampler.sample(&mut samples).await; for _ in 0..10 { tps_sampler.sample(&mut samples).await; @@ -366,7 +368,7 @@ mod tests { let mut tps_sampler = Sampler::new(mock_noisy_scenario, NonZeroU32::new(1_000).unwrap()); tps_sampler.set_concurrency(20); - let mut samples = SampleSet::new(1); + let mut samples = SampleSet::new(); tps_sampler.sample(&mut samples).await; for _ in 0..10 { tps_sampler.sample(&mut samples).await; diff --git a/balter/src/sampler/base_sampler.rs b/balter/src/sampler/base_sampler.rs new file mode 100644 index 0000000..66f58fe --- /dev/null +++ b/balter/src/sampler/base_sampler.rs @@ -0,0 +1,364 @@ +use crate::controllers::{CCOutcome, ConcurrencyController}; +use crate::data::{SampleData, SampleSet}; +use crate::transaction::{TransactionData, TRANSACTION_HOOK}; +use arc_swap::ArcSwap; +use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; +use metrics_util::AtomicBucket; +use std::future::Future; +use std::{ + num::NonZeroU32, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::task::JoinHandle; +use tokio::time::{interval, Instant, Interval}; +#[allow(unused)] +use tracing::{debug, error, info, trace, warn}; + +const SKIP_SIZE: usize = 3; + +pub(crate) struct BaseSampler { + scenario: T, + tasks: Vec>, + timer: Timer, + task_atomics: TaskAtomics, +} + +impl BaseSampler +where + T: Fn() -> F + Send + Sync + 'static + Clone, + F: Future + Send, +{ + pub async fn new(scenario: T, tps_limit: NonZeroU32) -> Self { + Self { + scenario, + tasks: vec![], + timer: Timer::new(balter_core::BASE_INTERVAL).await, + task_atomics: TaskAtomics::new(tps_limit), + } + } + + pub async fn sample(&mut self) -> SampleSet { + let mut samples = SampleSet::new(); + let mut skip_window = SKIP_SIZE; + let mut means = vec![]; + loop { + let elapsed = self.timer.tick().await; + let provisional = self.task_atomics.collect(); + let per_sample_count = provisional.count(); + + if per_sample_count < balter_core::MIN_SAMPLE_COUNT { + self.timer.double().await; + trace!("Not enough sample count. Found {per_sample_count}."); + continue; + } + + // NOTE: We skip the first N mainly because they have tended to be the noisiest. + if skip_window > 0 { + skip_window -= 1; + trace!("Within sample skip window."); + continue; + } + + samples.push(SampleData { + success: provisional.success, + error: provisional.error, + elapsed, + }); + samples.push_latencies(provisional.latency); + + if samples.len() > 10 { + trace!("Enough samples collected."); + // NOTE: Could use a statistical review here. + means.push(samples.mean_tps()); + if is_stable(&means, 5) { + if samples.len() < 25 && per_sample_count > balter_core::ADJUSTABLE_SAMPLE_COUNT + { + trace!("Halving timer"); + self.timer.halve().await; + } + + return samples; + } else if samples.len() > 100 { + error!("Balter is unable to find a stable measurement."); + samples.clear(); + } else { + trace!("Waiting on stabilization."); + trace!("Mean measurements: {means:?}"); + } + } + } + } + + pub fn set_tps_limit(&mut self, tps_limit: NonZeroU32) { + self.task_atomics.set_tps_limit(tps_limit); + } + + pub fn set_concurrency(&mut self, concurrency: usize) { + if self.tasks.len() == concurrency { + return; + } else if self.tasks.len() > concurrency { + for handle in self.tasks.drain(concurrency..) { + handle.abort(); + } + } else { + while self.tasks.len() < concurrency { + let scenario = self.scenario.clone(); + let transaction_data = self.task_atomics.clone_to_transaction_data(); + + self.tasks.push(tokio::spawn(TRANSACTION_HOOK.scope( + transaction_data, + async move { + // NOTE: We have an outer loop just in case the user-provided + // scenario does not have a loop. + loop { + scenario().await; + } + }, + ))); + } + } + } + + pub async fn shutdown(mut self) { + self.set_concurrency(0); + } +} + +struct Timer { + interval: Interval, + last_tick: Instant, + interval_dur: Duration, +} + +impl Timer { + async fn new(interval_dur: Duration) -> Self { + let mut interval = interval(interval_dur); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + // NOTE: First tick completes instantly + let last_tick = interval.tick().await; + Self { + interval, + last_tick, + interval_dur, + } + } + + async fn tick(&mut self) -> Duration { + let next = self.interval.tick().await; + let elapsed = self.last_tick.elapsed(); + self.last_tick = next; + elapsed + } + + async fn double(&mut self) { + if self.interval_dur < Duration::from_secs(10) { + self.interval_dur *= 2; + *self = Self::new(self.interval_dur).await; + } else { + error!("Balter's Sampling interval is greater than 10s. This is likely a sign of an issue; not increasing the sampling interval.") + } + } + + async fn halve(&mut self) { + self.interval_dur *= 2; + *self = Self::new(self.interval_dur).await; + } +} + +struct TaskAtomics { + limiter: Arc>, + tps_limit: NonZeroU32, + success: Arc, + error: Arc, + latency: Arc>, +} + +impl TaskAtomics { + fn new(tps_limit: NonZeroU32) -> Self { + Self { + limiter: Arc::new(ArcSwap::new(Arc::new(rate_limiter(tps_limit)))), + tps_limit, + success: Arc::new(AtomicU64::new(0)), + error: Arc::new(AtomicU64::new(0)), + latency: Arc::new(AtomicBucket::new()), + } + } + + fn set_tps_limit(&mut self, tps_limit: NonZeroU32) { + if tps_limit != self.tps_limit { + self.tps_limit = tps_limit; + self.limiter.store(Arc::new(rate_limiter(tps_limit))); + } + } + + fn clone_to_transaction_data(&self) -> TransactionData { + TransactionData { + limiter: self.limiter.clone(), + success: self.success.clone(), + error: self.error.clone(), + latency: self.latency.clone(), + } + } + + fn collect(&self) -> ProvisionalData { + let success = self.success.swap(0, Ordering::Relaxed); + let error = self.error.swap(0, Ordering::Relaxed); + let mut latency = vec![]; + self.latency.clear_with(|dur| { + latency.extend_from_slice(dur); + }); + + ProvisionalData { + success, + error, + latency, + } + } +} + +struct ProvisionalData { + success: u64, + error: u64, + latency: Vec, +} + +impl ProvisionalData { + fn count(&self) -> u64 { + self.success + self.error + } +} + +fn rate_limiter(tps_limit: NonZeroU32) -> DefaultDirectRateLimiter { + RateLimiter::direct( + Quota::per_second(tps_limit) + // TODO: Make burst configurable + .allow_burst(NonZeroU32::new(1).unwrap()), + ) +} + +fn is_stable(values: &[f64], count: usize) -> bool { + let diffs: Vec<_> = values + .windows(2) + .map(|arr| { + // % difference + (arr[1] - arr[0]) / arr[0] + }) + .collect(); + + diffs.iter().rev().take_while(|x| **x < 0.02).count() >= count - 1 +} + +fn is_decreasing(values: &[f64], count: usize) -> bool { + values + .windows(2) + .rev() + .take(count - 1) + .map(|arr| arr[1] < arr[0]) + .all(|x| x) +} + +#[cfg(test)] +mod tests { + use super::*; + use rand_distr::{Distribution, SkewNormal}; + + macro_rules! mock_scenario { + ($m:expr, $s:expr) => { + || async { + let labels = balter_core::TransactionLabels { + success: "", + error: "", + latency: "", + }; + let mean: Duration = $m; + let std: Duration = $s; + let _ = crate::transaction::transaction_hook::<_, (), ()>(labels, async { + let normal = + SkewNormal::new(mean.as_secs_f64(), std.as_secs_f64(), 50.).unwrap(); + let v: f64 = normal.sample(&mut rand::thread_rng()).max(0.); + tokio::time::sleep(Duration::from_secs_f64(v)).await; + Ok(()) + }) + .await; + } + }; + } + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_simple() { + let mut sampler = BaseSampler::new( + mock_scenario!(Duration::from_millis(1), Duration::from_millis(0)), + NonZeroU32::new(1_000).unwrap(), + ) + .await; + + sampler.set_concurrency(11); + + let samples = sampler.sample().await; + assert!(samples.mean_tps() >= 990. && samples.mean_tps() <= 1_010.); + } + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_noisy() { + let mut sampler = BaseSampler::new( + mock_scenario!(Duration::from_millis(10), Duration::from_millis(5)), + NonZeroU32::new(10_000).unwrap(), + ) + .await; + + sampler.set_concurrency(210); + + let samples = sampler.sample().await; + assert!(samples.mean_tps() >= 9_000. && samples.mean_tps() <= 10_010.); + } + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_slow() { + let mut sampler = BaseSampler::new( + mock_scenario!(Duration::from_millis(400), Duration::from_millis(100)), + NonZeroU32::new(50).unwrap(), + ) + .await; + + sampler.set_concurrency(100); + + let samples = sampler.sample().await; + assert!(samples.mean_tps() >= 49. && samples.mean_tps() <= 51.); + } + + #[test] + fn test_stability_simple() { + let arr = [5., 6., 10., 10., 10.]; + assert!(is_stable(&arr, 3)); + assert!(!is_stable(&arr, 4)); + } + + #[test] + fn test_stability_close_values() { + let arr = [9., 9., 9.8, 9.9, 10.]; + assert!(is_stable(&arr, 3)); + assert!(!is_stable(&arr, 4)); + } + + #[test] + #[tracing_test::traced_test] + fn test_is_decreasing_true() { + let arr = [10., 5., 8., 7., 6.]; + assert!(is_decreasing(&arr, 3)); + assert!(!is_decreasing(&arr, 4)); + } + + #[test] + fn test_is_decreasing_false() { + let arr = [10., 11., 8., 7., 9.]; + assert!(!is_decreasing(&arr, 2)); + assert!(!is_decreasing(&arr, 4)); + } +} diff --git a/balter/src/scenario.rs b/balter/src/scenario.rs index 695371e..f8c1c1b 100644 --- a/balter/src/scenario.rs +++ b/balter/src/scenario.rs @@ -1,5 +1,7 @@ //! Scenario logic and constants use crate::controllers::{CompositeController, Controller}; +use crate::hints::Hint; +use crate::sampler::ConcurrentSampler; use balter_core::{LatencyConfig, RunStatistics, ScenarioConfig}; #[cfg(feature = "rt")] use balter_runtime::runtime::{RuntimeMessage, BALTER_OUT}; @@ -13,10 +15,6 @@ use std::{ #[allow(unused_imports)] use tracing::{debug, error, info, instrument, trace, warn, Instrument}; -mod sampler; - -use sampler::ConcurrentSampler; - /// Load test scenario structure /// /// Handler for running scenarios. Not intended for manual creation, use the [`#[scenario]`](balter_macros::scenario) macro which will add these methods to functions. @@ -65,6 +63,7 @@ pub trait ConfigurableScenario: Future + Sized + Send { fn tps(self, tps: u32) -> Self; fn latency(self, latency: Duration, quantile: f64) -> Self; fn duration(self, duration: Duration) -> Self; + fn hint(self, hint: Hint) -> Self; } impl ConfigurableScenario for Scenario @@ -194,6 +193,19 @@ where self.config.duration = Some(duration); self } + + /// Apply a hint for how to run the Scenario + /// + /// By default Balter attempts to autoscale all parameters to find the optimal values for + /// various scenarios. However, this process can be slow due to the control loop processes + /// underneath (and the requirements to be adaptable to all sorts of timing + /// characteristics). + /// + /// This method allows providing hints to Balter to speed up finding optimal + /// parameters. See [Hint] for more information. + fn hint(mut self, hint: Hint) -> Self { + todo!() + } } #[cfg(feature = "rt")]