Skip to content

Commit

Permalink
feat(backend): implement inline script
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Nov 18, 2024
1 parent fc6e402 commit 2e59d52
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS inline_script;
13 changes: 13 additions & 0 deletions backend/migrations/20241118072014_create_inline_script.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Add up migration script here
ALTER TYPE JOB_KIND ADD VALUE IF NOT EXISTS 'inlinescript';

CREATE TABLE inline_script (
workspace_id VARCHAR(50) NOT NULL REFERENCES workspace(id),
flow VARCHAR(255) NOT NULL,
lock TEXT,
path TEXT,
hash BIGINT NOT NULL,
content TEXT NOT NULL,
PRIMARY KEY (flow, hash),
FOREIGN KEY (flow, workspace_id) REFERENCES flow (path, workspace_id) ON DELETE CASCADE
);
60 changes: 54 additions & 6 deletions backend/windmill-api/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use windmill_common::HUB_BASE_URL;
use windmill_common::{
db::UserDB,
error::{self, to_anyhow, Error, JsonResult, Result},
flows::{Flow, FlowWithStarred, ListFlowQuery, ListableFlow, NewFlow},
flows::{resolve, Flow, FlowWithStarred, ListFlowQuery, ListableFlow, NewFlow},
jobs::JobPayload,
schedule::Schedule,
scripts::Schema,
Expand Down Expand Up @@ -98,7 +98,7 @@ async fn list_search_flows(
let n = 3;
let mut tx = user_db.begin(&authed).await?;

let rows = sqlx::query_as::<_, SearchFlow>(
let mut rows = sqlx::query_as::<_, SearchFlow>(
"SELECT flow.path, flow_version.value
FROM flow
LEFT JOIN flow_version ON flow_version.id = flow.versions[array_upper(flow.versions, 1)]
Expand All @@ -110,6 +110,9 @@ async fn list_search_flows(
.await?
.into_iter()
.collect::<Vec<_>>();
for row in &mut rows {
resolve(&mut *tx, &row.path, &w_id, &mut row.value.0).await;
}
tx.commit().await?;
Ok(Json(rows))
}
Expand Down Expand Up @@ -578,11 +581,12 @@ async fn get_flow_version(
WHERE flow.path = $1 AND flow.workspace_id = $2 AND flow_version.id = $3",
)
.bind(path)
.bind(w_id)
.bind(&w_id)
.bind(version)
.fetch_optional(&mut *tx)
.await?;

let flow = resolve_flow(&mut *tx, path, &w_id, flow).await;
tx.commit().await?;

let flow = not_found_if_none(flow, "Flow version", version.to_string())?;
Expand Down Expand Up @@ -942,7 +946,7 @@ async fn get_flow_by_path(
WHERE flow.path = $1 AND flow.workspace_id = $2"
)
.bind(path)
.bind(w_id)
.bind(&w_id)
.bind(&authed.username)
.fetch_optional(&mut *tx)
.await?
Expand All @@ -954,10 +958,11 @@ async fn get_flow_by_path(
WHERE flow.path = $1 AND flow.workspace_id = $2"
)
.bind(path)
.bind(w_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?
};
let flow_o = resolve_flow_with_starred(&mut *tx, path, &w_id, flow_o).await;
tx.commit().await?;

let flow = not_found_if_none(flow_o, "Flow", path)?;
Expand Down Expand Up @@ -1004,9 +1009,10 @@ async fn get_flow_by_path_w_draft(
WHERE flow.path = $1 AND flow.workspace_id = $2",
)
.bind(path)
.bind(w_id)
.bind(&w_id)
.fetch_optional(&mut *tx)
.await?;
let flow_o = resolve_flow_with_draft(&mut *tx, path, &w_id, flow_o).await;
tx.commit().await?;

let flow = not_found_if_none(flow_o, "Flow", path)?;
Expand Down Expand Up @@ -1485,3 +1491,45 @@ mod tests {
assert_eq!(Some(81 * SECOND), retry.max_interval());
}
}

async fn resolve_flow(
conn: &mut sqlx::PgConnection,
path: &str,
workspace_id: &str,
flow: Option<Flow>
) -> Option<Flow> {
if let Some(mut flow) = flow {
resolve(conn, path, workspace_id, &mut flow.value).await;
Some(flow)
} else {
None
}
}

async fn resolve_flow_with_starred(
conn: &mut sqlx::PgConnection,
path: &str,
workspace_id: &str,
flow: Option<FlowWithStarred>
) -> Option<FlowWithStarred> {
if let Some(mut flow) = flow {
resolve(conn, path, workspace_id, &mut flow.flow.value).await;
Some(flow)
} else {
None
}
}

async fn resolve_flow_with_draft(
conn: &mut sqlx::PgConnection,
path: &str,
workspace_id: &str,
flow: Option<FlowWDraft>
) -> Option<FlowWDraft> {
if let Some(mut flow) = flow {
resolve(conn, path, workspace_id, &mut flow.value).await;
Some(flow)
} else {
None
}
}
96 changes: 95 additions & 1 deletion backend/windmill-common/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@ use std::{

use rand::Rng;
use serde::{Deserialize, Serialize, Serializer};
use sqlx::types::Json;
use sqlx::types::JsonRawValue;

use crate::{
error::Error,
more_serde::{default_empty_string, default_id, default_null, default_true, is_default},
scripts::{Schema, ScriptHash, ScriptLang},
};
use crate::worker::to_raw_value;

#[derive(Serialize, Deserialize, sqlx::FromRow)]
pub struct Flow {
pub workspace_id: String,
pub path: String,
pub summary: String,
pub description: String,
pub value: sqlx::types::Json<Box<serde_json::value::RawValue>>,
pub value: Json<Box<JsonRawValue>>,
pub edited_by: String,
pub edited_at: chrono::DateTime<chrono::Utc>,
pub archived: bool,
Expand Down Expand Up @@ -480,6 +483,23 @@ pub enum FlowModuleValue {
#[serde(skip_serializing_if = "Option::is_none")]
is_trigger: Option<bool>,
},
InlineScript {
#[serde(default)]
#[serde(alias = "input_transform", serialize_with = "ordered_map")]
input_transforms: HashMap<String, InputTransform>,
hash: ScriptHash, // reference to `inline_script` table on (flow.path, hash)
#[serde(skip_serializing_if = "is_none_or_empty")]
tag: Option<String>,
language: ScriptLang,
#[serde(skip_serializing_if = "Option::is_none")]
custom_concurrency_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
concurrent_limit: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
concurrency_time_window_s: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
is_trigger: Option<bool>,
},
Identity,
}

Expand Down Expand Up @@ -583,6 +603,20 @@ impl<'de> Deserialize<'de> for FlowModuleValue {
concurrency_time_window_s: untagged.concurrency_time_window_s,
is_trigger: untagged.is_trigger,
}),
"inlinescript" => Ok(FlowModuleValue::InlineScript {
input_transforms: untagged.input_transforms.unwrap_or_default(),
hash: untagged
.hash
.ok_or_else(|| serde::de::Error::missing_field("hash"))?,
tag: untagged.tag,
language: untagged
.language
.ok_or_else(|| serde::de::Error::missing_field("language"))?,
custom_concurrency_key: untagged.custom_concurrency_key,
concurrent_limit: untagged.concurrent_limit,
concurrency_time_window_s: untagged.concurrency_time_window_s,
is_trigger: untagged.is_trigger,
}),
"identity" => Ok(FlowModuleValue::Identity),
other => Err(serde::de::Error::unknown_variant(
other,
Expand Down Expand Up @@ -678,3 +712,63 @@ pub async fn has_failure_module<'c>(flow: sqlx::types::Uuid, db: &sqlx::Pool<sql
})
.map(|v| v.unwrap_or(false))
}

/// Resolve loadable modules recursively.
pub async fn resolve(
conn: &mut sqlx::PgConnection,
flow_path: &str,
workspace_id: &str,
value: &mut Box<JsonRawValue>
) {
let Ok(mut val) = serde_json::from_str::<FlowValue>(value.get()) else { return };
for module in &mut val.modules {
resolve_module(conn, flow_path, workspace_id, &mut module.value).await;
}
*value = to_raw_value(&val);
}

/// Resolve loadable module values recursively.
pub async fn resolve_module(
conn: &mut sqlx::PgConnection,
flow_path: &str,
workspace_id: &str,
value: &mut Box<JsonRawValue>
) {
use FlowModuleValue::*;

let Ok(mut val) = serde_json::from_str::<FlowModuleValue>(value.get()) else { return };
match &mut val {
InlineScript { .. } => {
let InlineScript {
input_transforms, hash, tag, language, custom_concurrency_key, concurrent_limit,
concurrency_time_window_s, is_trigger
} = std::mem::replace(&mut val, Identity) else { unreachable!() };
let Ok((content, lock, path)) = sqlx::query!(
"SELECT content, lock, path FROM inline_script WHERE hash = $1 AND flow = $2 AND \
workspace_id = $3",
hash.0, flow_path, workspace_id
)
.fetch_one(conn)
.await
.map(|record| (record.content, record.lock, record.path)) else { return };
val = RawScript {
input_transforms, content, lock, path, tag, language, custom_concurrency_key,
concurrent_limit, concurrency_time_window_s, is_trigger
};
},
ForloopFlow { modules, .. } | WhileloopFlow { modules, .. } => {
for module in modules {
Box::pin(resolve_module(conn, flow_path, workspace_id, &mut module.value)).await;
}
},
BranchOne { branches, .. } | BranchAll { branches, .. } => {
for branch in branches {
for module in &mut branch.modules {
Box::pin(resolve_module(conn, flow_path, workspace_id, &mut module.value)).await;
}
}
}
_ => {}
}
*value = to_raw_value(&val);
}
11 changes: 11 additions & 0 deletions backend/windmill-common/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub enum JobKind {
AppDependencies,
Noop,
DeploymentCallback,
InlineScript,
}

#[derive(sqlx::FromRow, Debug, Serialize, Clone)]
Expand Down Expand Up @@ -263,6 +264,16 @@ pub enum JobPayload {
priority: Option<i16>,
apply_preprocessor: bool,
},
InlineScript {
flow: String,
hash: ScriptHash,
language: ScriptLang,
custom_concurrency_key: Option<String>,
concurrent_limit: Option<i32>,
concurrency_time_window_s: Option<i32>,
cache_ttl: Option<i32>,
dedicated_worker: Option<bool>,
},
Code(RawCode),
Dependencies {
path: String,
Expand Down
26 changes: 26 additions & 0 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3299,6 +3299,31 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
priority,
)
}
// TODO(uael): apply_preprocessor ?
JobPayload::InlineScript {
flow,
hash,
language,
custom_concurrency_key,
concurrent_limit,
concurrency_time_window_s,
cache_ttl,
dedicated_worker
} => (
Some(hash.0),
Some(flow),
None,
JobKind::InlineScript,
None,
None,
Some(language),
custom_concurrency_key,
concurrent_limit,
concurrency_time_window_s,
cache_ttl,
dedicated_worker,
None,
),
JobPayload::ScriptHub { path } => {
if path == "hub/7771/slack" || path == "hub/7836/slack" {
permissioned_as = SUPERADMIN_NOTIFICATION_EMAIL.to_string();
Expand Down Expand Up @@ -4060,6 +4085,7 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
JobKind::FlowDependencies => "jobs.run.flow_dependencies",
JobKind::AppDependencies => "jobs.run.app_dependencies",
JobKind::DeploymentCallback => "jobs.run.deployment_callback",
JobKind::InlineScript => "jobs.run.inline_script",
};

let audit_author = if format!("u/{user}") != permissioned_as && user != permissioned_as {
Expand Down
39 changes: 39 additions & 0 deletions backend/windmill-worker/src/dedicated_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,45 @@ async fn spawn_dedicated_workers_for_flow(
workers.push(dedi_w);
}
}
FlowModuleValue::InlineScript { hash, language, .. } => {
let spawn = sqlx::query!(
"SELECT content, lock, path FROM inline_script \
WHERE flow = $1 AND hash = $2 AND workspace_id = $3",
path, hash.0, w_id
)
.fetch_one(db)
.await
.map(|record| SpawnWorker::RawScript {
path: record.path.unwrap_or_else(|| "".to_string()),
content: record.content,
lock: record.lock,
lang: language.clone(),
});
match spawn {
Ok(spawn) => {
if let Some(dedi_w) = spawn_dedicated_worker(
spawn,
w_id,
killpill_tx.clone(),
killpill_rx,
db,
worker_dir,
base_internal_url,
worker_name,
job_completed_tx,
Some(module.id.clone()),
)
.await
{
workers.push(dedi_w);
}
},
Err(err) => tracing::error!(
"failed to get script for module: {:?}, err: {:?}",
module, err
)
}
},
FlowModuleValue::Flow { .. } => (),
FlowModuleValue::Identity => (),
}
Expand Down
Loading

0 comments on commit 2e59d52

Please sign in to comment.