From 4ea157588ae837c48035c29799957cb893077143 Mon Sep 17 00:00:00 2001 From: Byron Wasti Date: Mon, 6 May 2024 10:23:58 -0400 Subject: [PATCH] Add basic support for noise to integration tests Based on a bug report, Balter was struggling to achieve 50TPS and was limiting at a concurrency of 7. Unfortunately recreating this is proving to be tricky. This commit adds basic support for simulating noise and shows that in this case Balter behaves correctly. Unfortunately still not recreating https://github.com/BalterLoadTesting/balter/issues/8 --- Justfile | 2 +- balter-runtime/src/gossip/interchange.rs | 1 + mock-service/Cargo.toml | 2 + mock-service/src/lib.rs | 12 ++++ rust-toolchain.toml | 2 +- tests/tests/isolated.rs | 80 +++++++++++++++++++----- 6 files changed, 83 insertions(+), 16 deletions(-) diff --git a/Justfile b/Justfile index 03a6081..9f49705 100644 --- a/Justfile +++ b/Justfile @@ -14,4 +14,4 @@ publish EXECUTE='': cargo release --exclude balter-tests --exclude mock-service --exclude examples {{EXECUTE}} integration TEST='': - cargo test --release --features integration {{TEST}} -- --nocapture --test-threads 1 + cargo test --release --features integration {{TEST}} -- --nocapture diff --git a/balter-runtime/src/gossip/interchange.rs b/balter-runtime/src/gossip/interchange.rs index 96904f6..cafb7c7 100644 --- a/balter-runtime/src/gossip/interchange.rs +++ b/balter-runtime/src/gossip/interchange.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use std::future::Future; use tokio::io::{AsyncRead, AsyncWrite}; +#[allow(dead_code)] #[trait_variant::make(GossipStream: Send)] pub trait LocalGossipStream { async fn recv_bytes(&mut self) -> Option, GossipError>>; diff --git a/mock-service/Cargo.toml b/mock-service/Cargo.toml index a1dcb02..d99c059 100644 --- a/mock-service/Cargo.toml +++ b/mock-service/Cargo.toml @@ -18,3 +18,5 @@ tracing = "0.1.40" serde = { version = "1.0.197", features = ["derive"] } tower-http = { version="0.5.2", features = ["trace"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +rand_distr = "0.4.3" +rand = "0.8.5" diff --git a/mock-service/src/lib.rs b/mock-service/src/lib.rs index 5260a07..0dc486b 100644 --- a/mock-service/src/lib.rs +++ b/mock-service/src/lib.rs @@ -9,6 +9,7 @@ use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; use lazy_static::lazy_static; #[allow(unused)] use metrics::{counter, gauge, histogram}; +use rand_distr::{Distribution, SkewNormal}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::net::SocketAddr; @@ -77,6 +78,7 @@ pub struct LatencyConfig { pub enum LatencyKind { Delay, Linear(NonZeroU32), + Noise(Duration, f64), //Exponential(NonZeroU32), //Cutoff(NonZeroU32), } @@ -147,6 +149,16 @@ pub async fn mock_route(Json(config): Json) -> Result<(), StatusCode> { histogram!(format!("mock-server.{}.latency", &config.scenario_name)) .record(latency_conf.latency.as_secs_f64()); } + LatencyKind::Noise(std, shape) => { + let skew_normal = + SkewNormal::new(latency_conf.latency.as_secs_f64(), std.as_secs_f64(), shape) + .unwrap(); + let v: f64 = skew_normal.sample(&mut rand::thread_rng()); + + tokio::time::sleep(Duration::from_secs_f64(v)).await; + histogram!(format!("mock-server.{}.latency", &config.scenario_name)) + .record(latency_conf.latency.as_secs_f64()); + } LatencyKind::Linear(latency_tps) => { let avg_tps = state.avg_tps.load(Ordering::Relaxed); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 0571309..d0765fc 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel="1.76.0" +channel="stable" components = ["rustfmt", "clippy", "rust-analyzer"] diff --git a/tests/tests/isolated.rs b/tests/tests/isolated.rs index 163aca6..de385f2 100644 --- a/tests/tests/isolated.rs +++ b/tests/tests/isolated.rs @@ -5,8 +5,8 @@ use utils::*; #[cfg(feature = "integration")] mod tests { use super::*; - use balter::prelude::*; + use mock_service::prelude::*; use reqwest::Client; use std::sync::OnceLock; use std::time::Duration; @@ -22,7 +22,7 @@ mod tests { let stats = scenario_1ms_delay() .tps(10_000) - .duration(Duration::from_secs(30)) + .duration(Duration::from_secs(360)) .await; assert_eq!(stats.goal_tps, 10_000); @@ -30,6 +30,70 @@ mod tests { assert!(stats.concurrency >= 10); } + #[scenario] + async fn scenario_1ms_delay() { + let client = Client::new(); + loop { + let _ = transaction_1ms(&client).await; + } + } + + #[transaction] + async fn transaction_1ms(client: &Client) -> Result<(), reqwest::Error> { + let _res = client + .get("http://0.0.0.0:3002/") + .json(&Config { + scenario_name: "tps_isolated".to_string(), + tps: None, + latency: Some(LatencyConfig { + latency: Duration::from_millis(1), + kind: LatencyKind::Delay, + }), + }) + .send() + .await?; + Ok(()) + } + + #[tokio::test] + async fn single_instance_noisy_tps() { + init().await; + + let stats = scenario_1ms_noisy_delay() + .tps(50) + .duration(Duration::from_secs(80)) + .await; + + assert_eq!(stats.goal_tps, 50); + assert!(stats.actual_tps > 45.); + assert!(stats.concurrency >= 10); + } + + #[scenario] + async fn scenario_1ms_noisy_delay() { + let client = Client::new(); + loop { + let _ = transaction_noisy_1ms(&client).await; + } + } + + #[transaction] + async fn transaction_noisy_1ms(client: &Client) -> Result<(), reqwest::Error> { + let _res = client + .get("http://0.0.0.0:3002/") + .json(&Config { + scenario_name: "tps_isolated".to_string(), + tps: None, + latency: Some(LatencyConfig { + latency: Duration::from_millis(400), + kind: LatencyKind::Noise(Duration::from_millis(300), 50.), + }), + }) + .send() + .await?; + Ok(()) + } + #[tokio::test] async fn single_instance_limited_tps() { init().await; @@ -64,18 +128,6 @@ mod tests { static CLIENT: OnceLock = OnceLock::new(); - #[scenario] - async fn scenario_1ms_delay() { - let _ = transaction_1ms().await; - } - - #[transaction] - async fn transaction_1ms() -> Result<(), reqwest::Error> { - let client = CLIENT.get_or_init(Client::new); - client.get("http://0.0.0.0:3002/delay/ms/1").send().await?; - Ok(()) - } - #[scenario] async fn scenario_1ms_limited_7000() { let _ = transaction_1ms_limited_7000().await;