-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathpipe.rs
132 lines (113 loc) · 3.67 KB
/
pipe.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
use eventuals::*;
use std::sync::Arc;
use tokio::{
sync::{Mutex, Notify},
test, time,
};
struct NotifyOnDrop {
notify: Arc<Notify>,
}
impl Drop for NotifyOnDrop {
fn drop(&mut self) {
self.notify.notify_one();
}
}
impl NotifyOnDrop {
fn new() -> (Arc<Notify>, Self) {
let notify = Arc::new(Notify::new());
let on_drop = Self {
notify: notify.clone(),
};
(notify, on_drop)
}
}
#[test]
async fn produces_side_effect() {
let (mut handle_writer, handle) = Eventual::new();
let (mut writer, eventual) = Eventual::new();
let _pipe = eventual.pipe(move |v| {
handle_writer.write(v);
});
writer.write(1);
assert_eq!(Ok(1), handle.subscribe().next().await);
}
#[test]
async fn produces_async_side_effect() {
let (handle_writer, handle) = Eventual::new();
let (mut writer, eventual) = Eventual::new();
let handle_writer = Arc::new(Mutex::new(handle_writer));
let _pipe = eventual.pipe_async(move |v| {
let handle_writer = handle_writer.clone();
async move {
handle_writer.lock().await.write(v);
}
});
writer.write(1);
assert_eq!(Ok(1), handle.subscribe().next().await);
}
#[test]
async fn stops_after_drop_handle() {
let (mut writer, eventual) = Eventual::new();
let (notify, notify_on_drop) = NotifyOnDrop::new();
let pipe = eventual.pipe(move |v| {
if v == 2 {
panic!();
}
// Notifies if it either passed the panic,
// or will never be called again.
notify_on_drop.notify.notify_one();
});
// This test passing depends on the notifies. In part this is because
// the pipe is in a spawned task. If we want to remove the first notify so
// that pipe stops _immediately_ we may have to have pipe check a weak
// reference to the reader each time it acts. Or, use some version of
// select! that prefers cancellation over writing in spawn.
writer.write(1);
notify.notified().await;
drop(pipe);
notify.notified().await;
// We know this can't panic, because we have been notified that the
// closure has been dropped and can't be called again. Unfortunately
// I can't think of a good way to verify it didn't panic. But, surely
// it doesn't.
writer.write(2);
}
#[test]
async fn forever_cleans_up_when_writer_closed() {
let (mut writer, eventual) = Eventual::new();
let (mut acker, ack) = Eventual::new();
let mut ack = ack.subscribe();
let (notify, notify_on_drop) = NotifyOnDrop::new();
eventual
.pipe(move |v| {
acker.write(v);
let _keep = ¬ify_on_drop;
})
.forever();
writer.write(1);
drop(writer);
// If this is notified it means that forever() cleans itself up when the writer stops.
notify.notified().await;
// This ensures that the last value was in fact passed through to pipe so that
// a stale value is not the last observed.
assert_eq!(ack.next().await, Ok(1));
}
#[test]
async fn forever_keeps_handle_alive() {
let (mut writer, reader) = Eventual::<u8>::new();
let (mut notifier, notify) = Eventual::<()>::new();
let mut notify = notify.subscribe();
reader
.pipe(move |_| {
notifier.write(());
})
.forever();
time::sleep(time::Duration::from_millis(100)).await;
writer.write(2);
// Check that the pipe handle hasn't been dropped. If it were to drop, then
// the notifier would also be closed.
assert_eq!(notify.next().await, Ok(()));
drop(writer);
// Now the pipe handle and the notifier should be dropped.
assert_eq!(notify.next().await, Err(Closed));
}