Skip to content

Commit

Permalink
feat: make dedupe faster at I/O level
Browse files Browse the repository at this point in the history
  • Loading branch information
alpha-ulrich committed Jan 3, 2025
1 parent e3a6026 commit d3850c3
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 36 deletions.
60 changes: 34 additions & 26 deletions src/core/data_loader/dedupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use tokio::sync::broadcast;
pub trait Key: Send + Sync + Eq + Hash + Clone {}
impl<A: Send + Sync + Eq + Hash + Clone> Key for A {}

pub trait Value: Send + Sync + Clone {}
impl<A: Send + Sync + Clone> Value for A {}
pub trait Value: Send + Sync {}
impl<A: Send + Sync> Value for A {}

///
/// Allows deduplication of async operations based on a key.
Expand All @@ -25,36 +25,36 @@ pub struct Dedupe<Key, Value> {
/// Represents the current state of the operation.
enum State<Value> {
/// Means that the operation has been executed and the result is stored.
Ready(Value),
Ready(Arc<Value>),

/// Means that the operation is in progress and the result can be sent via
/// the stored sender whenever it's available in the future.
Pending(Weak<broadcast::Sender<Value>>),
Pending(Weak<broadcast::Sender<Arc<Value>>>),
}

/// Represents the next steps
enum Step<Value> {
/// The operation has been executed and the result must be returned.
Return(Value),
Return(Arc<Value>),

/// The operation is in progress and the result must be awaited on the
/// receiver.
Await(broadcast::Receiver<Value>),
Await(broadcast::Receiver<Arc<Value>>),

/// The operation needs to be executed and the result needs to be sent to
/// the provided sender.
Init(Arc<broadcast::Sender<Value>>),
Init(Arc<broadcast::Sender<Arc<Value>>>),
}

impl<K: Key, V: Value> Dedupe<K, V> {
pub fn new(size: usize, persist: bool) -> Self {
Self { cache: Arc::new(Mutex::new(HashMap::new())), size, persist }
}

pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> V
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Arc<V>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = V>,
Fut: Future<Output = Arc<V>>,
{
loop {
let value = match self.step(key) {
Expand Down Expand Up @@ -123,10 +123,10 @@ impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
}

impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Result<V, E>
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Arc<Result<V, E>>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = Result<V, E>>,
Fut: Future<Output = Arc<Result<V, E>>>,
{
self.0.dedupe(key, or_else).await
}
Expand All @@ -147,29 +147,31 @@ mod tests {
#[tokio::test]
async fn test_no_key() {
let cache = Arc::new(Dedupe::<u64, u64>::new(1000, true));
let actual = cache.dedupe(&1, || Box::pin(async { 1 })).await;
assert_eq!(actual, 1);
let actual = cache.dedupe(&1, || Box::pin(async { Arc::new(1) })).await;
assert_eq!(*actual, 1);
}

#[tokio::test]
async fn test_with_key() {
let cache = Arc::new(Dedupe::<u64, u64>::new(1000, true));
cache.dedupe(&1, || Box::pin(async { 1 })).await;
cache.dedupe(&1, || Box::pin(async { Arc::new(1) })).await;

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 1);
let actual = cache.dedupe(&1, || Box::pin(async { Arc::new(2) })).await;
assert_eq!(*actual, 1);
}

#[tokio::test]
async fn test_with_multi_get() {
let cache = Arc::new(Dedupe::<u64, u64>::new(1000, true));

for i in 0..100 {
cache.dedupe(&1, || Box::pin(async move { i })).await;
cache
.dedupe(&1, || Box::pin(async move { Arc::new(i) }))
.await;
}

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 0);
let actual = cache.dedupe(&1, || Box::pin(async { Arc::new(2) })).await;
assert_eq!(*actual, 0);
}

#[tokio::test]
Expand All @@ -179,24 +181,24 @@ mod tests {
let a = cache.dedupe(&1, || {
Box::pin(async move {
sleep(Duration::from_millis(1)).await;
1
Arc::new(1)
})
});
let b = cache.dedupe(&1, || {
Box::pin(async move {
sleep(Duration::from_millis(1)).await;
2
Arc::new(2)
})
});
let (a, b) = join!(a, b);

assert_eq!(a, b);
}

async fn compute_value(counter: Arc<AtomicUsize>) -> String {
async fn compute_value(counter: Arc<AtomicUsize>) -> Arc<String> {
counter.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(1)).await;
format!("value_{}", counter.load(Ordering::SeqCst))
Arc::new(format!("value_{}", counter.load(Ordering::SeqCst)))
}

#[tokio::test(worker_threads = 16, flavor = "multi_thread")]
Expand Down Expand Up @@ -237,6 +239,7 @@ mod tests {

let task = cache.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
Arc::new(())
});

// drops the task since the underlying sleep timeout is higher than the
Expand All @@ -249,6 +252,7 @@ mod tests {
cache
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
Arc::new(())
})
.await;
}
Expand All @@ -263,7 +267,7 @@ mod tests {
cache_1
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
100
Arc::new(100)
})
.await
});
Expand All @@ -272,7 +276,7 @@ mod tests {
cache_2
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
200
Arc::new(200)
})
.await
});
Expand All @@ -283,7 +287,7 @@ mod tests {
task_1.abort();

let actual = task_2.await.unwrap();
assert_eq!(actual, 200)
assert_eq!(*actual, 200)
}

// TODO: This is a failing test
Expand Down Expand Up @@ -313,6 +317,7 @@ mod tests {
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
status_1.lock().unwrap().call_1 = true;
Arc::new(())

Check warning on line 320 in src/core/data_loader/dedupe.rs

View check run for this annotation

Codecov / codecov/patch

src/core/data_loader/dedupe.rs#L320

Added line #L320 was not covered by tests
})
.await
});
Expand All @@ -326,6 +331,7 @@ mod tests {
.dedupe(&1, move || async move {
sleep(Duration::from_millis(120)).await;
status_2.lock().unwrap().call_2 = true;
Arc::new(())

Check warning on line 334 in src/core/data_loader/dedupe.rs

View check run for this annotation

Codecov / codecov/patch

src/core/data_loader/dedupe.rs#L334

Added line #L334 was not covered by tests
})
.await
});
Expand Down Expand Up @@ -368,6 +374,7 @@ mod tests {
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
status_1.lock().unwrap().call_1 = true;
Arc::new(())

Check warning on line 377 in src/core/data_loader/dedupe.rs

View check run for this annotation

Codecov / codecov/patch

src/core/data_loader/dedupe.rs#L377

Added line #L377 was not covered by tests
})
.await
});
Expand All @@ -378,6 +385,7 @@ mod tests {
.dedupe(&1, move || async move {
sleep(Duration::from_millis(150)).await;
status_2.lock().unwrap().call_2 = true;
Arc::new(())

Check warning on line 388 in src/core/data_loader/dedupe.rs

View check run for this annotation

Codecov / codecov/patch

src/core/data_loader/dedupe.rs#L388

Added line #L388 was not covered by tests
})
.await
});
Expand Down
12 changes: 11 additions & 1 deletion src/core/helpers/value.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::hash::{Hash, Hasher};
use std::{

Check warning on line 1 in src/core/helpers/value.rs

View workflow job for this annotation

GitHub Actions / Run Formatter and Lint Check

Diff in /home/runner/work/tailcall/tailcall/src/core/helpers/value.rs
hash::{Hash, Hasher},
sync::Arc,
};

use async_graphql_value::ConstValue;

Expand Down Expand Up @@ -36,3 +39,10 @@ pub fn hash<H: Hasher>(const_value: &ConstValue, state: &mut H) {
}
}
}

pub fn arc_result_to_result<T: Clone, E: Clone>(arc_result: Arc<Result<T, E>>) -> Result<T, E> {
match &*arc_result {
Ok(t) => Ok(t.clone()),
Err(e) => Err(e.clone()),
}
}
8 changes: 5 additions & 3 deletions src/core/ir/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::eval_io::eval_io;
use super::model::{Cache, CacheKey, Map, IR};
use super::{Error, EvalContext, ResolverContextLike, TypedValue};
use crate::core::auth::verify::{AuthVerifier, Verify};
use crate::core::helpers::value::arc_result_to_result;
use crate::core::json::{JsonLike, JsonObjectLike};
use crate::core::merge_right::MergeRight;
use crate::core::serde_value_ext::ValueExt;
Expand Down Expand Up @@ -43,15 +44,16 @@ impl IR {

expr.eval(ctx).await
}
IR::IO(io) => eval_io(io, ctx).await,
IR::IO(io) => arc_result_to_result(eval_io(io, ctx).await),
IR::Cache(Cache { max_age, io }) => {
let io = io.deref();
let key = io.cache_key(ctx);
if let Some(key) = key {
if let Some(val) = ctx.request_ctx.runtime.cache.get(&key).await? {
Ok(val)
} else {
let val = eval_io(io, ctx).await?;
let result = arc_result_to_result(eval_io(io, ctx).await);
let val = result?;
ctx.request_ctx
.runtime
.cache
Expand All @@ -60,7 +62,7 @@ impl IR {
Ok(val)
}
} else {
eval_io(io, ctx).await
arc_result_to_result(eval_io(io, ctx).await)

Check warning on line 65 in src/core/ir/eval.rs

View check run for this annotation

Codecov / codecov/patch

src/core/ir/eval.rs#L65

Added line #L65 was not covered by tests
}
}
IR::Map(Map { input, map }) => {
Expand Down
20 changes: 16 additions & 4 deletions src/core/ir/eval_io.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use async_graphql_value::ConstValue;

use super::eval_http::{
Expand All @@ -14,7 +16,7 @@ use crate::core::grpc::data_loader::GrpcDataLoader;
use crate::core::http::DataLoaderRequest;
use crate::core::ir::Error;

pub async fn eval_io<Ctx>(io: &IO, ctx: &mut EvalContext<'_, Ctx>) -> Result<ConstValue, Error>
pub async fn eval_io<Ctx>(io: &IO, ctx: &mut EvalContext<'_, Ctx>) -> Arc<Result<ConstValue, Error>>
where
Ctx: ResolverContextLike + Sync,
{
Expand All @@ -23,20 +25,20 @@ where
let dedupe = io.dedupe();

if !dedupe || !ctx.is_query() {
return eval_io_inner(io, ctx).await;
return eval_io_inner_arc(io, ctx).await;
}
if let Some(key) = io.cache_key(ctx) {
ctx.request_ctx
.cache
.dedupe(&key, || async {
ctx.request_ctx
.dedupe_handler
.dedupe(&key, || eval_io_inner(io, ctx))
.dedupe(&key, || eval_io_inner_arc(io, ctx))
.await
})
.await
} else {
eval_io_inner(io, ctx).await
eval_io_inner_arc(io, ctx).await

Check warning on line 41 in src/core/ir/eval_io.rs

View check run for this annotation

Codecov / codecov/patch

src/core/ir/eval_io.rs#L41

Added line #L41 was not covered by tests
}
}

Expand Down Expand Up @@ -116,3 +118,13 @@ where
}
}
}

async fn eval_io_inner_arc<Ctx>(
io: &IO,
ctx: &mut EvalContext<'_, Ctx>,
) -> Arc<Result<ConstValue, Error>>
where
Ctx: ResolverContextLike + Sync,
{
return Arc::new(eval_io_inner(io, ctx).await);
}
5 changes: 3 additions & 2 deletions src/core/jit/graphql_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tailcall_hasher::TailcallHasher;
use super::{AnyResponse, BatchResponse, Response};
use crate::core::app_context::AppContext;
use crate::core::async_graphql_hyper::OperationId;
use crate::core::helpers::value::arc_result_to_result;
use crate::core::http::RequestContext;
use crate::core::jit::{self, ConstValueExecutor, OPHash, Pos, Positioned};

Expand Down Expand Up @@ -53,12 +54,12 @@ impl JITExecutor {
.dedupe(&self.operation_id, || {
Box::pin(async move {
let resp = self.exec(exec, jit_request).await;
Ok(resp)
Arc::new(Ok(resp))
})
})
.await;

out.unwrap_or_default()
arc_result_to_result(out).unwrap_or_default()
}

#[inline(always)]
Expand Down

0 comments on commit d3850c3

Please sign in to comment.