Skip to content

Commit

Permalink
fix: improve uv dependency install reliability with an atomic success…
Browse files Browse the repository at this point in the history
… file

* Revert solution with Mutex

* Implement valid.windmill logic

* Remove unused import

* valid.windmill -> .valid.windmill

Just like .lock

* Dont delete wheels if cancelled/failed

Now we dont clean up requirement folder if it was failed.

This way we can fully utilize uv's flock system.

Also if we left wheel dir, but it was partially filled (resulting to invalid wheel)
we use --reinstall flag in order to overwrite any content of wheels

* Add comment

* Add --reinstall to nsjail
  • Loading branch information
pyranota authored Jan 20, 2025
1 parent 99b0193 commit 5831822
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 18 deletions.
1 change: 1 addition & 0 deletions backend/windmill-worker/nsjail/download_deps.py.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ CMD="/usr/local/bin/uv pip install
$INDEX_URL_ARG $EXTRA_INDEX_URL_ARG $TRUSTED_HOST_ARG
--index-strategy unsafe-best-match
--system
--reinstall
"

echo $CMD
Expand Down
33 changes: 15 additions & 18 deletions backend/windmill-worker/src/python_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{
};

use anyhow::anyhow;
use futures::lock::Mutex;
use itertools::Itertools;
use regex::Regex;
use serde_json::value::RawValue;
Expand Down Expand Up @@ -36,8 +35,6 @@ use windmill_common::variables::get_secret_value_as_admin;
use windmill_queue::{append_logs, CanceledBy};

lazy_static::lazy_static! {
static ref BUSY_WITH_UV_INSTALL: Mutex<()> = Mutex::new(());

static ref PYTHON_PATH: String =
std::env::var("PYTHON_PATH").unwrap_or_else(|_| "/usr/local/bin/python3".to_string());

Expand Down Expand Up @@ -1248,6 +1245,8 @@ async fn spawn_uv_install(
"--target",
venv_p,
"--no-cache",
// If we invoke uv pip install, then we want to overwrite existing data
"--reinstall",
]
};

Expand Down Expand Up @@ -1356,7 +1355,6 @@ pub async fn handle_python_reqs(
mut no_uv_install: bool,
is_ansible: bool,
) -> error::Result<Vec<String>> {
let lock = BUSY_WITH_UV_INSTALL.lock().await;
let counter_arc = Arc::new(tokio::sync::Mutex::new(0));
// Append logs with line like this:
// [9/21] + requests==2.32.3 << (S3) | in 57ms
Expand Down Expand Up @@ -1486,10 +1484,11 @@ pub async fn handle_python_reqs(
"{py_prefix}/{}",
req.replace(' ', "").replace('/', "").replace(':', "")
);
if metadata(&venv_p).await.is_ok() {
if metadata(venv_p.clone() + "/.valid.windmill").await.is_ok() {
req_paths.push(venv_p);
in_cache.push(req.to_string());
} else {
// There is no valid or no wheel at all. Regardless of if there is content or not, we will overwrite it with --reinstall flag
req_with_penv.push((req.to_string(), venv_p));
}
}
Expand Down Expand Up @@ -1520,12 +1519,6 @@ pub async fn handle_python_reqs(
let pids = Arc::new(tokio::sync::Mutex::new(vec![None; total_to_install]));
let mem_peak_thread_safe = Arc::new(tokio::sync::Mutex::new(0));
{
// when we cancel the job, it has up to 1 second window before actually getting cancelled
// Thus the directory with wheel in windmill's cache cleaned only after that.
// If we manage to start new job during that period windmill might see that wanted wheel is already there (because we have not cleaned it yet)
// and write it to installed wheels, meanwhile previous job will clean that wheel.
// To fix that we create lock, which will pipeline all uv installs on worker
let _lock = lock;
let pids = pids.clone();
let mem_peak_thread_safe = mem_peak_thread_safe.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -1866,6 +1859,17 @@ pub async fn handle_python_reqs(
);

pids.lock().await.get_mut(i).and_then(|e| e.take());
// Create a file to indicate that installation was successfull
let valid_path = venv_p.clone() + "/.valid.windmill";
// This is atomic operation, meaning, that it either completes and wheel is valid,
// or it does not and wheel is invalid and will be reinstalled next run
if let Err(e) = File::create(&valid_path).await{
tracing::error!(
workspace_id = %w_id,
job_id = %job_id,
"Failed to create {}!\n{e}\n
This file needed for python jobs to function", valid_path)
};
Ok(())
}));
}
Expand All @@ -1882,13 +1886,6 @@ pub async fn handle_python_reqs(
"Env installation failed: {:?}",
e
);
if let Err(e) = fs::remove_dir_all(&venv_p) {
tracing::warn!(
workspace_id = %w_id,
"Failed to remove cache dir: {:?}",
e
);
}
} else {
req_paths.push(venv_p);
}
Expand Down

0 comments on commit 5831822

Please sign in to comment.