Skip to content

Commit

Permalink
Refactor Scenario runner to be generic rather than Pin<Box<>>
Browse files Browse the repository at this point in the history
Upon doing some benchmarks with Pin<Box<impl Future>>, I found that it
is _slow_. As in, about an order of magnitude slower than a regular
Future. It was originally a shim to get things working, and replacing
with generics was always the plan. The types were just annoying at the
time.

This gets the generic Future integration done, where Pin<Box<T>> is only
kept around for the outermost Scenario which is used by the distributed
runtime. Unfortunately, the distributed runtime functionality is broken
and will have a followup commit to actually fix it.

I haven't done a ton of benchmarking yet, but so far it seems there are
some *major* performance wins with this change which is exciting.
  • Loading branch information
byronwasti committed Feb 8, 2024
1 parent 2323a73 commit 86afddc
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 107 deletions.
1 change: 1 addition & 0 deletions balter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tokio-tungstenite = { version = "0.21.0", optional = true }
tower = { version = "0.4.13", optional = true }
tower-http = { version = "0.5.0", features = ["trace"], optional = true }
url = { version = "2.5.0", optional = true }
async-trait = "0.1.77"

[dev-dependencies]
rand = { version = "0.8.5", features = ["small_rng"] }
Expand Down
1 change: 1 addition & 0 deletions balter-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use balter_macros::{scenario, transaction};
pub use scenario::Scenario;

pub mod prelude {
pub use crate::scenario::ConfigurableScenario;
cfg_rt! {
pub use crate::runtime::{distributed_slice, BalterRuntime};
}
Expand Down
7 changes: 4 additions & 3 deletions balter-core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! involves spinning up an API server and a gossip protocol task.
use crate::{
gossip::{gossip_task, GossipData},
scenario::{Scenario, ScenarioConfig},
scenario::{ScenarioConfig, DistributedScenario},
server::server_task,
};
use async_channel::{bounded, Receiver, Sender};
Expand All @@ -29,7 +29,7 @@ lazy_static! {
/// function pointer.
#[doc(hidden)]
#[distributed_slice]
pub static BALTER_SCENARIOS: [(&'static str, fn() -> Scenario)];
pub static BALTER_SCENARIOS: [(&'static str, fn() -> Box<dyn DistributedScenario<Output=()>>)];

const DEFAULT_PORT: u16 = 7621;

Expand Down Expand Up @@ -137,9 +137,10 @@ async fn run(scenarios: HashMap<&'static str, usize>, balter: &BalterRuntime) ->
if let Some(idx) = scenarios.get(config.name.as_str()) {
info!("Running scenario {}.", &config.name);
let scenario = BALTER_SCENARIOS[*idx];
let fut = scenario.1().set_config(config);
tokio::spawn(
async move {
scenario.1().set_config(config).await;
fut.await;
}
.in_current_span(),
);
Expand Down
42 changes: 20 additions & 22 deletions balter-core/src/sampling/tps_sampler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::{
scenario::BoxedFut,
transaction::{TransactionData, TRANSACTION_HOOK},
};
use crate::transaction::{TransactionData, TRANSACTION_HOOK};
use arc_swap::ArcSwap;
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use std::future::Future;
use std::{
num::NonZeroU32,
sync::{
Expand Down Expand Up @@ -47,8 +45,8 @@ impl TpsData {
}
}

pub(crate) struct TpsSampler {
scenario: fn() -> BoxedFut,
pub(crate) struct TpsSampler<T> {
scenario: T,
concurrent_count: Arc<AtomicUsize>,
limiter: Arc<ArcSwap<DefaultDirectRateLimiter>>,
tps_limit: NonZeroU32,
Expand All @@ -61,8 +59,12 @@ pub(crate) struct TpsSampler {
error_count: Arc<AtomicU64>,
}

impl TpsSampler {
pub(crate) fn new(scenario: fn() -> BoxedFut, tps_limit: NonZeroU32) -> Self {
impl<T, F> TpsSampler<T>
where
T: Fn() -> F + Send + Sync + 'static + Clone,
F: Future<Output = ()> + Send,
{
pub(crate) fn new(scenario: T, tps_limit: NonZeroU32) -> Self {
let limiter: DefaultDirectRateLimiter = rate_limiter(tps_limit);
let limiter: Arc<DefaultDirectRateLimiter> = Arc::new(limiter);
let limiter: Arc<ArcSwap<DefaultDirectRateLimiter>> = Arc::new(ArcSwap::new(limiter));
Expand Down Expand Up @@ -131,7 +133,7 @@ impl TpsSampler {
self.tasks.truncate(concurrent_count);
} else {
while self.tasks.len() < concurrent_count {
let scenario = self.scenario;
let scenario = self.scenario.clone();
let concurrent_count = self.concurrent_count.clone();
let id = self.tasks.len();
let transaction_data = TransactionData {
Expand Down Expand Up @@ -168,22 +170,18 @@ mod tests {
use super::*;
use rand_distr::{Distribution, Normal};

fn mock_trivial_scenario() -> BoxedFut {
Box::pin(async move {
let _ = crate::transaction::transaction_hook::<_, (), ()>(async { Ok(()) }).await;
})
async fn mock_trivial_scenario() {
let _ = crate::transaction::transaction_hook::<_, (), ()>(async { Ok(()) }).await;
}

fn mock_noisy_scenario() -> BoxedFut {
Box::pin(async move {
let _ = crate::transaction::transaction_hook::<_, (), ()>(async {
let normal = Normal::new(100., 25.).unwrap();
let v: f64 = normal.sample(&mut rand::thread_rng());
tokio::time::sleep(Duration::from_micros(v.floor() as u64)).await;
Ok(())
})
.await;
async fn mock_noisy_scenario() {
let _ = crate::transaction::transaction_hook::<_, (), ()>(async {
let normal = Normal::new(100., 25.).unwrap();
let v: f64 = normal.sample(&mut rand::thread_rng());
tokio::time::sleep(Duration::from_micros(v.floor() as u64)).await;
Ok(())
})
.await;
}

#[tracing_test::traced_test]
Expand Down
127 changes: 81 additions & 46 deletions balter-core/src/scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@ pub const DEFAULT_SATURATE_ERROR_RATE: f64 = 0.03;
/// The default error rate used for `.overload()`
pub const DEFAULT_OVERLOAD_ERROR_RATE: f64 = 0.80;

// TODO: We should _not_ need to use a Boxed future! Every single function call for any load
// testing is boxed which *sucks*. Unfortunately I haven't figured out how to appease the Type
// system.
pub(crate) type BoxedFut = Pin<Box<dyn Future<Output = ()> + Send>>;

// TODO: Have a separate builder
#[doc(hidden)]
#[derive(Clone, Debug)]
#[cfg_attr(feature = "rt", cfg_eval::cfg_eval, serde_as)]
#[cfg_attr(feature = "rt", derive(Serialize, Deserialize))]
pub(crate) struct ScenarioConfig {
pub struct ScenarioConfig {
pub name: String,
#[cfg_attr(feature = "rt", serde_as(as = "DurationSeconds"))]
pub duration: Duration,
Expand Down Expand Up @@ -84,9 +80,10 @@ impl ScenarioConfig {
}
}

#[doc(hidden)]
#[derive(Default, Clone, Copy, Debug)]
#[cfg_attr(feature = "rt", derive(Serialize, Deserialize))]
pub(crate) enum ScenarioKind {
pub enum ScenarioKind {
#[default]
Once,
Tps(u32),
Expand All @@ -98,22 +95,83 @@ pub(crate) enum ScenarioKind {
///
/// Handler for running scenarios. Not intended for manual creation, use the [`#[scenario]`](balter_macros::scenario) macro which will add these methods to functions.
#[pin_project::pin_project]
pub struct Scenario {
fut: fn() -> BoxedFut,
pub struct Scenario<T> {
func: T,
runner_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
config: ScenarioConfig,
}

impl Scenario {
impl<T> Scenario<T> {
#[doc(hidden)]
pub fn new(name: &str, fut: fn() -> BoxedFut) -> Self {
pub fn new(name: &str, func: T) -> Self {
Self {
fut,
func,
runner_fut: None,
config: ScenarioConfig::new(name),
}
}
}

impl<T, F> Future for Scenario<T>
where
T: Fn() -> F + Send + 'static + Clone + Sync,
F: Future<Output = ()> + Send,
{
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.runner_fut.is_none() {
let func = self.func.clone();
let config = self.config.clone();
self.runner_fut = Some(Box::pin(async move { run_scenario(func, config).await }));
}

if let Some(runner) = &mut self.runner_fut {
runner.as_mut().poll(cx)
} else {
unreachable!()
}
}
}

#[doc(hidden)]
pub trait DistributedScenario: Future + Send {
/// The strange type-signature is to facilitate treating this as a trait object for the
/// distributed functionality.
fn set_config(
&self,
config: ScenarioConfig,
) -> Pin<Box<dyn DistributedScenario<Output = Self::Output>>>;
}

impl<T, F> DistributedScenario for Scenario<T>
where
T: Fn() -> F + Send + 'static + Clone + Sync,
F: Future<Output = ()> + Send,
{
#[allow(unused)]
fn set_config(
&self,
_config: ScenarioConfig,
) -> Pin<Box<dyn DistributedScenario<Output = Self::Output>>> {
unimplemented!()
}
}

pub trait ConfigurableScenario<T: Send>: Future<Output = T> + Sized + Send {
fn saturate(self) -> Self;
fn overload(self) -> Self;
fn error_rate(self, error_rate: f64) -> Self;
fn tps(self, tps: u32) -> Self;
fn direct(self, tps_limit: u32, concurrency: usize) -> Self;
fn duration(self, duration: Duration) -> Self;
}

impl<T, F> ConfigurableScenario<()> for Scenario<T>
where
T: Fn() -> F + Send + 'static + Clone + Sync,
F: Future<Output = ()> + Send,
{
/// Run the scenario increasing TPS until an error rate of 3% is reached.
///
/// NOTE: Must supply a `.duration()` as well
Expand All @@ -135,7 +193,7 @@ impl Scenario {
/// async fn my_scenario() {
/// }
/// ```
pub fn saturate(mut self) -> Self {
fn saturate(mut self) -> Self {
self.config.kind = ScenarioKind::Saturate(DEFAULT_SATURATE_ERROR_RATE);
self
}
Expand All @@ -161,7 +219,7 @@ impl Scenario {
/// async fn my_scenario() {
/// }
/// ```
pub fn overload(mut self) -> Self {
fn overload(mut self) -> Self {
self.config.kind = ScenarioKind::Saturate(DEFAULT_OVERLOAD_ERROR_RATE);
self
}
Expand All @@ -187,7 +245,7 @@ impl Scenario {
/// async fn my_scenario() {
/// }
/// ```
pub fn error_rate(mut self, error_rate: f64) -> Self {
fn error_rate(mut self, error_rate: f64) -> Self {
self.config.kind = ScenarioKind::Saturate(error_rate);
self
}
Expand All @@ -213,15 +271,15 @@ impl Scenario {
/// async fn my_scenario() {
/// }
/// ```
pub fn tps(mut self, tps: u32) -> Self {
fn tps(mut self, tps: u32) -> Self {
self.config.kind = ScenarioKind::Tps(tps);
self
}

/// Run the scenario with direct control over TPS and concurrency.
/// No automatic controls will limit or change any values. This is intended
/// for development testing or advanced ussage.
pub fn direct(mut self, tps_limit: u32, concurrency: usize) -> Self {
fn direct(mut self, tps_limit: u32, concurrency: usize) -> Self {
self.config.kind = ScenarioKind::Direct(tps_limit, concurrency);
self
}
Expand All @@ -247,40 +305,17 @@ impl Scenario {
/// async fn my_scenario() {
/// }
/// ```
pub fn duration(mut self, duration: Duration) -> Self {
fn duration(mut self, duration: Duration) -> Self {
self.config.duration = duration;
self
}

#[cfg(feature = "rt")]
pub(crate) fn set_config(mut self, config: ScenarioConfig) -> Self {
self.config = config;
self
}
}

impl Future for Scenario {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// TODO: Surely there is a cleaner way to do this...
if self.runner_fut.is_none() {
let fut = self.fut;
let config = self.config.clone();
// TODO: There must be a way to run this future without boxing it. I feel like I'm
// missing something really simple here.
self.runner_fut = Some(Box::pin(async move { run_scenario(fut, config).await }));
}

if let Some(runner) = &mut self.runner_fut {
runner.as_mut().poll(cx)
} else {
unreachable!()
}
}
}

async fn run_scenario(scenario: fn() -> BoxedFut, config: ScenarioConfig) {
async fn run_scenario<T, F>(scenario: T, config: ScenarioConfig)
where
T: Fn() -> F + Send + Sync + 'static + Clone,
F: Future<Output = ()> + Send,
{
match config.kind {
ScenarioKind::Once => scenario().await,
ScenarioKind::Tps(_) => goal_tps::run_tps(scenario, config).await,
Expand Down
11 changes: 8 additions & 3 deletions balter-core/src/scenario/direct.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use super::{BoxedFut, ScenarioConfig};
use super::ScenarioConfig;
use crate::sampling::tps_sampler::TpsSampler;
use humantime::format_duration;
use std::future::Future;
use std::num::NonZeroU32;
#[allow(unused_imports)]
use std::time::{Duration, Instant};
#[allow(unused_imports)]
use tracing::{debug, error, info, instrument, trace, warn, Instrument};

#[instrument(name="scenario", skip_all, fields(name=config.name))]
pub(crate) async fn run_direct(scenario: fn() -> BoxedFut, config: ScenarioConfig) {
pub(crate) async fn run_direct<T, F>(scenario: T, config: ScenarioConfig)
where
T: Fn() -> F + Send + Sync + 'static + Clone,
F: Future<Output = ()> + Send,
{
info!("Running {} with config {:?}", config.name, &config);

let start = Instant::now();
Expand All @@ -22,7 +27,7 @@ pub(crate) async fn run_direct(scenario: fn() -> BoxedFut, config: ScenarioConfi
let sample = sampler.sample_tps().await;

info!(
"Sample: {}TPS, {}/{} ({}), {}",
"Sample: {:.2}TPS, {}/{} ({}), {}",
sample.tps(),
sample.success_count,
sample.error_count,
Expand Down
9 changes: 7 additions & 2 deletions balter-core/src/scenario/goal_tps.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use super::concurrency_controller::ConcurrencyController;
use super::{BoxedFut, ScenarioConfig};
use super::ScenarioConfig;
#[cfg(feature = "rt")]
use crate::runtime::BALTER_OUT;
use crate::sampling::tps_sampler::TpsSampler;
use std::future::Future;
use std::num::NonZeroU32;
#[allow(unused_imports)]
use std::time::{Duration, Instant};
#[allow(unused_imports)]
use tracing::{debug, error, info, instrument, trace, warn, Instrument};

#[instrument(name="scenario", skip_all, fields(name=config.name))]
pub(crate) async fn run_tps(scenario: fn() -> BoxedFut, config: ScenarioConfig) {
pub(crate) async fn run_tps<T, F>(scenario: T, config: ScenarioConfig)
where
T: Fn() -> F + Send + Sync + 'static + Clone,
F: Future<Output = ()> + Send,
{
info!("Running {} with config {:?}", config.name, &config);

let start = Instant::now();
Expand Down
Loading

0 comments on commit 86afddc

Please sign in to comment.