-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.rs
96 lines (89 loc) · 3.46 KB
/
server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//! Example HTTP, WebSocket and TCP JSON-RPC server.
use std::{sync::Arc, time::Duration};
use futures_util::{SinkExt, TryStreamExt};
use jsonrpc_core::{MetaIoHandler, Params};
use jsonrpc_utils::{
axum_utils::jsonrpc_router,
pub_sub::{add_pub_sub, PublishMsg},
stream::{serve_stream_sink, StreamMsg, StreamServerConfig},
};
use tokio::net::TcpListener;
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError};
#[tokio::main]
async fn main() {
let mut rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2);
rpc.add_method("sleep", |params: Params| async move {
let (x,): (u64,) = params.parse()?;
tokio::time::sleep(Duration::from_secs(x)).await;
Ok(x.into())
});
rpc.add_method("@ping", |_| async move { Ok("pong".into()) });
rpc.add_method("value", |params: Params| async move {
let x: Option<u64> = params.parse()?;
Ok(x.unwrap_or_default().into())
});
rpc.add_method("add", |params: Params| async move {
let ((x, y), z): ((i32, i32), i32) = params.parse()?;
let sum = x + y + z;
Ok(sum.into())
});
add_pub_sub(
&mut rpc,
"subscribe",
"subscription",
"unsubscribe",
|params: Params| {
let (interval,): (u64,) = params.parse()?;
if interval > 0 {
Ok(async_stream::stream! {
for i in 0..10 {
tokio::time::sleep(Duration::from_secs(interval)).await;
yield PublishMsg::result(&i);
}
yield PublishMsg::error(&jsonrpc_core::Error {
code: jsonrpc_core::ErrorCode::ServerError(-32000),
message: "ended".into(),
data: None,
});
})
} else {
Err(jsonrpc_core::Error::invalid_params("invalid interval"))
}
},
);
let rpc = Arc::new(rpc);
let stream_config = StreamServerConfig::default()
.with_channel_size(4)
.with_pipeline_size(4);
// HTTP and WS server.
let ws_config = stream_config.clone().with_keep_alive(true);
let app = jsonrpc_router("/rpc", rpc.clone(), ws_config);
// You can use additional tower-http middlewares to add e.g. CORS.
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
});
// TCP server with line delimited json codec.
//
// You can also use other transports (e.g. TLS, unix socket) and codecs
// (e.g. netstring, JSON splitter).
let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap();
let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024);
while let Ok((s, _)) = listener.accept().await {
let rpc = rpc.clone();
let stream_config = stream_config.clone();
let codec = codec.clone();
tokio::spawn(async move {
let (r, w) = s.into_split();
let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str);
let w = FramedWrite::new(w, codec).with(|msg| async move {
Ok::<_, LinesCodecError>(match msg {
StreamMsg::Str(msg) => msg,
_ => "".into(),
})
});
tokio::pin!(w);
drop(serve_stream_sink(&rpc, w, r, stream_config).await);
});
}
}