From d678c5e31425e7a59a61950ea8594b2d6c046bf9 Mon Sep 17 00:00:00 2001 From: bloodearnest Date: Wed, 8 May 2024 15:47:14 +0100 Subject: [PATCH 1/2] Backport production tweaks to backfill scripts --- jobrunner/cli/manifests.py | 12 +++++++++++- jobrunner/executors/local.py | 9 ++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/jobrunner/cli/manifests.py b/jobrunner/cli/manifests.py index 76e644c7..95c0993a 100644 --- a/jobrunner/cli/manifests.py +++ b/jobrunner/cli/manifests.py @@ -8,8 +8,9 @@ from jobrunner.models import Job -def main(workspaces=None): +def main(): conn = database.get_connection() + workspaces = [ w["workspace"] for w in conn.execute("SELECT DISTINCT(workspace) FROM job;") ] @@ -18,6 +19,11 @@ def main(workspaces=None): for i, workspace in enumerate(workspaces): print(f"workspace {i+1}/{n_workspaces}: {workspace}") + workspace_dir = local.get_high_privacy_workspace(workspace) + if not workspace_dir.exists(): + print(f" - workspace is archived") + continue + level4_dir = local.get_medium_privacy_workspace(workspace) sentinel = level4_dir / ".manifest-backfill" @@ -62,6 +68,10 @@ def write_manifest(workspace): abspath = workspace_dir / output + if not abspath.exists(): + print(f" - {output}, {level}: old output no longer on disk") + continue + # use presence of message file to detect excluded files message_file = level4_dir / (output + ".txt") excluded = message_file.exists() diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index 598cab2e..d9212064 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -541,12 +541,11 @@ def persist_outputs(job_definition, outputs, job_metadata): # if it previously had a message, delete it delete_files_from_directory(medium_privacy_dir, [message_file]) - # Update manifest with file metdata - manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace) + new_outputs = {} for filename, level in outputs.items(): abspath = workspace_dir / filename - manifest["outputs"][filename] = get_output_metadata( + new_outputs[filename] = get_output_metadata( abspath, level, job_id=job_definition.id, @@ -557,6 +556,10 @@ def persist_outputs(job_definition, outputs, job_metadata): message=excluded_job_msgs.get(filename), csv_counts=csv_metadata.get(filename), ) + + # Update manifest with file metdata + manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace) + manifest["outputs"].update(**new_outputs) write_manifest_file(medium_privacy_dir, manifest) return excluded_job_msgs From 4e5b29b6afc19a0c380d60c6c00c2cb5c7d0f0c6 Mon Sep 17 00:00:00 2001 From: bloodearnest Date: Wed, 8 May 2024 15:47:41 +0100 Subject: [PATCH 2/2] Enforce wal mode We've already enabled this in production, but this ensures dev is wal mode also --- jobrunner/cli/manifests.py | 2 +- jobrunner/lib/database.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/jobrunner/cli/manifests.py b/jobrunner/cli/manifests.py index 95c0993a..e0b2ed85 100644 --- a/jobrunner/cli/manifests.py +++ b/jobrunner/cli/manifests.py @@ -21,7 +21,7 @@ def main(): workspace_dir = local.get_high_privacy_workspace(workspace) if not workspace_dir.exists(): - print(f" - workspace is archived") + print(f" - workspace {workspace} is archived") continue level4_dir = local.get_medium_privacy_workspace(workspace) diff --git a/jobrunner/lib/database.py b/jobrunner/lib/database.py index 8703b620..45f81a5b 100644 --- a/jobrunner/lib/database.py +++ b/jobrunner/lib/database.py @@ -177,6 +177,12 @@ def get_connection(filename=None): conn.row_factory = sqlite3.Row cache[filename] = conn + # use WAL to enable other processes (e.g. operational tasks) to read the DB. + # job-runner should be the only active writer, which means if we need + # some other process to write the db (e.g. a backfill), then we should + # stop job-runner. + conn.execute("PRAGMA journal_mode=WAL") + return cache[filename]