Skip to content

Commit

Permalink
chore: error handling around retry, wasms
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanlakhani committed Nov 14, 2023
1 parent 75cf9a3 commit 433b46a
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 25 deletions.
10 changes: 9 additions & 1 deletion Cross.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@ passthrough = [
"RUSTFLAGS",
]

# When running `cross` with nix, do this within `nix-shell -p gcc rustup`.
#
# Then, run
#
# `cross build -p homestar-runtime --target x86_64-unknown-linux-musl`
# or
# `cross build -p homestar-runtime --target aarch64-unknown-linux-musl`

[target.x86_64-unknown-linux-musl]
image = "burntsushi/cross:x86_64-unknown-linux-musl"

[target.aarch64-unknown-linux-musl]
image = "burntsushi/cross:aarch64-unknown-linux-gnu"
image = "burntsushi/cross:aarch64-unknown-linux-musl"

[target.x86_64-apple-darwin]
image = "freeznet/x86_64-apple-darwin-cross:11.3"
Expand Down
1 change: 1 addition & 0 deletions homestar-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl<'a> TaskScheduler<'a> {
.into_iter()
.map(|(_, rsc)| rsc.to_owned())
.collect();

let fetched = fetch_fn(resources_to_fetch).await?;

match resume {
Expand Down
2 changes: 1 addition & 1 deletion homestar-runtime/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl Default for Network {
rpc_max_connections: 10,
rpc_port: 3030,
rpc_server_timeout: Duration::new(120, 0),
transport_connection_timeout: Duration::new(20, 0),
transport_connection_timeout: Duration::new(60, 0),
webserver_host: Uri::from_static("127.0.0.1"),
webserver_port: 1337,
webserver_timeout: Duration::new(120, 0),
Expand Down
51 changes: 37 additions & 14 deletions homestar-runtime/src/tasks/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use anyhow::Result;
use fnv::FnvHashSet;
use indexmap::IndexMap;
use std::sync::Arc;
use tracing::{info, warn};

pub(crate) struct Fetch;

Expand All @@ -29,27 +30,49 @@ impl Fetch {
) -> Result<IndexMap<Resource, Vec<u8>>> {
use futures::{stream::FuturesUnordered, TryStreamExt};
let settings = settings.as_ref();
let retries = settings.retries;
let tasks = FuturesUnordered::new();
for rsc in resources.iter() {
tracing::info!(rsc = rsc.to_string(), "Fetching resource");
let task = tryhard::retry_fn(|| async { Self::fetch(rsc.clone(), ipfs.clone()).await })
.with_config(
tryhard::RetryFutureConfig::new(settings.retries)
.exponential_backoff(settings.retry_initial_delay)
.max_delay(settings.retry_max_delay),
let task = tryhard::retry_fn(|| async {
info!(
rsc = rsc.to_string(),
"attempting to fetch resource from IPFS"
);
Self::fetch(rsc.clone(), ipfs.clone()).await
})
.retries(retries)
.exponential_backoff(settings.retry_initial_delay)
.max_delay(settings.retry_max_delay)
.on_retry(|attempts, next_delay, error| {
let err = error.to_string();
async move {
if attempts < retries {
warn!(
err = err,
attempts = attempts,
"retrying fetch after error @ {}ms",
next_delay.map(|d| d.as_millis()).unwrap_or(0)
);
} else {
warn!(err = err, attempts = attempts, "maxed out # of retries");
}
}
});
tasks.push(task);
}

tasks.try_collect::<Vec<_>>().await?.into_iter().try_fold(
IndexMap::default(),
|mut acc, res| {
let answer = res.1?;
acc.insert(res.0, answer);
info!("fetching necessary resources from IPFS");
if let Ok(vec) = tasks.try_collect::<Vec<_>>().await {
vec.into_iter()
.try_fold(IndexMap::default(), |mut acc, res| {
let answer = res.1?;
acc.insert(res.0, answer);

Ok::<_, anyhow::Error>(acc)
},
)
Ok::<_, anyhow::Error>(acc)
})
} else {
Err(anyhow::anyhow!("Failed to fetch resources from IPFS"))
}
}

/// Gather resources via URLs, leveraging an exponential backoff.
Expand Down
21 changes: 15 additions & 6 deletions homestar-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,19 @@ where
where
F: FnOnce(FnvHashSet<Resource>) -> BoxFuture<'a, Result<IndexMap<Resource, Vec<u8>>>>,
{
let scheduler_ctx = TaskScheduler::init(
match TaskScheduler::init(
self.graph.clone(), // Arc'ed
&mut self.db.conn()?,
fetch_fn,
)
.await?;

self.run_queue(scheduler_ctx.scheduler, running_tasks).await
.await
{
Ok(ctx) => self.run_queue(ctx.scheduler, running_tasks).await,
Err(err) => {
error!(err=?err, "error initializing scheduler");
Err(anyhow!("error initializing scheduler"))
}
}
}

#[allow(unused_mut)]
Expand Down Expand Up @@ -185,7 +190,7 @@ where
info!(
workflow_cid = workflow_cid.to_string(),
cid = cid.to_string(),
"resolving cid"
"attempting to resolve cid in workflow"
);

if let Some(result) = linkmap.read().await.get(&cid) {
Expand Down Expand Up @@ -369,7 +374,11 @@ where
receipt_meta,
additional_meta,
)),
Err(e) => Err(anyhow!("cannot execute wasm module: {e}")),
Err(err) => Err(
anyhow!("cannot execute wasm module: {err}"))
.with_context(|| {
format!("not able to run fn {fun} for promised cid: {instruction_ptr}, in workflow {workflow_cid}")
}),
}
});
handles.push(handle);
Expand Down
2 changes: 1 addition & 1 deletion homestar-runtime/src/workflow/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Settings {
impl Default for Settings {
fn default() -> Self {
Self {
retries: 10,
retries: 3,
retry_max_delay: Duration::new(60, 0),
retry_initial_delay: Duration::from_millis(500),
p2p_timeout: Duration::new(5, 0),
Expand Down
2 changes: 1 addition & 1 deletion homestar-wasm/src/wasmtime/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub enum Error {
#[error(transparent)]
WasmRuntime(#[from] anyhow::Error),
/// Failure to find Wasm function for execution.
#[error("Wasm function {0} not found")]
#[error("Wasm function {0} not found in given Wasm component/resource")]
WasmFunctionNotFound(String),
/// [Wat] as Wasm component error.
///
Expand Down
2 changes: 1 addition & 1 deletion homestar-wasm/src/wasmtime/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<T> Env<T> {
},
Input::Deferred(await_promise) => {
bail!(Error::ResolvePromise(ResolveError::UnresolvedCid(format!(
"deferred task not yet resolved for {}: {}",
"deferred task/instruction not yet resolved or exists for promise: {}: {}",
await_promise.result(),
await_promise.instruction_cid()
))))
Expand Down

0 comments on commit 433b46a

Please sign in to comment.