diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..7da8766 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +/target +config.json +config_all.json +TODO.local +Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..94c7710 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM rust:1-slim-buster AS builder +WORKDIR /app +ADD . /app +RUN cargo build --release + + +FROM debian:buster-slim +WORKDIR /app +COPY --from=builder /app/target/release/doppler-swarm /app/doppler-swarm +ENV RUST_LOG=info +CMD ["/app/doppler-swarm"] diff --git a/src/main.rs b/src/main.rs index d6d03c9..32014e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,8 @@ async fn main() -> crate::result::Result<()> { let config = config::read_config()?; + let (tx, rx) = tokio::sync::watch::channel(false); + log::info!("Starting {} watchers...", config.watchers.len()); let mut handles = Vec::with_capacity(config.watchers.len()); @@ -24,8 +26,9 @@ async fn main() -> crate::result::Result<()> { let mut startup_handles = Vec::with_capacity(config.watchers.len()); for watcher in config.watchers { + let rx = rx.clone(); let startup_handle = tokio::spawn(async move { - let fetcher = worker::Worker::new(watcher.clone()); + let fetcher = worker::Worker::new(watcher.clone(), rx); if let Err(e) = fetcher.sync_secrets().await { let error_msg = format!("[{}] Failed to sync secrets: {}", &watcher.name, e); log::error!("{error_msg}"); @@ -43,7 +46,7 @@ async fn main() -> crate::result::Result<()> { for fetcher_result in fetchers { match fetcher_result { - Ok(Ok(fetcher)) => { + Ok(Ok(mut fetcher)) => { let handle = tokio::spawn(async move { fetcher.run().await; }); @@ -63,6 +66,24 @@ async fn main() -> crate::result::Result<()> { } } + tokio::spawn(async move { + let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) + .expect("Failed to create SIGINT signal handler"); + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("Failed to create SIGTERM signal handler"); + + tokio::select! { + _ = sigint.recv() => { + log::info!("Received SIGINT, shutting down..."); + tx.send(true).expect("Failed to send shutdown signal"); + } + _ = sigterm.recv() => { + log::info!("Received SIGTERM, shutting down..."); + tx.send(true).expect("Failed to send shutdown signal"); + } + } + }); + futures::future::join_all(handles).await; log::info!("Done."); diff --git a/src/worker.rs b/src/worker.rs index 14b306c..c24cac2 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -11,6 +11,8 @@ use tokio::time::timeout; pub struct Worker { watcher: config::Watcher, http: reqwest::Client, + stop: tokio::sync::watch::Receiver, + wanna_stop: bool, } pub fn should_update_docker_service( @@ -29,21 +31,28 @@ pub fn should_update_docker_service( } impl Worker { - pub fn new(watcher: config::Watcher) -> Self { + pub fn new(watcher: config::Watcher, stop: tokio::sync::watch::Receiver) -> Self { let http = reqwest::ClientBuilder::new() .use_rustls_tls() .connect_timeout(std::time::Duration::from_secs(10)) .build() .expect("Cannot build http client"); - Self { watcher, http } + Self { + watcher, + http, + stop, + wanna_stop: false, + } } - pub async fn run(&self) { - loop { + pub async fn run(&mut self) { + while !self.wanna_stop { if let Err(e) = self.watch_for_updates().await { log::warn!("{e}"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + if !self.wanna_stop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } } } } @@ -74,7 +83,7 @@ impl Worker { Ok(()) } - pub async fn watch_for_updates(&self) -> crate::result::Result<()> { + pub async fn watch_for_updates(&mut self) -> crate::result::Result<()> { let response = self .http .get("https://api.doppler.com/v3/configs/config/secrets/watch?include_dynamic_secrets=false&include_managed_secrets=false") @@ -87,47 +96,55 @@ impl Worker { let mut buf: Vec = Vec::with_capacity(1024); loop { - // Doppler sends ping event every 30 seconds. - // If we don't receive any events for 60 seconds, we assume that the connection is dead. - match timeout(std::time::Duration::from_secs(60), stream.next()).await { - Ok(Some(Ok(item))) => { - buf.extend_from_slice(&item); - if !buf.ends_with(b"\n\n") { - continue; - } - - let buf_copy: Bytes = Bytes::copy_from_slice(&buf); - buf.clear(); - match parse_watch_event(&buf_copy) { - Ok(WatchEvent::SecretsUpdate) => { - self.sync_secrets().await?; - } - Ok(WatchEvent::Ping) => { - log::debug!("[{}] Received event: Ping", &self.watcher.name); + tokio::select! { + _ = self.stop.changed() => { + self.wanna_stop = *self.stop.borrow(); + return Ok(()); + } + // Doppler sends ping event every 30 seconds. + // If we don't receive any events for 60 seconds, we assume that the connection is dead. + resp = timeout(std::time::Duration::from_secs(60), stream.next()) => { + match resp { + Ok(Some(Ok(item))) => { + buf.extend_from_slice(&item); + if !buf.ends_with(b"\n\n") { + continue; + } + + let buf_copy: Bytes = Bytes::copy_from_slice(&buf); + buf.clear(); + match parse_watch_event(&buf_copy) { + Ok(WatchEvent::SecretsUpdate) => { + self.sync_secrets().await?; + } + Ok(WatchEvent::Ping) => { + log::debug!("[{}] Received event: Ping", &self.watcher.name); + } + Ok(WatchEvent::Connected) => { + log::info!("[{}] Received event: Connected", &self.watcher.name); + } + Err(e) => { + return Err(e); + } + } } - Ok(WatchEvent::Connected) => { - log::info!("[{}] Received event: Connected", &self.watcher.name); + Ok(Some(Err(e))) => { + return Err(format!( + "[{}] Failed to read watch stream: {}", + &self.watcher.name, e + ) + .into()) } - Err(e) => { - return Err(e); + Ok(None) => return Err("Watch stream ended unexpectedly".into()), + Err(_) => { + return Err(format!( + "[{}] Watch stream timed out after 60 seconds", + &self.watcher.name + ) + .into()); } } } - Ok(Some(Err(e))) => { - return Err(format!( - "[{}] Failed to read watch stream: {}", - &self.watcher.name, e - ) - .into()) - } - Ok(None) => return Err("Watch stream ended unexpectedly".into()), - Err(_) => { - return Err(format!( - "[{}] Watch stream timed out after 60 seconds", - &self.watcher.name - ) - .into()); - } } } }