Skip to content

Commit

Permalink
jsonrpc: add the backbone of a JSONRPC 2 server
Browse files Browse the repository at this point in the history
This implements a JSONRPC2 server over an Unix domain socket, using
jsonrpc-core for the parsing and request handling, and mio for the
non-blocking io interface.

A simple (and wrong) "stop" call is implemented for demonstration and
testing (just use `netcat -U`) purpose.

Signed-off-by: Antoine Poinsot <[email protected]>
  • Loading branch information
darosior committed Nov 17, 2020
1 parent 770038a commit 333d7e8
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 1 deletion.
87 changes: 87 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ chrono = "0.4"

# DB stuff
rusqlite = { version = "0.24.1", features = ["bundled"] }

# For the JSONRPC API
jsonrpc-core = "15.1.0"
jsonrpc-derive = "15.1.0"
mio = { version = "0.7.5", features = ["default", "os-poll", "os-util", "uds"] }
8 changes: 8 additions & 0 deletions src/jsonrpc/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use jsonrpc_derive::rpc;

#[rpc(server)]
pub trait RpcApi {
/// Stops the daemon
#[rpc(name = "stop")]
fn stop(&self) -> jsonrpc_core::Result<()>;
}
148 changes: 148 additions & 0 deletions src/jsonrpc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
mod api;
use self::api::RpcApi;

use std::{
io::{self, Read},
path::PathBuf,
process,
time::Duration,
};

use mio::{
net::{UnixListener, UnixStream},
Events, Interest, Poll, Token,
};

const JSONRPC_SERVER: Token = Token(0);

pub struct RpcImpl;
impl RpcApi for RpcImpl {
fn stop(&self) -> jsonrpc_core::Result<()> {
// FIXME: of course, this is Bad :TM:
process::exit(0);
}
}

/// Set up the poller used to listen on the Unix Domain Socket for JSONRPC messages
pub fn jsonrpcapi_setup(socket_path: PathBuf) -> Result<(Poll, UnixListener), io::Error> {
// FIXME: permissions! (umask before binding ?)
let mut listener = UnixListener::bind(&socket_path)?;
let poll = Poll::new()?;
poll.registry()
.register(&mut listener, JSONRPC_SERVER, Interest::READABLE)?;

Ok((poll, listener))
}

// Remove trailing newlines from utf-8 byte stream
fn trimmed(mut vec: Vec<u8>, bytes_read: usize) -> Vec<u8> {
vec.truncate(bytes_read);

// Until there is some whatever-newline character, pop.
while let Some(byte) = vec.last() {
// Of course, we assume utf-8
if byte < &0x0a || byte > &0x0d {
break;
}
vec.pop();
}

vec
}

// Returns an error only on a fatal one, and None on recoverable ones.
fn read_bytes_from_stream(mut stream: UnixStream) -> Result<Option<Vec<u8>>, io::Error> {
let mut buf = vec![0; 512];
let mut bytes_read = 0;

loop {
match stream.read(&mut buf) {
Ok(0) => return Ok(Some(trimmed(buf, bytes_read))),
Ok(n) => {
bytes_read += n;
if bytes_read == buf.len() {
buf.resize(bytes_read * 2, 0);
}
}
Err(err) => {
match err.kind() {
io::ErrorKind::WouldBlock => {
if bytes_read == 0 {
// We can't read it just yet, but it's fine.
return Ok(None);
}
return Ok(Some(trimmed(buf, bytes_read)));
}
io::ErrorKind::Interrupted => {
// Try again on interruption.
continue;
}
// Now that's actually bad
_ => return Err(err),
}
}
}
}
}

// Try to parse and interpret bytes from the stream
fn handle_byte_stream(
jsonrpc_io: &jsonrpc_core::IoHandler,
stream: UnixStream,
) -> Result<(), io::Error> {
if let Some(bytes) = read_bytes_from_stream(stream)? {
match String::from_utf8(bytes) {
Ok(string) => {
log::trace!("JSONRPC server: got '{}'", &string);
// FIXME: couldn't we just spawn it in a thread or handle the future?
jsonrpc_io.handle_request_sync(&string);
}
Err(e) => {
log::error!(
"JSONRPC server: error interpreting request: '{}'",
e.to_string()
);
}
}
}

Ok(())
}

/// The main event loop for the JSONRPC interface
pub fn jsonrpcapi_loop(mut poller: Poll, listener: UnixListener) -> Result<(), io::Error> {
let mut events = Events::with_capacity(16);
let mut jsonrpc_io = jsonrpc_core::IoHandler::new();
jsonrpc_io.extend_with(RpcImpl.to_delegate());

loop {
poller.poll(&mut events, Some(Duration::from_millis(10000)))?;

for event in &events {
// FIXME: remove, was just out of curiosity
if event.is_error() {
log::error!("Got error polling the JSONRPC socket: {:?}", event.token());
}

// A connection was established; loop to process all the messages
if event.token() == JSONRPC_SERVER && event.is_readable() {
loop {
match listener.accept() {
Ok((stream, _)) => {
handle_byte_stream(&jsonrpc_io, stream)?;
}
Err(e) => {
// Ok; next time then!
if e.kind() == io::ErrorKind::WouldBlock {
break;
}

// This one is not expected!
return Err(e);
}
}
}
}
}
}
}
15 changes: 14 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
mod bitcoind;
mod config;
mod database;
mod jsonrpc;
mod revaultd;

use crate::{
bitcoind::actions::{bitcoind_main_loop, setup_bitcoind},
config::parse_config,
database::actions::setup_db,
jsonrpc::{jsonrpcapi_loop, jsonrpcapi_setup},
revaultd::RevaultD,
};

use std::path::PathBuf;
use std::{env, process};
use std::{env, process, thread};

use daemonize_simple::Daemonize;

Expand Down Expand Up @@ -44,6 +46,17 @@ fn daemon_main(mut revaultd: RevaultD) {
revaultd.bitcoind_config.network
);

let (poller, listener) = jsonrpcapi_setup(revaultd.rpc_socket_file()).unwrap_or_else(|e| {
log::error!("Error setting up the JSONRPC server: {}", e.to_string());
process::exit(1)
});
let _jsonrpc_thread = thread::spawn(move || {
jsonrpcapi_loop(poller, listener).unwrap_or_else(|e| {
log::error!("Error in JSONRPC server event loop: {}", e.to_string());
process::exit(1)
})
});

// We poll bitcoind until we die
bitcoind_main_loop(&mut revaultd, &bitcoind).unwrap_or_else(|e| {
log::error!("Error in bitcoind main loop: {}", e.to_string());
Expand Down
4 changes: 4 additions & 0 deletions src/revaultd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ impl RevaultD {
self.file_from_datadir(&format!("revaultd-watchonly-wallet-{}", wallet_id))
}

pub fn rpc_socket_file(&self) -> PathBuf {
self.file_from_datadir("revaultd_rpc")
}

pub fn deposit_address(&mut self) -> Address {
self.vault_address(self.current_unused_index)
}
Expand Down

0 comments on commit 333d7e8

Please sign in to comment.