Skip to content

Commit

Permalink
Chunking sql export queries, coverage (#249)
Browse files Browse the repository at this point in the history
* Chunking SQL export queries

* cleanup extra files, docstring tweaks
  • Loading branch information
dogversioning authored Jun 17, 2024
1 parent 5db1c2a commit 23f6bc9
Show file tree
Hide file tree
Showing 28 changed files with 831 additions and 316 deletions.
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[run]
omit =cumulus_library/schema/*
16 changes: 15 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,23 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install ".[test]"
- name: Create mock AWS credentials
run: |
mkdir ~/.aws && touch ~/.aws/credentials
echo -e "[test]\naws_access_key_id = test\naws_secret_access_key = test" > ~/.aws/credentials
- name: Test with pytest
run: |
python -m pytest
python -m pytest --cov-report xml --cov=cumulus_library tests
- name: Generate coverage report
uses: orgoro/[email protected]
with:
coverageFile: coverage.xml
token: ${{ secrets.GITHUB_TOKEN }}
thresholdAll: .9
thresholdNew: 1
thresholdModified: .95


lint:
runs-on: ubuntu-22.04
steps:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ output.sql
*generated.md
MRCONSO.RRF
*.zip
coverage.xml

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
37 changes: 30 additions & 7 deletions cumulus_library/actions/exporter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import pathlib

import pyarrow
from pyarrow import csv, parquet
from rich.progress import track

from cumulus_library import base_utils, databases, study_parser
Expand All @@ -25,12 +26,25 @@ def reset_counts_exports(
file.unlink()


def _write_chunk(writer, chunk, schema):
writer.write(
pyarrow.Table.from_pandas(
chunk.sort_values(
by=list(chunk.columns), ascending=False, na_position="first"
),
preserve_index=False,
schema=schema,
)
)


def export_study(
manifest_parser: study_parser.StudyManifestParser,
db: databases.DatabaseBackend,
schema_name: str,
data_path: pathlib.Path,
archive: bool,
chunksize: int = 1000000,
) -> list:
"""Exports csvs/parquet extracts of tables listed in export_list
:param db: A database backend
Expand All @@ -56,13 +70,22 @@ def export_study(
description=f"Exporting {manifest_parser.get_study_prefix()} data...",
):
query = f"SELECT * FROM {table}"
dataframe = db.execute_as_pandas(query)
dataframe_chunks, db_schema = db.execute_as_pandas(query, chunksize=chunksize)
path.mkdir(parents=True, exist_ok=True)
dataframe = dataframe.sort_values(
by=list(dataframe.columns), ascending=False, na_position="first"
)
dataframe.to_csv(f"{path}/{table}.csv", index=False, quoting=csv.QUOTE_MINIMAL)
dataframe.to_parquet(f"{path}/{table}.parquet", index=False)
schema = pyarrow.schema(db.col_pyarrow_types_from_sql(db_schema))
with parquet.ParquetWriter(f"{path}/{table}.parquet", schema) as p_writer:
with csv.CSVWriter(
f"{path}/{table}.csv",
schema,
write_options=csv.WriteOptions(
# Note that this quoting style is not exactly csv.QUOTE_MINIMAL
# https://github.com/apache/arrow/issues/42032
quoting_style="needed"
),
) as c_writer:
for chunk in dataframe_chunks:
_write_chunk(p_writer, chunk, schema) # pragma: no cover
_write_chunk(c_writer, chunk, schema) # pragma: no cover
queries.append(queries)
if archive:
base_utils.zip_dir(path, data_path, manifest_parser.get_study_prefix())
Expand Down
98 changes: 22 additions & 76 deletions cumulus_library/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ def clean_study(
:param stats_clean: If true, removes previous stats runs
:keyword prefix: If True, does a search by string prefix in place of study name
"""
if targets is None or targets == ["all"]:
if targets is None:
sys.exit(
"Explicit targets for cleaning not provided. "
"Provide one or more explicit study prefixes to remove."
)

for target in targets:
if prefix:
manifest_parser = study_parser.StudyManifestParser()
Expand Down Expand Up @@ -207,25 +208,6 @@ def run_matching_table_builder(
config=config,
)

def clean_and_build_all(
self, study_dict: dict, config: base_utils.StudyConfig
) -> None:
"""Builds tables for all studies.
NOTE: By design, this method will always exclude the `template` study dir,
since 99% of the time you don't need a live copy in the database.
:param study_dict: A dict of paths
:param config: A StudyConfig object containing optional params
"""
study_dict = dict(study_dict)
study_dict.pop("template")
for precursor_study in ["vocab", "core"]:
self.clean_and_build_study(study_dict[precursor_study], config=config)
study_dict.pop(precursor_study)
for key in study_dict:
self.clean_and_build_study(study_dict[key], config=config)

### Data exporters
def export_study(
self, target: pathlib.Path, data_path: pathlib.Path, archive: bool
Expand All @@ -241,11 +223,6 @@ def export_study(
manifest_parser, self.db, self.schema_name, data_path, archive
)

def export_all(self, study_dict: dict, data_path: pathlib.Path, archive: bool):
"""Exports all defined count tables to disk"""
for key in study_dict.keys():
self.export_study(study_dict[key], data_path, archive)

def generate_study_sql(
self,
target: pathlib.Path,
Expand Down Expand Up @@ -296,24 +273,6 @@ def get_abs_path(path: str) -> pathlib.Path:
return pathlib.Path(pathlib.Path.cwd(), path)


def create_template(path: str) -> None:
"""Creates a manifest in target dir if one doesn't exist"""
abs_path = get_abs_path(path)
manifest_path = pathlib.Path(abs_path, "manifest.toml")
if manifest_path.exists():
sys.exit(f"A manifest.toml already exists at {abs_path}, skipping creation")
abs_path.mkdir(parents=True, exist_ok=True)

copy_lists = [
["studies/template/manifest.toml", "manifest.toml"],
[".sqlfluff", ".sqlfluff"],
]
for source, dest in copy_lists:
source_path = pathlib.Path(pathlib.Path(__file__).resolve().parents[0], source)
dest_path = pathlib.Path(abs_path, dest)
dest_path.write_bytes(source_path.read_bytes())


def get_study_dict(alt_dir_paths: list) -> dict[str, pathlib.Path] | None:
"""Gets valid study targets from ./studies/, and any pip installed studies
Expand Down Expand Up @@ -362,10 +321,8 @@ def get_studies_by_manifest_path(path: pathlib.Path) -> dict[str, pathlib.Path]:
def run_cli(args: dict):
"""Controls which library tasks are run based on CLI arguments"""
console = rich.console.Console()
if args["action"] == "create":
create_template(args["create_dir"])

elif args["action"] == "upload":
if args["action"] == "upload":
try:
uploader.upload_files(args)
except requests.RequestException as e:
Expand All @@ -387,7 +344,7 @@ def run_cli(args: dict):
runner.verbose = True
console.print("[italic] Connecting to database...")
runner.cursor.execute("SHOW DATABASES")
study_dict = get_study_dict(args["study_dir"])
study_dict = get_study_dict(args.get("study_dir"))
if "prefix" not in args.keys():
if args.get("target"):
for target in args["target"]:
Expand All @@ -406,19 +363,16 @@ def run_cli(args: dict):
prefix=args["prefix"],
)
elif args["action"] == "build":
if "all" in args["target"]:
runner.clean_and_build_all(study_dict, config=config)
else:
for target in args["target"]:
if args["builder"]:
runner.run_matching_table_builder(
study_dict[target], args["builder"], config=config
)
else:
runner.clean_and_build_study(
study_dict[target],
config=config,
)
for target in args["target"]:
if args["builder"]:
runner.run_matching_table_builder(
study_dict[target], args["builder"], config=config
)
else:
runner.clean_and_build_study(
study_dict[target],
config=config,
)

elif args["action"] == "export":
if args["archive"]:
Expand All @@ -429,20 +383,17 @@ def run_cli(args: dict):
"set[/italic], primarily dates, on a per patient level.\n\n"
"[bold]By doing this, you are assuming the responsibility for "
"meeting your organization's security requirements for "
"storing this data in a secure manager.[/bold]\n\n"
"storing this data in a secure manner.[/bold]\n\n"
"Type Y to proceed, or any other value to quit.\n"
)
console.print(warning_text)
response = input()
if response.lower() != "y":
sys.exit()
if "all" in args["target"]:
runner.export_all(study_dict, args["data_path"], args["archive"])
else:
for target in args["target"]:
runner.export_study(
study_dict[target], args["data_path"], args["archive"]
)
for target in args["target"]:
runner.export_study(
study_dict[target], args["data_path"], args["archive"]
)

elif args["action"] == "import":
for archive in args["archive_path"]:
Expand Down Expand Up @@ -475,11 +426,6 @@ def main(cli_args=None):
if args["action"] is None:
parser.print_usage()
sys.exit(1)
if args.get("target"):
for target in args["target"]:
if target == "all":
args["target"] = ["all"]
break

arg_env_pairs = (
("data_path", "CUMULUS_LIBRARY_DATA_PATH"),
Expand All @@ -493,7 +439,7 @@ def main(cli_args=None):
("umls_key", "UMLS_API_KEY"),
("url", "CUMULUS_AGGREGATOR_URL"),
("user", "CUMULUS_AGGREGATOR_USER"),
("workgroup", "CUMULUS_LIBRARY_WORKGROUP"),
("work_group", "CUMULUS_LIBRARY_WORKGROUP"),
)
read_env_vars = []
for pair in arg_env_pairs:
Expand Down Expand Up @@ -541,8 +487,8 @@ def main(cli_args=None):


def main_cli(): # called by the generated wrapper scripts
main()
main() # pragma: no cover


if __name__ == "__main__":
main()
main() # pragma: no cover
1 change: 1 addition & 0 deletions cumulus_library/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def add_aws_config(parser: argparse.ArgumentParser) -> None:
aws.add_argument(
"--workgroup",
default="cumulus",
dest="work_group",
help="Cumulus Athena workgroup (default: cumulus)",
)
aws.add_argument(
Expand Down
Loading

0 comments on commit 23f6bc9

Please sign in to comment.