From 86afddc1e6e0565235dba38d80bd7e23a7fd9cff Mon Sep 17 00:00:00 2001 From: Byron Wasti Date: Sat, 3 Feb 2024 17:01:31 -0500 Subject: [PATCH] Refactor Scenario runner to be generic rather than Pin> Upon doing some benchmarks with Pin>, I found that it is _slow_. As in, about an order of magnitude slower than a regular Future. It was originally a shim to get things working, and replacing with generics was always the plan. The types were just annoying at the time. This gets the generic Future integration done, where Pin> is only kept around for the outermost Scenario which is used by the distributed runtime. Unfortunately, the distributed runtime functionality is broken and will have a followup commit to actually fix it. I haven't done a ton of benchmarking yet, but so far it seems there are some *major* performance wins with this change which is exciting. --- balter-core/Cargo.toml | 1 + balter-core/src/lib.rs | 1 + balter-core/src/runtime.rs | 7 +- balter-core/src/sampling/tps_sampler.rs | 42 +++--- balter-core/src/scenario.rs | 127 +++++++++++------- balter-core/src/scenario/direct.rs | 11 +- balter-core/src/scenario/goal_tps.rs | 9 +- balter-core/src/scenario/saturate.rs | 9 +- balter-macros/src/lib.rs | 36 ++--- examples/basic-examples/examples/basic-tps.rs | 2 +- 10 files changed, 138 insertions(+), 107 deletions(-) diff --git a/balter-core/Cargo.toml b/balter-core/Cargo.toml index a63b18f..1138939 100644 --- a/balter-core/Cargo.toml +++ b/balter-core/Cargo.toml @@ -38,6 +38,7 @@ tokio-tungstenite = { version = "0.21.0", optional = true } tower = { version = "0.4.13", optional = true } tower-http = { version = "0.5.0", features = ["trace"], optional = true } url = { version = "2.5.0", optional = true } +async-trait = "0.1.77" [dev-dependencies] rand = { version = "0.8.5", features = ["small_rng"] } diff --git a/balter-core/src/lib.rs b/balter-core/src/lib.rs index a17b7c3..a14da36 100644 --- a/balter-core/src/lib.rs +++ b/balter-core/src/lib.rs @@ -33,6 +33,7 @@ pub use balter_macros::{scenario, transaction}; pub use scenario::Scenario; pub mod prelude { + pub use crate::scenario::ConfigurableScenario; cfg_rt! { pub use crate::runtime::{distributed_slice, BalterRuntime}; } diff --git a/balter-core/src/runtime.rs b/balter-core/src/runtime.rs index 518f885..88f756f 100644 --- a/balter-core/src/runtime.rs +++ b/balter-core/src/runtime.rs @@ -4,7 +4,7 @@ //! involves spinning up an API server and a gossip protocol task. use crate::{ gossip::{gossip_task, GossipData}, - scenario::{Scenario, ScenarioConfig}, + scenario::{ScenarioConfig, DistributedScenario}, server::server_task, }; use async_channel::{bounded, Receiver, Sender}; @@ -29,7 +29,7 @@ lazy_static! { /// function pointer. #[doc(hidden)] #[distributed_slice] -pub static BALTER_SCENARIOS: [(&'static str, fn() -> Scenario)]; +pub static BALTER_SCENARIOS: [(&'static str, fn() -> Box>)]; const DEFAULT_PORT: u16 = 7621; @@ -137,9 +137,10 @@ async fn run(scenarios: HashMap<&'static str, usize>, balter: &BalterRuntime) -> if let Some(idx) = scenarios.get(config.name.as_str()) { info!("Running scenario {}.", &config.name); let scenario = BALTER_SCENARIOS[*idx]; + let fut = scenario.1().set_config(config); tokio::spawn( async move { - scenario.1().set_config(config).await; + fut.await; } .in_current_span(), ); diff --git a/balter-core/src/sampling/tps_sampler.rs b/balter-core/src/sampling/tps_sampler.rs index 87a182c..0af642f 100644 --- a/balter-core/src/sampling/tps_sampler.rs +++ b/balter-core/src/sampling/tps_sampler.rs @@ -1,9 +1,7 @@ -use crate::{ - scenario::BoxedFut, - transaction::{TransactionData, TRANSACTION_HOOK}, -}; +use crate::transaction::{TransactionData, TRANSACTION_HOOK}; use arc_swap::ArcSwap; use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; +use std::future::Future; use std::{ num::NonZeroU32, sync::{ @@ -47,8 +45,8 @@ impl TpsData { } } -pub(crate) struct TpsSampler { - scenario: fn() -> BoxedFut, +pub(crate) struct TpsSampler { + scenario: T, concurrent_count: Arc, limiter: Arc>, tps_limit: NonZeroU32, @@ -61,8 +59,12 @@ pub(crate) struct TpsSampler { error_count: Arc, } -impl TpsSampler { - pub(crate) fn new(scenario: fn() -> BoxedFut, tps_limit: NonZeroU32) -> Self { +impl TpsSampler +where + T: Fn() -> F + Send + Sync + 'static + Clone, + F: Future + Send, +{ + pub(crate) fn new(scenario: T, tps_limit: NonZeroU32) -> Self { let limiter: DefaultDirectRateLimiter = rate_limiter(tps_limit); let limiter: Arc = Arc::new(limiter); let limiter: Arc> = Arc::new(ArcSwap::new(limiter)); @@ -131,7 +133,7 @@ impl TpsSampler { self.tasks.truncate(concurrent_count); } else { while self.tasks.len() < concurrent_count { - let scenario = self.scenario; + let scenario = self.scenario.clone(); let concurrent_count = self.concurrent_count.clone(); let id = self.tasks.len(); let transaction_data = TransactionData { @@ -168,22 +170,18 @@ mod tests { use super::*; use rand_distr::{Distribution, Normal}; - fn mock_trivial_scenario() -> BoxedFut { - Box::pin(async move { - let _ = crate::transaction::transaction_hook::<_, (), ()>(async { Ok(()) }).await; - }) + async fn mock_trivial_scenario() { + let _ = crate::transaction::transaction_hook::<_, (), ()>(async { Ok(()) }).await; } - fn mock_noisy_scenario() -> BoxedFut { - Box::pin(async move { - let _ = crate::transaction::transaction_hook::<_, (), ()>(async { - let normal = Normal::new(100., 25.).unwrap(); - let v: f64 = normal.sample(&mut rand::thread_rng()); - tokio::time::sleep(Duration::from_micros(v.floor() as u64)).await; - Ok(()) - }) - .await; + async fn mock_noisy_scenario() { + let _ = crate::transaction::transaction_hook::<_, (), ()>(async { + let normal = Normal::new(100., 25.).unwrap(); + let v: f64 = normal.sample(&mut rand::thread_rng()); + tokio::time::sleep(Duration::from_micros(v.floor() as u64)).await; + Ok(()) }) + .await; } #[tracing_test::traced_test] diff --git a/balter-core/src/scenario.rs b/balter-core/src/scenario.rs index 64902e5..5a6b440 100644 --- a/balter-core/src/scenario.rs +++ b/balter-core/src/scenario.rs @@ -23,16 +23,12 @@ pub const DEFAULT_SATURATE_ERROR_RATE: f64 = 0.03; /// The default error rate used for `.overload()` pub const DEFAULT_OVERLOAD_ERROR_RATE: f64 = 0.80; -// TODO: We should _not_ need to use a Boxed future! Every single function call for any load -// testing is boxed which *sucks*. Unfortunately I haven't figured out how to appease the Type -// system. -pub(crate) type BoxedFut = Pin + Send>>; - // TODO: Have a separate builder +#[doc(hidden)] #[derive(Clone, Debug)] #[cfg_attr(feature = "rt", cfg_eval::cfg_eval, serde_as)] #[cfg_attr(feature = "rt", derive(Serialize, Deserialize))] -pub(crate) struct ScenarioConfig { +pub struct ScenarioConfig { pub name: String, #[cfg_attr(feature = "rt", serde_as(as = "DurationSeconds"))] pub duration: Duration, @@ -84,9 +80,10 @@ impl ScenarioConfig { } } +#[doc(hidden)] #[derive(Default, Clone, Copy, Debug)] #[cfg_attr(feature = "rt", derive(Serialize, Deserialize))] -pub(crate) enum ScenarioKind { +pub enum ScenarioKind { #[default] Once, Tps(u32), @@ -98,22 +95,83 @@ pub(crate) enum ScenarioKind { /// /// Handler for running scenarios. Not intended for manual creation, use the [`#[scenario]`](balter_macros::scenario) macro which will add these methods to functions. #[pin_project::pin_project] -pub struct Scenario { - fut: fn() -> BoxedFut, +pub struct Scenario { + func: T, runner_fut: Option + Send>>>, config: ScenarioConfig, } -impl Scenario { +impl Scenario { #[doc(hidden)] - pub fn new(name: &str, fut: fn() -> BoxedFut) -> Self { + pub fn new(name: &str, func: T) -> Self { Self { - fut, + func, runner_fut: None, config: ScenarioConfig::new(name), } } +} + +impl Future for Scenario +where + T: Fn() -> F + Send + 'static + Clone + Sync, + F: Future + Send, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.runner_fut.is_none() { + let func = self.func.clone(); + let config = self.config.clone(); + self.runner_fut = Some(Box::pin(async move { run_scenario(func, config).await })); + } + + if let Some(runner) = &mut self.runner_fut { + runner.as_mut().poll(cx) + } else { + unreachable!() + } + } +} +#[doc(hidden)] +pub trait DistributedScenario: Future + Send { + /// The strange type-signature is to facilitate treating this as a trait object for the + /// distributed functionality. + fn set_config( + &self, + config: ScenarioConfig, + ) -> Pin>>; +} + +impl DistributedScenario for Scenario +where + T: Fn() -> F + Send + 'static + Clone + Sync, + F: Future + Send, +{ + #[allow(unused)] + fn set_config( + &self, + _config: ScenarioConfig, + ) -> Pin>> { + unimplemented!() + } +} + +pub trait ConfigurableScenario: Future + Sized + Send { + fn saturate(self) -> Self; + fn overload(self) -> Self; + fn error_rate(self, error_rate: f64) -> Self; + fn tps(self, tps: u32) -> Self; + fn direct(self, tps_limit: u32, concurrency: usize) -> Self; + fn duration(self, duration: Duration) -> Self; +} + +impl ConfigurableScenario<()> for Scenario +where + T: Fn() -> F + Send + 'static + Clone + Sync, + F: Future + Send, +{ /// Run the scenario increasing TPS until an error rate of 3% is reached. /// /// NOTE: Must supply a `.duration()` as well @@ -135,7 +193,7 @@ impl Scenario { /// async fn my_scenario() { /// } /// ``` - pub fn saturate(mut self) -> Self { + fn saturate(mut self) -> Self { self.config.kind = ScenarioKind::Saturate(DEFAULT_SATURATE_ERROR_RATE); self } @@ -161,7 +219,7 @@ impl Scenario { /// async fn my_scenario() { /// } /// ``` - pub fn overload(mut self) -> Self { + fn overload(mut self) -> Self { self.config.kind = ScenarioKind::Saturate(DEFAULT_OVERLOAD_ERROR_RATE); self } @@ -187,7 +245,7 @@ impl Scenario { /// async fn my_scenario() { /// } /// ``` - pub fn error_rate(mut self, error_rate: f64) -> Self { + fn error_rate(mut self, error_rate: f64) -> Self { self.config.kind = ScenarioKind::Saturate(error_rate); self } @@ -213,7 +271,7 @@ impl Scenario { /// async fn my_scenario() { /// } /// ``` - pub fn tps(mut self, tps: u32) -> Self { + fn tps(mut self, tps: u32) -> Self { self.config.kind = ScenarioKind::Tps(tps); self } @@ -221,7 +279,7 @@ impl Scenario { /// Run the scenario with direct control over TPS and concurrency. /// No automatic controls will limit or change any values. This is intended /// for development testing or advanced ussage. - pub fn direct(mut self, tps_limit: u32, concurrency: usize) -> Self { + fn direct(mut self, tps_limit: u32, concurrency: usize) -> Self { self.config.kind = ScenarioKind::Direct(tps_limit, concurrency); self } @@ -247,40 +305,17 @@ impl Scenario { /// async fn my_scenario() { /// } /// ``` - pub fn duration(mut self, duration: Duration) -> Self { + fn duration(mut self, duration: Duration) -> Self { self.config.duration = duration; self } - - #[cfg(feature = "rt")] - pub(crate) fn set_config(mut self, config: ScenarioConfig) -> Self { - self.config = config; - self - } -} - -impl Future for Scenario { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // TODO: Surely there is a cleaner way to do this... - if self.runner_fut.is_none() { - let fut = self.fut; - let config = self.config.clone(); - // TODO: There must be a way to run this future without boxing it. I feel like I'm - // missing something really simple here. - self.runner_fut = Some(Box::pin(async move { run_scenario(fut, config).await })); - } - - if let Some(runner) = &mut self.runner_fut { - runner.as_mut().poll(cx) - } else { - unreachable!() - } - } } -async fn run_scenario(scenario: fn() -> BoxedFut, config: ScenarioConfig) { +async fn run_scenario(scenario: T, config: ScenarioConfig) +where + T: Fn() -> F + Send + Sync + 'static + Clone, + F: Future + Send, +{ match config.kind { ScenarioKind::Once => scenario().await, ScenarioKind::Tps(_) => goal_tps::run_tps(scenario, config).await, diff --git a/balter-core/src/scenario/direct.rs b/balter-core/src/scenario/direct.rs index fcaa0d1..1fa9dfe 100644 --- a/balter-core/src/scenario/direct.rs +++ b/balter-core/src/scenario/direct.rs @@ -1,6 +1,7 @@ -use super::{BoxedFut, ScenarioConfig}; +use super::ScenarioConfig; use crate::sampling::tps_sampler::TpsSampler; use humantime::format_duration; +use std::future::Future; use std::num::NonZeroU32; #[allow(unused_imports)] use std::time::{Duration, Instant}; @@ -8,7 +9,11 @@ use std::time::{Duration, Instant}; use tracing::{debug, error, info, instrument, trace, warn, Instrument}; #[instrument(name="scenario", skip_all, fields(name=config.name))] -pub(crate) async fn run_direct(scenario: fn() -> BoxedFut, config: ScenarioConfig) { +pub(crate) async fn run_direct(scenario: T, config: ScenarioConfig) +where + T: Fn() -> F + Send + Sync + 'static + Clone, + F: Future + Send, +{ info!("Running {} with config {:?}", config.name, &config); let start = Instant::now(); @@ -22,7 +27,7 @@ pub(crate) async fn run_direct(scenario: fn() -> BoxedFut, config: ScenarioConfi let sample = sampler.sample_tps().await; info!( - "Sample: {}TPS, {}/{} ({}), {}", + "Sample: {:.2}TPS, {}/{} ({}), {}", sample.tps(), sample.success_count, sample.error_count, diff --git a/balter-core/src/scenario/goal_tps.rs b/balter-core/src/scenario/goal_tps.rs index 5c96230..753c10c 100644 --- a/balter-core/src/scenario/goal_tps.rs +++ b/balter-core/src/scenario/goal_tps.rs @@ -1,8 +1,9 @@ use super::concurrency_controller::ConcurrencyController; -use super::{BoxedFut, ScenarioConfig}; +use super::ScenarioConfig; #[cfg(feature = "rt")] use crate::runtime::BALTER_OUT; use crate::sampling::tps_sampler::TpsSampler; +use std::future::Future; use std::num::NonZeroU32; #[allow(unused_imports)] use std::time::{Duration, Instant}; @@ -10,7 +11,11 @@ use std::time::{Duration, Instant}; use tracing::{debug, error, info, instrument, trace, warn, Instrument}; #[instrument(name="scenario", skip_all, fields(name=config.name))] -pub(crate) async fn run_tps(scenario: fn() -> BoxedFut, config: ScenarioConfig) { +pub(crate) async fn run_tps(scenario: T, config: ScenarioConfig) +where + T: Fn() -> F + Send + Sync + 'static + Clone, + F: Future + Send, +{ info!("Running {} with config {:?}", config.name, &config); let start = Instant::now(); diff --git a/balter-core/src/scenario/saturate.rs b/balter-core/src/scenario/saturate.rs index 9dc24eb..06de489 100644 --- a/balter-core/src/scenario/saturate.rs +++ b/balter-core/src/scenario/saturate.rs @@ -1,8 +1,9 @@ use super::error_rate_controller::ErrorRateController; -use super::{BoxedFut, ScenarioConfig}; +use super::ScenarioConfig; #[cfg(feature = "rt")] use crate::runtime::BALTER_OUT; use crate::sampling::tps_sampler::TpsSampler; +use std::future::Future; use std::num::NonZeroU32; #[allow(unused_imports)] use std::time::{Duration, Instant}; @@ -10,7 +11,11 @@ use std::time::{Duration, Instant}; use tracing::{debug, error, info, instrument, trace, warn, Instrument}; #[instrument(name="scenario", skip_all, fields(name=config.name))] -pub(crate) async fn run_saturate(scenario: fn() -> BoxedFut, config: ScenarioConfig) { +pub(crate) async fn run_saturate(scenario: T, config: ScenarioConfig) +where + T: Fn() -> F + Send + Sync + 'static + Clone, + F: Future + Send, +{ info!("Running {} with config {:?}", config.name, &config); let start = Instant::now(); diff --git a/balter-macros/src/lib.rs b/balter-macros/src/lib.rs index 6aa313c..02b06e8 100644 --- a/balter-macros/src/lib.rs +++ b/balter-macros/src/lib.rs @@ -82,8 +82,8 @@ pub fn scenario_linkme(attr: TokenStream, item: TokenStream) -> TokenStream { scenario_internal(attr, item, true).into() } -fn scenario_internal(_attr: TokenStream, item: TokenStream, linkme: bool) -> TokenStream2 { - let input = syn::parse::(item).unwrap(); +fn scenario_internal(_attr: TokenStream, item: TokenStream, _linkme: bool) -> TokenStream2 { + let input = syn::parse::(item).expect("Macro only works on fn() items"); let ItemFn { attrs, @@ -102,38 +102,15 @@ fn scenario_internal(_attr: TokenStream, item: TokenStream, linkme: bool) -> Tok scen_sig.asyncness = None; scen_sig.output = syn::parse( quote! { - -> ::balter::scenario::Scenario + -> impl ::balter::scenario::ConfigurableScenario<()> } .into(), ) - .unwrap(); - - let mut inter_sig = sig.clone(); - let inter_name = Ident::new(&format!("__balter_inter_{}", sig.ident), Span::call_site()); - inter_sig.ident = inter_name.clone(); - inter_sig.asyncness = None; - inter_sig.output = syn::parse( - quote! { - -> ::std::pin::Pin + Send>> - } - .into(), - ) - .unwrap(); - - let static_name = Ident::new( - &format!("__BALTER_{}", sig.ident.to_string().to_ascii_uppercase()), - Span::call_site(), - ); + .expect("Scenario signature is invalid"); let res = quote! { #(#attrs)* #vis #scen_sig { - ::balter::scenario::Scenario::new(stringify!(#scen_name), #inter_name) - } - - #(#attrs)* #vis #inter_sig { - Box::pin(async { - #new_name().await - }) + ::balter::scenario::Scenario::new(stringify!(#scen_name), #new_name) } #(#attrs)* #vis #new_sig { @@ -141,6 +118,7 @@ fn scenario_internal(_attr: TokenStream, item: TokenStream, linkme: bool) -> Tok } }; + /* TODO: Uncomment and fix the linkme functionality for the distributed runtime if linkme { let mut linkme = quote! { #[::balter::runtime::distributed_slice(::balter::runtime::BALTER_SCENARIOS)] @@ -152,4 +130,6 @@ fn scenario_internal(_attr: TokenStream, item: TokenStream, linkme: bool) -> Tok } else { res } + */ + res } diff --git a/examples/basic-examples/examples/basic-tps.rs b/examples/basic-examples/examples/basic-tps.rs index e1f2ff3..33559d7 100644 --- a/examples/basic-examples/examples/basic-tps.rs +++ b/examples/basic-examples/examples/basic-tps.rs @@ -10,7 +10,7 @@ async fn main() { .init(); scenario_a() - .tps(20_000) + .tps(5_000) .duration(Duration::from_secs(120)) .await; }