Skip to content

Commit

Permalink
main loop listen also to block notify from zmq
Browse files Browse the repository at this point in the history
  • Loading branch information
RCasatta committed Oct 22, 2024
1 parent 1e37f20 commit 7587165
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ fn fetch_from(config: &Config, store: &Store) -> FetchFrom {
}

fn run_server(config: Arc<Config>) -> Result<()> {
let signal = Waiter::start();
let (block_hash_notify, signal) = Waiter::start();
let metrics = Metrics::new(config.monitoring_addr);
metrics.start();

if let Some(zmq_addr) = config.zmq_addr.as_ref() {
zmq::start(&format!("tcp://{zmq_addr}"), None);
zmq::start(&format!("tcp://{zmq_addr}"), Some(block_hash_notify));
}

let daemon = Arc::new(Daemon::new(
Expand Down
2 changes: 1 addition & 1 deletion src/bin/tx-fingerprint-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {
util::has_prevout,
};

let signal = Waiter::start();
let signal = Waiter::start().1;
let config = Config::from_args();
let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config));

Expand Down
3 changes: 1 addition & 2 deletions src/new_index/zmq.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::mpsc::Sender;

use bitcoin::{hashes::Hash, BlockHash};
use crossbeam_channel::Sender;

use crate::util::spawn_thread;

Expand Down
44 changes: 33 additions & 11 deletions src/signal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crossbeam_channel::{self as channel, after, select};
use bitcoin::BlockHash;
use crossbeam_channel::{self as channel, after, select, Sender};
use std::thread;
use std::time::{Duration, Instant};

Expand All @@ -9,6 +10,7 @@ use crate::errors::*;
#[derive(Clone)] // so multiple threads could wait on signals
pub struct Waiter {
receiver: channel::Receiver<i32>,
zmq_receiver: channel::Receiver<BlockHash>,
}

fn notify(signals: &[i32]) -> channel::Receiver<i32> {
Expand All @@ -25,34 +27,54 @@ fn notify(signals: &[i32]) -> channel::Receiver<i32> {
}

impl Waiter {
pub fn start() -> Waiter {
Waiter {
receiver: notify(&[
SIGINT, SIGTERM,
SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`)
]),
}
pub fn start() -> (Sender<BlockHash>, Waiter) {
let (block_hash_notify, block_hash_receive) = channel::bounded(1);

(
block_hash_notify,
Waiter {
receiver: notify(&[
SIGINT, SIGTERM,
SIGUSR1, // allow external triggering (e.g. via bitcoind `blocknotify`)
]),
zmq_receiver: block_hash_receive,
},
)
}

pub fn wait(&self, duration: Duration, accept_sigusr: bool) -> Result<()> {
pub fn wait(&self, duration: Duration, accept_block_notification: bool) -> Result<()> {
let start = Instant::now();
select! {
recv(self.receiver) -> msg => {
match msg {
Ok(sig) if sig == SIGUSR1 => {
trace!("notified via SIGUSR1");
if accept_sigusr {
if accept_block_notification {
Ok(())
} else {
let wait_more = duration.saturating_sub(start.elapsed());
self.wait(wait_more, accept_sigusr)
self.wait(wait_more, accept_block_notification)
}
}
Ok(sig) => bail!(ErrorKind::Interrupt(sig)),
Err(_) => bail!("signal hook channel disconnected"),
}
},
recv(self.zmq_receiver) -> msg => {
match msg {
Ok(_) => {
if accept_block_notification {
Ok(())
} else {
let wait_more = duration.saturating_sub(start.elapsed());
self.wait(wait_more, accept_block_notification)
}
}
Err(_) => bail!("signal hook channel disconnected"),
}
},
recv(after(duration)) -> _ => Ok(()),

}
}
}
2 changes: 1 addition & 1 deletion tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl TestRunner {
//tor_proxy: Option<std::net::SocketAddr>,
});

let signal = Waiter::start();
let signal = Waiter::start().1;
let metrics = Metrics::new(rand_available_addr());
metrics.start();

Expand Down

0 comments on commit 7587165

Please sign in to comment.