Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(3028): Use Arc to reduce clones and improve memory usage in dedupe #3179

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions src/core/data_loader/dedupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,33 @@ pub struct Dedupe<Key, Value> {
/// Represents the current state of the operation.
enum State<Value> {
/// Means that the operation has been executed and the result is stored.
Ready(Value),
Ready(Arc<Value>),

/// Means that the operation is in progress and the result can be sent via
/// the stored sender whenever it's available in the future.
Pending(Weak<broadcast::Sender<Value>>),
Pending(Weak<broadcast::Sender<Arc<Value>>>),
}

/// Represents the next steps
enum Step<Value> {
/// The operation has been executed and the result must be returned.
Return(Value),
Return(Arc<Value>),

/// The operation is in progress and the result must be awaited on the
/// receiver.
Await(broadcast::Receiver<Value>),
Await(broadcast::Receiver<Arc<Value>>),

/// The operation needs to be executed and the result needs to be sent to
/// the provided sender.
Init(Arc<broadcast::Sender<Value>>),
Init(Arc<broadcast::Sender<Arc<Value>>>),
}

impl<K: Key, V: Value> Dedupe<K, V> {
pub fn new(size: usize, persist: bool) -> Self {
Self { cache: Arc::new(Mutex::new(HashMap::new())), size, persist }
}

pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> V
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Arc<V>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = V>,
Expand All @@ -70,14 +70,14 @@ impl<K: Key, V: Value> Dedupe<K, V> {
}
},
Step::Init(tx) => {
let value = or_else().await;
let value = Arc::new(or_else().await);
let mut guard = self.cache.lock().unwrap();
if self.persist {
guard.insert(key.to_owned(), State::Ready(value.clone()));
guard.insert(key.to_owned(), State::Ready(Arc::clone(&value)));
} else {
guard.remove(key);
}
let _ = tx.send(value.clone());
let _ = tx.send(Arc::clone(&value));
value
}
};
Expand All @@ -91,7 +91,7 @@ impl<K: Key, V: Value> Dedupe<K, V> {

if let Some(state) = this.get(key) {
match state {
State::Ready(value) => return Step::Return(value.clone()),
State::Ready(value) => return Step::Return(Arc::clone(value)),
State::Pending(tx) => {
// We can upgrade from Weak to Arc only in case when
// original tx is still alive
Expand Down Expand Up @@ -120,15 +120,17 @@ impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
pub fn new(persist: bool) -> Self {
Self(Dedupe::new(1, persist))
}
}

impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Result<V, E>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = Result<V, E>>,
{
self.0.dedupe(key, or_else).await
let result = self.0.dedupe(key, or_else).await;
match Arc::try_unwrap(result) {
Ok(result) => result,
Err(arc) => (*arc).clone(),
}
}
}

Expand All @@ -148,7 +150,7 @@ mod tests {
async fn test_no_key() {
let cache = Arc::new(Dedupe::<u64, u64>::new(1000, true));
let actual = cache.dedupe(&1, || Box::pin(async { 1 })).await;
assert_eq!(actual, 1);
assert_eq!(*actual, 1);
}

#[tokio::test]
Expand All @@ -157,7 +159,7 @@ mod tests {
cache.dedupe(&1, || Box::pin(async { 1 })).await;

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 1);
assert_eq!(*actual, 1);
}

#[tokio::test]
Expand All @@ -169,7 +171,7 @@ mod tests {
}

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 0);
assert_eq!(*actual, 0);
}

#[tokio::test]
Expand All @@ -190,7 +192,7 @@ mod tests {
});
let (a, b) = join!(a, b);

assert_eq!(a, b);
assert_eq!(*a, *b);
}

async fn compute_value(counter: Arc<AtomicUsize>) -> String {
Expand Down Expand Up @@ -283,7 +285,7 @@ mod tests {
task_1.abort();

let actual = task_2.await.unwrap();
assert_eq!(actual, 200)
assert_eq!(*actual, 200)
}

// TODO: This is a failing test
Expand Down
Loading