-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.rs
157 lines (141 loc) · 6.79 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use rxrust::observable;
use rxrust::observable::{ObservableExt, ObservableItem};
fn main() {
println!("Hello Reactive world!");
}
#[cfg(test)]
mod tests {
use std::thread;
use std::time::Duration;
use rxrust::ops::throttle::ThrottleEdge;
use rxrust::prelude::FuturesLocalSchedulerPool;
use super::*;
#[test]
fn iter_observable() {
// The `from_iter` operator creates an observable from an iterator.
// It will emit each item of the iterator sequentially.
// This is useful for converting an existing collection into a stream of items.
observable::from_iter(0..10)
.on_complete(|| println!("Stream completed"))
.subscribe(|n| println!("{}", n));
}
#[test]
fn filter_observable() {
// The `filter` operator allows you to filter the items emitted by an observable.
// Only the items that satisfy the provided predicate are emitted.
// In this case, it filters out negative numbers.
observable::from_iter(vec![1, 2, 3, -6, 4, 5, -8])
.filter(|n| n > &0)
.subscribe(|n| println!("{}", n));
}
#[test]
fn map_observable() {
// The `map` operator transforms each item emitted by an observable by applying a function to it.
// It emits the transformed items as a new observable.
// Here, it converts each string to uppercase.
observable::from_iter(vec!["hello", "transforming", "reactive", "world", "from", "rust"])
.map(|word| word.to_uppercase())
.subscribe(|n| println!("{}", n));
}
#[test]
fn flatmap_observable() {
// The `flat_map` operator is used to transform the items emitted by an observable
// into observables, then flatten these emissions into a single observable.
// This is useful for chaining Observable operations.
observable::from_iter(vec!["hello", "composition", "reactive", "world"])
.flat_map(|word| {
observable::from_iter(vec![word])
.map(|new_word| format!("[{}]", new_word))
})
.subscribe(|n| println!("{}", n));
}
#[test]
fn merge_observable() {
// The `merge` operator combines multiple observables into one by interleaving their emissions.
// It emits items from all source observables as they arrive.
// This test merges two streams: one emitting "hello" and the other emitting "world".
observable::from_iter(vec!["hello"])
.merge(observable::from_iter(vec!["world"]))
.subscribe(|n| println!("{}", n));
}
#[test]
fn zip_observable() {
// The `zip` operator combines the emissions of multiple observables into a single observable
// by pairing their items one-by-one.
// It emits tuples where each tuple contains one item from each of the source observables.
observable::from_iter(vec!["hello"])
.zip(observable::from_iter(vec!["world"]))
.subscribe(|(w, w1)| println!("Zip result {}-{}", w, w1));
}
#[test]
fn throttler_observable() {
//Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.
let mut scheduler_pool = FuturesLocalSchedulerPool::new();
let duration = Duration::from_millis(1);
observable::from_iter(1..1000)
.throttle(|d| duration, ThrottleEdge::all(), scheduler_pool.spawner())
.subscribe(|d| println!("{}",d));
scheduler_pool.run()
}
#[test]
fn take_observable() {
// The `take` operator limits the number of items emitted by an observable.
// It only emits the specified number of items from the start of the stream and then completes.
// Here, it takes the first three items from the stream.
observable::from_iter(vec!["hello", "reactive", "world", "from", "rust"])
.take(3)
.subscribe(|n| println!("{}", n));
}
#[test]
fn skip_observable() {
// The `skip` operator skip the number of items emitted by an observable.
// It only emits the specified number of items from the start after the number of skip of the stream and then completes.
observable::from_iter(vec!["hello", "reactive", "world", "from", "rust"])
.skip(1)
.subscribe(|n| println!("{}", n));
}
#[test]
fn take_while_observable() {
// The `take_while` operator emits items from an observable until a specified condition is no longer met.
// It stops emitting as soon as the condition fails.
// This example emits items as long as their length is 5 or less characters.
observable::from_iter(vec!["hello", "reactive", "world", "from", "rust"])
.take_while(|w| w.len() <= 5)
.subscribe(|n| println!("{}", n));
}
#[test]
fn future_observable() {
// The `from_future` operator converts a future into an observable.
// The observable emits the result of the future when it completes.
// Here, it converts a future that resolves to the string "hello future world".
let mut scheduler_pool = FuturesLocalSchedulerPool::new();
observable::from_future(std::future::ready("hello future world"), scheduler_pool.spawner())
.on_complete(|| println!("Complete in thread {:?}", thread::current().name()))
.subscribe(move |n| println!("{}", n));
scheduler_pool.run()
}
#[test]
fn subscribe_on_observable() {
// The `subscribe_on` operator specifies the scheduler on which to subscribe to the observable.
// This determines the thread that the observable's emissions are handled on.
// It allows the observable to run asynchronously on a specified scheduler.
let mut scheduler_pool = FuturesLocalSchedulerPool::new();
observable::from_iter(vec!["hello", "reactive", "world", "from", "rust"])
.subscribe_on(scheduler_pool.spawner())
.on_complete(|| println!("Complete in thread {:?}", thread::current().name()))
.subscribe(|n| println!("{}", n));
scheduler_pool.run()
}
#[test]
fn delay_observable() {
// The `delay` operator delays the emission of items from an observable by a specified duration.
// Each item is emitted after the delay period has passed.
// This example delays each item by 500 milliseconds.
let mut scheduler_pool = FuturesLocalSchedulerPool::new();
observable::from_iter(vec!["hello", "reactive", "world", "from", "rust"])
.delay(Duration::from_millis(500), scheduler_pool.spawner())
.on_complete(|| println!("Complete in thread {:?}", thread::current().name()))
.subscribe(|n| println!("{}", n));
scheduler_pool.run()
}
}