Skip to content

Commit

Permalink
Switch to a separate executor for RPC calls to avoid tokio hangs
Browse files Browse the repository at this point in the history
See the comment in the commit for more info on why we have to do
this.
  • Loading branch information
TheBlueMatt committed Jan 14, 2025
1 parent 04aaa24 commit f7eb75b
Showing 1 changed file with 84 additions and 35 deletions.
119 changes: 84 additions & 35 deletions src/bitcoind_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ use lightning_block_sync::rpc::RpcClient;
use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
use serde_json;
use std::collections::HashMap;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;

use tokio::runtime::{self, Runtime};

pub struct BitcoindClient {
pub(crate) bitcoind_rpc_client: Arc<RpcClient>,
network: Network,
Expand All @@ -38,7 +41,8 @@ pub struct BitcoindClient {
rpc_user: String,
rpc_password: String,
fees: Arc<HashMap<ConfirmationTarget, AtomicU32>>,
handle: tokio::runtime::Handle,
main_runtime_handle: runtime::Handle,
inner_runtime: Arc<Runtime>,
logger: Arc<FilesystemLogger>,
}

Expand Down Expand Up @@ -66,7 +70,7 @@ const MIN_FEERATE: u32 = 253;
impl BitcoindClient {
pub(crate) async fn new(
host: String, port: u16, rpc_user: String, rpc_password: String, network: Network,
handle: tokio::runtime::Handle, logger: Arc<FilesystemLogger>,
handle: runtime::Handle, logger: Arc<FilesystemLogger>,
) -> std::io::Result<Self> {
let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port);
let rpc_credentials =
Expand Down Expand Up @@ -95,6 +99,16 @@ impl BitcoindClient {
fees.insert(ConfirmationTarget::ChannelCloseMinimum, AtomicU32::new(MIN_FEERATE));
fees.insert(ConfirmationTarget::OutputSpendingFee, AtomicU32::new(MIN_FEERATE));

let mut builder = runtime::Builder::new_multi_thread();
let runtime =
builder.enable_all().worker_threads(1).thread_name("rpc-worker").build().unwrap();
let inner_runtime =
Arc::new(runtime);
// Tokio will panic if we drop a runtime while in another runtime. Because the entire
// application runs inside a tokio runtime, we have to ensure this runtime is never
// `drop`'d, which we do by leaking an Arc reference.
std::mem::forget(Arc::clone(&inner_runtime));

let client = Self {
bitcoind_rpc_client: Arc::new(bitcoind_rpc_client),
host,
Expand All @@ -103,7 +117,8 @@ impl BitcoindClient {
rpc_password,
network,
fees: Arc::new(fees),
handle: handle.clone(),
main_runtime_handle: handle.clone(),
inner_runtime,
logger,
};
BitcoindClient::poll_for_fee_estimates(
Expand Down Expand Up @@ -226,10 +241,42 @@ impl BitcoindClient {
});
}

fn run_future_in_blocking_context<F: Future + Send + 'static>(&self, future: F) -> F::Output
where
F::Output: Send + 'static,
{
// Tokio deliberately makes it nigh impossible to block on a future in a sync context that
// is running in an async task (which makes it really hard to interact with sync code that
// has callbacks in an async project).
//
// Reading the docs, it *seems* like
// `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the
// trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if
// the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go
// into a `block_in_place` call, and the inner future requires I/O (which of course it
// does, its a future!), the whole thing will come to a grinding halt as no other thread is
// allowed to poll I/O until the blocked one finishes.
//
// This is, of course, nuts, and an almost trivial performance penalty of occasional
// additional wakeups would solve this, but tokio refuses to do so because any performance
// penalty at all would be too much (tokio issue #4730).
//
// Instead, we have to do a rather insane dance - we have to spawn the `future` we want to
// run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid
// blocking too many threads on the main runtime). We want to block on that `future` being
// run on the other runtime's threads, but tokio only provides `block_on` to do so, which
// runs the `future` itself on the current thread, panicing if this thread is already a
// part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we
// have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting
// `JoinHandle` on the main runtime.
tokio::task::block_in_place(move || {
self.main_runtime_handle.block_on(self.inner_runtime.spawn(future)).unwrap()
})
}

pub fn get_new_rpc_client(&self) -> RpcClient {
let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port);
let rpc_credentials =
base64::encode(format!("{}:{}", self.rpc_user.clone(), self.rpc_password.clone()));
let rpc_credentials = base64::encode(format!("{}:{}", self.rpc_user, self.rpc_password));
RpcClient::new(&rpc_credentials, http_endpoint)
}

Expand Down Expand Up @@ -273,22 +320,28 @@ impl BitcoindClient {
.unwrap();
}

pub async fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> SignedTx {
pub fn sign_raw_transaction_with_wallet(
&self, tx_hex: String,
) -> impl Future<Output = SignedTx> {
let tx_hex_json = serde_json::json!(tx_hex);
self.bitcoind_rpc_client
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
.await
.unwrap()
let rpc_client = self.get_new_rpc_client();
async move {
rpc_client
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
.await
.unwrap()
}
}

pub async fn get_new_address(&self) -> Address {
pub fn get_new_address(&self) -> impl Future<Output = Address> {
let addr_args = vec![serde_json::json!("LDK output address")];
let addr = self
.bitcoind_rpc_client
.call_method::<NewAddress>("getnewaddress", &addr_args)
.await
.unwrap();
Address::from_str(addr.0.as_str()).unwrap().require_network(self.network).unwrap()
let network = self.network;
let rpc_client = self.get_new_rpc_client();
async move {
let addr =
rpc_client.call_method::<NewAddress>("getnewaddress", &addr_args).await.unwrap();
Address::from_str(addr.0.as_str()).unwrap().require_network(network).unwrap()
}
}

pub async fn get_blockchain_info(&self) -> BlockchainInfo {
Expand All @@ -298,11 +351,11 @@ impl BitcoindClient {
.unwrap()
}

pub async fn list_unspent(&self) -> ListUnspentResponse {
self.bitcoind_rpc_client
.call_method::<ListUnspentResponse>("listunspent", &vec![])
.await
.unwrap()
pub fn list_unspent(&self) -> impl Future<Output = ListUnspentResponse> {
let rpc_client = self.get_new_rpc_client();
async move {
rpc_client.call_method::<ListUnspentResponse>("listunspent", &vec![]).await.unwrap()
}
}
}

Expand All @@ -324,7 +377,7 @@ impl BroadcasterInterface for BitcoindClient {
let txn = txs.iter().map(|tx| encode::serialize_hex(tx)).collect::<Vec<_>>();
let bitcoind_rpc_client = Arc::clone(&self.bitcoind_rpc_client);
let logger = Arc::clone(&self.logger);
self.handle.spawn(async move {
self.main_runtime_handle.spawn(async move {
let res = if txn.len() == 1 {
let tx_json = serde_json::json!(txn[0]);
bitcoind_rpc_client
Expand Down Expand Up @@ -355,17 +408,15 @@ impl BroadcasterInterface for BitcoindClient {

impl ChangeDestinationSource for BitcoindClient {
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
tokio::task::block_in_place(move || {
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
})
let future = self.get_new_address();
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
}
}

impl WalletSource for BitcoindClient {
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()> {
let utxos = tokio::task::block_in_place(move || {
self.handle.block_on(async move { self.list_unspent().await }).0
});
let future = self.list_unspent();
let utxos = self.run_future_in_blocking_context(async move { future.await.0 });
Ok(utxos
.into_iter()
.filter_map(|utxo| {
Expand Down Expand Up @@ -398,18 +449,16 @@ impl WalletSource for BitcoindClient {
}

fn get_change_script(&self) -> Result<ScriptBuf, ()> {
tokio::task::block_in_place(move || {
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
})
let future = self.get_new_address();
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
}

fn sign_psbt(&self, tx: Psbt) -> Result<Transaction, ()> {
let mut tx_bytes = Vec::new();
let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ());
let tx_hex = hex_utils::hex_str(&tx_bytes);
let signed_tx = tokio::task::block_in_place(move || {
self.handle.block_on(async move { self.sign_raw_transaction_with_wallet(tx_hex).await })
});
let future = self.sign_raw_transaction_with_wallet(tx_hex);
let signed_tx = self.run_future_in_blocking_context(async move { future.await });
let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?;
Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ())
}
Expand Down

0 comments on commit f7eb75b

Please sign in to comment.