Skip to content

Commit

Permalink
Merge pull request #730 from opensafely-core/fixes-from-backfill
Browse files Browse the repository at this point in the history
fixes from backfill
  • Loading branch information
bloodearnest authored May 8, 2024
2 parents fb9b0ce + 4e5b29b commit 1a18b85
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
12 changes: 11 additions & 1 deletion jobrunner/cli/manifests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from jobrunner.models import Job


def main(workspaces=None):
def main():
conn = database.get_connection()

workspaces = [
w["workspace"] for w in conn.execute("SELECT DISTINCT(workspace) FROM job;")
]
Expand All @@ -18,6 +19,11 @@ def main(workspaces=None):
for i, workspace in enumerate(workspaces):
print(f"workspace {i+1}/{n_workspaces}: {workspace}")

workspace_dir = local.get_high_privacy_workspace(workspace)
if not workspace_dir.exists():
print(f" - workspace {workspace} is archived")
continue

level4_dir = local.get_medium_privacy_workspace(workspace)

sentinel = level4_dir / ".manifest-backfill"
Expand Down Expand Up @@ -62,6 +68,10 @@ def write_manifest(workspace):

abspath = workspace_dir / output

if not abspath.exists():
print(f" - {output}, {level}: old output no longer on disk")
continue

# use presence of message file to detect excluded files
message_file = level4_dir / (output + ".txt")
excluded = message_file.exists()
Expand Down
9 changes: 6 additions & 3 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,12 +541,11 @@ def persist_outputs(job_definition, outputs, job_metadata):
# if it previously had a message, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])

# Update manifest with file metdata
manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace)
new_outputs = {}

for filename, level in outputs.items():
abspath = workspace_dir / filename
manifest["outputs"][filename] = get_output_metadata(
new_outputs[filename] = get_output_metadata(
abspath,
level,
job_id=job_definition.id,
Expand All @@ -557,6 +556,10 @@ def persist_outputs(job_definition, outputs, job_metadata):
message=excluded_job_msgs.get(filename),
csv_counts=csv_metadata.get(filename),
)

# Update manifest with file metdata
manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace)
manifest["outputs"].update(**new_outputs)
write_manifest_file(medium_privacy_dir, manifest)

return excluded_job_msgs
Expand Down
6 changes: 6 additions & 0 deletions jobrunner/lib/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ def get_connection(filename=None):
conn.row_factory = sqlite3.Row
cache[filename] = conn

# use WAL to enable other processes (e.g. operational tasks) to read the DB.
# job-runner should be the only active writer, which means if we need
# some other process to write the db (e.g. a backfill), then we should
# stop job-runner.
conn.execute("PRAGMA journal_mode=WAL")

return cache[filename]


Expand Down

0 comments on commit 1a18b85

Please sign in to comment.