-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcached_network.rs
115 lines (102 loc) · 3.45 KB
/
cached_network.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use futures::StreamExt;
use rand::Rng;
use std::env;
use std::time::Duration;
use tokio::task::spawn_blocking;
use tracing::instrument;
use tracing_durations_export::DurationsLayerBuilder;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
#[instrument]
async fn make_network_request(api: &str, id: usize) -> String {
let millis = rand::thread_rng().gen_range(5..10);
tokio::time::sleep(Duration::from_millis(millis)).await;
format!("{api} {id}")
}
#[instrument]
async fn read_cache(id: usize) -> Option<String> {
let millis = rand::thread_rng().gen_range(1..3);
tokio::time::sleep(Duration::from_millis(millis)).await;
// There's a 50% change there's a cache entry
if rand::thread_rng().gen_bool(0.5) {
Some(format!("cached({id})"))
} else {
None
}
}
/// cpu intensive, blocking method
#[instrument(skip_all)]
fn parse_cache(data: &str) -> String {
let millis = rand::thread_rng().gen_range(2..6);
std::thread::sleep(Duration::from_millis(millis));
format!("from_cache({data})")
}
/// cpu intensive, blocking method
#[instrument(skip_all)]
fn parse_network(data: &str) -> String {
let millis = rand::thread_rng().gen_range(3..8);
std::thread::sleep(Duration::from_millis(millis));
format!("from_network({data})")
}
#[instrument]
async fn cached_network_request(api: &str, id: usize) -> String {
if let Some(cached) = read_cache(id).await {
spawn_blocking(move || parse_cache(&cached))
.await
.expect("executor died")
} else {
let response = make_network_request(api, id).await;
spawn_blocking(move || parse_network(&response))
.await
.expect("executor died")
}
}
#[tokio::main]
async fn main() {
let (duration_layer, _guard) = if let Ok(location) = env::var("TRACING_DURATION_EXPORT") {
let (layer, guard) = DurationsLayerBuilder::default()
.durations_file(location)
.build()
.expect("Couldn't create TRACING_DURATION_FILE");
(Some(layer), Some(guard))
} else {
(None, None)
};
tracing_subscriber::registry().with(duration_layer).init();
// Sequential
futures::stream::iter(0..4)
.then(|id| make_network_request("https://example.org/uncached", id))
.then(|data| async {
spawn_blocking(move || parse_network(&data))
.await
.expect("the executor is broken")
})
.collect::<Vec<String>>()
.await;
// Spacer
tokio::time::sleep(Duration::from_millis(5)).await;
// Parallel
futures::stream::iter(0..4)
.map(|id| async move {
let data = make_network_request("https://example.org/uncached", id).await;
spawn_blocking(move || parse_network(&data))
.await
.expect("the executor is broken")
})
.buffer_unordered(4)
.collect::<Vec<String>>()
.await;
tokio::time::sleep(Duration::from_millis(5)).await;
// Sequential
futures::stream::iter(0..4)
.then(|id| cached_network_request("https://example.net/cached", id))
.collect::<Vec<String>>()
.await;
tokio::time::sleep(Duration::from_millis(5)).await;
// Parallel
futures::stream::iter(0..4)
.map(|id| cached_network_request("https://example.net/cached", id))
.buffer_unordered(3)
.collect::<Vec<String>>()
.await;
}