Skip to content

Commit

Permalink
Merge pull request #727 from opensafely-core/limit-csv-rows
Browse files Browse the repository at this point in the history
Prevent large CSV files from being copied to level 4
  • Loading branch information
rebkwok authored May 2, 2024
2 parents ba22eb5 + 18b46a7 commit fb9b0ce
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 9 deletions.
1 change: 1 addition & 0 deletions jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def database_urls_from_env(env):
os.environ.get("LEVEL4_MAX_FILESIZE", 16 * 1024 * 1024)
) # 16mb

LEVEL4_MAX_CSV_ROWS = int(os.environ.get("LEVEL4_MAX_CSV_ROWS", 5000))

LEVEL4_FILE_TYPES = pipeline.constants.LEVEL4_FILE_TYPES

Expand Down
57 changes: 49 additions & 8 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,16 @@ def persist_outputs(job_definition, outputs, job_metadata):
if level == "moderately_sensitive"
]

csv_metadata = {}
# check any L4 files are vaild
for filename in l4_files:
ok, job_msg, file_msg = check_l4_file(
ok, job_msg, file_msg, csv_counts = check_l4_file(
job_definition, filename, sizes[filename], workspace_dir
)
if not ok:
excluded_job_msgs[filename] = job_msg
excluded_file_msgs[filename] = file_msg

csv_metadata[filename] = csv_counts
medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace)

# local run currently does not have a level 4 directory, so exit early
Expand Down Expand Up @@ -554,20 +555,28 @@ def persist_outputs(job_definition, outputs, job_metadata):
commit=job_definition.study.commit,
excluded=filename in excluded_file_msgs,
message=excluded_job_msgs.get(filename),
csv_counts=csv_metadata.get(filename),
)

write_manifest_file(medium_privacy_dir, manifest)

return excluded_job_msgs


def get_output_metadata(
abspath, level, job_id, job_request, action, commit, excluded, message=None
abspath,
level,
job_id,
job_request,
action,
commit,
excluded,
message=None,
csv_counts=None,
):
stat = abspath.stat()
with abspath.open("rb") as fp:
content_hash = file_digest(fp, "sha256").hexdigest()

csv_counts = csv_counts or {}
return {
"level": level,
"job_id": job_id,
Expand All @@ -579,6 +588,8 @@ def get_output_metadata(
"content_hash": content_hash,
"excluded": excluded,
"message": message,
"row_count": csv_counts.get("rows"),
"col_count": csv_counts.get("cols"),
}


Expand All @@ -604,7 +615,7 @@ def get_output_metadata(
Level 4 files should be aggregate information easily viewable by output checkers.
See available list of file types here: https://docs.opensafely.org/releasing-files/#allowed-file-types
See available list of file types here: https://docs.opensafely.org/requesting-file-release/#allowed-file-types.
"""

PATIENT_ID = """
Expand All @@ -622,13 +633,26 @@ def get_output_metadata(
"""

MAX_CSV_ROWS_MSG = """
The file:
{filename}
contained {row_count} rows, which is above the limit for moderately_sensitive files of
{limit} rows.
As such, it has *not* been copied to Level 4 storage. Please contact tech-support for
further assistance.
"""


def check_l4_file(job_definition, filename, size, workspace_dir):
def mb(b):
return round(b / (1024 * 1024), 2)

job_msgs = []
file_msgs = []
csv_counts = {"rows": None, "cols": None}

suffix = Path(filename).suffix
if suffix not in config.LEVEL4_FILE_TYPES:
Expand All @@ -643,12 +667,29 @@ def mb(b):
with actual_file.open() as f:
reader = csv.DictReader(f)
headers = reader.fieldnames
first_row = next(reader, None)
if first_row:
csv_counts["cols"] = len(first_row)
csv_counts["rows"] = sum(1 for _ in reader) + 1
else:
csv_counts["cols"] = csv_counts["rows"] = 0
except Exception:
pass
else:
if headers and "patient_id" in headers:
job_msgs.append("File has patient_id column")
file_msgs.append(PATIENT_ID.format(filename=filename))
if csv_counts["rows"] > job_definition.level4_max_csv_rows:
job_msgs.append(
f"File row count ({csv_counts['rows']}) exceeds maximum allowed rows ({job_definition.level4_max_csv_rows})"
)
file_msgs.append(
MAX_CSV_ROWS_MSG.format(
filename=filename,
row_count=csv_counts["rows"],
limit=job_definition.level4_max_csv_rows,
)
)

if size > job_definition.level4_max_filesize:
job_msgs.append(
Expand All @@ -663,9 +704,9 @@ def mb(b):
)

if job_msgs:
return False, ",".join(job_msgs), "\n\n".join(file_msgs)
return False, ",".join(job_msgs), "\n\n".join(file_msgs), csv_counts
else:
return True, None, None
return True, None, None, csv_counts


def find_matching_outputs(job_definition):
Expand Down
3 changes: 2 additions & 1 deletion jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ class JobDefinition:
str, str
] # the files that the job should produce (globs mapped to privacy levels)
allow_database_access: bool # whether this job should have access to the database
level4_max_csv_rows: int
level4_max_filesize: int
# our internal name for the database this job uses (actual connection details are
# passed in `env`)
database_name: str = None
cpu_count: str = None # number of CPUs to be allocated
memory_limit: str = None # memory limit to apply
level4_max_filesize: int = 16 * 1024 * 1024
level4_file_types: list = field(default_factory=lambda: [".csv"])
# if a job has been cancelled, the name of the canceller - either "user" or "admin"
cancelled: str = None
Expand Down
1 change: 1 addition & 0 deletions jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ def job_to_job_definition(job):
cpu_count=config.DEFAULT_JOB_CPU_COUNT,
memory_limit=config.DEFAULT_JOB_MEMORY_LIMIT,
level4_max_filesize=config.LEVEL4_MAX_FILESIZE,
level4_max_csv_rows=config.LEVEL4_MAX_CSV_ROWS,
level4_file_types=config.LEVEL4_FILE_TYPES,
cancelled=job_definition_cancelled,
)
Expand Down
73 changes: 73 additions & 0 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def job_definition(request, test_repo):
"**/*": "medium",
},
allow_database_access=False,
level4_max_filesize=16 * 1024 * 1024,
level4_max_csv_rows=5000,
)


Expand Down Expand Up @@ -324,6 +326,7 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a
"touch",
"/workspace/output/output.csv",
"/workspace/output/summary.csv",
"/workspace/output/summary.txt",
]
job_definition.inputs = ["output/input.csv"]
job_definition.output_spec = {
Expand Down Expand Up @@ -360,6 +363,7 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a
assert results.outputs == {
"output/output.csv": "highly_sensitive",
"output/summary.csv": "moderately_sensitive",
"output/summary.txt": "moderately_sensitive",
}
assert results.unmatched_patterns == []

Expand All @@ -383,6 +387,13 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a
metadata["content_hash"]
== "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
)
assert metadata["row_count"] == 0
assert metadata["col_count"] == 0

txt_metadata = manifest["outputs"]["output/summary.txt"]
assert txt_metadata["excluded"] is False
assert txt_metadata["row_count"] is None
assert txt_metadata["col_count"] is None


@pytest.mark.needs_docker
Expand Down Expand Up @@ -681,6 +692,68 @@ def test_finalize_patient_id_header(
manifest["outputs"]["output/output.csv"]["message"]
== "File has patient_id column"
)
assert manifest["outputs"]["output/output.csv"]["row_count"] == 1
assert manifest["outputs"]["output/output.csv"]["col_count"] == 3


@pytest.mark.needs_docker
def test_finalize_csv_max_rows(docker_cleanup, job_definition, tmp_work_dir, local_run):
rows = "1,2\n" * 11
job_definition.args = [
"sh",
"-c",
f"echo 'foo,bar\n{rows}' > /workspace/output/output.csv",
]
job_definition.output_spec = {
"output/output.csv": "moderately_sensitive",
}
job_definition.level4_max_csv_rows = 10

api = local.LocalDockerAPI()

status = api.prepare(job_definition)
assert status.state == ExecutorState.PREPARED
status = api.execute(job_definition)
assert status.state == ExecutorState.EXECUTING

status = wait_for_state(api, job_definition, ExecutorState.EXECUTED)

status = api.finalize(job_definition)
assert status.state == ExecutorState.FINALIZED

result = api.get_results(job_definition)

assert result.exit_code == 0
assert result.level4_excluded_files == {
"output/output.csv": "File row count (11) exceeds maximum allowed rows (10)",
}

log_file = local.get_log_dir(job_definition) / "logs.txt"
log = log_file.read_text()
assert "Invalid moderately_sensitive outputs:" in log
assert (
"output/output.csv - File row count (11) exceeds maximum allowed rows (10)"
in log
)

if not local_run:
level4_dir = local.get_medium_privacy_workspace(job_definition.workspace)

message_file = level4_dir / "output/output.csv.txt"
txt = message_file.read_text()
assert "output/output.csv" in txt
assert "contained 11 rows" in txt

manifest = local.read_manifest_file(level4_dir, job_definition)

assert manifest["outputs"]["output/output.csv"]["excluded"]
assert (
manifest["outputs"]["output/output.csv"]["message"]
== "File row count (11) exceeds maximum allowed rows (10)"
)

assert manifest["outputs"]["output/output.csv"]["row_count"] == 11
assert manifest["outputs"]["output/output.csv"]["col_count"] == 2


@pytest.mark.needs_docker
Expand Down
2 changes: 2 additions & 0 deletions tests/test_logging_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,7 @@ def create_job_definition(job_id="a-job-id"):
inputs=[],
output_spec={},
allow_database_access=False,
level4_max_csv_rows=None,
level4_max_filesize=None,
)
return job_definition

0 comments on commit fb9b0ce

Please sign in to comment.