Skip to content

Commit

Permalink
WIP rewrite of sampling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
byronwasti committed May 9, 2024
1 parent e9a36b9 commit 7643d72
Show file tree
Hide file tree
Showing 14 changed files with 451 additions and 67 deletions.
4 changes: 2 additions & 2 deletions balter-core/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::BASELINE_TPS;
use crate::BASE_TPS;
#[cfg(feature = "rt")]
use serde::{Deserialize, Serialize};
#[allow(unused_imports)]
Expand Down Expand Up @@ -50,7 +50,7 @@ impl ScenarioConfig {
}
| ScenarioConfig {
latency: Some(_), ..
} => Some(BASELINE_TPS),
} => Some(BASE_TPS),

ScenarioConfig {
max_tps: Some(tps), ..
Expand Down
8 changes: 7 additions & 1 deletion balter-core/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
use std::num::NonZeroU32;
use std::time::Duration;

pub const BASELINE_TPS: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(256) };
pub const BASE_TPS: NonZeroU32 = unsafe { NonZeroU32::new_unchecked(256) };
pub const BASE_CONCURRENCY: usize = 4;
pub const BASE_INTERVAL: Duration = Duration::from_millis(200);

pub const MIN_SAMPLE_COUNT: u64 = 256;
pub const ADJUSTABLE_SAMPLE_COUNT: u64 = 5_000;
2 changes: 0 additions & 2 deletions balter-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
mod config;
mod constants;
mod data;
mod metrics;
mod stats;

pub use config::*;
pub use constants::*;
pub use data::*;
pub use metrics::*;
pub use stats::*;
3 changes: 2 additions & 1 deletion balter/src/controllers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ pub(crate) use constant::ConstantController;
pub(crate) use error_rate::ErrorRateController;
pub(crate) use latency::LatencyController;

use balter_core::{LatencyConfig, SampleSet, ScenarioConfig};
use crate::data::SampleSet;
use balter_core::{LatencyConfig, ScenarioConfig};
use std::num::NonZeroU32;

pub(crate) trait Controller: Send {
Expand Down
10 changes: 5 additions & 5 deletions balter/src/controllers/concurrency.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use balter_core::SampleSet;
use crate::data::SampleSet;
use std::num::NonZeroU32;
#[allow(unused)]
use tracing::{debug, error, trace};
Expand Down Expand Up @@ -174,20 +174,20 @@ struct Measurement {
#[cfg(test)]
mod tests {
use super::*;
use balter_core::SampleData;
use crate::data::SampleData;
use std::num::NonZeroU32;
use std::time::Duration;

pub fn generate_tps(count: usize, tps: u64) -> SampleSet {
let mut samples = SampleSet::new(count);
let mut samples = SampleSet::new();

for _ in 0..count {
let success_count = tps;
let elapsed = Duration::from_secs(1);

samples.push(SampleData {
success_count,
error_count: 0,
success: success_count,
error: 0,
elapsed,
})
}
Expand Down
2 changes: 1 addition & 1 deletion balter/src/controllers/constant.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::controllers::Controller;
use balter_core::SampleSet;
use crate::data::SampleSet;
use std::num::NonZeroU32;

pub(crate) struct ConstantController {
Expand Down
7 changes: 4 additions & 3 deletions balter/src/controllers/error_rate.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::controllers::Controller;
use balter_core::{SampleSet, BASELINE_TPS};
use crate::data::SampleSet;
use balter_core::BASE_TPS;
use std::num::NonZeroU32;
#[allow(unused_imports)]
use tracing::{debug, error, info, instrument, trace, warn, Instrument};
Expand All @@ -18,7 +19,7 @@ impl ErrorRateController {
pub fn new(name: &str, error_rate: f64) -> Self {
Self {
base_label: format!("balter_{name}"),
goal_tps: BASELINE_TPS,
goal_tps: BASE_TPS,
error_rate,
state: State::BigStep,
}
Expand All @@ -43,7 +44,7 @@ impl ErrorRateController {

impl Controller for ErrorRateController {
fn initial_tps(&self) -> NonZeroU32 {
BASELINE_TPS
BASE_TPS
}

fn limit(&mut self, samples: &SampleSet, stable: bool) -> NonZeroU32 {
Expand Down
7 changes: 4 additions & 3 deletions balter/src/controllers/latency.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::controllers::Controller;
use balter_core::{SampleSet, BASELINE_TPS};
use crate::data::SampleSet;
use balter_core::BASE_TPS;
use std::num::NonZeroU32;
use std::time::Duration;
#[allow(unused)]
Expand All @@ -21,7 +22,7 @@ impl LatencyController {
base_label: format!("balter_{name}"),
latency,
quantile,
goal_tps: BASELINE_TPS,
goal_tps: BASE_TPS,
};
s.goal_tps_metric();
s
Expand All @@ -36,7 +37,7 @@ impl LatencyController {

impl Controller for LatencyController {
fn initial_tps(&self) -> NonZeroU32 {
BASELINE_TPS
BASE_TPS
}

fn limit(&mut self, samples: &SampleSet, stable: bool) -> NonZeroU32 {
Expand Down
63 changes: 24 additions & 39 deletions balter-core/src/data.rs → balter/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,67 +8,43 @@ const TDIGEST_BACKLOG_SIZE: usize = 100;
pub struct SampleSet {
samples: VecDeque<SampleData>,
latency: TDigest<K1>,
window_size: usize,
skip_first_n: Option<usize>,
skip_window: usize,
latency_skip_window: usize,
}

impl SampleSet {
pub fn new(window_size: usize) -> Self {
pub fn new() -> Self {
Self {
samples: VecDeque::new(),
latency: default_tdigest(),
window_size,
skip_first_n: None,
skip_window: 0,
latency_skip_window: 0,
}
}

pub fn skip_first_n(mut self, n_to_skip: usize) -> Self {
self.skip_first_n = Some(n_to_skip);
self.skip_window = n_to_skip;
self.latency_skip_window = n_to_skip;
self
}

pub fn push(&mut self, sample: SampleData) {
if self.skip_window > 0 {
self.skip_window -= 1;
return;
}

self.samples.push_back(sample);
if self.samples.len() > self.window_size {
self.samples.pop_front();
}
}

pub fn len(&self) -> usize {
self.samples.len()
}

/// Separate Latency push method since the TDigest datastructure does not support merge, and is
/// probabilistic in nature.
pub fn push_latency(&mut self, latency: Duration) {
if self.latency_skip_window > 0 {
self.latency_skip_window -= 1;
return;
}

self.latency.insert(latency.as_secs_f64());
}

// TODO: The latency measurements have no windowing effect, and are strictly cumulative.
pub fn push_latencies(&mut self, mut latencies: Vec<Duration>) {
for latency in latencies.drain(..) {
self.latency.insert(latency.as_secs_f64());
}
}

pub fn clear(&mut self) {
self.samples.clear();
self.latency = default_tdigest();
if let Some(skip_n) = self.skip_first_n {
self.skip_window = skip_n;
self.latency_skip_window = skip_n;
}
}

pub fn full(&self) -> bool {
self.samples.len() == self.window_size
self.samples.len() > 0
}

pub fn mean_err(&self) -> f64 {
Expand All @@ -81,6 +57,15 @@ impl SampleSet {
sum / self.samples.len() as f64
}

pub fn var_tps(&self) -> f64 {
let mean = self.mean_tps();
self.samples
.iter()
.map(|x| (x.tps() - mean).powi(2))
.sum::<f64>()
/ self.samples.len() as f64
}

pub fn latency(&self, quantile: f64) -> Duration {
let secs = self.latency.quantile(quantile);
Duration::from_secs_f64(secs)
Expand All @@ -94,8 +79,8 @@ fn default_tdigest() -> TDigest<K1> {

#[derive(Debug, Clone)]
pub struct SampleData {
pub success_count: u64,
pub error_count: u64,
pub success: u64,
pub error: u64,
pub elapsed: Duration,
}

Expand All @@ -105,10 +90,10 @@ impl SampleData {
}

pub fn error_rate(&self) -> f64 {
self.error_count as f64 / self.total() as f64
self.error as f64 / self.total() as f64
}

pub fn total(&self) -> u64 {
self.success_count + self.error_count
self.success + self.error
}
}
9 changes: 9 additions & 0 deletions balter/src/hints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/// User provided hints for setting autoscaling parameters.
///
/// Balter attempts to find the optimal values for all parameters, however sometimes the control
/// loops can take a while to stabalize. These are user-provided hints (see [crate::Scenario#method.hint])
pub enum Hint {
/// Provide the starting concurrency value. Useful for Scenarios with low TPS (which Balter can
/// take a long time to stablize on).
Concurrency(usize),
}
5 changes: 5 additions & 0 deletions balter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ pub mod scenario;
#[doc(hidden)]
pub mod transaction;

mod hints;

#[macro_use]
#[doc(hidden)]
pub mod macros;

pub(crate) mod controllers;
pub(crate) mod data;
pub(crate) mod sampler;

#[cfg(not(feature = "rt"))]
pub use balter_macros::{scenario, transaction};
pub use hints::Hint;
pub use scenario::Scenario;

cfg_rt! {
Expand Down
14 changes: 8 additions & 6 deletions balter/src/scenario/sampler.rs → balter/src/sampler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::controllers::{CCOutcome, ConcurrencyController};
use crate::data::{SampleData, SampleSet};
use crate::transaction::{TransactionData, TRANSACTION_HOOK};
use arc_swap::ArcSwap;
use balter_core::{SampleData, SampleSet};
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use metrics_util::AtomicBucket;
use std::future::Future;
Expand All @@ -18,6 +18,8 @@ use tokio::time::{interval, Interval};
#[allow(unused)]
use tracing::{debug, error, info, trace, warn};

mod base_sampler;

const SAMPLE_WINDOW_SIZE: usize = 100;
const SKIP_SIZE: usize = 25;

Expand All @@ -44,7 +46,7 @@ where
base_label: format!("balter_{name}"),
sampler: Sampler::new(scenario, goal_tps),
cc: ConcurrencyController::new(goal_tps),
samples: SampleSet::new(SAMPLE_WINDOW_SIZE).skip_first_n(SKIP_SIZE),
samples: SampleSet::new(),
needs_clear: false,
tps_limited: false,
};
Expand Down Expand Up @@ -230,8 +232,8 @@ where

let data = SampleData {
elapsed,
success_count,
error_count,
success: success_count,
error: error_count,
};

// TODO: We should adjust interval timing based on noise not just sample count.
Expand Down Expand Up @@ -349,7 +351,7 @@ mod tests {
let mut tps_sampler = Sampler::new(mock_trivial_scenario, NonZeroU32::new(1_000).unwrap());
tps_sampler.set_concurrency(20);

let mut samples = SampleSet::new(1);
let mut samples = SampleSet::new();
tps_sampler.sample(&mut samples).await;
for _ in 0..10 {
tps_sampler.sample(&mut samples).await;
Expand All @@ -366,7 +368,7 @@ mod tests {
let mut tps_sampler = Sampler::new(mock_noisy_scenario, NonZeroU32::new(1_000).unwrap());
tps_sampler.set_concurrency(20);

let mut samples = SampleSet::new(1);
let mut samples = SampleSet::new();
tps_sampler.sample(&mut samples).await;
for _ in 0..10 {
tps_sampler.sample(&mut samples).await;
Expand Down
Loading

0 comments on commit 7643d72

Please sign in to comment.