Skip to content

Commit

Permalink
Add basic support for noise to integration tests
Browse files Browse the repository at this point in the history
Based on a bug report, Balter was struggling to achieve 50TPS and was
limiting at a concurrency of 7. Unfortunately recreating this is proving
to be tricky. This commit adds basic support for simulating noise and
shows that in this case Balter behaves correctly.

Unfortunately still not recreating #8
  • Loading branch information
byronwasti committed May 6, 2024
1 parent a01d052 commit 4ea1575
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ publish EXECUTE='':
cargo release --exclude balter-tests --exclude mock-service --exclude examples {{EXECUTE}}

integration TEST='':
cargo test --release --features integration {{TEST}} -- --nocapture --test-threads 1
cargo test --release --features integration {{TEST}} -- --nocapture
1 change: 1 addition & 0 deletions balter-runtime/src/gossip/interchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use std::future::Future;
use tokio::io::{AsyncRead, AsyncWrite};

#[allow(dead_code)]
#[trait_variant::make(GossipStream: Send)]
pub trait LocalGossipStream {
async fn recv_bytes(&mut self) -> Option<Result<Vec<u8>, GossipError>>;
Expand Down
2 changes: 2 additions & 0 deletions mock-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ tracing = "0.1.40"
serde = { version = "1.0.197", features = ["derive"] }
tower-http = { version="0.5.2", features = ["trace"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
rand_distr = "0.4.3"
rand = "0.8.5"
12 changes: 12 additions & 0 deletions mock-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use lazy_static::lazy_static;
#[allow(unused)]
use metrics::{counter, gauge, histogram};
use rand_distr::{Distribution, SkewNormal};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
Expand Down Expand Up @@ -77,6 +78,7 @@ pub struct LatencyConfig {
pub enum LatencyKind {
Delay,
Linear(NonZeroU32),
Noise(Duration, f64),
//Exponential(NonZeroU32),
//Cutoff(NonZeroU32),
}
Expand Down Expand Up @@ -147,6 +149,16 @@ pub async fn mock_route(Json(config): Json<Config>) -> Result<(), StatusCode> {
histogram!(format!("mock-server.{}.latency", &config.scenario_name))
.record(latency_conf.latency.as_secs_f64());
}
LatencyKind::Noise(std, shape) => {
let skew_normal =
SkewNormal::new(latency_conf.latency.as_secs_f64(), std.as_secs_f64(), shape)
.unwrap();
let v: f64 = skew_normal.sample(&mut rand::thread_rng());

tokio::time::sleep(Duration::from_secs_f64(v)).await;
histogram!(format!("mock-server.{}.latency", &config.scenario_name))
.record(latency_conf.latency.as_secs_f64());
}
LatencyKind::Linear(latency_tps) => {
let avg_tps = state.avg_tps.load(Ordering::Relaxed);

Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel="1.76.0"
channel="stable"
components = ["rustfmt", "clippy", "rust-analyzer"]
80 changes: 66 additions & 14 deletions tests/tests/isolated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use utils::*;
#[cfg(feature = "integration")]
mod tests {
use super::*;

use balter::prelude::*;
use mock_service::prelude::*;
use reqwest::Client;
use std::sync::OnceLock;
use std::time::Duration;
Expand All @@ -22,14 +22,78 @@ mod tests {

let stats = scenario_1ms_delay()
.tps(10_000)
.duration(Duration::from_secs(30))
.duration(Duration::from_secs(360))
.await;

assert_eq!(stats.goal_tps, 10_000);
assert!(stats.actual_tps > 9_500.);
assert!(stats.concurrency >= 10);
}

#[scenario]
async fn scenario_1ms_delay() {
let client = Client::new();
loop {
let _ = transaction_1ms(&client).await;
}
}

#[transaction]
async fn transaction_1ms(client: &Client) -> Result<(), reqwest::Error> {
let _res = client
.get("http://0.0.0.0:3002/")
.json(&Config {
scenario_name: "tps_isolated".to_string(),
tps: None,
latency: Some(LatencyConfig {
latency: Duration::from_millis(1),
kind: LatencyKind::Delay,
}),
})
.send()
.await?;
Ok(())
}

#[tokio::test]
async fn single_instance_noisy_tps() {
init().await;

let stats = scenario_1ms_noisy_delay()
.tps(50)
.duration(Duration::from_secs(80))
.await;

assert_eq!(stats.goal_tps, 50);
assert!(stats.actual_tps > 45.);
assert!(stats.concurrency >= 10);
}

#[scenario]
async fn scenario_1ms_noisy_delay() {
let client = Client::new();
loop {
let _ = transaction_noisy_1ms(&client).await;
}
}

#[transaction]
async fn transaction_noisy_1ms(client: &Client) -> Result<(), reqwest::Error> {
let _res = client
.get("http://0.0.0.0:3002/")
.json(&Config {
scenario_name: "tps_isolated".to_string(),
tps: None,
latency: Some(LatencyConfig {
latency: Duration::from_millis(400),
kind: LatencyKind::Noise(Duration::from_millis(300), 50.),
}),
})
.send()
.await?;
Ok(())
}

#[tokio::test]
async fn single_instance_limited_tps() {
init().await;
Expand Down Expand Up @@ -64,18 +128,6 @@ mod tests {

static CLIENT: OnceLock<Client> = OnceLock::new();

#[scenario]
async fn scenario_1ms_delay() {
let _ = transaction_1ms().await;
}

#[transaction]
async fn transaction_1ms() -> Result<(), reqwest::Error> {
let client = CLIENT.get_or_init(Client::new);
client.get("http://0.0.0.0:3002/delay/ms/1").send().await?;
Ok(())
}

#[scenario]
async fn scenario_1ms_limited_7000() {
let _ = transaction_1ms_limited_7000().await;
Expand Down

0 comments on commit 4ea1575

Please sign in to comment.