Skip to content

Commit

Permalink
Refactor ConcurrencyController for better stability
Browse files Browse the repository at this point in the history
The ConcurrencyController has gone through a handful of iterations, the
latest attempts to simplify the operations drastically. It now
calculates the "optimal" task count given the measured TPS ratio to the
goal TPS and the current concurrency. There needs to be some safeguards
(currently limited to increases by 100) since otherwise in wildly
underprovisioned workflows it attempts to go too high too fast.

Additionally, it uses simple slope detection to attempt to determine if
the server is unable to reach the TPS requested; if we keep increasing
the concurrency but the TPS we are hitting stays the same then there is
no point in continuing to increase concurrency. This functionality will
likely require some fine-tuning.
  • Loading branch information
byronwasti committed Feb 14, 2024
1 parent a00896a commit f7f1e58
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 19 deletions.
2 changes: 2 additions & 0 deletions balter-core/src/controllers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod concurrency;
mod sample_set;
250 changes: 250 additions & 0 deletions balter-core/src/controllers/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
use crate::controllers::sample_set::SampleSet;
use std::collections::HashMap;
use std::num::NonZeroU32;
use tracing::{debug, error, info, trace, warn};

// TODO: Does it make more sense to have this as CPU count?
const STARTING_CONCURRENCY_COUNT: usize = 4;
const WINDOW_SIZE: usize = 20;

#[derive(Debug)]
pub(crate) struct ConcurrencyController {
samples: SampleSet,
prev_measurements: HashMap<usize, f64>,
concurrency: usize,
goal_tps: NonZeroU32,
state: State,
}

impl ConcurrencyController {
pub fn new(goal_tps: NonZeroU32) -> Self {
Self {
samples: SampleSet::new(WINDOW_SIZE),
prev_measurements: HashMap::new(),
concurrency: STARTING_CONCURRENCY_COUNT,
goal_tps,
state: State::Active,
}
}

pub fn set_goal_tps(&mut self, goal_tps: NonZeroU32) {
self.goal_tps = goal_tps;
self.samples.clear();
self.prev_measurements.clear();
self.concurrency = STARTING_CONCURRENCY_COUNT;
self.state = State::Active;
}

pub fn concurrency(&self) -> usize {
self.concurrency
}

pub fn analyze(&mut self, sample: f64) -> Message {
if sample == 0. {
error!("No TPS sampled");
return Message::None;
}

self.samples.push(sample);

match self.analyze_inner() {
Some(m @ Message::AlterConcurrency(val)) => {
self.samples.clear();
self.concurrency = val;
info!("Adjusting concurrency to {}", self.concurrency);
m
}
Some(m @ Message::TpsLimited(val)) => {
warn!(
"TPS is limited to {}, at {} concurrency",
val, self.concurrency
);
m
}
Some(Message::None) | None => Message::None,
}
}

fn analyze_inner(&mut self) -> Option<Message> {
let mean_tps = self.samples.mean()?;
let measurement = Measurement {
concurrency: self.concurrency,
tps: mean_tps,
};

let goal_tps: f64 = Into::<u32>::into(self.goal_tps).into();

debug!(
"Goal TPS: {}, Measured TPS: {} at {} concurrency",
goal_tps, measurement.tps, self.concurrency
);

let error = (goal_tps - measurement.tps) / goal_tps;
if error < 0.05 {
// NOTE: We don't really care about the negative case, since we're relying on the
// RateLimiter to handle that situation.
self.state = State::Stable(0);
None
} else {
self.prev_measurements
.insert(self.concurrency, measurement.tps);

let adjustment = goal_tps / measurement.tps;
trace!(
"Adjustment: {:.2} ({:.2} / {:.2})",
adjustment,
goal_tps,
measurement.tps
);

let new_concurrency = (self.concurrency as f64 * adjustment) as usize;

let new_concurrency_step = new_concurrency - self.concurrency;

// TODO: Make this a proportion of the current concurrency so that it can scale faster
// at higher levels.
let new_concurrency = if new_concurrency_step > 50 {
self.concurrency + 100
} else {
new_concurrency
};

if new_concurrency == 0 {
error!("Error in the ConcurrencyController.");
None
} else if let Some(max_tps) = self.detect_underpowered() {
Some(Message::TpsLimited(max_tps))
} else {
Some(Message::AlterConcurrency(new_concurrency))
}
}
}

fn detect_underpowered(&self) -> Option<NonZeroU32> {
let mut data_points: Vec<Measurement> = self
.prev_measurements
.iter()
.map(|(c, t)| Measurement {
concurrency: *c,
tps: *t,
})
.collect();

data_points.sort_by_key(|f| f.concurrency);

let slopes: Vec<_> = data_points
.windows(2)
.map(|arr| {
let m1 = arr[0];
let m2 = arr[1];

let slope = (m2.tps - m1.tps) / (m2.concurrency - m1.concurrency) as f64;
let b = m2.tps - slope * m1.concurrency as f64;
trace!(
"({}, {:.2}), ({}, {:.2})",
m1.concurrency,
m1.tps,
m2.concurrency,
m2.tps
);
trace!("y = {:.2}x + {:.2}", slope, b);

slope
})
.collect();

if slopes.len() > 3 && slopes.iter().rev().take(3).all(|m| *m < 1.) {
let last = data_points[data_points.len() - 1].tps;
let max_tps = NonZeroU32::new(last as u32).unwrap();
Some(max_tps)
} else {
None
}
}
}

#[derive(Debug)]
enum State {
Active,
Stable(usize),
}

#[derive(Debug, Copy, Clone)]
pub(crate) enum Message {
None,
TpsLimited(NonZeroU32),
AlterConcurrency(usize),
}

#[derive(Debug, Copy, Clone)]
struct Measurement {
pub concurrency: usize,
pub tps: f64,
}

#[cfg(test)]
mod tests {
use super::*;
use rand_distr::{Distribution, Normal};

#[tracing_test::traced_test]
#[test]
fn scales_up() {
let mut controller = ConcurrencyController::new(NonZeroU32::new(200).unwrap());

let mut normal = Normal::new(9. * STARTING_CONCURRENCY_COUNT as f64, 2.).unwrap();
for _ in 0..50 {
let v: f64 = normal.sample(&mut rand::thread_rng());
match controller.analyze(v) {
Message::None => {}
Message::AlterConcurrency(new_val) => {
normal = Normal::new(9. * new_val as f64, 2.).unwrap();
}
Message::TpsLimited(_) => {
panic!("ConcurrencyController reports TpsLimited incorrectly");
}
}

if matches!(controller.state, State::Stable(_)) {
break;
}
}

assert!(controller.concurrency > 20);
}

#[tracing_test::traced_test]
#[test]
fn limits() {
let mut controller = ConcurrencyController::new(NonZeroU32::new(400).unwrap());

let mut normal = Normal::new(9. * STARTING_CONCURRENCY_COUNT as f64, 2.).unwrap();
let mut limited = false;
for _ in 0..100 {
let v: f64 = normal.sample(&mut rand::thread_rng());
match controller.analyze(v) {
Message::None => {}
Message::AlterConcurrency(new_val) => {
if new_val > 22 {
normal = Normal::new(9. * 22., 2.).unwrap();
} else {
normal = Normal::new(9. * new_val as f64, 2.).unwrap();
}
}
Message::TpsLimited(tps) => {
assert!(u32::from(tps) < 220);
assert!(u32::from(tps) > 170);
limited = true;
break;
}
}

if matches!(controller.state, State::Stable(_)) {
break;
}
}

assert!(limited);
assert!(controller.concurrency > 20);
}
}
46 changes: 46 additions & 0 deletions balter-core/src/controllers/sample_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::collections::VecDeque;

#[derive(Debug)]
pub(crate) struct SampleSet {
samples: VecDeque<f64>,
window_size: usize,
}

impl SampleSet {
pub fn new(window_size: usize) -> Self {
Self {
samples: VecDeque::new(),
window_size,
}
}

pub fn push(&mut self, sample: f64) {
self.samples.push_back(sample);
if self.samples.len() > self.window_size {
self.samples.pop_front();
}
}

pub fn clear(&mut self) {
self.samples.clear();
}

pub fn mean(&self) -> Option<f64> {
if self.samples.len() == self.window_size {
let sum: f64 = self.samples.iter().sum();
Some(sum / self.samples.len() as f64)
} else {
None
}
}

#[allow(unused)]
pub fn std(&self) -> Option<f64> {
let mean = self.mean()?;

let n = self.samples.len() as f64;
let v = self.samples.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / (n - 1.);

Some(v.sqrt())
}
}
3 changes: 2 additions & 1 deletion balter-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ pub mod macros;
cfg_rt! {
pub mod runtime;
}
pub(crate) mod sampling;
pub(crate) mod controllers;
pub mod scenario;
pub(crate) mod tps_sampler;
#[doc(hidden)]
pub mod transaction;

Expand Down
1 change: 0 additions & 1 deletion balter-core/src/sampling.rs

This file was deleted.

2 changes: 1 addition & 1 deletion balter-core/src/scenario/direct.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::ScenarioConfig;
use crate::sampling::tps_sampler::TpsSampler;
use crate::tps_sampler::TpsSampler;
use humantime::format_duration;
use std::future::Future;
use std::num::NonZeroU32;
Expand Down
2 changes: 1 addition & 1 deletion balter-core/src/scenario/error_rate_controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::concurrency_controller::ConcurrencyController;
use crate::sampling::tps_sampler::TpsData;
use crate::tps_sampler::TpsData;
use std::collections::VecDeque;
#[allow(unused_imports)]
use tracing::{debug, error, info, instrument, trace, warn, Instrument};
Expand Down
27 changes: 16 additions & 11 deletions balter-core/src/scenario/goal_tps.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::concurrency_controller::ConcurrencyController;
use super::ScenarioConfig;
use crate::controllers::concurrency::{ConcurrencyController, Message};
#[cfg(feature = "rt")]
use crate::runtime::BALTER_OUT;
use crate::sampling::tps_sampler::TpsSampler;
use crate::tps_sampler::TpsSampler;
use std::future::Future;
use std::num::NonZeroU32;
#[allow(unused_imports)]
Expand All @@ -21,8 +21,9 @@ where
let start = Instant::now();

let goal_tps = config.goal_tps().unwrap();
let mut controller = ConcurrencyController::new(goal_tps as f64);
let mut controller = ConcurrencyController::new(NonZeroU32::new(goal_tps).unwrap());
let mut sampler = TpsSampler::new(scenario, NonZeroU32::new(goal_tps).unwrap());
sampler.set_concurrent_count(controller.concurrency());

// NOTE: This loop is time-sensitive. Any long awaits or blocking will throw off measurements
loop {
Expand All @@ -31,15 +32,19 @@ where
break;
}

controller.push(sample.tps());
sampler.set_concurrent_count(controller.concurrent_count() as usize);
match controller.analyze(sample.tps()) {
Message::AlterConcurrency(val) => {
sampler.set_concurrent_count(val);
}
Message::TpsLimited(max_tps) => {
sampler.set_concurrent_count(controller.concurrency());
sampler.set_tps_limit(max_tps);
controller.set_goal_tps(max_tps);

if let Some(max_tps) = controller.is_underpowered() {
controller.set_goal_tps(max_tps);
sampler.set_tps_limit(NonZeroU32::new(max_tps as u32).unwrap());

#[cfg(feature = "rt")]
distribute_work(&config, start.elapsed(), max_tps).await;
#[cfg(feature = "rt")]
distribute_work(&config, start.elapsed(), u32::from(max_tps) as f64).await;
}
Message::None => {}
}
}
sampler.wait_for_shutdown().await;
Expand Down
2 changes: 1 addition & 1 deletion balter-core/src/scenario/saturate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::error_rate_controller::ErrorRateController;
use super::ScenarioConfig;
#[cfg(feature = "rt")]
use crate::runtime::BALTER_OUT;
use crate::sampling::tps_sampler::TpsSampler;
use crate::tps_sampler::TpsSampler;
use std::future::Future;
use std::num::NonZeroU32;
#[allow(unused_imports)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
tps_limit,

tasks: vec![],
interval: interval(Duration::from_millis(30)),
interval: interval(Duration::from_millis(200)),
last_tick: Instant::now(),

success_count: Arc::new(AtomicU64::new(0)),
Expand Down
Loading

0 comments on commit f7f1e58

Please sign in to comment.