Skip to content

Commit

Permalink
backend: switch to script_entrypoint_override column
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Feb 3, 2025
1 parent c9bc516 commit 10d84fb
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 48 deletions.
2 changes: 1 addition & 1 deletion backend/migrations/20250201124743_v2_job_queue_sync.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ BEGIN
NEW.__args = jsonb_set(
coalesce(NEW.__args, '{}'::JSONB),
'{_ENTRYPOINT_OVERRIDE}',
job.script_entrypoint_override
to_jsonb(job.script_entrypoint_override)
);
END IF;
RETURN NEW;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ SELECT
j.flow_step_id,
j.cache_ttl,
j.priority,
NULL::TEXT AS logs
NULL::TEXT AS logs,
j.script_entrypoint_override
FROM v2_job_queue q
JOIN v2_job j USING (id)
LEFT JOIN v2_job_runtime r USING (id)
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2795,6 +2795,7 @@ impl<'a> From<UnifiedJob> for Job {
permissioned_as: uj.permissioned_as,
is_flow_step: uj.is_flow_step,
language: uj.language,
script_entrypoint_override: None,
same_worker: false,
pre_run_error: None,
email: uj.email,
Expand Down
2 changes: 2 additions & 0 deletions backend/windmill-common/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct QueuedJob {
pub script_hash: Option<ScriptHash>,
#[serde(skip_serializing_if = "Option::is_none")]
pub script_path: Option<String>,
pub script_entrypoint_override: Option<String>,
pub args: Option<Json<HashMap<String, Box<RawValue>>>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub logs: Option<String>,
Expand Down Expand Up @@ -170,6 +171,7 @@ impl Default for QueuedJob {
flow_status: None,
is_flow_step: false,
language: None,
script_entrypoint_override: None,
same_worker: false,
pre_run_error: None,
email: "".to_string(),
Expand Down
5 changes: 3 additions & 2 deletions backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn format_pull_query(peek: String) -> String {
flow_innermost_root_job AS root_job, flow_step_id, flow_step_id IS NOT NULL AS is_flow_step,
same_worker, pre_run_error, visible_to_owner, tag, concurrent_limit,
concurrency_time_window_s, timeout, cache_ttl, priority, raw_code, raw_lock,
raw_flow
raw_flow, script_entrypoint_override
FROM v2_job
WHERE id = (SELECT id FROM peek)
) SELECT id, workspace_id, parent_job, created_by, created_at, started_at, scheduled_for,
Expand All @@ -141,7 +141,8 @@ fn format_pull_query(peek: String) -> String {
same_worker, pre_run_error, email, visible_to_owner, mem_peak,
root_job, flow_leaf_jobs as leaf_jobs, tag, concurrent_limit, concurrency_time_window_s,
timeout, flow_step_id, cache_ttl, priority,
raw_code, raw_lock, raw_flow
raw_code, raw_lock, raw_flow,
script_entrypoint_override
FROM q, r, j
LEFT JOIN v2_job_status f USING (id)",
peek
Expand Down
15 changes: 13 additions & 2 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3731,15 +3731,25 @@ pub async fn push<'c, 'd>(

let raw_flow = raw_flow.map(Json);

let script_entrypoint_override: Option<String> = match args.args.get(ENTRYPOINT_OVERRIDE) {
Some(x) => Some(x.clone()),
None => args
.extra
.as_mut()
.map(|extra| extra.remove(ENTRYPOINT_OVERRIDE))
.flatten(),
}
.and_then(|x| serde_json::from_str(x.get()).ok());

sqlx::query!(
"INSERT INTO v2_job (id, workspace_id, raw_code, raw_lock, raw_flow, tag, parent_job,
created_by, permissioned_as, runnable_id, runnable_path, args, kind, trigger,
script_lang, same_worker, pre_run_error, permissioned_as_email, visible_to_owner,
flow_innermost_root_job, concurrent_limit, concurrency_time_window_s, timeout, flow_step_id,
cache_ttl, priority, trigger_kind)
cache_ttl, priority, trigger_kind, script_entrypoint_override)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18,
$19, $20, $21, $22, $23, $24, $25, $26,
CASE WHEN $14::VARCHAR IS NOT NULL THEN 'schedule'::job_trigger_kind END)",
CASE WHEN $14::VARCHAR IS NOT NULL THEN 'schedule'::job_trigger_kind END, $27)",
job_id,
workspace_id,
raw_code,
Expand Down Expand Up @@ -3770,6 +3780,7 @@ pub async fn push<'c, 'd>(
flow_step_id,
cache_ttl,
final_priority,
script_entrypoint_override,
)
.execute(&mut *tx)
.warn_after_seconds(1)
Expand Down
18 changes: 10 additions & 8 deletions backend/windmill-worker/src/bun_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use crate::common::build_envs_map;

use crate::{
common::{
create_args_and_out_file, get_main_override, get_reserved_variables, parse_npm_config,
read_file, read_file_content, read_result, start_child_process, write_file_binary,
OccupancyMetrics,
create_args_and_out_file, get_reserved_variables, parse_npm_config, read_file,
read_file_content, read_result, start_child_process, write_file_binary, OccupancyMetrics,
},
handle_child::handle_child,
AuthedClientBackgroundTask, BUNFIG_INSTALL_SCOPES, BUN_BUNDLE_CACHE_DIR, BUN_CACHE_DIR,
Expand Down Expand Up @@ -875,7 +874,7 @@ pub async fn handle_bun_job(
if codebase.is_some() {
annotation.nodejs = true
}
let (main_override, apply_preprocessor) = match get_main_override(job.args.as_ref()) {
let (main_override, apply_preprocessor) = match job.script_entrypoint_override.as_deref() {
Some(main_override) => {
if main_override == PREPROCESSOR_FAKE_ENTRYPOINT {
(None, true)
Expand Down Expand Up @@ -1051,9 +1050,12 @@ pub async fn handle_bun_job(
return Ok(()) as error::Result<()>;
}
// let mut start = Instant::now();
let args =
windmill_parser_ts::parse_deno_signature(inner_content, true, main_override.clone())?
.args;
let args = windmill_parser_ts::parse_deno_signature(
inner_content,
true,
main_override.map(ToString::to_string),
)?
.args;

let pre_args = if apply_preprocessor {
Some(
Expand Down Expand Up @@ -1087,7 +1089,7 @@ pub async fn handle_bun_job(
let spread = args.into_iter().map(|x| x.name).join(",");
// logs.push_str(format!("infer args: {:?}\n", start.elapsed().as_micros()).as_str());
// we cannot use Bun.read and Bun.write because it results in an EBADF error on cloud
let main_name = main_override.unwrap_or("main".to_string());
let main_name = main_override.unwrap_or("main");

let main_import = if codebase.is_some() || has_bundle_cache {
"./main.js"
Expand Down
10 changes: 0 additions & 10 deletions backend/windmill-worker/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use sqlx::types::Json;
use sqlx::{Pool, Postgres};
use tokio::process::Command;
use tokio::{fs::File, io::AsyncReadExt};
use windmill_common::jobs::ENTRYPOINT_OVERRIDE;

#[cfg(feature = "parquet")]
use windmill_common::s3_helpers::{
Expand Down Expand Up @@ -440,15 +439,6 @@ pub async fn build_envs_map(context: Vec<ContextualVariable>) -> HashMap<String,
r
}

pub fn get_main_override(args: Option<&Json<HashMap<String, Box<RawValue>>>>) -> Option<String> {
return args
.map(|x| {
x.0.get(ENTRYPOINT_OVERRIDE)
.map(|x| x.get().to_string().replace("\"", ""))
})
.flatten();
}

pub fn sizeof_val(v: &serde_json::Value) -> usize {
std::mem::size_of::<serde_json::Value>()
+ match v {
Expand Down
17 changes: 10 additions & 7 deletions backend/windmill-worker/src/deno_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use windmill_queue::{append_logs, CanceledBy};

use crate::{
common::{
create_args_and_out_file, get_main_override, get_reserved_variables, parse_npm_config,
read_file, read_result, start_child_process, OccupancyMetrics,
create_args_and_out_file, get_reserved_variables, parse_npm_config, read_file, read_result,
start_child_process, OccupancyMetrics,
},
handle_child::handle_child,
AuthedClientBackgroundTask, DENO_CACHE_DIR, DENO_PATH, DISABLE_NSJAIL, HOME_ENV,
Expand Down Expand Up @@ -197,7 +197,7 @@ pub async fn handle_deno_job(
let logs1 = "\n\n--- DENO CODE EXECUTION ---\n".to_string();
append_logs(&job.id, &job.workspace_id, logs1, db).await;

let (main_override, apply_preprocessor) = match get_main_override(job.args.as_ref()) {
let (main_override, apply_preprocessor) = match job.script_entrypoint_override.as_deref() {
Some(main_override) => {
if main_override == PREPROCESSOR_FAKE_ENTRYPOINT {
(None, true)
Expand All @@ -212,9 +212,12 @@ pub async fn handle_deno_job(

let write_wrapper_f = async {
// let mut start = Instant::now();
let args =
windmill_parser_ts::parse_deno_signature(inner_content, true, main_override.clone())?
.args;
let args = windmill_parser_ts::parse_deno_signature(
inner_content,
true,
main_override.map(ToString::to_string),
)?
.args;

let pre_args = if apply_preprocessor {
Some(
Expand Down Expand Up @@ -246,7 +249,7 @@ pub async fn handle_deno_job(
.join("\n ");

let spread = args.into_iter().map(|x| x.name).join(",");
let main_name = main_override.unwrap_or("main".to_string());
let main_name = main_override.unwrap_or("main");
// logs.push_str(format!("infer args: {:?}\n", start.elapsed().as_micros()).as_str());
let (preprocessor_import, preprocessor) = if let Some(pre_args) = pre_args {
let pre_spread = pre_args.into_iter().map(|x| x.name).join(",");
Expand Down
9 changes: 6 additions & 3 deletions backend/windmill-worker/src/php_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,14 @@ pub async fn handle_php_job(

let _ = write_file(job_dir, "main.php", inner_content)?;

let main_override = get_main_override(job.args.as_ref());
let main_override = job.script_entrypoint_override.as_deref();

let write_wrapper_f = async {
let args =
windmill_parser_php::parse_php_signature(inner_content, main_override.clone())?.args;
let args = windmill_parser_php::parse_php_signature(
inner_content,
main_override.map(ToString::to_string),
)?
.args;

let args_to_include = args
.iter()
Expand Down
29 changes: 15 additions & 14 deletions backend/windmill-worker/src/python_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use anyhow::anyhow;
use itertools::Itertools;
use regex::Regex;
use serde_json::value::RawValue;
use sqlx::{types::Json, Pool, Postgres};
use sqlx::{Pool, Postgres};
use tokio::{
fs::{metadata, DirBuilder, File},
io::AsyncReadExt,
Expand All @@ -35,8 +35,8 @@ use windmill_common::{
#[cfg(feature = "enterprise")]
use windmill_common::variables::get_secret_value_as_admin;

use windmill_queue::{append_logs, CanceledBy};
use std::env::var;
use windmill_queue::{append_logs, CanceledBy};

lazy_static::lazy_static! {
static ref PYTHON_PATH: String =
Expand Down Expand Up @@ -84,8 +84,8 @@ use windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS;

use crate::{
common::{
create_args_and_out_file, get_main_override, get_reserved_variables, read_file,
read_result, start_child_process, OccupancyMetrics,
create_args_and_out_file, get_reserved_variables, read_file, read_result,
start_child_process, OccupancyMetrics,
},
handle_child::handle_child,
AuthedClientBackgroundTask, DISABLE_NSJAIL, DISABLE_NUSER, HOME_ENV, INSTANCE_PYTHON_VERSION,
Expand Down Expand Up @@ -902,8 +902,6 @@ pub async fn handle_python_job(
tracing::debug!("Finished deps postinstall stage");
}



if no_uv {
append_logs(
&job.id,
Expand Down Expand Up @@ -937,9 +935,9 @@ pub async fn handle_python_job(
pre_spread,
) = prepare_wrapper(
job_dir,
job.script_entrypoint_override.as_deref(),
inner_content,
&script_path,
job.args.as_ref(),
false,
)
.await?;
Expand Down Expand Up @@ -1057,7 +1055,7 @@ except BaseException as e:
// Usefull if certain wheels needs to be preinstalled before execution.
let global_site_packages_path = py_version.to_cache_dir() + "/global-site-packages";
let additional_python_paths_folders = {
let mut paths= additional_python_paths.clone();
let mut paths = additional_python_paths.clone();
if std::fs::metadata(&global_site_packages_path).is_ok() {
// We want global_site_packages_path to be included in additonal_python_paths_folders, but
// we don't want it to be included in global_site_packages_path.
Expand Down Expand Up @@ -1207,9 +1205,9 @@ mount {{

async fn prepare_wrapper(
job_dir: &str,
job_script_entrypoint_override: Option<&str>,
inner_content: &str,
script_path: &str,
args: Option<&Json<HashMap<String, Box<RawValue>>>>,
skip_preprocessor: bool,
) -> error::Result<(
&'static str,
Expand All @@ -1223,7 +1221,7 @@ async fn prepare_wrapper(
Option<String>,
Option<String>,
)> {
let (main_override, apply_preprocessor) = match get_main_override(args) {
let (main_override, apply_preprocessor) = match job_script_entrypoint_override {
Some(main_override) => {
if !skip_preprocessor && main_override == PREPROCESSOR_FAKE_ENTRYPOINT {
(None, true)
Expand Down Expand Up @@ -1272,7 +1270,10 @@ async fn prepare_wrapper(
let _ = write_file(job_dir, "loader.py", RELATIVE_PYTHON_LOADER)?;
}

let sig = windmill_parser_py::parse_python_signature(inner_content, main_override.clone())?;
let sig = windmill_parser_py::parse_python_signature(
inner_content,
main_override.map(ToString::to_string),
)?;

let pre_sig = if apply_preprocessor {
Some(windmill_parser_py::parse_python_signature(
Expand Down Expand Up @@ -1389,7 +1390,7 @@ async fn prepare_wrapper(
last,
transforms,
spread,
main_override,
main_override.map(ToString::to_string),
pre_spread,
))
}
Expand Down Expand Up @@ -1721,7 +1722,7 @@ async fn spawn_uv_install(
// Track https://github.com/astral-sh/uv/issues/6715
if let Some(cert_path) = INDEX_CERT.as_ref() {
// Once merged --cert can be used instead
//
//
// command_args.extend(["--cert", cert_path]);
envs.push(("SSL_CERT_FILE", cert_path));
}
Expand Down Expand Up @@ -2458,7 +2459,7 @@ pub async fn start_worker(
spread,
_,
_,
) = prepare_wrapper(job_dir, inner_content, script_path, _args.as_ref(), true).await?;
) = prepare_wrapper(job_dir, None, inner_content, script_path, true).await?;

{
let indented_transforms = transforms
Expand Down

0 comments on commit 10d84fb

Please sign in to comment.