-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmap.rs
102 lines (89 loc) · 3.03 KB
/
map.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use eventuals::*;
use lazy_static::lazy_static;
use std::{
sync::{
atomic::{AtomicU32, Ordering::SeqCst},
Arc, Mutex,
},
time::{Duration, Instant},
};
use tokio::test;
use tokio::{select, time::sleep};
#[test]
async fn basic() {
let (mut writer, eventual) = Eventual::<u32>::new();
writer.write(5);
// Format the value and save it in an Arc<String> for
let format_value = |v| async move { Arc::new(format!("{}", v)) };
let mut mapped = eventual.map(format_value).subscribe();
assert_eq!(&mapped.next().await.ok().unwrap().as_str(), &"5");
writer.write(10);
assert_eq!(&mapped.next().await.ok().unwrap().as_str(), &"10");
writer.write(10); // Same value, de-duplicated.
drop(writer);
assert_eq!(mapped.next().await, Err(Closed))
}
#[test]
async fn with_retry_works_eventually() {
let (mut writer, nums) = Eventual::new();
writer.write(1);
lazy_static! {
static ref LOCK: Mutex<()> = Mutex::new(());
static ref TRIES: AtomicU32 = AtomicU32::new(0);
}
// In case this test is run more than once or concurrently for some reason, these
// need to be here to ensure on the test is run consistently.
let _lock = LOCK.lock().unwrap();
TRIES.store(0, SeqCst);
let start = Instant::now();
let inviolable = nums.subscribe().map_with_retry(
// Attempt 5 times on the same value before succeeding.
move |n| async move {
assert_eq!(n, 1);
let attempt = TRIES.fetch_add(1, SeqCst);
if attempt < 4 {
Err(attempt)
} else {
Ok("ok")
}
},
// Sleep 1ms between failures.
|_| sleep(Duration::from_millis(1)),
);
// Assert that this eventually works
assert_eq!(inviolable.value().await.unwrap(), "ok");
let end = Instant::now();
// Verify the sleeps. In practice this ends up much
// larger than 5ms.
assert!(end - start >= Duration::from_millis(5));
}
#[test]
async fn with_retry_gets_new_value() {
let (mut writer, nums) = Eventual::<u32>::new();
writer.write(1);
let inviolable = nums.map_with_retry(
move |n| async move {
match n {
1 => Err(()),
_ => Ok(()),
}
},
// Sleep "forever". In practice this could be a short sleep
// but we want to show that if a new value is available it
// is used rather than reconstructing the pipeline.
|_| sleep(Duration::from_secs(1000000000000)),
);
// Demonstrate that inviolable does not produce a value. At this point retry
// should be waiting for either a new value or the new eventual but gets
// neither.
select! {
_ = inviolable.value() => {
panic!("Nooooooooo!");
}
_ = sleep(Duration::from_millis(10)) => {}
};
// Show that when a new value is written we don't have to wait indefinitely
// for the new eventual.
writer.write(2);
assert_eq!(inviolable.value().await.unwrap(), ());
}