Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add inlining options and a new inline command #374

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ jobs:
# Compare results
export OUTDIR=$DATADIR/run-output/covid_symptom__nlp_results
sudo chown -R $(id -u) $OUTDIR
sed -i 's/"generated_on": "[^"]*", //g' $OUTDIR/*.ndjson
sed -i 's/"generated_on":"[^"]*",//g' $OUTDIR/*.ndjson
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted to remove expected whitespace because this PR also switches to a more condensed NDJSON output mode for the ETL that removes whitespace (in an attempt to reduce bloat a little bit when inlining).

diff -upr $DATADIR/expected-output $OUTDIR

echo "All Good!"
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Turns FHIR data into de-identified & aggregated records"""

__version__ = "2.0.0"
__version__ = "2.1.0"
7 changes: 5 additions & 2 deletions cumulus_etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import rich.logging

from cumulus_etl import common, etl, export, upload_notes
from cumulus_etl import common, etl, export, inliner, upload_notes
from cumulus_etl.etl import convert, init


Expand All @@ -20,6 +20,7 @@ class Command(enum.Enum):
ETL = "etl"
EXPORT = "export"
INIT = "init"
INLINE = "inline"
UPLOAD_NOTES = "upload-notes"

# Why isn't this part of Enum directly...?
Expand Down Expand Up @@ -69,13 +70,15 @@ async def main(argv: list[str]) -> None:
run_method = export.run_export
elif subcommand == Command.INIT.value:
run_method = init.run_init
elif subcommand == Command.INLINE.value:
run_method = inliner.run_inline
else:
parser.description = "Extract, transform, and load FHIR data."
if not subcommand:
# Add a note about other subcommands we offer, and tell argparse not to wrap our formatting
parser.formatter_class = argparse.RawDescriptionHelpFormatter
parser.description += "\n\nother commands available:\n"
parser.description += " convert\n export\n init\n upload-notes"
parser.description += " convert\n export\n init\n inline\n upload-notes"
run_method = etl.run_etl

with tempfile.TemporaryDirectory() as tempdir:
Expand Down
72 changes: 72 additions & 0 deletions cumulus_etl/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Helper methods for CLI parsing."""

import argparse
import itertools
import os
import socket
import tempfile
Expand Down Expand Up @@ -61,6 +62,24 @@ def add_bulk_export(parser: argparse.ArgumentParser, *, as_subgroup: bool = True
"--until", metavar="TIMESTAMP", help="end date for export from the FHIR server"
)
parser.add_argument("--resume", metavar="URL", help="polling status URL from a previous export")
parser.add_argument(
"--inline",
action="store_true",
help="attachments will be inlined after the export",
)
parser.add_argument(
"--inline-resource",
metavar="RESOURCES",
action="append",
help="only consider this resource for inlining (default is all supported inline targets: "
"DiagnosticReport and DocumentReference)",
)
parser.add_argument(
"--inline-mimetype",
metavar="MIMETYPES",
action="append",
help="only inline this attachment mimetype (default is text, HTML, and XHTML)",
)
return parser


Expand Down Expand Up @@ -176,3 +195,56 @@ def make_progress_bar() -> rich.progress.Progress:
rich.progress.TimeElapsedColumn(),
]
return rich.progress.Progress(*columns)


def expand_inline_resources(arg: Iterable[str] | None) -> set[str]:
"""
This converts a list of inline resource args into the final properly cased resource names.

If you have an arg like --inline-resource, this will process that for you.
"""
allowed = {"diagnosticreport": "DiagnosticReport", "documentreference": "DocumentReference"}

if arg is None:
return set(allowed.values())

resources = set(expand_comma_list_arg(arg))
for resource in resources:
if resource.casefold() not in allowed:
errors.fatal(f"Unsupported resource for inlining: {resource}", errors.ARGS_INVALID)

return {allowed[resource.casefold()] for resource in resources}


def expand_inline_mimetypes(arg: Iterable[str] | None) -> set[str]:
"""
This converts a list of inline mimetype args into a set of normalized mimetypes.

If you have an arg like --inline-mimetype, this will process that for you.
"""
if arg is None:
return {"text/plain", "text/html", "application/xhtml+xml"}

return set(expand_comma_list_arg(arg, casefold=True))


def expand_comma_list_arg(arg: Iterable[str] | None, casefold: bool = False) -> Iterable[str]:
"""
This converts a list of string args, splits any strings on commas, and combines results.

This is useful for CLI arguments with action="append" but you also want to allow comma
separated args. --task does this, as well as others.

An example CLI:
--task=patient --task=condition,procedure
Would give:
["patient", "condition,procedure"]
And this method would turn that into:
["patient", "condition", procedure"]
"""
if arg is None:
return []
split_args = itertools.chain.from_iterable(x.split(",") for x in arg)
if casefold:
return map(str.casefold, split_args)
return split_args
18 changes: 2 additions & 16 deletions cumulus_etl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def write(self, obj: dict) -> None:
# lazily create the file, to avoid 0-line ndjson files (unless created in __init__)
self._ensure_file()

json.dump(obj, self._file)
# Specify separators for the most compact (no whitespace) representation saves disk space.
json.dump(obj, self._file, separators=(",", ":"))
self._file.write("\n")


Expand Down Expand Up @@ -316,21 +317,6 @@ def human_time_offset(seconds: int) -> str:
return f"{_pretty_float(hours)}h"


def info_mode():
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)


def debug_mode():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)


def warn_mode():
logging.basicConfig()
logging.getLogger().setLevel(logging.WARN)


def print_header(name: str | None = None) -> None:
"""Prints a section break to the console, with a name for the user"""
rich.get_console().rule()
Expand Down
12 changes: 11 additions & 1 deletion cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,30 @@
MISSING_REQUESTED_RESOURCES = 36
TOO_MANY_SMART_CREDENTIALS = 37
BAD_SMART_CREDENTIAL = 38
INLINE_TASK_FAILED = 39
INLINE_WITHOUT_FOLDER = 40


class FatalError(Exception):
"""An unrecoverable error"""


class NetworkError(FatalError):
"""An unrecoverable network error"""
"""A network error"""

def __init__(self, msg: str, response: httpx.Response):
super().__init__(msg)
self.response = response


class FatalNetworkError(NetworkError):
"""An unrecoverable network error that should not be retried"""


class TemporaryNetworkError(NetworkError):
"""An recoverable network error that could be retried"""


class FhirConnectionConfigError(FatalError):
"""We needed to connect to a FHIR server but are not configured correctly"""

Expand Down
6 changes: 6 additions & 0 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ async def etl_main(args: argparse.Namespace) -> None:
# Grab a list of all required resource types for the tasks we are running
required_resources = set(t.resource for t in selected_tasks)

inline_resources = cli_utils.expand_inline_resources(args.inline_resource)
inline_mimetypes = cli_utils.expand_inline_mimetypes(args.inline_mimetype)

# Create a client to talk to a FHIR server.
# This is useful even if we aren't doing a bulk export, because some resources like DocumentReference can still
# reference external resources on the server (like the document text).
Expand All @@ -326,6 +329,9 @@ async def etl_main(args: argparse.Namespace) -> None:
since=args.since,
until=args.until,
resume=args.resume,
inline=args.inline,
inline_resources=inline_resources,
inline_mimetypes=inline_mimetypes,
)

required_resources = await check_available_resources(
Expand Down
13 changes: 5 additions & 8 deletions cumulus_etl/etl/tasks/task_factory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""Finds and creates ETL tasks"""

import itertools
import sys
from collections.abc import Iterable
from typing import TypeVar

from cumulus_etl import errors
from cumulus_etl import cli_utils, errors
from cumulus_etl.etl.studies import covid_symptom, hftest
from cumulus_etl.etl.tasks import basic_tasks

Expand Down Expand Up @@ -67,13 +66,11 @@ def get_selected_tasks(
:param filter_tags: only tasks that have all the listed tags will be eligible for selection
:returns: a list of selected EtlTask subclasses, to instantiate and run
"""
names = names and set(itertools.chain.from_iterable(t.lower().split(",") for t in names))
filter_tags = filter_tags and list(
itertools.chain.from_iterable(t.lower().split(",") for t in filter_tags)
)
filter_tag_set = set(filter_tags or [])
names = set(cli_utils.expand_comma_list_arg(names, casefold=True))
filter_tags = list(cli_utils.expand_comma_list_arg(filter_tags, casefold=True))
filter_tag_set = set(filter_tags)

if names and "help" in names:
if "help" in names:
# OK, we actually are just going to print the list of all task names and be done.
_print_task_names()
raise SystemExit(errors.TASK_HELP) # not an *error* exactly, but not successful ETL either
Expand Down
6 changes: 6 additions & 0 deletions cumulus_etl/export/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ async def export_main(args: argparse.Namespace) -> None:
required_resources = {t.resource for t in selected_tasks}
using_default_tasks = not args.task and not args.task_filter

inline_resources = cli_utils.expand_inline_resources(args.inline_resource)
inline_mimetypes = cli_utils.expand_inline_mimetypes(args.inline_mimetype)

fhir_root = store.Root(args.url_input)
client = fhir.create_fhir_client_for_cli(args, fhir_root, required_resources)

Expand All @@ -39,6 +42,9 @@ async def export_main(args: argparse.Namespace) -> None:
since=args.since,
until=args.until,
resume=args.resume,
inline=args.inline,
inline_resources=inline_resources,
inline_mimetypes=inline_mimetypes,
)
await loader.load_from_bulk_export(
sorted(required_resources), prefer_url_resources=using_default_tasks
Expand Down
2 changes: 2 additions & 0 deletions cumulus_etl/fhir/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
FhirUrl,
download_reference,
get_docref_note,
parse_content_type,
parse_datetime,
ref_resource,
request_attachment,
unref_resource,
)
Loading