From 9b1820e9f86e1932b186ea92eb03ebd5653a714c Mon Sep 17 00:00:00 2001 From: Dino Wernli Date: Sun, 16 Apr 2023 17:18:22 +0200 Subject: [PATCH] Add a serving multiplexer to server both web and grpc requests on the same port. --- Cargo.lock | 74 +++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 4 +++ src/keyvalue/http.rs | 77 +++++++++++++++++++++++++++++++++++++++++++ src/keyvalue/mod.rs | 6 ++-- src/keyvalue/store.rs | 4 +-- src/main.rs | 65 +++++++++++++++++++++++++----------- src/testing/server.rs | 2 +- 7 files changed, 207 insertions(+), 25 deletions(-) create mode 100644 src/keyvalue/http.rs diff --git a/Cargo.lock b/Cargo.lock index 6e08f03..36e33aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,7 +289,11 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", + "tokio", "tower", "tower-layer", "tower-service", @@ -429,12 +433,16 @@ version = "0.1.0" dependencies = [ "async-std", "async-trait", + "axum", "bytes", "chrono", "futures", + "hyper", "im", + "multiplex-tonic-hyper", "pin-project", "prost", + "querystring", "rand", "structopt", "timer", @@ -565,6 +573,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.30" @@ -1038,6 +1055,18 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "multiplex-tonic-hyper" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "002f157801c0a446aac0949ca2ff38508e9b5ed9e10c98260eb031677b3c82fc" +dependencies = [ + "futures", + "hyper", + "pin-project", + "tower", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1286,6 +1315,12 @@ dependencies = [ "prost", ] +[[package]] +name = "querystring" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9318ead08c799aad12a55a3e78b82e0b6167271ffd1f627b758891282f739187" + [[package]] name = "quote" version = "1.0.36" @@ -1417,6 +1452,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47" +[[package]] +name = "ryu" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" + [[package]] name = "serde" version = "1.0.198" @@ -1437,6 +1478,39 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "serde_json" +version = "1.0.116" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index 68d5bba..28368a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,12 @@ edition = "2021" [dependencies] async-std = {version = "1.12", features = ["attributes"]} async-trait = "0.1.80" +axum = "0.6.12" bytes = "1.6" chrono = "0.4" futures = "0.3" +hyper="0.14.25" +multiplex-tonic-hyper = "0.1" im = "15" prost = "0.12" rand = "0.8" @@ -22,6 +25,7 @@ tracing = "0.1.40" tracing-subscriber = {version = "0.3.1", features = ["env-filter"]} tower = "0.4.13" pin-project = "1.1" +querystring = "1.1" [build-dependencies] tonic-build = "0.11" diff --git a/src/keyvalue/http.rs b/src/keyvalue/http.rs new file mode 100644 index 0000000..26c991f --- /dev/null +++ b/src/keyvalue/http.rs @@ -0,0 +1,77 @@ +use axum::routing::get; +use axum::Router; +use hyper::{Body, StatusCode}; +use std::sync::Arc; +use tonic::Request; + +use crate::keyvalue::keyvalue_proto::key_value_server::KeyValue; +use crate::keyvalue::keyvalue_proto::GetRequest; +use crate::keyvalue::KeyValueService; + +#[derive(Clone)] +pub struct HttpHandler { + service: Arc, +} + +impl HttpHandler { + pub fn new(service: Arc) -> Self { + Self { service } + } + + /** + * Installs routes that allow interacting with the keyvalue store. Example queries + * include /get?key=foo. + */ + pub fn routes(self: Arc) -> Router { + let hello = move |req: hyper::Request| async move { self.handle_get(req).await }; + Router::new().route("/get", get(hello)) + } + + fn invalid(message: String) -> hyper::Response { + hyper::Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("{}\n", message))) + .unwrap() + } + + async fn handle_get(&self, request: hyper::Request) -> hyper::Response { + let query = match request.uri().query() { + Some(q) => q, + None => return HttpHandler::invalid("must pass query".to_string()), + }; + + let parsed = querystring::querify(query); + let key = match parsed + .iter() + .find(|(a, _)| a.eq(&"key")) + .map(|(_, b)| b.to_string()) + { + Some(value) => value, + _ => return HttpHandler::invalid("must pass key parameter".to_string()), + }; + + let request = Request::new(GetRequest { + key: key.clone().as_bytes().to_vec(), + version: -1, + }); + + let output = match self.service.get(request).await { + Ok(p) => { + let proto = p.into_inner(); + match proto.entry { + Some(e) => match String::from_utf8(e.value) { + Ok(value) => format!("{}={}, version={}", key, value, proto.version), + _ => format!("failed to parse value as utf8 for key {}", key), + }, + None => format!("no value for key {}", key), + } + } + Err(status) => format!("Failed to query keyvalue store {}", status.to_string()), + }; + + hyper::Response::builder() + .status(StatusCode::OK) + .body(Body::from(output)) + .unwrap() + } +} diff --git a/src/keyvalue/mod.rs b/src/keyvalue/mod.rs index 9202024..9303d3b 100644 --- a/src/keyvalue/mod.rs +++ b/src/keyvalue/mod.rs @@ -2,16 +2,18 @@ // cluster. Users of this module are expected to run the service in their grpc // server and/or use the generated grpc client code to make requests. -pub use crate::keyvalue::store::{MapStore}; +pub use crate::keyvalue::store::{MapStore, Store}; +pub use http::HttpHandler; pub use service::KeyValueService; pub mod grpc { pub use crate::keyvalue::keyvalue_proto::key_value_client::KeyValueClient; pub use crate::keyvalue::keyvalue_proto::key_value_server::KeyValueServer; - pub use crate::keyvalue::keyvalue_proto::{PutRequest}; + pub use crate::keyvalue::keyvalue_proto::PutRequest; } #[path = "generated/keyvalue_proto.rs"] pub(in crate::keyvalue) mod keyvalue_proto; pub(in crate::keyvalue) mod service; pub(in crate::keyvalue) mod store; +pub(in crate::keyvalue) mod http; diff --git a/src/keyvalue/store.rs b/src/keyvalue/store.rs index 9c8eb5e..fcd8a77 100644 --- a/src/keyvalue/store.rs +++ b/src/keyvalue/store.rs @@ -1,11 +1,11 @@ extern crate bytes; extern crate im; -use std::collections::VecDeque; use bytes::Bytes; use im::HashMap; -use prost::Message; use keyvalue_proto::{Entry, Operation, Snapshot}; +use prost::Message; +use std::collections::VecDeque; use crate::keyvalue::keyvalue_proto; use crate::keyvalue::keyvalue_proto::operation::Op::Set; diff --git a/src/main.rs b/src/main.rs index f26497e..87affa4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,27 +4,30 @@ extern crate structopt; extern crate tracing; -use std::error::Error; -use std::time::Duration; - use async_std::sync::{Arc, Mutex}; +use axum::routing::get; +use axum::Router; use futures::future::join5; use futures::future::join_all; +use multiplex_tonic_hyper::MakeMultiplexer; use rand::seq::SliceRandom; +use std::error::Error; +use std::net::SocketAddr; +use std::time::Duration; use structopt::StructOpt; use tokio::time::{sleep, Instant}; +use tower::make::Shared; use tracing::{debug, error, info, info_span, Instrument}; use tracing_subscriber::EnvFilter; -use raft::raft_proto; -use raft::{Diagnostics, FailureOptions, Options, RaftImpl}; -use raft_proto::Server; - use crate::keyvalue::grpc::KeyValueClient; use crate::keyvalue::grpc::KeyValueServer; use crate::keyvalue::grpc::PutRequest; use crate::keyvalue::{KeyValueService, MapStore}; +use crate::raft::raft_proto; +use crate::raft::{Diagnostics, FailureOptions, Options, RaftImpl}; use crate::raft_proto::raft_server::RaftServer; +use crate::raft_proto::Server; mod keyvalue; mod raft; @@ -60,6 +63,16 @@ fn server(host: &str, port: i32, name: &str) -> Server { } } +fn make_address(address: &Server) -> SocketAddr { + format!("[{}]:{}", address.host, address.port) + .parse() + .unwrap() +} + +async fn root() -> &'static str { + "Hello, World!\n" +} + //#[instrument(skip(all,diagnostics))] async fn run_server(address: &Server, all: &Vec, diagnostics: Arc>) { let server = address.name.to_string(); @@ -89,19 +102,31 @@ async fn run_server(address: &Server, all: &Vec, diagnostics: Arc info!("Serving terminated successfully"), Err(message) => error!("Serving terminated unsuccessfully: {}", message), } diff --git a/src/testing/server.rs b/src/testing/server.rs index dd6d439..a3b7698 100644 --- a/src/testing/server.rs +++ b/src/testing/server.rs @@ -8,8 +8,8 @@ use tokio_stream::wrappers::TcpListenerStream; use tonic::body::BoxBody; use tonic::codegen::http::{Request, Response}; use tonic::codegen::Service; -use tonic::transport::Body; use tonic::server::NamedService; +use tonic::transport::Body; // A helper struct which can be used to test grpc services. Runs a real server // which binds to an arbitrary port and provides access to the resulting port.