Skip to content

Commit

Permalink
crud-bench tokio runtime improvement (#15)
Browse files Browse the repository at this point in the history
Co-authored-by: Tobie Morgan Hitchcock <[email protected]>
  • Loading branch information
emmanuel-keller and tobiemh authored Sep 26, 2024
1 parent 62edb05 commit 4ee55e2
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 178 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/crud-bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -54,7 +54,7 @@ jobs:
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -74,7 +74,7 @@ jobs:
steps:
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -90,11 +90,11 @@ jobs:

build:
name: Build crud-bench
runs-on: [runner-amd64-2xlarge-private]
runs-on: [ runner-amd64-2xlarge-private ]
steps:
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -120,7 +120,7 @@ jobs:
benchmark:
name: Benchmark ${{ matrix.description }}
needs: build
runs-on: [runner-amd64-2xlarge-private]
runs-on: [ runner-amd64-2xlarge-private ]
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -158,7 +158,7 @@ jobs:
steps:
- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable

- name: Checkout sources
uses: actions/checkout@v4

Expand All @@ -179,6 +179,6 @@ jobs:

- name: Run benchmarks (100,000 samples / 32 threads)
run: ${{ github.workspace }}/artifacts/crud-bench -d ${{ matrix.database }} -s 100000 -t 32

- name: Run benchmarks (250,000 samples / 128 threads)
run: ${{ github.workspace }}/artifacts/crud-bench -d ${{ matrix.database }} -s 250000 -t 128
3 changes: 2 additions & 1 deletion crud-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ codegen-units = 1
[dependencies]
anyhow = "1.0.88"
clap = { version = "4.5.17", features = ["derive"] }
dashmap = "6.1.0"
env_logger = "0.11.5"
log = "0.4.22"
num_cpus = "1.16.0"
mongodb = { version = "2.8.2", optional = true }
rand = { version = "0.8.5", features = ["small_rng"] }
rayon = "1.10.0"
redb = { version = "2.1.3", optional = true }
redis = { version = "0.24.0", features = ["tokio-comp"], optional = true }
rocksdb = { git = "https://github.com/surrealdb/rust-rocksdb", features = ["lz4", "snappy"], optional = true }
Expand Down
5 changes: 4 additions & 1 deletion crud-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

## Purpose

The goal of this benchmark is for developers working on features in SurrealDB to assess their impact on CRUD performance.
The goal of this benchmark is for developers working on features in SurrealDB to assess their impact on CRUD
performance.

E.g.:

- Testing a new operator
- Work on indexes
- Work on query planner and execution plan
Expand Down Expand Up @@ -39,6 +41,7 @@ Options:
-d, --database <DATABASE> Database [possible values: dry, surrealdb, surrealdb-memory, surrealdb-rocksdb, surrealdb-surrealkv, mongodb, postgresql]
-s, --samples <SAMPLES> Number of samples
-t, --threads <THREADS> Number of concurrent threads
-w, --workers <WORKERS> Number of workers for the client async runtime (tokio). By default the number of logical CPUs.
-h, --help Print help
```

Expand Down
194 changes: 111 additions & 83 deletions crud-bench/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::io;
use std::io::Write;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};

use crate::Args;
use anyhow::{bail, Result};
use log::{error, info, warn};
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use rayon::scope;
use serde::{Deserialize, Serialize};
use tokio::runtime::Builder;
use tokio::task;
use tokio::time::sleep;

use crate::Args;

pub(crate) struct Benchmark {
threads: usize,
samples: i32,
Expand Down Expand Up @@ -51,8 +50,8 @@ impl Benchmark {

pub(crate) async fn wait_for_client<C, P>(&self, engine: &P) -> Result<C>
where
C: BenchmarkClient + Send,
P: BenchmarkEngine<C> + Send + Sync,
C: BenchmarkClient,
P: BenchmarkEngine<C>,
{
sleep(Duration::from_secs(2)).await;
let start = SystemTime::now();
Expand All @@ -69,30 +68,30 @@ impl Benchmark {

pub(crate) async fn run<C, P>(&self, engine: P) -> Result<BenchmarkResult>
where
C: BenchmarkClient + Send,
P: BenchmarkEngine<C> + Send + Sync,
C: BenchmarkClient,
P: BenchmarkEngine<C>,
{
// Start the client
self.wait_for_client(&engine).await?.startup().await?;

// Run the "creates" benchmark
info!("Start creates benchmark");
let creates = self.run_operation(&engine, BenchmarkOperation::Create)?;
let creates = self.run_operation(&engine, BenchmarkOperation::Create).await?;
info!("Creates benchmark done");

// Run the "reads" benchmark
info!("Start reads benchmark");
let reads = self.run_operation(&engine, BenchmarkOperation::Read)?;
let reads = self.run_operation(&engine, BenchmarkOperation::Read).await?;
info!("Reads benchmark done");

// Run the "reads" benchmark
info!("Start updates benchmark");
let updates = self.run_operation(&engine, BenchmarkOperation::Update)?;
let updates = self.run_operation(&engine, BenchmarkOperation::Update).await?;
info!("Reads benchmark done");

// Run the "deletes" benchmark
info!("Start deletes benchmark");
let deletes = self.run_operation(&engine, BenchmarkOperation::Delete)?;
let deletes = self.run_operation(&engine, BenchmarkOperation::Delete).await?;
info!("Deletes benchmark done");

Ok(BenchmarkResult {
Expand All @@ -103,81 +102,110 @@ impl Benchmark {
})
}

fn run_operation<C, P>(&self, engine: &P, operation: BenchmarkOperation) -> Result<Duration>
async fn run_operation<C, P>(
&self,
engine: &P,
operation: BenchmarkOperation,
) -> Result<Duration>
where
C: BenchmarkClient + Send,
C: BenchmarkClient,
P: BenchmarkEngine<C> + Send + Sync,
{
let error = Arc::new(AtomicBool::new(false));
let time = Instant::now();
let percent = Arc::new(AtomicU8::new(0));
print!("\r{operation:?} 0%");
scope(|s| {
let current = Arc::new(AtomicI32::new(0));
for thread_number in 0..self.threads {
let current = current.clone();
let error = error.clone();
let percent = percent.clone();
s.spawn(move |_| {
let mut record_provider = RecordProvider::default();
let runtime = Builder::new_multi_thread()
.thread_stack_size(10 * 1024 * 1024) // Set stack size to 10MiB
.worker_threads(4) // Set the number of worker threads
.enable_all() // Enables all runtime features, including I/O and time
.build()
.expect("Failed to create a runtime");
if let Err(e) = runtime.block_on(async {
info!("Thread #{thread_number}/{operation:?} starts");
let mut client = engine.create_client(self.endpoint.to_owned()).await?;
while !error.load(Ordering::Relaxed) {
let sample = current.fetch_add(1, Ordering::Relaxed);
if sample >= self.samples {
break;
}
// Calculate and print out the percents
{
let new_percent = if sample == 0 {
0u8
} else {
(sample * 20 / self.samples) as u8
};
let old_percent = percent.load(Ordering::Relaxed);
if new_percent > old_percent {
percent.store(new_percent, Ordering::Relaxed);
print!("\r{operation:?} {}%", new_percent * 5);
io::stdout().flush()?;
}
}
match operation {
BenchmarkOperation::Read => client.read(sample).await?,
BenchmarkOperation::Create => {
let record = record_provider.sample();
client.create(sample, record).await?;
}
BenchmarkOperation::Update => {
let record = record_provider.sample();
client.update(sample, record).await?;
}
BenchmarkOperation::Delete => client.delete(sample).await?,
}
}
client.shutdown().await?;
info!("Thread #{thread_number}/{operation:?} ends");
Ok::<(), anyhow::Error>(())
}) {
error!("{}", e);
error.store(true, Ordering::Relaxed);
}
});

let current = Arc::new(AtomicI32::new(0));

let mut futures = Vec::with_capacity(self.threads);

// start the threads
for thread_number in 0..self.threads {
let current = current.clone();
let error = error.clone();
let percent = percent.clone();
let samples = self.samples;
let client = engine.create_client(self.endpoint.clone()).await?;
let f = task::spawn(async move {
info!("Thread #{thread_number}/{operation:?} starts");
if let Err(e) =
Self::operation_loop(client, samples, &error, &current, &percent, operation)
.await
{
error!("{e}");
error.store(true, Ordering::Relaxed);
}
info!("Thread #{thread_number}/{operation:?} ends");
});
futures.push(f);
}

// Wait for threads to be done
for f in futures {
if let Err(e) = f.await {
{
error!("{e}");
error.store(true, Ordering::Relaxed);
}
}
});
println!("\r{operation:?} 100%");
io::stdout().flush()?;
}

if error.load(Ordering::Relaxed) {
bail!("Benchmark error");
}
println!("\r{operation:?} 100%");
io::stdout().flush()?;
Ok(time.elapsed())
}

async fn operation_loop<C>(
mut client: C,
samples: i32,
error: &AtomicBool,
current: &AtomicI32,
percent: &AtomicU8,
operation: BenchmarkOperation,
) -> Result<()>
where
C: BenchmarkClient,
{
let mut record_provider = RecordProvider::default();
while !error.load(Ordering::Relaxed) {
let sample = current.fetch_add(1, Ordering::Relaxed);
if sample >= samples {
break;
}
// Calculate and print out the percents
{
let new_percent = if sample == 0 {
0u8
} else {
(sample * 20 / samples) as u8
};
let old_percent = percent.load(Ordering::Relaxed);
if new_percent > old_percent {
percent.store(new_percent, Ordering::Relaxed);
print!("\r{operation:?} {}%", new_percent * 5);
io::stdout().flush()?;
}
}
match operation {
BenchmarkOperation::Read => client.read(sample).await?,
BenchmarkOperation::Create => {
let record = record_provider.sample();
client.create(sample, record).await?;
}
BenchmarkOperation::Update => {
let record = record_provider.sample();
client.update(sample, record).await?;
}
BenchmarkOperation::Delete => client.delete(sample).await?,
}
}
client.shutdown().await?;
Ok::<(), anyhow::Error>(())
}
}

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -222,28 +250,28 @@ impl RecordProvider {
}
}

pub(crate) trait BenchmarkEngine<C>
pub(crate) trait BenchmarkEngine<C>: Send + Sync
where
C: BenchmarkClient,
{
async fn create_client(&self, endpoint: Option<String>) -> Result<C>;
fn create_client(&self, endpoint: Option<String>) -> impl Future<Output = Result<C>> + Send;
}

pub(crate) trait BenchmarkClient {
pub(crate) trait BenchmarkClient: Send + 'static {
/// Initialise the store at startup
async fn startup(&mut self) -> Result<()> {
Ok(())
}
/// Cleanup the store at shutdown
async fn shutdown(&mut self) -> Result<()> {
Ok(())
fn shutdown(&mut self) -> impl Future<Output = Result<()>> + Send {
async { Ok(()) }
}
/// Create a record at a key
async fn create(&mut self, key: i32, record: &Record) -> Result<()>;
fn create(&mut self, key: i32, record: &Record) -> impl Future<Output = Result<()>> + Send;
/// Read a record at a key
async fn read(&mut self, key: i32) -> Result<()>;
fn read(&mut self, key: i32) -> impl Future<Output = Result<()>> + Send;
/// Update a record at a key
async fn update(&mut self, key: i32, record: &Record) -> Result<()>;
fn update(&mut self, key: i32, record: &Record) -> impl Future<Output = Result<()>> + Send;
/// Delete a record at a key
async fn delete(&mut self, key: i32) -> Result<()>;
fn delete(&mut self, key: i32) -> impl Future<Output = Result<()>> + Send;
}
Loading

0 comments on commit 4ee55e2

Please sign in to comment.