Skip to content

Commit

Permalink
add signal handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Oct 18, 2024
1 parent 82b4ca4 commit 318996f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 3 deletions.
50 changes: 48 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
snmalloc-rs = "0.3.6"
tokio = { version = "1.40.0", features = [
"sync",
"macros",
"parking_lot",
"rt-multi-thread",
"time",
"signal",
"sync",
] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = [
Expand Down
3 changes: 3 additions & 0 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,12 @@ async fn run() -> anyhow::Result<()> {
.into_iter()
.map(|e| (format!("{}/{}", e.topic(), e.partition()), 0))
.collect();
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
loop {
tokio::select! {
biased;
_ = tokio::signal::ctrl_c() => anyhow::bail!("exit"),
_ = sigterm.recv() => anyhow::bail!("exit"),
_ = consumer.recv() => anyhow::bail!("message received from split consumer"),
_ = select_all(&mut partition_consumers) => anyhow::bail!("parition consumer exit"),
_ = source_msg_rx.recv_many(&mut msg_buffer, msg_buffer_limit) => {
Expand Down

0 comments on commit 318996f

Please sign in to comment.