Skip to content

Commit

Permalink
Add a serving multiplexer to server both web and grpc requests on the…
Browse files Browse the repository at this point in the history
… same port.
  • Loading branch information
dinowernli committed May 4, 2024
1 parent 32b8b4a commit 9b1820e
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 25 deletions.
74 changes: 74 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ edition = "2021"
[dependencies]
async-std = {version = "1.12", features = ["attributes"]}
async-trait = "0.1.80"
axum = "0.6.12"
bytes = "1.6"
chrono = "0.4"
futures = "0.3"
hyper="0.14.25"
multiplex-tonic-hyper = "0.1"
im = "15"
prost = "0.12"
rand = "0.8"
Expand All @@ -22,6 +25,7 @@ tracing = "0.1.40"
tracing-subscriber = {version = "0.3.1", features = ["env-filter"]}
tower = "0.4.13"
pin-project = "1.1"
querystring = "1.1"

[build-dependencies]
tonic-build = "0.11"
77 changes: 77 additions & 0 deletions src/keyvalue/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use axum::routing::get;
use axum::Router;
use hyper::{Body, StatusCode};
use std::sync::Arc;
use tonic::Request;

use crate::keyvalue::keyvalue_proto::key_value_server::KeyValue;
use crate::keyvalue::keyvalue_proto::GetRequest;
use crate::keyvalue::KeyValueService;

#[derive(Clone)]
pub struct HttpHandler {
service: Arc<KeyValueService>,
}

impl HttpHandler {
pub fn new(service: Arc<KeyValueService>) -> Self {
Self { service }
}

/**
* Installs routes that allow interacting with the keyvalue store. Example queries
* include /get?key=foo.
*/
pub fn routes(self: Arc<Self>) -> Router {
let hello = move |req: hyper::Request<Body>| async move { self.handle_get(req).await };
Router::new().route("/get", get(hello))
}

fn invalid(message: String) -> hyper::Response<Body> {
hyper::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(format!("{}\n", message)))
.unwrap()
}

async fn handle_get(&self, request: hyper::Request<Body>) -> hyper::Response<Body> {
let query = match request.uri().query() {
Some(q) => q,
None => return HttpHandler::invalid("must pass query".to_string()),
};

let parsed = querystring::querify(query);
let key = match parsed
.iter()
.find(|(a, _)| a.eq(&"key"))
.map(|(_, b)| b.to_string())
{
Some(value) => value,
_ => return HttpHandler::invalid("must pass key parameter".to_string()),
};

let request = Request::new(GetRequest {
key: key.clone().as_bytes().to_vec(),
version: -1,
});

let output = match self.service.get(request).await {
Ok(p) => {
let proto = p.into_inner();
match proto.entry {
Some(e) => match String::from_utf8(e.value) {
Ok(value) => format!("{}={}, version={}", key, value, proto.version),
_ => format!("failed to parse value as utf8 for key {}", key),
},
None => format!("no value for key {}", key),
}
}
Err(status) => format!("Failed to query keyvalue store {}", status.to_string()),
};

hyper::Response::builder()
.status(StatusCode::OK)
.body(Body::from(output))
.unwrap()
}
}
6 changes: 4 additions & 2 deletions src/keyvalue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
// cluster. Users of this module are expected to run the service in their grpc
// server and/or use the generated grpc client code to make requests.

pub use crate::keyvalue::store::{MapStore};
pub use crate::keyvalue::store::{MapStore, Store};
pub use http::HttpHandler;
pub use service::KeyValueService;

pub mod grpc {
pub use crate::keyvalue::keyvalue_proto::key_value_client::KeyValueClient;
pub use crate::keyvalue::keyvalue_proto::key_value_server::KeyValueServer;
pub use crate::keyvalue::keyvalue_proto::{PutRequest};
pub use crate::keyvalue::keyvalue_proto::PutRequest;
}

#[path = "generated/keyvalue_proto.rs"]
pub(in crate::keyvalue) mod keyvalue_proto;
pub(in crate::keyvalue) mod service;
pub(in crate::keyvalue) mod store;
pub(in crate::keyvalue) mod http;
4 changes: 2 additions & 2 deletions src/keyvalue/store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
extern crate bytes;
extern crate im;

use std::collections::VecDeque;
use bytes::Bytes;
use im::HashMap;
use prost::Message;
use keyvalue_proto::{Entry, Operation, Snapshot};
use prost::Message;
use std::collections::VecDeque;

use crate::keyvalue::keyvalue_proto;
use crate::keyvalue::keyvalue_proto::operation::Op::Set;
Expand Down
65 changes: 45 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,30 @@
extern crate structopt;
extern crate tracing;

use std::error::Error;
use std::time::Duration;

use async_std::sync::{Arc, Mutex};
use axum::routing::get;
use axum::Router;
use futures::future::join5;
use futures::future::join_all;
use multiplex_tonic_hyper::MakeMultiplexer;
use rand::seq::SliceRandom;
use std::error::Error;
use std::net::SocketAddr;
use std::time::Duration;
use structopt::StructOpt;
use tokio::time::{sleep, Instant};
use tower::make::Shared;
use tracing::{debug, error, info, info_span, Instrument};
use tracing_subscriber::EnvFilter;

use raft::raft_proto;
use raft::{Diagnostics, FailureOptions, Options, RaftImpl};
use raft_proto::Server;

use crate::keyvalue::grpc::KeyValueClient;
use crate::keyvalue::grpc::KeyValueServer;
use crate::keyvalue::grpc::PutRequest;
use crate::keyvalue::{KeyValueService, MapStore};
use crate::raft::raft_proto;
use crate::raft::{Diagnostics, FailureOptions, Options, RaftImpl};
use crate::raft_proto::raft_server::RaftServer;
use crate::raft_proto::Server;

mod keyvalue;
mod raft;
Expand Down Expand Up @@ -60,6 +63,16 @@ fn server(host: &str, port: i32, name: &str) -> Server {
}
}

fn make_address(address: &Server) -> SocketAddr {
format!("[{}]:{}", address.host, address.port)
.parse()
.unwrap()
}

async fn root() -> &'static str {
"Hello, World!\n"
}

//#[instrument(skip(all,diagnostics))]
async fn run_server(address: &Server, all: &Vec<Server>, diagnostics: Arc<Mutex<Diagnostics>>) {
let server = address.name.to_string();
Expand Down Expand Up @@ -89,19 +102,31 @@ async fn run_server(address: &Server, all: &Vec<Server>, diagnostics: Arc<Mutex<
raft.start().await;
let raft_grpc = RaftServer::new(raft);

// Put it all together and serve.
info!("Created raft and key-value service");
let serve = tonic::transport::Server::builder()
.add_service(raft_grpc)
.add_service(kv_grpc)
.serve(
format!("[{}]:{}", address.host, address.port)
.parse()
.unwrap(),
)
.await;

match serve {
let kv2 = Arc::new(KeyValueService::new(
server.as_str(),
&address,
kv_store.clone(),
));
let kvhttp = Arc::new(keyvalue::HttpHandler::new(kv2));
let web_service = Router::new()
.route("/", get(root))
.nest("/keyvalue", kvhttp.routes())
.into_make_service();

let grpc_service = Shared::new(
tonic::transport::Server::builder()
.add_service(raft_grpc)
.add_service(kv_grpc)
.into_service(),
);

let multiplexer = MakeMultiplexer::new(grpc_service, web_service);
let serve = hyper::Server::bind(&make_address(&address)).serve(multiplexer);

info!("Started server");

let done = serve.await;
match done {
Ok(()) => info!("Serving terminated successfully"),
Err(message) => error!("Serving terminated unsuccessfully: {}", message),
}
Expand Down
2 changes: 1 addition & 1 deletion src/testing/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use tokio_stream::wrappers::TcpListenerStream;
use tonic::body::BoxBody;
use tonic::codegen::http::{Request, Response};
use tonic::codegen::Service;
use tonic::transport::Body;
use tonic::server::NamedService;
use tonic::transport::Body;

// A helper struct which can be used to test grpc services. Runs a real server
// which binds to an arbitrary port and provides access to the resulting port.
Expand Down

0 comments on commit 9b1820e

Please sign in to comment.