Skip to content

Commit

Permalink
Add a serving multiplexer to server both web and grpc requests on the…
Browse files Browse the repository at this point in the history
… same port.
  • Loading branch information
dinowernli committed May 26, 2024
1 parent 32b8b4a commit 45a9575
Show file tree
Hide file tree
Showing 11 changed files with 820 additions and 56 deletions.
518 changes: 502 additions & 16 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ edition = "2021"
[dependencies]
async-std = {version = "1.12", features = ["attributes"]}
async-trait = "0.1.80"
axum = "0.6.12"
bytes = "1.6"
chrono = "0.4"
futures = "0.3"
hyper="0.14.25"
multiplex-tonic-hyper = "0.1"
im = "15"
prost = "0.12"
rand = "0.8"
reqwest = "0.12"
structopt = "0.3"
timer = "0.2"
tokio = { version = "1.37", features = ["macros", "rt-multi-thread"] }
Expand All @@ -22,6 +26,7 @@ tracing = "0.1.40"
tracing-subscriber = {version = "0.3.1", features = ["env-filter"]}
tower = "0.4.13"
pin-project = "1.1"
querystring = "1.1"

[build-dependencies]
tonic-build = "0.11"
180 changes: 180 additions & 0 deletions src/keyvalue/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use axum::routing::get;
use axum::Router;
use hyper::{Body, StatusCode};
use std::sync::Arc;
use tonic::Request;

use crate::keyvalue::keyvalue_proto::key_value_server::KeyValue;
use crate::keyvalue::keyvalue_proto::GetRequest;

#[derive(Clone)]
pub struct HttpHandler {
service: Arc<dyn KeyValue>,
}

impl HttpHandler {
pub fn new(service: Arc<dyn KeyValue>) -> Self {
Self { service }
}

/**
* Installs routes that allow interacting with the keyvalue store. Example queries
* include /get?key=foo.
*/
pub fn routes(self: Arc<Self>) -> Router {
let hello = move |req: hyper::Request<Body>| async move { self.handle_get(req).await };
Router::new().route("/get", get(hello))
}

fn make_response(code: StatusCode, message: String) -> hyper::Response<Body> {
hyper::Response::builder()
.status(code)
.body(Body::from(format!("{}\n", message)))
.unwrap()
}

fn invalid(message: String) -> hyper::Response<Body> {
Self::make_response(StatusCode::BAD_REQUEST, message)
}

async fn handle_get(&self, request: hyper::Request<Body>) -> hyper::Response<Body> {
let query = match request.uri().query() {
Some(q) => q,
None => return HttpHandler::invalid("must pass query".to_string()),
};

let parsed = querystring::querify(query);
let key = match parsed
.iter()
.find(|(a, _)| a.eq(&"key"))
.map(|(_, b)| b.to_string())
{
Some(value) => value,
_ => return HttpHandler::invalid("must pass key parameter".to_string()),
};

let request = Request::new(GetRequest {
key: key.clone().as_bytes().to_vec(),
version: -1,
});

match self.service.get(request).await {
Ok(p) => {
let proto = p.into_inner();
match proto.entry {
Some(e) => match String::from_utf8(e.value) {
Ok(value) => HttpHandler::make_response(
StatusCode::OK,
format!("{}={}, version={}", key, value, proto.version),
),
_ => HttpHandler::make_response(
StatusCode::BAD_REQUEST,
format!("failed to parse value as utf8 for key {}", key),
),
},
None => HttpHandler::make_response(
StatusCode::NOT_FOUND,
format!("no value for key {}", key),
),
}
}
Err(status) => HttpHandler::make_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to query keyvalue store {}", status.to_string()),
),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::keyvalue::grpc::PutRequest;
use crate::keyvalue::keyvalue_proto::{Entry, GetResponse, PutResponse};
use crate::root;
use crate::testing::TestHttpServer;
use tonic::{Response, Status};

struct FakeKeyValue {}

#[tonic::async_trait]
impl KeyValue for FakeKeyValue {
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
let key = request.into_inner().key.clone();
let key_string = String::from_utf8(key.clone());
let entry = match key_string.expect("utf8").as_str() {
"foo" => Some(Entry {
key: key.clone(),
value: "foo-value".to_string().into_bytes(),
}),
"bar" => Some(Entry {
key: key.clone(),
value: "bar-value".to_string().into_bytes(),
}),
_ => None,
};

Ok(Response::new(GetResponse { entry, version: 0 }))
}

async fn put(&self, _: Request<PutRequest>) -> Result<Response<PutResponse>, Status> {
// We don't need any writes for now.
panic!("Not implemented");
}
}

async fn make_server() -> TestHttpServer {
let kv = Arc::new(FakeKeyValue {});
let http = Arc::new(HttpHandler {
service: kv.clone(),
});
let web_service = Router::new()
.route("/", get(root))
.nest("/keyvalue", http.routes())
.into_make_service();

TestHttpServer::run(web_service).await
}

async fn send_request(server: &TestHttpServer, path: &str) -> reqwest::Response {
let port = server.port().expect("port");
let uri = format!("http://{}:{}{}", "127.0.0.1", port, path);
reqwest::get(uri.as_str()).await.expect("request")
}

#[tokio::test]
async fn test_returns_value() {
let server = make_server().await;
let response = send_request(&server, "/keyvalue/get?key=foo").await;

let status = response.status().clone();
let text = response.text().await.expect("text");

assert_eq!(reqwest::StatusCode::OK, status);
assert_eq!("foo=foo-value, version=0", text.trim());
}

#[tokio::test]
async fn test_not_found() {
let server = make_server().await;
let response = send_request(&server, "/keyvalue/get?key=not-a-rea-key").await;
let status = response.status();
assert_eq!(reqwest::StatusCode::NOT_FOUND, status);
}

#[tokio::test]
async fn test_bad_path() {
let server = make_server().await;
let response = send_request(&server, "/INVALID/get?key=not-a-rea-key").await;
let status = response.status();
assert_eq!(reqwest::StatusCode::NOT_FOUND, status);
}

#[tokio::test]
async fn test_root_path() {
let server = make_server().await;
let response = send_request(&server, "/").await;
let status = response.status();
assert_eq!(reqwest::StatusCode::OK, status);
}
}
6 changes: 4 additions & 2 deletions src/keyvalue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
// cluster. Users of this module are expected to run the service in their grpc
// server and/or use the generated grpc client code to make requests.

pub use crate::keyvalue::store::{MapStore};
pub use crate::keyvalue::store::{MapStore, Store};
pub use http::HttpHandler;
pub use service::KeyValueService;

pub mod grpc {
pub use crate::keyvalue::keyvalue_proto::key_value_client::KeyValueClient;
pub use crate::keyvalue::keyvalue_proto::key_value_server::KeyValueServer;
pub use crate::keyvalue::keyvalue_proto::{PutRequest};
pub use crate::keyvalue::keyvalue_proto::PutRequest;
}

pub(in crate::keyvalue) mod http;
#[path = "generated/keyvalue_proto.rs"]
pub(in crate::keyvalue) mod keyvalue_proto;
pub(in crate::keyvalue) mod service;
Expand Down
6 changes: 3 additions & 3 deletions src/keyvalue/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mod tests {
use crate::keyvalue::store::MapStore;
use crate::raft::raft_proto::EntryId;
use crate::raft::StateMachine;
use crate::testing::TestServer;
use crate::testing::TestRpcServer;

use super::*;

Expand Down Expand Up @@ -176,7 +176,7 @@ mod tests {
async fn test_get() {
let service = create_service();
let store = service.store.clone();
let server = TestServer::run(KeyValueServer::new(service)).await;
let server = TestRpcServer::run(KeyValueServer::new(service)).await;

store
.lock()
Expand Down Expand Up @@ -204,7 +204,7 @@ mod tests {
async fn test_put() {
let service = create_service();
let store = service.store.clone();
let server = TestServer::run(KeyValueServer::new(service)).await;
let server = TestRpcServer::run(KeyValueServer::new(service)).await;

let request = PutRequest {
key: "foo".as_bytes().to_vec(),
Expand Down
4 changes: 2 additions & 2 deletions src/keyvalue/store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
extern crate bytes;
extern crate im;

use std::collections::VecDeque;
use bytes::Bytes;
use im::HashMap;
use prost::Message;
use keyvalue_proto::{Entry, Operation, Snapshot};
use prost::Message;
use std::collections::VecDeque;

use crate::keyvalue::keyvalue_proto;
use crate::keyvalue::keyvalue_proto::operation::Op::Set;
Expand Down
66 changes: 44 additions & 22 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,30 @@
extern crate structopt;
extern crate tracing;

use std::error::Error;
use std::time::Duration;

use async_std::sync::{Arc, Mutex};
use axum::routing::get;
use axum::Router;
use futures::future::join5;
use futures::future::join_all;
use multiplex_tonic_hyper::MakeMultiplexer;
use rand::seq::SliceRandom;
use std::error::Error;
use std::net::SocketAddr;
use std::time::Duration;
use structopt::StructOpt;
use tokio::time::{sleep, Instant};
use tower::make::Shared;
use tracing::{debug, error, info, info_span, Instrument};
use tracing_subscriber::EnvFilter;

use raft::raft_proto;
use raft::{Diagnostics, FailureOptions, Options, RaftImpl};
use raft_proto::Server;

use crate::keyvalue::grpc::KeyValueClient;
use crate::keyvalue::grpc::KeyValueServer;
use crate::keyvalue::grpc::PutRequest;
use crate::keyvalue::{KeyValueService, MapStore};
use crate::raft::raft_proto;
use crate::raft::{Diagnostics, FailureOptions, Options, RaftImpl};
use crate::raft_proto::raft_server::RaftServer;
use crate::raft_proto::Server;

mod keyvalue;
mod raft;
Expand Down Expand Up @@ -60,6 +63,16 @@ fn server(host: &str, port: i32, name: &str) -> Server {
}
}

fn make_address(address: &Server) -> SocketAddr {
format!("[{}]:{}", address.host, address.port)
.parse()
.unwrap()
}

async fn root() -> &'static str {
"Hello, World!\n"
}

//#[instrument(skip(all,diagnostics))]
async fn run_server(address: &Server, all: &Vec<Server>, diagnostics: Arc<Mutex<Diagnostics>>) {
let server = address.name.to_string();
Expand All @@ -74,8 +87,8 @@ async fn run_server(address: &Server, all: &Vec<Server>, diagnostics: Arc<Mutex<
let server_diagnostics = diagnostics.lock().await.get_server(&address);

// Set up the grpc service for the key-value store.
let keyvalue = KeyValueService::new(server.as_str(), &address, kv_store.clone());
let kv_grpc = KeyValueServer::new(keyvalue);
let kv1 = KeyValueService::new(server.as_str(), &address, kv_store.clone());
let kv_grpc = KeyValueServer::new(kv1);

// Set up the grpc service for the raft participant.
let raft = RaftImpl::new(
Expand All @@ -89,19 +102,28 @@ async fn run_server(address: &Server, all: &Vec<Server>, diagnostics: Arc<Mutex<
raft.start().await;
let raft_grpc = RaftServer::new(raft);

// Put it all together and serve.
info!("Created raft and key-value service");
let serve = tonic::transport::Server::builder()
.add_service(raft_grpc)
.add_service(kv_grpc)
.serve(
format!("[{}]:{}", address.host, address.port)
.parse()
.unwrap(),
)
.await;

match serve {
// Set up the webservice serving the contents of the kvstore.
// TODO(dino): See if we can reuse/share the "kv1" instance above.
let kv2 = KeyValueService::new(server.as_str(), &address, kv_store.clone());
let kv_http = Arc::new(keyvalue::HttpHandler::new(Arc::new(kv2)));
let web_service = Router::new()
.route("/", get(root))
.nest("/keyvalue", kv_http.routes())
.into_make_service();

// Put the pieces together to serve on a single port.
let grpc_service = Shared::new(
tonic::transport::Server::builder()
.add_service(raft_grpc)
.add_service(kv_grpc)
.into_service(),
);
let multiplexer = MakeMultiplexer::new(grpc_service, web_service);
let serve = hyper::Server::bind(&make_address(&address)).serve(multiplexer);

info!("Started server (http, grpc) on port {}", address.port);

match serve.await {
Ok(()) => info!("Serving terminated successfully"),
Err(message) => error!("Serving terminated unsuccessfully: {}", message),
}
Expand Down
Loading

0 comments on commit 45a9575

Please sign in to comment.