Skip to content

Commit

Permalink
Don't use RPC batching with bitcoind
Browse files Browse the repository at this point in the history
This actually hurts performance because the batched response has to be
bueffered, as @TheBlueMatt explains at romanz#373 (comment)

Instead, send multiple RPC requests in parallel using a thread pool.

Also see romanz#374
  • Loading branch information
shesek committed Mar 9, 2021
1 parent a33e97e commit ee8bcfd
Showing 1 changed file with 12 additions and 20 deletions.
32 changes: 12 additions & 20 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use bitcoin::util::hash::BitcoinHash;
use bitcoin::{BlockHash, Txid};
use glob;
use hex;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use serde_json::{from_str, from_value, Value};

#[cfg(not(feature = "liquid"))]
Expand Down Expand Up @@ -378,26 +379,16 @@ impl Daemon {
Ok(result)
}

fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn handle_request(&self, method: &str, params: &Value) -> Result<Value> {
let id = self.message_id.next();
let reqs = params_list
.iter()
.map(|params| json!({"method": method, "params": params, "id": id}))
.collect();
let mut results = vec![];
let mut replies = self.call_jsonrpc(method, &reqs)?;
if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec {
results.push(parse_jsonrpc_reply(reply.take(), method, id)?)
}
return Ok(results);
}
bail!("non-array replies: {:?}", replies);
let req = json!({"method": method, "params": params, "id": id});
let reply = self.call_jsonrpc(method, &req)?;
parse_jsonrpc_reply(reply, method, id)
}

fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn retry_request(&self, method: &str, params: &Value) -> Result<Value> {
loop {
match self.handle_request_batch(method, params_list) {
match self.handle_request(method, &params) {
Err(Error(ErrorKind::Connection(msg), _)) => {
warn!("reconnecting to bitcoind: {}", msg);
self.signal.wait(Duration::from_secs(3), false)?;
Expand All @@ -411,13 +402,14 @@ impl Daemon {
}

fn request(&self, method: &str, params: Value) -> Result<Value> {
let mut values = self.retry_request_batch(method, &[params])?;
assert_eq!(values.len(), 1);
Ok(values.remove(0))
self.retry_request(method, &params)
}

fn requests(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
self.retry_request_batch(method, params_list)
params_list
.par_iter()
.map(|params| self.retry_request(method, params))
.collect::<Result<Vec<_>>>()
}

// bitcoind JSONRPC API:
Expand Down

0 comments on commit ee8bcfd

Please sign in to comment.