From a348af9630b602e5bde6893adf9fa67b1666712d Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Tue, 22 Oct 2024 14:20:04 +0200 Subject: [PATCH] main loop listen also to block notify from zmq --- src/bin/electrs.rs | 4 +-- src/bin/tx-fingerprint-stats.rs | 2 +- src/new_index/zmq.rs | 3 +-- src/signal.rs | 44 ++++++++++++++++++++++++--------- tests/common.rs | 2 +- 5 files changed, 38 insertions(+), 17 deletions(-) diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index b98badacc..9a7b4a25b 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -41,12 +41,12 @@ fn fetch_from(config: &Config, store: &Store) -> FetchFrom { } fn run_server(config: Arc) -> Result<()> { - let signal = Waiter::start(); + let (block_hash_notify, signal) = Waiter::start(); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); if let Some(zmq_addr) = config.zmq_addr.as_ref() { - zmq::start(&format!("tcp://{zmq_addr}"), None); + zmq::start(&format!("tcp://{zmq_addr}"), Some(block_hash_notify)); } let daemon = Arc::new(Daemon::new( diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index afe980f8c..2587262fe 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -21,7 +21,7 @@ fn main() { util::has_prevout, }; - let signal = Waiter::start(); + let signal = Waiter::start().1; let config = Config::from_args(); let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config)); diff --git a/src/new_index/zmq.rs b/src/new_index/zmq.rs index 5315b4264..5c35c9b21 100644 --- a/src/new_index/zmq.rs +++ b/src/new_index/zmq.rs @@ -1,6 +1,5 @@ -use std::sync::mpsc::Sender; - use bitcoin::{hashes::Hash, BlockHash}; +use crossbeam_channel::Sender; use crate::util::spawn_thread; diff --git a/src/signal.rs b/src/signal.rs index a36688181..16e3e57d0 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -1,4 +1,5 @@ -use crossbeam_channel::{self as channel, after, select}; +use bitcoin::BlockHash; +use crossbeam_channel::{self as channel, after, select, Sender}; use std::thread; use std::time::{Duration, Instant}; @@ -9,6 +10,7 @@ use crate::errors::*; #[derive(Clone)] // so multiple threads could wait on signals pub struct Waiter { receiver: channel::Receiver, + zmq_receiver: channel::Receiver, } fn notify(signals: &[i32]) -> channel::Receiver { @@ -25,34 +27,54 @@ fn notify(signals: &[i32]) -> channel::Receiver { } impl Waiter { - pub fn start() -> Waiter { - Waiter { - receiver: notify(&[ - SIGINT, SIGTERM, - SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`) - ]), - } + pub fn start() -> (Sender, Waiter) { + let (block_hash_notify, block_hash_receive) = channel::bounded(1); + + ( + block_hash_notify, + Waiter { + receiver: notify(&[ + SIGINT, SIGTERM, + SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`) + ]), + zmq_receiver: block_hash_receive, + }, + ) } - pub fn wait(&self, duration: Duration, accept_sigusr: bool) -> Result<()> { + pub fn wait(&self, duration: Duration, accept_block_notification: bool) -> Result<()> { let start = Instant::now(); select! { recv(self.receiver) -> msg => { match msg { Ok(sig) if sig == SIGUSR1 => { trace!("notified via SIGUSR1"); - if accept_sigusr { + if accept_block_notification { Ok(()) } else { let wait_more = duration.saturating_sub(start.elapsed()); - self.wait(wait_more, accept_sigusr) + self.wait(wait_more, accept_block_notification) } } Ok(sig) => bail!(ErrorKind::Interrupt(sig)), Err(_) => bail!("signal hook channel disconnected"), } }, + recv(self.zmq_receiver) -> msg => { + match msg { + Ok(_) => { + if accept_block_notification { + Ok(()) + } else { + let wait_more = duration.saturating_sub(start.elapsed()); + self.wait(wait_more, accept_block_notification) + } + } + Err(_) => bail!("signal hook channel disconnected"), + } + }, recv(after(duration)) -> _ => Ok(()), + } } } diff --git a/tests/common.rs b/tests/common.rs index 66c7b6f79..95bfa55b5 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -123,7 +123,7 @@ impl TestRunner { //tor_proxy: Option, }); - let signal = Waiter::start(); + let signal = Waiter::start().1; let metrics = Metrics::new(rand_available_addr()); metrics.start();