-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy patheventual.rs
150 lines (131 loc) · 4.44 KB
/
eventual.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use eventuals::*;
use futures::poll;
use std::{sync::Arc, task::Poll, time::Duration};
use tokio::{join, test, time::sleep};
#[test]
async fn dropped_writer_closes() {
let (writer, eventual) = Eventual::<u32>::new();
let mut read_0 = eventual.subscribe();
drop(writer);
assert_eq!(read_0.next().await, Err(Closed));
}
#[test]
async fn can_observe_value_written_after_subscribe() {
let (mut writer, eventual) = Eventual::new();
let mut read_0 = eventual.subscribe();
writer.write(5);
assert_eq!(read_0.next().await, Ok(5));
}
#[test]
async fn can_observe_value_written_before_subscribe() {
let eventual = Eventual::from_value(5);
let mut read_0 = eventual.subscribe();
assert_eq!(read_0.next().await, Ok(5));
assert_eq!(read_0.next().await, Err(Closed));
}
#[test]
async fn only_most_recent_value_is_observed() {
let (mut writer, eventual) = Eventual::new();
let mut read_0 = eventual.subscribe();
writer.write(5);
writer.write(10);
assert_eq!(read_0.next().await, Ok(10));
}
#[test]
async fn drop_doesnt_interfere() {
let (mut writer, eventual) = Eventual::<u32>::new();
assert_eq!(eventual.subscriber_count(), 0);
let mut read_0 = eventual.subscribe();
assert_eq!(eventual.subscriber_count(), 1);
let mut read_1 = eventual.subscribe();
assert_eq!(eventual.subscriber_count(), 2);
writer.write(5);
writer.write(10);
assert_eq!(read_0.next().await, Ok(10));
drop(read_0);
assert_eq!(eventual.subscriber_count(), 1);
writer.write(1);
// The main point of the test is this line - after
// dropping one subscriber we still have our subscriber.
assert_eq!(read_1.next().await, Ok(1));
drop(read_1);
// It is also useful to verify that the above test passed
// even though drop is in fact working.
assert_eq!(eventual.subscriber_count(), 0);
}
#[test]
async fn can_message_pass() {
let (mut writer_a, eventual_a) = Eventual::<u32>::new();
let (mut writer_b, eventual_b) = Eventual::<u32>::new();
let mut eventual_a = eventual_a.subscribe();
let mut eventual_b = eventual_b.subscribe();
let b = tokio::spawn(async move {
let mut sum = 0;
while let Ok(v) = eventual_a.next().await {
sum += v;
writer_b.write(v + 1);
}
sum
});
let a = tokio::spawn(async move {
writer_a.write(0);
let first = eventual_b.next().await.unwrap();
writer_a.write(first + 1);
let second = eventual_b.next().await.unwrap();
writer_a.write(second + 1);
assert_eq!(eventual_b.next().await, Ok(5));
drop(writer_a);
assert_eq!(eventual_b.next().await, Err(Closed));
});
let (a, b) = join!(a, b);
assert!(a.is_ok());
assert_eq!(b.unwrap(), 6);
}
// Ensures that eventuals will drop all the way down the chain "immediately"
#[test]
async fn chained_eventuals_drop() {
let (mut writer, source) = Eventual::new();
let source = Arc::new(source);
let mut new_source = source.clone();
let mut i = 0;
let mapped = loop {
new_source = Arc::new(new_source.subscribe().map(|v: u32| async move { v + 1 }));
i += 1;
if i == 25 {
break new_source;
}
};
assert_eq!(source.subscriber_count(), 1);
assert_eq!(mapped.subscriber_count(), 0);
writer.write(5);
assert_eq!(mapped.value().await, Ok(30));
assert_eq!(source.subscriber_count(), 1);
drop(mapped);
// Dropping doesn't happen on the same thread, but
// it still should happen before we write a value.
sleep(Duration::from_millis(1)).await;
assert_eq!(source.subscriber_count(), 0);
}
#[test]
async fn clone_reader() {
let (mut writer, eventual) = Eventual::new();
writer.write(99);
let mut read_0 = eventual.subscribe();
let mut read_1 = read_0.clone();
assert_eq!(read_1.next().await, Ok(99));
let poll = poll!(read_1.clone().next());
assert_eq!(Poll::Pending, poll);
drop(writer);
assert_eq!(read_0.clone().next().await, Ok(99));
assert_eq!(read_0.next().await, Ok(99));
assert_eq!(read_0.clone().next().await, Err(Closed));
}
#[test]
async fn init_with_has_value() {
let (mut writer, values) = Eventual::new();
let mut values = values.init_with(5).subscribe();
assert_eq!(values.next().await, Ok(5));
writer.write(10);
assert_eq!(values.next().await, Ok(10));
}
// TODO: Test that closed is received twice in a row rather than getting stuck.