Skip to content

Commit

Permalink
dedupe: Use Arc to reduce clones and improve memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
palash25 committed Nov 28, 2024
1 parent ceefed9 commit e80e77e
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions src/core/data_loader/dedupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,33 @@ pub struct Dedupe<Key, Value> {
/// Represents the current state of the operation.
enum State<Value> {
/// Means that the operation has been executed and the result is stored.
Ready(Value),
Ready(Arc<Value>),

/// Means that the operation is in progress and the result can be sent via
/// the stored sender whenever it's available in the future.
Pending(Weak<broadcast::Sender<Value>>),
Pending(Weak<broadcast::Sender<Arc<Value>>>),
}

/// Represents the next steps
enum Step<Value> {
/// The operation has been executed and the result must be returned.
Return(Value),
Return(Arc<Value>),

/// The operation is in progress and the result must be awaited on the
/// receiver.
Await(broadcast::Receiver<Value>),
Await(broadcast::Receiver<Arc<Value>>),

/// The operation needs to be executed and the result needs to be sent to
/// the provided sender.
Init(Arc<broadcast::Sender<Value>>),
Init(Arc<broadcast::Sender<Arc<Value>>>),
}

impl<K: Key, V: Value> Dedupe<K, V> {
pub fn new(size: usize, persist: bool) -> Self {
Self { cache: Arc::new(Mutex::new(HashMap::new())), size, persist }
}

pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> V
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Arc<V>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = V>,
Expand All @@ -71,14 +71,15 @@ impl<K: Key, V: Value> Dedupe<K, V> {
},
Step::Init(tx) => {
let value = or_else().await;
let value_arc = Arc::new(value);
let mut guard = self.cache.lock().unwrap();
if self.persist {
guard.insert(key.to_owned(), State::Ready(value.clone()));
guard.insert(key.to_owned(), State::Ready(Arc::clone(&value_arc)));
} else {
guard.remove(key);
}
let _ = tx.send(value.clone());
value
let _ = tx.send(Arc::clone(&value_arc));
value_arc
}
};

Expand All @@ -91,7 +92,7 @@ impl<K: Key, V: Value> Dedupe<K, V> {

if let Some(state) = this.get(key) {
match state {
State::Ready(value) => return Step::Return(value.clone()),
State::Ready(value) => return Step::Return(Arc::clone(value)),
State::Pending(tx) => {
// We can upgrade from Weak to Arc only in case when
// original tx is still alive
Expand Down Expand Up @@ -120,15 +121,17 @@ impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
pub fn new(persist: bool) -> Self {
Self(Dedupe::new(1, persist))
}
}

impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Result<V, E>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = Result<V, E>>,
{
self.0.dedupe(key, or_else).await
let result = self.0.dedupe(key, or_else).await;
match Arc::try_unwrap(result) {
Ok(result) => result,
Err(arc) => (*arc).clone(),
}
}
}

Expand All @@ -148,7 +151,7 @@ mod tests {
async fn test_no_key() {
let cache = Arc::new(Dedupe::<u64, u64>::new(1000, true));
let actual = cache.dedupe(&1, || Box::pin(async { 1 })).await;
assert_eq!(actual, 1);
assert_eq!(*actual, 1);
}

#[tokio::test]
Expand All @@ -157,7 +160,7 @@ mod tests {
cache.dedupe(&1, || Box::pin(async { 1 })).await;

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 1);
assert_eq!(*actual, 1);
}

#[tokio::test]
Expand All @@ -169,7 +172,7 @@ mod tests {
}

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 0);
assert_eq!(*actual, 0);
}

#[tokio::test]
Expand All @@ -190,7 +193,7 @@ mod tests {
});
let (a, b) = join!(a, b);

assert_eq!(a, b);
assert_eq!(*a, *b);
}

async fn compute_value(counter: Arc<AtomicUsize>) -> String {
Expand Down Expand Up @@ -283,7 +286,7 @@ mod tests {
task_1.abort();

let actual = task_2.await.unwrap();
assert_eq!(actual, 200)
assert_eq!(*actual, 200)
}

// TODO: This is a failing test
Expand All @@ -308,7 +311,7 @@ mod tests {
let status_2 = status.clone();

// Task 1 completed in 100ms
let task_1 = tokio::spawn(async move {
let task_1 = tokio::task::spawn(async move {
cache_1
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -363,7 +366,7 @@ mod tests {
let status_2 = status.clone();

// Task 1 completed in 100ms
let task_1 = tokio::spawn(async move {
let task_1 = tokio::task::spawn(async move {
cache_1
.dedupe(&1, move || async move {
sleep(Duration::from_millis(100)).await;
Expand Down

0 comments on commit e80e77e

Please sign in to comment.