Skip to content

Commit

Permalink
Add new TPS sampling mechanism
Browse files Browse the repository at this point in the history
Sampling TPS turns out to be a trickier problem than expected. Various
strategies used previously ran into issues with noise and with the
overhead of spawning new tokio tasks. The new strategy simply spawns
tasks in the background and samples at a regular interval for the TPS
via atomics.

Everything operates independently, with no task creation overhead. Heavy
lifting being done by atomics.
  • Loading branch information
byronwasti committed Feb 6, 2024
1 parent 6fb9e49 commit 6c81994
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 308 deletions.
1 change: 1 addition & 0 deletions balter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ categories = ["development-tools", "concurrency"]
publish = true

[dependencies]
arc-swap = "1.6.0"
balter-macros = { version = "0.1", path = "../balter-macros" }
governor = "0.6.0"
humantime = "2.1.0"
Expand Down
1 change: 1 addition & 0 deletions balter-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod macros;
cfg_rt! {
pub mod runtime;
}
pub(crate) mod sampling;
pub mod scenario;
#[doc(hidden)]
pub mod transaction;
Expand Down
1 change: 1 addition & 0 deletions balter-core/src/sampling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod tps_sampler;
221 changes: 221 additions & 0 deletions balter-core/src/sampling/tps_sampler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
use crate::{
scenario::BoxedFut,
transaction::{TransactionData, TRANSACTION_HOOK},
};
use arc_swap::ArcSwap;
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use std::{
num::NonZeroU32,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
use tokio::task::JoinHandle;
use tokio::time::{interval, Interval};
#[allow(unused)]
use tracing::{debug, error, info, trace};

#[derive(Debug, Copy, Clone)]
pub(crate) struct TpsData {
pub success_count: u64,
pub error_count: u64,
pub elapsed: Duration,
}

impl TpsData {
#[allow(unused)]
pub fn new() -> Self {
Self {
success_count: 0,
error_count: 0,
elapsed: Duration::new(0, 0),
}
}

pub fn tps(&self) -> f64 {
self.total() as f64 / self.elapsed.as_nanos() as f64 * 1e9
}

pub fn error_rate(&self) -> f64 {
self.error_count as f64 / self.total() as f64
}

pub fn total(&self) -> u64 {
self.success_count + self.error_count
}
}

pub(crate) struct TpsSampler {
scenario: fn() -> BoxedFut,
concurrent_count: Arc<AtomicUsize>,
limiter: Arc<ArcSwap<DefaultDirectRateLimiter>>,
tps_limit: NonZeroU32,

tasks: Vec<JoinHandle<()>>,
interval: Interval,
last_tick: Instant,

success_count: Arc<AtomicU64>,
error_count: Arc<AtomicU64>,
}

impl TpsSampler {
pub(crate) fn new(scenario: fn() -> BoxedFut, tps_limit: NonZeroU32) -> Self {
let limiter: DefaultDirectRateLimiter = rate_limiter(tps_limit);
let limiter: Arc<DefaultDirectRateLimiter> = Arc::new(limiter);
let limiter: Arc<ArcSwap<DefaultDirectRateLimiter>> = Arc::new(ArcSwap::new(limiter));
let mut new_self = Self {
scenario,
concurrent_count: Arc::new(AtomicUsize::new(1)),
limiter,
tps_limit,

tasks: vec![],
interval: interval(Duration::from_millis(10)),
last_tick: Instant::now(),

success_count: Arc::new(AtomicU64::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
};
new_self.populate_jobs();
new_self
}

pub(crate) async fn sample_tps(&mut self) -> TpsData {
self.interval.tick().await;
let elapsed = self.last_tick.elapsed();
self.last_tick = Instant::now();

let success_count = self.success_count.swap(0, Ordering::Relaxed);
let error_count = self.error_count.swap(0, Ordering::Relaxed);

TpsData {
elapsed,
success_count,
error_count,
}
}

/// NOTE: Panics when concurrent_count=0
pub(crate) fn set_concurrent_count(&mut self, concurrent_count: usize) {
if concurrent_count != 0 {
self.concurrent_count
.store(concurrent_count, Ordering::Relaxed);
self.populate_jobs();
} else {
panic!("Concurrent count is not allowed to be set to 0.");
}
}

pub(crate) fn set_tps_limit(&mut self, tps_limit: NonZeroU32) {
if tps_limit != self.tps_limit {
self.tps_limit = tps_limit;
self.limiter.store(Arc::new(rate_limiter(tps_limit)));
}
}

pub(crate) async fn wait_for_shutdown(mut self) {
self.concurrent_count.store(0, Ordering::Relaxed);
for task in self.tasks.drain(..) {
// TODO: Timeout in case a scenario loops indefinitely
task.await.expect("Task unexpectedly failed.");
}
}

fn populate_jobs(&mut self) {
let concurrent_count = self.concurrent_count.load(Ordering::Relaxed);

if self.tasks.len() > concurrent_count {
// TODO: Clean up the tasks cleanly + timeout/abort in case a scenario loops
// indefinitely
self.tasks.truncate(concurrent_count);
} else {
while self.tasks.len() < concurrent_count {
let scenario = self.scenario;
let concurrent_count = self.concurrent_count.clone();
let id = self.tasks.len();
let transaction_data = TransactionData {
// TODO: Use ArcSwap here
limiter: self.limiter.clone(),
success: self.success_count.clone(),
error: self.error_count.clone(),
};

trace!("Spawning a new task with id {id}.");
self.tasks.push(tokio::spawn(TRANSACTION_HOOK.scope(
transaction_data,
async move {
while id < concurrent_count.load(Ordering::Relaxed) {
scenario().await;
}
},
)));
}
}
}
}

fn rate_limiter(tps_limit: NonZeroU32) -> DefaultDirectRateLimiter {
RateLimiter::direct(
Quota::per_second(tps_limit)
// TODO: Make burst configurable
.allow_burst(NonZeroU32::new(1).unwrap()),
)
}

#[cfg(test)]
mod tests {
use super::*;
use rand_distr::{Distribution, Normal};

fn mock_trivial_scenario() -> BoxedFut {
Box::pin(async move {
let _ = crate::transaction::transaction_hook::<_, (), ()>(async { Ok(()) }).await;
})
}

fn mock_noisy_scenario() -> BoxedFut {
Box::pin(async move {
let _ = crate::transaction::transaction_hook::<_, (), ()>(async {
let normal = Normal::new(100., 25.).unwrap();
let v: f64 = normal.sample(&mut rand::thread_rng());
tokio::time::sleep(Duration::from_micros(v.floor() as u64));
Ok(())
})
.await;
})
}

#[tracing_test::traced_test]
#[tokio::test]
#[ntest::timeout(300)]
async fn test_simple_case() {
let mut tps_sampler =
TpsSampler::new(mock_trivial_scenario, NonZeroU32::new(1_000).unwrap());
tps_sampler.set_concurrent_count(20);

let _sample = tps_sampler.sample_tps().await;
for _ in 0..10 {
let sample = tps_sampler.sample_tps().await;
info!("tps: {}", sample.tps());
assert!((1_000. - sample.tps()).abs() < 150.);
}
}

#[tracing_test::traced_test]
#[tokio::test]
#[ntest::timeout(300)]
async fn test_noisy_case() {
let mut tps_sampler = TpsSampler::new(mock_noisy_scenario, NonZeroU32::new(1_000).unwrap());
tps_sampler.set_concurrent_count(20);

let _sample = tps_sampler.sample_tps().await;
for _ in 0..10 {
let sample = tps_sampler.sample_tps().await;
info!("tps: {}", sample.tps());
assert!((1_000. - sample.tps()).abs() < 150.);
}
}
}
2 changes: 0 additions & 2 deletions balter-core/src/scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ use std::{
time::Duration,
};

mod batch_size_controller;
mod concurrency_controller;
mod error_rate_controller;
mod goal_tps;
mod saturate;
mod tps_sampler;

/// The default error rate used for `.saturate()`
pub const DEFAULT_SATURATE_ERROR_RATE: f64 = 0.03;
Expand Down
94 changes: 0 additions & 94 deletions balter-core/src/scenario/batch_size_controller.rs

This file was deleted.

10 changes: 5 additions & 5 deletions balter-core/src/scenario/concurrency_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ConcurrencyController {
if error.abs() < 0.05 {
if self.state != ConcurrencyControllerState::Stable {
self.state = ConcurrencyControllerState::Stable;
debug!(
info!(
"Concurrency controller is stable. Goal: {:.2}, acheiving: {:.2} at concurrency {}",
self.goal_tps,
mean,
Expand Down Expand Up @@ -174,7 +174,7 @@ impl ConcurrencyController {
}
} else {
trace!("I");
debug!("Concurrency controller found contradiction; resetting");
trace!("Concurrency controller found contradiction; resetting");
self.state = ConcurrencyControllerState::Reset;
}
} else {
Expand Down Expand Up @@ -232,7 +232,7 @@ impl ConcurrencyController {
self.set_underpowered(cur_measurements);
} else {
trace!("R");
debug!(
trace!(
"Concurrency controller found contradiction; resetting"
);
self.state = ConcurrencyControllerState::Reset;
Expand All @@ -253,7 +253,7 @@ impl ConcurrencyController {
}

if self.concurrent_count != cur_measurements.concurrent_count {
debug!("Adjusting concurrency count to {}", self.concurrent_count);
trace!("Adjusting concurrency count to {}", self.concurrent_count);
self.samples.clear();
self.previous_measured_values.push(cur_measurements);
}
Expand All @@ -279,7 +279,7 @@ impl ConcurrencyController {
} else {
*underpowered_count += 1;
self.reset();
trace!(
info!(
"Server may be underpowered. Sampling more measurements. {:?}",
self.underpowered_counter
);
Expand Down
3 changes: 2 additions & 1 deletion balter-core/src/scenario/error_rate_controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{concurrency_controller::ConcurrencyController, tps_sampler::TpsData};
use super::concurrency_controller::ConcurrencyController;
use crate::sampling::tps_sampler::TpsData;
use std::collections::VecDeque;
#[allow(unused_imports)]
use tracing::{debug, error, info, instrument, trace, warn, Instrument};
Expand Down
Loading

0 comments on commit 6c81994

Please sign in to comment.