Skip to content

Commit

Permalink
Parallel startup (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazureykis authored Jan 5, 2024
1 parent 60b996c commit d925562
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
52 changes: 37 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use crate::worker::Worker;
use tokio::task::JoinError;

mod config;
mod docker;
mod error;
Expand All @@ -15,29 +18,48 @@ async fn main() -> crate::result::Result<()> {

log::info!("Starting {} watchers...", config.watchers.len());

let mut handles = vec![];
let mut handles = Vec::with_capacity(config.watchers.len());

{
let mut fetchers = Vec::with_capacity(config.watchers.len());
let mut startup_handles = Vec::with_capacity(config.watchers.len());

for watcher in config.watchers {
let fetcher = worker::Worker::new(watcher.clone());
if let Err(e) = fetcher.sync_secrets().await {
return Err(error::Error::from(format!(
"[{}] Failed to sync secrets: {}",
&watcher.name, e
)));
}
let startup_handle = tokio::spawn(async move {
let fetcher = worker::Worker::new(watcher.clone());
if let Err(e) = fetcher.sync_secrets().await {
let error_msg = format!("[{}] Failed to sync secrets: {}", &watcher.name, e);
log::error!("{error_msg}");
return Err(error_msg.into());
}

Ok(fetcher)
});

fetchers.push(fetcher);
startup_handles.push(startup_handle);
}

for fetcher in fetchers {
let handle = tokio::spawn(async move {
fetcher.run().await;
});
let fetchers: Vec<Result<crate::result::Result<Worker>, JoinError>> =
futures::future::join_all(startup_handles).await;

for fetcher_result in fetchers {
match fetcher_result {
Ok(Ok(fetcher)) => {
let handle = tokio::spawn(async move {
fetcher.run().await;
});

handles.push(handle);
handles.push(handle);
}
Ok(Err(e)) => {
log::error!("Failed to start watcher: {e}");
return Err(e);
}
_ => {
let error_msg = format!("Failed to start watcher: {:?}", fetcher_result);
log::error!("{error_msg}");
return Err(error_msg.into());
}
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use futures::StreamExt;

use crate::{
config,
secrets::fetch_secrets,
watch::{parse_watch_event, WatchEvent},
};
use futures::StreamExt;

#[derive(Debug, Clone)]
pub struct Worker {
watcher: config::Watcher,
http: reqwest::Client,
Expand Down Expand Up @@ -35,7 +35,7 @@ impl Worker {
pub async fn run(&self) {
loop {
if let Err(e) = self.watch_for_updates().await {
log::error!(
log::warn!(
"[{}] Failed to watch for updates: {}",
&self.watcher.name,
e
Expand Down

0 comments on commit d925562

Please sign in to comment.