Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make dedupe faster at I/O level #3237

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions src/core/data_loader/dedupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
pub trait Key: Send + Sync + Eq + Hash + Clone {}
impl<A: Send + Sync + Eq + Hash + Clone> Key for A {}

pub trait Value: Send + Sync + Clone {}
impl<A: Send + Sync + Clone> Value for A {}
pub trait Value: Send + Sync {}
impl<A: Send + Sync> Value for A {}

///
/// Allows deduplication of async operations based on a key.
Expand All @@ -25,36 +25,36 @@
/// Represents the current state of the operation.
enum State<Value> {
/// Means that the operation has been executed and the result is stored.
Ready(Value),
Ready(Arc<Value>),

/// Means that the operation is in progress and the result can be sent via
/// the stored sender whenever it's available in the future.
Pending(Weak<broadcast::Sender<Value>>),
Pending(Weak<broadcast::Sender<Arc<Value>>>),
}

/// Represents the next steps
enum Step<Value> {
/// The operation has been executed and the result must be returned.
Return(Value),
Return(Arc<Value>),

/// The operation is in progress and the result must be awaited on the
/// receiver.
Await(broadcast::Receiver<Value>),
Await(broadcast::Receiver<Arc<Value>>),

/// The operation needs to be executed and the result needs to be sent to
/// the provided sender.
Init(Arc<broadcast::Sender<Value>>),
Init(Arc<broadcast::Sender<Arc<Value>>>),
}

impl<K: Key, V: Value> Dedupe<K, V> {
pub fn new(size: usize, persist: bool) -> Self {
Self { cache: Arc::new(Mutex::new(HashMap::new())), size, persist }
}

pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> V
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Arc<V>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = V>,
Fut: Future<Output = Arc<V>>,
{
loop {
let value = match self.step(key) {
Expand Down Expand Up @@ -123,10 +123,10 @@
}

impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Result<V, E>
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Arc<Result<V, E>>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = Result<V, E>>,
Fut: Future<Output = Arc<Result<V, E>>>,
{
self.0.dedupe(key, or_else).await
}
Expand All @@ -147,29 +147,31 @@
#[tokio::test]
async fn test_no_key() {
let cache = Arc::new(Dedupe::<u64, u64>::new(1000, true));
let actual = cache.dedupe(&1, || Box::pin(async { 1 })).await;
assert_eq!(actual, 1);
let actual = cache.dedupe(&1, || Box::pin(async { Arc::new(1) })).await;
assert_eq!(*actual, 1);
}

#[tokio::test]
async fn test_with_key() {
let cache = Arc::new(Dedupe::<u64, u64>::new(1000, true));
cache.dedupe(&1, || Box::pin(async { 1 })).await;
cache.dedupe(&1, || Box::pin(async { Arc::new(1) })).await;

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 1);
let actual = cache.dedupe(&1, || Box::pin(async { Arc::new(2) })).await;
assert_eq!(*actual, 1);
}

#[tokio::test]
async fn test_with_multi_get() {
let cache = Arc::new(Dedupe::<u64, u64>::new(1000, true));

for i in 0..100 {
cache.dedupe(&1, || Box::pin(async move { i })).await;
cache
.dedupe(&1, || Box::pin(async move { Arc::new(i) }))
.await;
}

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 0);
let actual = cache.dedupe(&1, || Box::pin(async { Arc::new(2) })).await;
assert_eq!(*actual, 0);
}

#[tokio::test]
Expand All @@ -179,24 +181,24 @@
let a = cache.dedupe(&1, || {
Box::pin(async move {
sleep(Duration::from_millis(1)).await;
1
Arc::new(1)
})
});
let b = cache.dedupe(&1, || {
Box::pin(async move {
sleep(Duration::from_millis(1)).await;
2
Arc::new(2)
})
});
let (a, b) = join!(a, b);

assert_eq!(a, b);
}

async fn compute_value(counter: Arc<AtomicUsize>) -> String {
async fn compute_value(counter: Arc<AtomicUsize>) -> Arc<String> {
counter.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(1)).await;
format!("value_{}", counter.load(Ordering::SeqCst))
Arc::new(format!("value_{}", counter.load(Ordering::SeqCst)))
}

#[tokio::test(worker_threads = 16, flavor = "multi_thread")]
Expand Down Expand Up @@ -237,6 +239,7 @@

let task = cache.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
Arc::new(())
});

// drops the task since the underlying sleep timeout is higher than the
Expand All @@ -249,6 +252,7 @@
cache
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
Arc::new(())
})
.await;
}
Expand All @@ -263,7 +267,7 @@
cache_1
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
100
Arc::new(100)
})
.await
});
Expand All @@ -272,7 +276,7 @@
cache_2
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
200
Arc::new(200)
})
.await
});
Expand All @@ -283,7 +287,7 @@
task_1.abort();

let actual = task_2.await.unwrap();
assert_eq!(actual, 200)
assert_eq!(*actual, 200)
}

// TODO: This is a failing test
Expand Down Expand Up @@ -313,6 +317,7 @@
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
status_1.lock().unwrap().call_1 = true;
Arc::new(())

Check warning on line 320 in src/core/data_loader/dedupe.rs

View check run for this annotation

Codecov / codecov/patch

src/core/data_loader/dedupe.rs#L320

Added line #L320 was not covered by tests
})
.await
});
Expand All @@ -326,6 +331,7 @@
.dedupe(&1, move || async move {
sleep(Duration::from_millis(120)).await;
status_2.lock().unwrap().call_2 = true;
Arc::new(())

Check warning on line 334 in src/core/data_loader/dedupe.rs

View check run for this annotation

Codecov / codecov/patch

src/core/data_loader/dedupe.rs#L334

Added line #L334 was not covered by tests
})
.await
});
Expand Down Expand Up @@ -368,6 +374,7 @@
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
status_1.lock().unwrap().call_1 = true;
Arc::new(())

Check warning on line 377 in src/core/data_loader/dedupe.rs

View check run for this annotation

Codecov / codecov/patch

src/core/data_loader/dedupe.rs#L377

Added line #L377 was not covered by tests
})
.await
});
Expand All @@ -378,6 +385,7 @@
.dedupe(&1, move || async move {
sleep(Duration::from_millis(150)).await;
status_2.lock().unwrap().call_2 = true;
Arc::new(())

Check warning on line 388 in src/core/data_loader/dedupe.rs

View check run for this annotation

Codecov / codecov/patch

src/core/data_loader/dedupe.rs#L388

Added line #L388 was not covered by tests
})
.await
});
Expand Down
12 changes: 11 additions & 1 deletion src/core/helpers/value.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::hash::{Hash, Hasher};
use std::{

Check warning on line 1 in src/core/helpers/value.rs

View workflow job for this annotation

GitHub Actions / Run Formatter and Lint Check

Diff in /home/runner/work/tailcall/tailcall/src/core/helpers/value.rs
hash::{Hash, Hasher},
sync::Arc,
};

use async_graphql_value::ConstValue;

Expand Down Expand Up @@ -36,3 +39,10 @@
}
}
}

pub fn arc_result_to_result<T: Clone, E: Clone>(arc_result: Arc<Result<T, E>>) -> Result<T, E> {
match &*arc_result {
Ok(t) => Ok(t.clone()),
Err(e) => Err(e.clone()),
}
}
8 changes: 5 additions & 3 deletions src/core/ir/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use super::model::{Cache, CacheKey, Map, IR};
use super::{Error, EvalContext, ResolverContextLike, TypedValue};
use crate::core::auth::verify::{AuthVerifier, Verify};
use crate::core::helpers::value::arc_result_to_result;
use crate::core::json::{JsonLike, JsonObjectLike};
use crate::core::merge_right::MergeRight;
use crate::core::serde_value_ext::ValueExt;
Expand Down Expand Up @@ -43,15 +44,16 @@

expr.eval(ctx).await
}
IR::IO(io) => eval_io(io, ctx).await,
IR::IO(io) => arc_result_to_result(eval_io(io, ctx).await),
IR::Cache(Cache { max_age, io }) => {
let io = io.deref();
let key = io.cache_key(ctx);
if let Some(key) = key {
if let Some(val) = ctx.request_ctx.runtime.cache.get(&key).await? {
Ok(val)
} else {
let val = eval_io(io, ctx).await?;
let result = arc_result_to_result(eval_io(io, ctx).await);
let val = result?;
ctx.request_ctx
.runtime
.cache
Expand All @@ -60,7 +62,7 @@
Ok(val)
}
} else {
eval_io(io, ctx).await
arc_result_to_result(eval_io(io, ctx).await)

Check warning on line 65 in src/core/ir/eval.rs

View check run for this annotation

Codecov / codecov/patch

src/core/ir/eval.rs#L65

Added line #L65 was not covered by tests
}
}
IR::Map(Map { input, map }) => {
Expand Down
20 changes: 16 additions & 4 deletions src/core/ir/eval_io.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use async_graphql_value::ConstValue;

use super::eval_http::{
Expand All @@ -14,7 +16,7 @@
use crate::core::http::DataLoaderRequest;
use crate::core::ir::Error;

pub async fn eval_io<Ctx>(io: &IO, ctx: &mut EvalContext<'_, Ctx>) -> Result<ConstValue, Error>
pub async fn eval_io<Ctx>(io: &IO, ctx: &mut EvalContext<'_, Ctx>) -> Arc<Result<ConstValue, Error>>
where
Ctx: ResolverContextLike + Sync,
{
Expand All @@ -23,20 +25,20 @@
let dedupe = io.dedupe();

if !dedupe || !ctx.is_query() {
return eval_io_inner(io, ctx).await;
return eval_io_inner_arc(io, ctx).await;
}
if let Some(key) = io.cache_key(ctx) {
ctx.request_ctx
.cache
.dedupe(&key, || async {
ctx.request_ctx
.dedupe_handler
.dedupe(&key, || eval_io_inner(io, ctx))
.dedupe(&key, || eval_io_inner_arc(io, ctx))
.await
})
.await
} else {
eval_io_inner(io, ctx).await
eval_io_inner_arc(io, ctx).await

Check warning on line 41 in src/core/ir/eval_io.rs

View check run for this annotation

Codecov / codecov/patch

src/core/ir/eval_io.rs#L41

Added line #L41 was not covered by tests
}
}

Expand Down Expand Up @@ -116,3 +118,13 @@
}
}
}

async fn eval_io_inner_arc<Ctx>(
io: &IO,
ctx: &mut EvalContext<'_, Ctx>,
) -> Arc<Result<ConstValue, Error>>
where
Ctx: ResolverContextLike + Sync,
{
return Arc::new(eval_io_inner(io, ctx).await);
}
5 changes: 3 additions & 2 deletions src/core/jit/graphql_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tailcall_hasher::TailcallHasher;
use super::{AnyResponse, BatchResponse, Response};
use crate::core::app_context::AppContext;
use crate::core::async_graphql_hyper::OperationId;
use crate::core::helpers::value::arc_result_to_result;
use crate::core::http::RequestContext;
use crate::core::jit::{self, ConstValueExecutor, OPHash, Pos, Positioned};

Expand Down Expand Up @@ -53,12 +54,12 @@ impl JITExecutor {
.dedupe(&self.operation_id, || {
Box::pin(async move {
let resp = self.exec(exec, jit_request).await;
Ok(resp)
Arc::new(Ok(resp))
})
})
.await;

out.unwrap_or_default()
arc_result_to_result(out).unwrap_or_default()
}

#[inline(always)]
Expand Down
Loading