Skip to content

Commit

Permalink
Dockerize (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazureykis authored Jan 5, 2024
1 parent 7824734 commit 6bd84a2
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 44 deletions.
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/target
config.json
config_all.json
TODO.local
Dockerfile
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM rust:1-slim-buster AS builder
WORKDIR /app
ADD . /app
RUN cargo build --release


FROM debian:buster-slim
WORKDIR /app
COPY --from=builder /app/target/release/doppler-swarm /app/doppler-swarm
ENV RUST_LOG=info
CMD ["/app/doppler-swarm"]
25 changes: 23 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ async fn main() -> crate::result::Result<()> {

let config = config::read_config()?;

let (tx, rx) = tokio::sync::watch::channel(false);

log::info!("Starting {} watchers...", config.watchers.len());

let mut handles = Vec::with_capacity(config.watchers.len());
Expand All @@ -24,8 +26,9 @@ async fn main() -> crate::result::Result<()> {
let mut startup_handles = Vec::with_capacity(config.watchers.len());

for watcher in config.watchers {
let rx = rx.clone();
let startup_handle = tokio::spawn(async move {
let fetcher = worker::Worker::new(watcher.clone());
let fetcher = worker::Worker::new(watcher.clone(), rx);
if let Err(e) = fetcher.sync_secrets().await {
let error_msg = format!("[{}] Failed to sync secrets: {}", &watcher.name, e);
log::error!("{error_msg}");
Expand All @@ -43,7 +46,7 @@ async fn main() -> crate::result::Result<()> {

for fetcher_result in fetchers {
match fetcher_result {
Ok(Ok(fetcher)) => {
Ok(Ok(mut fetcher)) => {
let handle = tokio::spawn(async move {
fetcher.run().await;
});
Expand All @@ -63,6 +66,24 @@ async fn main() -> crate::result::Result<()> {
}
}

tokio::spawn(async move {
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.expect("Failed to create SIGINT signal handler");
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Failed to create SIGTERM signal handler");

tokio::select! {
_ = sigint.recv() => {
log::info!("Received SIGINT, shutting down...");
tx.send(true).expect("Failed to send shutdown signal");
}
_ = sigterm.recv() => {
log::info!("Received SIGTERM, shutting down...");
tx.send(true).expect("Failed to send shutdown signal");
}
}
});

futures::future::join_all(handles).await;

log::info!("Done.");
Expand Down
101 changes: 59 additions & 42 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use tokio::time::timeout;
pub struct Worker {
watcher: config::Watcher,
http: reqwest::Client,
stop: tokio::sync::watch::Receiver<bool>,
wanna_stop: bool,
}

pub fn should_update_docker_service(
Expand All @@ -29,21 +31,28 @@ pub fn should_update_docker_service(
}

impl Worker {
pub fn new(watcher: config::Watcher) -> Self {
pub fn new(watcher: config::Watcher, stop: tokio::sync::watch::Receiver<bool>) -> Self {
let http = reqwest::ClientBuilder::new()
.use_rustls_tls()
.connect_timeout(std::time::Duration::from_secs(10))
.build()
.expect("Cannot build http client");

Self { watcher, http }
Self {
watcher,
http,
stop,
wanna_stop: false,
}
}

pub async fn run(&self) {
loop {
pub async fn run(&mut self) {
while !self.wanna_stop {
if let Err(e) = self.watch_for_updates().await {
log::warn!("{e}");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
if !self.wanna_stop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}
Expand Down Expand Up @@ -74,7 +83,7 @@ impl Worker {
Ok(())
}

pub async fn watch_for_updates(&self) -> crate::result::Result<()> {
pub async fn watch_for_updates(&mut self) -> crate::result::Result<()> {
let response = self
.http
.get("https://api.doppler.com/v3/configs/config/secrets/watch?include_dynamic_secrets=false&include_managed_secrets=false")
Expand All @@ -87,47 +96,55 @@ impl Worker {
let mut buf: Vec<u8> = Vec::with_capacity(1024);

loop {
// Doppler sends ping event every 30 seconds.
// If we don't receive any events for 60 seconds, we assume that the connection is dead.
match timeout(std::time::Duration::from_secs(60), stream.next()).await {
Ok(Some(Ok(item))) => {
buf.extend_from_slice(&item);
if !buf.ends_with(b"\n\n") {
continue;
}

let buf_copy: Bytes = Bytes::copy_from_slice(&buf);
buf.clear();
match parse_watch_event(&buf_copy) {
Ok(WatchEvent::SecretsUpdate) => {
self.sync_secrets().await?;
}
Ok(WatchEvent::Ping) => {
log::debug!("[{}] Received event: Ping", &self.watcher.name);
tokio::select! {
_ = self.stop.changed() => {
self.wanna_stop = *self.stop.borrow();
return Ok(());
}
// Doppler sends ping event every 30 seconds.
// If we don't receive any events for 60 seconds, we assume that the connection is dead.
resp = timeout(std::time::Duration::from_secs(60), stream.next()) => {
match resp {
Ok(Some(Ok(item))) => {
buf.extend_from_slice(&item);
if !buf.ends_with(b"\n\n") {
continue;
}

let buf_copy: Bytes = Bytes::copy_from_slice(&buf);
buf.clear();
match parse_watch_event(&buf_copy) {
Ok(WatchEvent::SecretsUpdate) => {
self.sync_secrets().await?;
}
Ok(WatchEvent::Ping) => {
log::debug!("[{}] Received event: Ping", &self.watcher.name);
}
Ok(WatchEvent::Connected) => {
log::info!("[{}] Received event: Connected", &self.watcher.name);
}
Err(e) => {
return Err(e);
}
}
}
Ok(WatchEvent::Connected) => {
log::info!("[{}] Received event: Connected", &self.watcher.name);
Ok(Some(Err(e))) => {
return Err(format!(
"[{}] Failed to read watch stream: {}",
&self.watcher.name, e
)
.into())
}
Err(e) => {
return Err(e);
Ok(None) => return Err("Watch stream ended unexpectedly".into()),
Err(_) => {
return Err(format!(
"[{}] Watch stream timed out after 60 seconds",
&self.watcher.name
)
.into());
}
}
}
Ok(Some(Err(e))) => {
return Err(format!(
"[{}] Failed to read watch stream: {}",
&self.watcher.name, e
)
.into())
}
Ok(None) => return Err("Watch stream ended unexpectedly".into()),
Err(_) => {
return Err(format!(
"[{}] Watch stream timed out after 60 seconds",
&self.watcher.name
)
.into());
}
}
}
}
Expand Down

0 comments on commit 6bd84a2

Please sign in to comment.