From 433b46a15ed1483c60dd1489d2fe039bd5934ccc Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Tue, 14 Nov 2023 12:09:07 -0500 Subject: [PATCH] chore: error handling around retry, wasms --- Cross.toml | 10 ++++- homestar-runtime/src/scheduler.rs | 1 + homestar-runtime/src/settings.rs | 2 +- homestar-runtime/src/tasks/fetch.rs | 51 ++++++++++++++++------- homestar-runtime/src/worker.rs | 21 +++++++--- homestar-runtime/src/workflow/settings.rs | 2 +- homestar-wasm/src/wasmtime/error.rs | 2 +- homestar-wasm/src/wasmtime/world.rs | 2 +- 8 files changed, 66 insertions(+), 25 deletions(-) diff --git a/Cross.toml b/Cross.toml index 688d1ac7..ff92d71f 100644 --- a/Cross.toml +++ b/Cross.toml @@ -6,11 +6,19 @@ passthrough = [ "RUSTFLAGS", ] +# When running `cross` with nix, do this within `nix-shell -p gcc rustup`. +# +# Then, run +# +# `cross build -p homestar-runtime --target x86_64-unknown-linux-musl` +# or +# `cross build -p homestar-runtime --target aarch64-unknown-linux-musl` + [target.x86_64-unknown-linux-musl] image = "burntsushi/cross:x86_64-unknown-linux-musl" [target.aarch64-unknown-linux-musl] -image = "burntsushi/cross:aarch64-unknown-linux-gnu" +image = "burntsushi/cross:aarch64-unknown-linux-musl" [target.x86_64-apple-darwin] image = "freeznet/x86_64-apple-darwin-cross:11.3" diff --git a/homestar-runtime/src/scheduler.rs b/homestar-runtime/src/scheduler.rs index df3f48bf..0dcc2f1e 100644 --- a/homestar-runtime/src/scheduler.rs +++ b/homestar-runtime/src/scheduler.rs @@ -171,6 +171,7 @@ impl<'a> TaskScheduler<'a> { .into_iter() .map(|(_, rsc)| rsc.to_owned()) .collect(); + let fetched = fetch_fn(resources_to_fetch).await?; match resume { diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index 2111a585..d29f8704 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -294,7 +294,7 @@ impl Default for Network { rpc_max_connections: 10, rpc_port: 3030, rpc_server_timeout: Duration::new(120, 0), - transport_connection_timeout: Duration::new(20, 0), + transport_connection_timeout: Duration::new(60, 0), webserver_host: Uri::from_static("127.0.0.1"), webserver_port: 1337, webserver_timeout: Duration::new(120, 0), diff --git a/homestar-runtime/src/tasks/fetch.rs b/homestar-runtime/src/tasks/fetch.rs index bf99f0be..79319e99 100644 --- a/homestar-runtime/src/tasks/fetch.rs +++ b/homestar-runtime/src/tasks/fetch.rs @@ -10,6 +10,7 @@ use anyhow::Result; use fnv::FnvHashSet; use indexmap::IndexMap; use std::sync::Arc; +use tracing::{info, warn}; pub(crate) struct Fetch; @@ -29,27 +30,49 @@ impl Fetch { ) -> Result>> { use futures::{stream::FuturesUnordered, TryStreamExt}; let settings = settings.as_ref(); + let retries = settings.retries; let tasks = FuturesUnordered::new(); for rsc in resources.iter() { - tracing::info!(rsc = rsc.to_string(), "Fetching resource"); - let task = tryhard::retry_fn(|| async { Self::fetch(rsc.clone(), ipfs.clone()).await }) - .with_config( - tryhard::RetryFutureConfig::new(settings.retries) - .exponential_backoff(settings.retry_initial_delay) - .max_delay(settings.retry_max_delay), + let task = tryhard::retry_fn(|| async { + info!( + rsc = rsc.to_string(), + "attempting to fetch resource from IPFS" ); + Self::fetch(rsc.clone(), ipfs.clone()).await + }) + .retries(retries) + .exponential_backoff(settings.retry_initial_delay) + .max_delay(settings.retry_max_delay) + .on_retry(|attempts, next_delay, error| { + let err = error.to_string(); + async move { + if attempts < retries { + warn!( + err = err, + attempts = attempts, + "retrying fetch after error @ {}ms", + next_delay.map(|d| d.as_millis()).unwrap_or(0) + ); + } else { + warn!(err = err, attempts = attempts, "maxed out # of retries"); + } + } + }); tasks.push(task); } - tasks.try_collect::>().await?.into_iter().try_fold( - IndexMap::default(), - |mut acc, res| { - let answer = res.1?; - acc.insert(res.0, answer); + info!("fetching necessary resources from IPFS"); + if let Ok(vec) = tasks.try_collect::>().await { + vec.into_iter() + .try_fold(IndexMap::default(), |mut acc, res| { + let answer = res.1?; + acc.insert(res.0, answer); - Ok::<_, anyhow::Error>(acc) - }, - ) + Ok::<_, anyhow::Error>(acc) + }) + } else { + Err(anyhow::anyhow!("Failed to fetch resources from IPFS")) + } } /// Gather resources via URLs, leveraging an exponential backoff. diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index 0f4b26e7..15a73e1a 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -147,14 +147,19 @@ where where F: FnOnce(FnvHashSet) -> BoxFuture<'a, Result>>>, { - let scheduler_ctx = TaskScheduler::init( + match TaskScheduler::init( self.graph.clone(), // Arc'ed &mut self.db.conn()?, fetch_fn, ) - .await?; - - self.run_queue(scheduler_ctx.scheduler, running_tasks).await + .await + { + Ok(ctx) => self.run_queue(ctx.scheduler, running_tasks).await, + Err(err) => { + error!(err=?err, "error initializing scheduler"); + Err(anyhow!("error initializing scheduler")) + } + } } #[allow(unused_mut)] @@ -185,7 +190,7 @@ where info!( workflow_cid = workflow_cid.to_string(), cid = cid.to_string(), - "resolving cid" + "attempting to resolve cid in workflow" ); if let Some(result) = linkmap.read().await.get(&cid) { @@ -369,7 +374,11 @@ where receipt_meta, additional_meta, )), - Err(e) => Err(anyhow!("cannot execute wasm module: {e}")), + Err(err) => Err( + anyhow!("cannot execute wasm module: {err}")) + .with_context(|| { + format!("not able to run fn {fun} for promised cid: {instruction_ptr}, in workflow {workflow_cid}") + }), } }); handles.push(handle); diff --git a/homestar-runtime/src/workflow/settings.rs b/homestar-runtime/src/workflow/settings.rs index cba18188..b2dbd834 100644 --- a/homestar-runtime/src/workflow/settings.rs +++ b/homestar-runtime/src/workflow/settings.rs @@ -18,7 +18,7 @@ pub struct Settings { impl Default for Settings { fn default() -> Self { Self { - retries: 10, + retries: 3, retry_max_delay: Duration::new(60, 0), retry_initial_delay: Duration::from_millis(500), p2p_timeout: Duration::new(5, 0), diff --git a/homestar-wasm/src/wasmtime/error.rs b/homestar-wasm/src/wasmtime/error.rs index 7e327579..a11147d8 100644 --- a/homestar-wasm/src/wasmtime/error.rs +++ b/homestar-wasm/src/wasmtime/error.rs @@ -40,7 +40,7 @@ pub enum Error { #[error(transparent)] WasmRuntime(#[from] anyhow::Error), /// Failure to find Wasm function for execution. - #[error("Wasm function {0} not found")] + #[error("Wasm function {0} not found in given Wasm component/resource")] WasmFunctionNotFound(String), /// [Wat] as Wasm component error. /// diff --git a/homestar-wasm/src/wasmtime/world.rs b/homestar-wasm/src/wasmtime/world.rs index 18f95e41..6338ff12 100644 --- a/homestar-wasm/src/wasmtime/world.rs +++ b/homestar-wasm/src/wasmtime/world.rs @@ -121,7 +121,7 @@ impl Env { }, Input::Deferred(await_promise) => { bail!(Error::ResolvePromise(ResolveError::UnresolvedCid(format!( - "deferred task not yet resolved for {}: {}", + "deferred task/instruction not yet resolved or exists for promise: {}: {}", await_promise.result(), await_promise.instruction_cid() ))))