From d92556222afca46a220e8e1e0150ebbba759a5e6 Mon Sep 17 00:00:00 2001 From: Pavel Lazureykis Date: Fri, 5 Jan 2024 14:50:15 -0500 Subject: [PATCH] Parallel startup (#7) --- src/main.rs | 52 ++++++++++++++++++++++++++++++++++++--------------- src/worker.rs | 6 +++--- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/src/main.rs b/src/main.rs index b149c35..d6d03c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,6 @@ +use crate::worker::Worker; +use tokio::task::JoinError; + mod config; mod docker; mod error; @@ -15,29 +18,48 @@ async fn main() -> crate::result::Result<()> { log::info!("Starting {} watchers...", config.watchers.len()); - let mut handles = vec![]; + let mut handles = Vec::with_capacity(config.watchers.len()); { - let mut fetchers = Vec::with_capacity(config.watchers.len()); + let mut startup_handles = Vec::with_capacity(config.watchers.len()); for watcher in config.watchers { - let fetcher = worker::Worker::new(watcher.clone()); - if let Err(e) = fetcher.sync_secrets().await { - return Err(error::Error::from(format!( - "[{}] Failed to sync secrets: {}", - &watcher.name, e - ))); - } + let startup_handle = tokio::spawn(async move { + let fetcher = worker::Worker::new(watcher.clone()); + if let Err(e) = fetcher.sync_secrets().await { + let error_msg = format!("[{}] Failed to sync secrets: {}", &watcher.name, e); + log::error!("{error_msg}"); + return Err(error_msg.into()); + } + + Ok(fetcher) + }); - fetchers.push(fetcher); + startup_handles.push(startup_handle); } - for fetcher in fetchers { - let handle = tokio::spawn(async move { - fetcher.run().await; - }); + let fetchers: Vec, JoinError>> = + futures::future::join_all(startup_handles).await; + + for fetcher_result in fetchers { + match fetcher_result { + Ok(Ok(fetcher)) => { + let handle = tokio::spawn(async move { + fetcher.run().await; + }); - handles.push(handle); + handles.push(handle); + } + Ok(Err(e)) => { + log::error!("Failed to start watcher: {e}"); + return Err(e); + } + _ => { + let error_msg = format!("Failed to start watcher: {:?}", fetcher_result); + log::error!("{error_msg}"); + return Err(error_msg.into()); + } + } } } diff --git a/src/worker.rs b/src/worker.rs index 27597a1..3579b02 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,11 +1,11 @@ -use futures::StreamExt; - use crate::{ config, secrets::fetch_secrets, watch::{parse_watch_event, WatchEvent}, }; +use futures::StreamExt; +#[derive(Debug, Clone)] pub struct Worker { watcher: config::Watcher, http: reqwest::Client, @@ -35,7 +35,7 @@ impl Worker { pub async fn run(&self) { loop { if let Err(e) = self.watch_for_updates().await { - log::error!( + log::warn!( "[{}] Failed to watch for updates: {}", &self.watcher.name, e