Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add dedupe benchmark #3071

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
50 changes: 50 additions & 0 deletions benches/dedupe_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use criterion::Criterion;
use futures_util::future::join_all;
use tailcall::core::data_loader::DedupeResult;
use tokio::runtime::Builder;

pub fn benchmark_dedupe(c: &mut Criterion) {
c.bench_function("dedupe concurrent access with thread pool", |b| {
b.iter(|| {
// Create a Tokio runtime with a thread pool of 5 threads
let rt = Builder::new_multi_thread()
.worker_threads(5)
.enable_all()
.build()
.unwrap();

rt.block_on(async {
let cache = Arc::new(DedupeResult::<u64, String, ()>::new(false));
let key = 1;
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();

for _ in 0..1000 {
let cache = cache.clone();

Check warning on line 26 in benches/dedupe_bench.rs

View workflow job for this annotation

GitHub Actions / Run Formatter and Lint Check

Diff in /home/runner/work/tailcall/tailcall/benches/dedupe_bench.rs

Check warning on line 26 in benches/dedupe_bench.rs

View workflow job for this annotation

GitHub Actions / Run Formatter and Lint Check

Diff in /home/runner/work/tailcall/tailcall/benches/dedupe_bench.rs
let counter = counter.clone();
let handle = tokio::spawn(async move {

cache
.dedupe(&key, || Box::pin(compute_value(counter)))
.await
});
handles.push(handle);
}

let results = join_all(handles).await;
let all_ok = results.into_iter().all(|r| r.unwrap().is_ok());

assert!(all_ok);
});
});
});
}

async fn compute_value(counter: Arc<AtomicUsize>) -> Result<String, ()> {
counter.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
Ok(format!("value_{}", counter.load(Ordering::SeqCst)))
}
2 changes: 2 additions & 0 deletions benches/tailcall_benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};

mod bench_synth;
mod data_loader_bench;
mod dedupe_bench;
mod from_json_bench;
mod handle_request_bench;
mod http_execute_bench;
Expand All @@ -22,6 +23,7 @@ fn all_benchmarks(c: &mut Criterion) {
from_json_bench::benchmark_from_json_method(c);
bench_synth::bench_synth_nested(c);
bench_synth::bench_synth_nested_borrow(c);
dedupe_bench::benchmark_dedupe(c);
}

criterion_group! {
Expand Down
4 changes: 2 additions & 2 deletions ci-benchmark/benchmark.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ schema @server(port: 8000, hostname: "0.0.0.0") {
query: Query
}

type Query @cache(maxAge: 30000) {
posts: [Post] @http(url: "http://jsonplaceholder.typicode.com/posts")
type Query {
posts: [Post] @http(url: "http://jsonplaceholder.typicode.com/posts", dedupe: true)
}

type User {
Expand Down
2 changes: 1 addition & 1 deletion ci-benchmark/nginx-benchmark.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ schema
}

type Query {
posts: [Post] @http(url: "http://jsonplaceholder.typicode.com/posts")
posts: [Post] @http(url: "http://jsonplaceholder.typicode.com/posts", dedupe: true)
}

type User {
Expand Down
70 changes: 44 additions & 26 deletions src/core/data_loader/dedupe.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Weak};
use std::thread::ThreadId;

use dashmap::DashMap;
use futures_util::Future;
use tokio::sync::broadcast;

Expand All @@ -15,7 +17,7 @@ impl<A: Send + Sync + Clone> Value for A {}
/// Allows deduplication of async operations based on a key.
pub struct Dedupe<Key, Value> {
/// Cache storage for the operations.
cache: Arc<Mutex<HashMap<Key, State<Value>>>>,
cache: Arc<DashMap<std::thread::ThreadId, HashMap<Key, State<Value>>>>,
/// Initial size of the multi-producer, multi-consumer channel.
size: usize,
/// When enabled allows the operations to be cached forever.
Expand Down Expand Up @@ -48,16 +50,17 @@ enum Step<Value> {

impl<K: Key, V: Value> Dedupe<K, V> {
pub fn new(size: usize, persist: bool) -> Self {
Self { cache: Arc::new(Mutex::new(HashMap::new())), size, persist }
Self { cache: Arc::new(DashMap::new()), size, persist }
}

pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> V
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = V>,
{
let thread_id = std::thread::current().id();
loop {
let value = match self.step(key) {
let value = match self.step(key, &thread_id) {
Step::Return(value) => value,
Step::Await(mut rx) => match rx.recv().await {
Ok(value) => value,
Expand All @@ -71,12 +74,20 @@ impl<K: Key, V: Value> Dedupe<K, V> {
},
Step::Init(tx) => {
let value = or_else().await;
let mut guard = self.cache.lock().unwrap();
if self.persist {
guard.insert(key.to_owned(), State::Ready(value.clone()));
} else {
guard.remove(key);
let mut is_empty = false;
if let Some(mut inner_map) = self.cache.get_mut(&thread_id) {
if self.persist {
inner_map.insert(key.to_owned(), State::Ready(value.clone()));
} else {
inner_map.remove(key);
}
is_empty = inner_map.is_empty();
}

if is_empty {
self.cache.remove(&thread_id);
}

let _ = tx.send(value.clone());
value
}
Expand All @@ -86,30 +97,36 @@ impl<K: Key, V: Value> Dedupe<K, V> {
}
}

fn step(&self, key: &K) -> Step<V> {
let mut this = self.cache.lock().unwrap();

if let Some(state) = this.get(key) {
match state {
State::Ready(value) => return Step::Return(value.clone()),
State::Pending(tx) => {
// We can upgrade from Weak to Arc only in case when
// original tx is still alive
// otherwise we will create in the code below
if let Some(tx) = tx.upgrade() {
return Step::Await(tx.subscribe());
fn step(&self, key: &K, thread_id: &ThreadId) -> Step<V> {
// Fast path: Try read-only access first
if let Some(inner_data) = self.cache.get(thread_id) {
if let Some(state) = inner_data.get(key) {
match state {
State::Ready(value) => return Step::Return(value.clone()),
State::Pending(tx) => {
if let Some(tx) = tx.upgrade() {
return Step::Await(tx.subscribe());
}
}
}
}
}

self.initialize_cache(thread_id, key)
}

fn initialize_cache(&self, thread_id: &std::thread::ThreadId, key: &K) -> Step<V> {
let (tx, _) = broadcast::channel(self.size);
let tx = Arc::new(tx);
// Store a Weak version of tx and pass actual tx to further handling
// to control if tx is still alive and will be able to handle the request.
// Only single `strong` reference to tx should exist so we can
// understand when the execution is still alive and we'll get the response
this.insert(key.to_owned(), State::Pending(Arc::downgrade(&tx)));

if let Some(mut inner_data) = self.cache.get_mut(thread_id) {
inner_data.insert(key.to_owned(), State::Pending(Arc::downgrade(&tx)));
return Step::Init(tx);
}

let mut local_map = HashMap::default(); // Pre-allocate with reasonable size
local_map.insert(key.to_owned(), State::Pending(Arc::downgrade(&tx)));
self.cache.insert(thread_id.to_owned(), local_map);
Step::Init(tx)
}
}
Expand All @@ -136,6 +153,7 @@ impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
mod tests {
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::Duration;

use assert_eq;
Expand Down
Loading