Skip to content

Commit

Permalink
feat: add inlining options and a new inline command
Browse files Browse the repository at this point in the history
Inlining here means downloading attachment URLs and insert them as
attachment data, allowing you to download the attachment just once,
at the cost of extra storage space.

This is often useful in preparation for NLP tasks, where you want to
often refer back to DocumentReference clinical notes, but do not want
to constantly deal with the flakiness of an EHR server. (Or losing
access to that server.)

New features:
- There is a new `inline` command that can inline an existing folder
  of NDJSON.
- The `export` and `etl` commands both now accept an opt-in flag to
  inline data after performing a bulk export.
- If the server provides a Retry-After header that is a timestamp
  (rather than a number of seconds), we now parse that correctly.
- All server requests are retried at least a little bit upon errors.
  (previously, only bulk export requests were retried - now every time
  we hit the server, so even for Medication downloads or DocRef
  attachment downloads during NLP tasks.

Behavior changes:
- If the server gives us a 429 error, we now log it as an error
  message, and don't log a successful progress call.
- If the server gives us a 429 error, we use the next exponential
  backoff delay instead of hard coding 60 seconds as the delay.
- If the server gives us a Retry-After header on an error message,
  we no longer unconditionally accept and use it. Rather the requested
  delay is capped by our next exponential backoff delay. That is, the
  server's Retry-After time will be used if it's LESS than our next
  backoff, but if it's longer, we'll still use our own backoff. This
  is lightly hostile, but (a) it's only done on error cases, (b) our
  backoff times are generous, and counted in minutes not seconds, and
  (c) it lets us guarantee a max delay time for callers.
- Instead of retrying on 429 and ALL 5xx errors, there's a specific
  list of error codes that we retry on. Currently it's 408, 429, 500,
  502, 503, and 504.
- Have the bulk exporter time out after 30 days, rather than one.
  We've seen Epic exports take a couple weeks.
  • Loading branch information
mikix committed Jan 28, 2025
1 parent 2395bb4 commit 7f2d0ab
Show file tree
Hide file tree
Showing 33 changed files with 1,443 additions and 229 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ jobs:
# Compare results
export OUTDIR=$DATADIR/run-output/covid_symptom__nlp_results
sudo chown -R $(id -u) $OUTDIR
sed -i 's/"generated_on": "[^"]*", //g' $OUTDIR/*.ndjson
sed -i 's/"generated_on":"[^"]*",//g' $OUTDIR/*.ndjson
diff -upr $DATADIR/expected-output $OUTDIR
echo "All Good!"
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Turns FHIR data into de-identified & aggregated records"""

__version__ = "2.0.0"
__version__ = "2.1.0"
7 changes: 5 additions & 2 deletions cumulus_etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import rich.logging

from cumulus_etl import common, etl, export, upload_notes
from cumulus_etl import common, etl, export, inliner, upload_notes
from cumulus_etl.etl import convert, init


Expand All @@ -20,6 +20,7 @@ class Command(enum.Enum):
ETL = "etl"
EXPORT = "export"
INIT = "init"
INLINE = "inline"
UPLOAD_NOTES = "upload-notes"

# Why isn't this part of Enum directly...?
Expand Down Expand Up @@ -69,13 +70,15 @@ async def main(argv: list[str]) -> None:
run_method = export.run_export
elif subcommand == Command.INIT.value:
run_method = init.run_init
elif subcommand == Command.INLINE.value:
run_method = inliner.run_inline
else:
parser.description = "Extract, transform, and load FHIR data."
if not subcommand:
# Add a note about other subcommands we offer, and tell argparse not to wrap our formatting
parser.formatter_class = argparse.RawDescriptionHelpFormatter
parser.description += "\n\nother commands available:\n"
parser.description += " convert\n export\n init\n upload-notes"
parser.description += " convert\n export\n init\n inline\n upload-notes"
run_method = etl.run_etl

with tempfile.TemporaryDirectory() as tempdir:
Expand Down
72 changes: 72 additions & 0 deletions cumulus_etl/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Helper methods for CLI parsing."""

import argparse
import itertools
import os
import socket
import tempfile
Expand Down Expand Up @@ -61,6 +62,24 @@ def add_bulk_export(parser: argparse.ArgumentParser, *, as_subgroup: bool = True
"--until", metavar="TIMESTAMP", help="end date for export from the FHIR server"
)
parser.add_argument("--resume", metavar="URL", help="polling status URL from a previous export")
parser.add_argument(
"--inline",
action="store_true",
help="attachments will be inlined after the export",
)
parser.add_argument(
"--inline-resource",
metavar="RESOURCES",
action="append",
help="only consider this resource for inlining (default is all supported inline targets: "
"DiagnosticReport and DocumentReference)",
)
parser.add_argument(
"--inline-mimetype",
metavar="MIMETYPES",
action="append",
help="only inline this attachment mimetype (default is text, HTML, and XHTML)",
)
return parser


Expand Down Expand Up @@ -176,3 +195,56 @@ def make_progress_bar() -> rich.progress.Progress:
rich.progress.TimeElapsedColumn(),
]
return rich.progress.Progress(*columns)


def expand_inline_resources(arg: Iterable[str] | None) -> set[str]:
"""
This converts a list of inline resource args into the final properly cased resource names.
If you have an arg like --inline-resource, this will process that for you.
"""
allowed = {"diagnosticreport": "DiagnosticReport", "documentreference": "DocumentReference"}

if arg is None:
return set(allowed.values())

resources = set(expand_comma_list_arg(arg))
for resource in resources:
if resource.casefold() not in allowed:
errors.fatal(f"Unsupported resource for inlining: {resource}", errors.ARGS_INVALID)

return {allowed[resource.casefold()] for resource in resources}


def expand_inline_mimetypes(arg: Iterable[str] | None) -> set[str]:
"""
This converts a list of inline mimetype args into a set of normalized mimetypes.
If you have an arg like --inline-mimetype, this will process that for you.
"""
if arg is None:
return {"text/plain", "text/html", "application/xhtml+xml"}

return set(expand_comma_list_arg(arg, casefold=True))


def expand_comma_list_arg(arg: Iterable[str] | None, casefold: bool = False) -> Iterable[str]:
"""
This converts a list of string args, splits any strings on commas, and combines results.
This is useful for CLI arguments with action="append" but you also want to allow comma
separated args. --task does this, as well as others.
An example CLI:
--task=patient --task=condition,procedure
Would give:
["patient", "condition,procedure"]
And this method would turn that into:
["patient", "condition", procedure"]
"""
if arg is None:
return []
split_args = itertools.chain.from_iterable(x.split(",") for x in arg)
if casefold:
return map(str.casefold, split_args)
return split_args
18 changes: 2 additions & 16 deletions cumulus_etl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def write(self, obj: dict) -> None:
# lazily create the file, to avoid 0-line ndjson files (unless created in __init__)
self._ensure_file()

json.dump(obj, self._file)
# Specify separators for the most compact (no whitespace) representation saves disk space.
json.dump(obj, self._file, separators=(",", ":"))
self._file.write("\n")


Expand Down Expand Up @@ -316,21 +317,6 @@ def human_time_offset(seconds: int) -> str:
return f"{_pretty_float(hours)}h"


def info_mode():
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)


def debug_mode():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)


def warn_mode():
logging.basicConfig()
logging.getLogger().setLevel(logging.WARN)


def print_header(name: str | None = None) -> None:
"""Prints a section break to the console, with a name for the user"""
rich.get_console().rule()
Expand Down
12 changes: 11 additions & 1 deletion cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,30 @@
MISSING_REQUESTED_RESOURCES = 36
TOO_MANY_SMART_CREDENTIALS = 37
BAD_SMART_CREDENTIAL = 38
INLINE_TASK_FAILED = 39
INLINE_WITHOUT_FOLDER = 40


class FatalError(Exception):
"""An unrecoverable error"""


class NetworkError(FatalError):
"""An unrecoverable network error"""
"""A network error"""

def __init__(self, msg: str, response: httpx.Response):
super().__init__(msg)
self.response = response


class FatalNetworkError(NetworkError):
"""An unrecoverable network error that should not be retried"""


class TemporaryNetworkError(NetworkError):
"""An recoverable network error that could be retried"""


class FhirConnectionConfigError(FatalError):
"""We needed to connect to a FHIR server but are not configured correctly"""

Expand Down
6 changes: 6 additions & 0 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ async def etl_main(args: argparse.Namespace) -> None:
# Grab a list of all required resource types for the tasks we are running
required_resources = set(t.resource for t in selected_tasks)

inline_resources = cli_utils.expand_inline_resources(args.inline_resource)
inline_mimetypes = cli_utils.expand_inline_mimetypes(args.inline_mimetype)

# Create a client to talk to a FHIR server.
# This is useful even if we aren't doing a bulk export, because some resources like DocumentReference can still
# reference external resources on the server (like the document text).
Expand All @@ -326,6 +329,9 @@ async def etl_main(args: argparse.Namespace) -> None:
since=args.since,
until=args.until,
resume=args.resume,
inline=args.inline,
inline_resources=inline_resources,
inline_mimetypes=inline_mimetypes,
)

required_resources = await check_available_resources(
Expand Down
13 changes: 5 additions & 8 deletions cumulus_etl/etl/tasks/task_factory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""Finds and creates ETL tasks"""

import itertools
import sys
from collections.abc import Iterable
from typing import TypeVar

from cumulus_etl import errors
from cumulus_etl import cli_utils, errors
from cumulus_etl.etl.studies import covid_symptom, hftest
from cumulus_etl.etl.tasks import basic_tasks

Expand Down Expand Up @@ -67,13 +66,11 @@ def get_selected_tasks(
:param filter_tags: only tasks that have all the listed tags will be eligible for selection
:returns: a list of selected EtlTask subclasses, to instantiate and run
"""
names = names and set(itertools.chain.from_iterable(t.lower().split(",") for t in names))
filter_tags = filter_tags and list(
itertools.chain.from_iterable(t.lower().split(",") for t in filter_tags)
)
filter_tag_set = set(filter_tags or [])
names = set(cli_utils.expand_comma_list_arg(names, casefold=True))
filter_tags = list(cli_utils.expand_comma_list_arg(filter_tags, casefold=True))
filter_tag_set = set(filter_tags)

if names and "help" in names:
if "help" in names:
# OK, we actually are just going to print the list of all task names and be done.
_print_task_names()
raise SystemExit(errors.TASK_HELP) # not an *error* exactly, but not successful ETL either
Expand Down
6 changes: 6 additions & 0 deletions cumulus_etl/export/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ async def export_main(args: argparse.Namespace) -> None:
required_resources = {t.resource for t in selected_tasks}
using_default_tasks = not args.task and not args.task_filter

inline_resources = cli_utils.expand_inline_resources(args.inline_resource)
inline_mimetypes = cli_utils.expand_inline_mimetypes(args.inline_mimetype)

fhir_root = store.Root(args.url_input)
client = fhir.create_fhir_client_for_cli(args, fhir_root, required_resources)

Expand All @@ -39,6 +42,9 @@ async def export_main(args: argparse.Namespace) -> None:
since=args.since,
until=args.until,
resume=args.resume,
inline=args.inline,
inline_resources=inline_resources,
inline_mimetypes=inline_mimetypes,
)
await loader.load_from_bulk_export(
sorted(required_resources), prefer_url_resources=using_default_tasks
Expand Down
2 changes: 2 additions & 0 deletions cumulus_etl/fhir/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
FhirUrl,
download_reference,
get_docref_note,
parse_content_type,
parse_datetime,
ref_resource,
request_attachment,
unref_resource,
)
Loading

0 comments on commit 7f2d0ab

Please sign in to comment.