From 2a43757c1a963d7cdda359d33a1c1369b49d1f79 Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Thu, 15 Feb 2024 16:03:40 +0000 Subject: [PATCH 1/2] remove samplesheet package --- pgscatalog_utils/samplesheet/Config.py | 7 - pgscatalog_utils/samplesheet/check.py | 370 ------------------------- pyproject.toml | 1 - 3 files changed, 378 deletions(-) delete mode 100644 pgscatalog_utils/samplesheet/Config.py delete mode 100755 pgscatalog_utils/samplesheet/check.py diff --git a/pgscatalog_utils/samplesheet/Config.py b/pgscatalog_utils/samplesheet/Config.py deleted file mode 100644 index 1f4bddb..0000000 --- a/pgscatalog_utils/samplesheet/Config.py +++ /dev/null @@ -1,7 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class Config: - input_path: str - output_path: str diff --git a/pgscatalog_utils/samplesheet/check.py b/pgscatalog_utils/samplesheet/check.py deleted file mode 100755 index b1ff16b..0000000 --- a/pgscatalog_utils/samplesheet/check.py +++ /dev/null @@ -1,370 +0,0 @@ -import argparse -import logging -import math -import pathlib -from pathlib import Path - -import pandas as pd - -from pgscatalog_utils import config -from pgscatalog_utils.samplesheet.Config import Config - -logger = logging.getLogger(__name__) - - -def _parse_args(args=None) -> argparse.Namespace: - d: ( - str - ) = "Convert pgscatalog/pgsc_calc samplesheet file to JSON and check its contents." - e: str = "Example usage: python check.py " - - parser: argparse.ArgumentParser = argparse.ArgumentParser(description=d, epilog=e) - parser.add_argument("FILE_IN", help="Input samplesheet file.") - parser.add_argument( - "-v", - "--verbose", - dest="verbose", - action="store_true", - help=" Extra logging information", - ) - parser.add_argument("FILE_OUT", help="Output file.") - return parser.parse_args(args) - - -def _truncate_chrom(chrom): - match chrom: - case _ if chrom.isdigit(): - return int(chrom) - case _ if chrom.startswith("chr"): - logger.critical("Please remove chr prefix from samplesheet chromosome column e.g. chr1 -> 1, chrX -> X") - raise ValueError("chr prefix detected") - case _: - return chrom - - -def _check_colnames(df: pd.DataFrame): - mandatory: list[str] = ["sampleset", "path_prefix", "chrom", "format"] - optional: list[str] = ["vcf_genotype_field"] - - if not set(mandatory) == set(df.columns): - if set(mandatory + optional) == set(df.columns): - # this is fine - return - else: - logger.critical("Samplesheet has invalid header row") - logger.critical(f"Column names must only include: {mandatory}") - [ - logger.critical(f"Invalid column name: {col}") - for col in df - if col not in mandatory - ] - raise Exception - - -def _check_unique_paths(df: pd.DataFrame): - """Each row in a samplesheet should have a unique path""" - duplicated: pd.Series = df["path_prefix"].duplicated() - for idx, duplicate in duplicated.items(): - if duplicate: - bad_record = df.iloc[:idx] - logger.critical(f"Duplicated path found in samplesheet:\n{bad_record}") - - -def _check_empty_paths(df: pd.DataFrame): - """Paths are mandatory""" - empty_paths: pd.Series = df["path_prefix"].isnull() - for idx, empty in empty_paths.items(): - if empty: - logger.critical(f"Empty path found in samplesheet:\n {df.iloc[[idx]]}") - raise Exception - - -def _read_samplesheet(path: str) -> pd.DataFrame: - csv: pd.DataFrame = pd.read_csv(path, sep=",", header=0, converters={"chrom": str}) - csv["chrom"] = csv["chrom"].apply(_truncate_chrom) - return csv - - -def _check_paths(df: pd.DataFrame) -> None: - _check_empty_paths(df) - _check_unique_paths(df) - - -def _get_chrom_list(df: pd.DataFrame) -> dict[str, list[str | None]]: - chrom_dict = {} - for idx, row in df.iterrows(): - key = row["sampleset"] - value = row["chrom"] - try: - if math.isnan(value): - value = None - except TypeError: - pass - chroms = chrom_dict.get(key, []) - chroms.append(value) - chrom_dict.update({key: chroms}) - - return chrom_dict - - -def _check_chrom_duplicates(sampleset: str, chrom_list: dict) -> None: - seen = set() - duplicate_chromosomes: list[str] = [ - str(x) for x in chrom_list if x in seen or seen.add(x) - ] - if duplicate_chromosomes: - logger.critical(f"Duplicate chromosomes detected in sampleset {sampleset}") - logger.critical(f"Duplicate chromosomes: {duplicate_chromosomes}") - raise Exception - - -def _check_multiple_missing_chrom(sampleset: str, chrom_list: dict) -> None: - for chrom in chrom_list: - if chrom is None and len(chrom_list) != 1: - logger.critical( - f"Sampleset {sampleset} has rows with multiple missing chromosomes" - ) - logger.critical( - "If you have file with multiple chromosomes, delete the duplicate rows" - ) - logger.critical( - "If your data are split per chromosome, then chromosomes must be set for all rows" - ) - raise Exception - - -def _check_chrom(df: pd.DataFrame) -> None: - # get a list of chroms per sampleset and check them for some basic errors - chroms: dict = _get_chrom_list(df) - - for sampleset, chrom_list in chroms.items(): - _check_chrom_duplicates(sampleset, chrom_list) - _check_multiple_missing_chrom(sampleset, chrom_list) - - -def _check_format(df: pd.DataFrame): - """Make sure the file format is a valid choice""" - for idx, row in df.iterrows(): - valid_formats: list[str] = ["vcf", "pfile", "bfile"] - if row["format"] not in valid_formats: - logger.critical( - f"Invalid format: {row['format']} must be one of {valid_formats}" - ) - logger.critical(f"\n{df.iloc[[idx]]}") - raise Exception - - -def _setup_paths(df: pd.DataFrame) -> pd.DataFrame: - """Add suffix to path prefixes depending on file format / type""" - paths: list[pd.Series] = [] - for idx, row in df.iterrows(): - suffix: list[str] - match row["format"]: - case "vcf": - logger.info("Setting VCF input") - suffix = [".vcf.gz"] - case "bfile": - logger.info("Setting plink1 binary fileset (bfile) input") - suffix = [".bed", ".bim", ".fam"] - case "pfile": - logger.info("Setting plink2 binary fileset (pfile) input") - suffix = [".pgen", ".pvar", ".psam"] - case _: - raise Exception - - resolved_paths: list[str] = _resolve_paths( - [row["path_prefix"] + x for x in suffix], row["format"] - ) - paths.append(pd.Series(data=[resolved_paths], index=[idx])) - - df["path"] = pd.concat(paths) - return df - - -def _resolve_compressed_variant_path(path: str) -> pathlib.Path: - # .bim.zst | .bim -> OK - # .pvar.zst | .pvar -> OK - # anything else not OK - zstd_ext: str = ".zst" - compressed_path: pathlib.Path = pathlib.Path(path + zstd_ext).resolve() - uncompressed_path: pathlib.Path = pathlib.Path(path).resolve() - - # prefer compressed data - if compressed_path.exists(): - logger.info(f"Found compressed variant information file {compressed_path.name}") - return compressed_path - elif uncompressed_path.exists(): - logger.info( - f"Couldn't find compressed variant information file, trying {uncompressed_path.name}" - ) - return uncompressed_path - else: - logger.critical(f"{compressed_path} doesn't exist") - logger.critical(f"{uncompressed_path} doesn't exist") - logger.critical( - "Couldn't find variant information files, please check samplesheet path_prefix and try again" - ) - raise Exception - - -def _resolve_paths(path_list: list[str], filetype: str) -> list[str]: - resolved_list: list[str] = [] - - # always resolve the input samplesheet - base_dir: Path = Path(Config.input_path).resolve().parent - if (path := Path(Config.input_path)).is_symlink(): - logger.info( - f"Input file {path} is symlinked, resolving to absolute path {path.resolve()}" - ) - - for path in path_list: - if path.startswith("https://") | path.startswith("s3://"): - logger.info("Remote path detected, skipping resolve") - resolved_list.append(str(path)) - continue - elif path.startswith("http://"): - logger.critical("HTTP download is insecure! Did you mean https:// ?") - raise Exception("Insecure path detected") - else: - p: Path = Path(path) - if not p.is_absolute(): - logger.warning( - "Relative path detected in samplesheet. Set absolute paths to silence this warning." - ) - logger.warning( - "Assuming input samplesheet is a symlinked file in a nextflow working directory" - ) - logger.warning( - "Following symlink and attempting to resolve path relative to input file" - ) - logger.warning(f"Resolving paths relative to: {base_dir}") - resolved = _resolve_filetypes( - path=str(base_dir.joinpath(path)), filetype=filetype - ) - else: - logger.info("Absolute path detected") - resolved = _resolve_filetypes(filetype=filetype, path=str(p)) - - if resolved.exists(): - logger.info(f"{resolved} exists") - resolved_list.append(str(resolved)) - else: - logger.critical( - f"{resolved} doesn't exist, please check samplesheet path_prefix and try again" - ) - logger.critical( - "If you're 100% sure this file exists and you're confused by this error, please check https://pgsc-calc.readthedocs.io/en/latest/how-to/mount.html" - ) - raise FileNotFoundError - - return resolved_list - - -def _resolve_filetypes(filetype: str, path: str) -> Path: - match filetype: - case "pfile" | "bfile": - if path.endswith(".bim") or path.endswith(".pvar"): - resolved = _resolve_compressed_variant_path(path) - else: - # bed / pgen | fam / psam - resolved = pathlib.Path(path).resolve() - case "vcf": - resolved = pathlib.Path(path).resolve() - case _: - logger.critical(f"Unsupported filetype {filetype}") - raise Exception - - return resolved - - -def _check_genotype_field(df: pd.DataFrame) -> pd.DataFrame: - df["vcf_import_dosage"] = False # (dosage off by default) - if "vcf_genotype_field" in df.columns: - logger.debug("vcf_genotype_field detected") - for index, row in df.iterrows(): - if row["vcf_genotype_field"] not in ["GT", "DS"]: - missing: bool # missing dosage is OK - try: - missing = math.isnan(row["vcf_genotype_field"]) - except TypeError: - missing = False - - if not missing: - logger.critical( - f"Invalid entry in vcf_genotype_field: {row['vcf_genotype_field']}" - ) - logger.critical(f"\n {row}") - raise Exception - - df.loc[df["vcf_genotype_field"] == "DS", "vcf_import_dosage"] = True - else: - logger.info("no vcf_genotype_field detected") - - return df - - -def _check_reserved_names(df: pd.DataFrame): - if any(df["sampleset"] == "reference"): - logger.critical( - "Samplesets must not be named 'reference', please rename in the sample sheet" - ) - raise Exception - - # Check whether reference contains reserved tokens from nextflow channels - badnames = [x for x in df["sampleset"] if ("." in x or "_" in x)] - if len(badnames) > 0: - logger.critical( - "Samplesets must not contain any reserved characters ( '_' , '.'), " - "please rename the following samples in the sample sheet: {}".format( - badnames - ) - ) - raise Exception - - -def _check_one_sampleset(df: pd.DataFrame): - samplesets = set(df["sampleset"].to_list()) - if len(samplesets) > 1: - logger.critical(f"Multiple samplesets defined in the samplesheet {samplesets}") - sampleset_error = """ Only one sampleset per samplesheet is supported - Your genomic data should _only_ be split by chromosome - pgsc_calc works best with cohorts - Individual VCFs should be merged into a multi-sample VCF - If you want to process multiple cohorts, please run pgsc_calc multiple times with different samplesheets. """ - [logger.critical(x.strip()) for x in sampleset_error.split("\n")] - raise Exception("Multiple samplesets") - - -def check_samplesheet() -> None: - """ - This function checks that the samplesheet follows the following structure: - sampleset,vcf_path,bfile_path,chrom,chunk - cineca_synthetic_subset,cineca_synthetic_subset.vcf.gz,,22, - """ - args = _parse_args() - config.set_logging_level(args.verbose) - - Config.input_path = args.FILE_IN - Config.output_path = args.FILE_OUT - - df = _read_samplesheet(Config.input_path) - - # check df for errors - _check_one_sampleset(df) - _check_reserved_names(df) - _check_colnames(df) - _check_paths(df) - _check_chrom(df) - _check_format(df) - - # add information to df - df = _setup_paths(df) - df = _check_genotype_field(df) # dosages - - logger.info("Samplesheet checks complete") - (df.drop(["path_prefix"], axis=1).to_json(Config.output_path, orient="records")) - logger.info(f"JSON file successfully written to {Config.output_path}") - - -if __name__ == "__main__": - check_samplesheet() diff --git a/pyproject.toml b/pyproject.toml index 94d18a6..098283f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,6 @@ aggregate_scores = "pgscatalog_utils.aggregate.aggregate_scores:aggregate_scores validate_scorefiles = "pgscatalog_utils.validate.validate_scorefile:validate_scorefile" relabel_ids = "pgscatalog_utils.relabel.relabel_ids:relabel_ids" ancestry_analysis = "pgscatalog_utils.ancestry.ancestry_analysis:ancestry_analysis" -samplesheet_to_json = "pgscatalog_utils.samplesheet.check:check_samplesheet" [tool.poetry.dependencies] python = "^3.10" From 43db0a03b6b2b6cb48b22835a1317a3578689acd Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Thu, 15 Feb 2024 16:46:59 +0000 Subject: [PATCH 2/2] delete samplesheet tests --- tests/test_samplesheet.py | 101 -------------------------------------- 1 file changed, 101 deletions(-) delete mode 100644 tests/test_samplesheet.py diff --git a/tests/test_samplesheet.py b/tests/test_samplesheet.py deleted file mode 100644 index 80d77db..0000000 --- a/tests/test_samplesheet.py +++ /dev/null @@ -1,101 +0,0 @@ -import json -import os -from pathlib import Path -from unittest.mock import patch - -import pandas as pd -import pytest - -from pgscatalog_utils.samplesheet.check import check_samplesheet - - -@pytest.fixture -def existing_vcf_prefix(tmp_path): - vcf_path = tmp_path / "test.vcf.gz" - _touch(vcf_path) - return str(vcf_path.parent.joinpath(Path(vcf_path.stem).stem)) - - -@pytest.fixture -def samplesheet_df(existing_vcf_prefix): - return pd.DataFrame( - {"path_prefix": [existing_vcf_prefix], "format": ["vcf"], "sampleset": ["test"], "chrom": [None]}) - - -@pytest.fixture -def good_samplesheet(samplesheet_df, tmp_path): - path = tmp_path / "good_samplesheet.csv" - samplesheet_df.to_csv(path, index=False) - return str(path) - - -@pytest.fixture -def bad_samplesheet(samplesheet_df, tmp_path): - path = tmp_path / "bad_samplesheet.csv" - bad_df = samplesheet_df.copy() - bad_df['path_prefix'] = 'bad_path' # path doesn't exist - bad_df.to_csv(path, index=False) - return str(path) - - -@pytest.fixture -def multi_samplesets(samplesheet_df, tmp_path): - path = tmp_path / "multi_samplesets.csv" - multi_samplesets = pd.concat([samplesheet_df, samplesheet_df], ignore_index=True) - multi_samplesets.loc[multi_samplesets.index == 1, 'sampleset'] = 'a_different_name' - multi_samplesets.to_csv(path, index=False) - return str(path) - - -@pytest.fixture -def vcf_dosage(samplesheet_df, tmp_path): - path = tmp_path / "vcf_dosage.csv" - dosage_samplesheet = samplesheet_df.copy() - dosage_samplesheet["vcf_genotype_field"] = ["DS"] - dosage_samplesheet.to_csv(path, index=False) - return str(path) - - -def _touch(fname): - if os.path.exists(fname): - os.utime(fname, None) - else: - open(fname, 'a').close() - - -def test_good_samplesheet(good_samplesheet, tmp_path): - out_path = str(tmp_path / "out.json") - args = ['samplesheet_to_json', good_samplesheet, out_path] - with patch('sys.argv', args): - check_samplesheet() - - assert os.path.exists(out_path), "No file written" - - -def test_bad_samplesheet(bad_samplesheet, tmp_path): - out_path = str(tmp_path / "out.json") - args = ['samplesheet_to_json', bad_samplesheet, out_path] - with patch('sys.argv', args): - with pytest.raises(FileNotFoundError): - check_samplesheet() - - -def test_multi_samplesets(multi_samplesets, tmp_path): - out_path = str(tmp_path / "out.json") - args = ['samplesheet_to_json', multi_samplesets, out_path] - with patch('sys.argv', args): - with pytest.raises(Exception, match="Multiple samplesets"): - check_samplesheet() - - -def test_dosage_samplesheet(vcf_dosage, tmp_path): - out_path = str(tmp_path / "out.json") - args = ['samplesheet_to_json', vcf_dosage, out_path] - with patch('sys.argv', args): - check_samplesheet() - - assert os.path.exists(out_path), "Missing output file" - - with open(out_path, 'r') as f: - converted = json.loads(f.read()) - assert converted[0]['vcf_import_dosage'], "Not importing dosage correctly"