From 73fa2b7d15bbc95f983cc014a04174b1d3624c6b Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 3 May 2023 19:22:27 +0200 Subject: [PATCH 01/32] feat: generic ingest function based on irods pythonclient --- cubi_tk/sodar/__init__.py | 5 + cubi_tk/sodar/ingest.py | 366 +++++++++++++++++++++++++++++++++++++ tests/test_sodar_ingest.py | 31 ++++ 3 files changed, 402 insertions(+) create mode 100644 cubi_tk/sodar/ingest.py create mode 100644 tests/test_sodar_ingest.py diff --git a/cubi_tk/sodar/__init__.py b/cubi_tk/sodar/__init__.py index 95321569..699914e5 100644 --- a/cubi_tk/sodar/__init__.py +++ b/cubi_tk/sodar/__init__.py @@ -37,6 +37,9 @@ Upload external files to SODAR (defaults for fastq files). +``ingest`` + Upload arbitrary files to SODAR + ``check-remote`` Check if or which local files with md5 sums are already deposited in iRODs/Sodar @@ -54,6 +57,7 @@ from .check_remote import setup_argparse as setup_argparse_check_remote from .download_sheet import setup_argparse as setup_argparse_download_sheet from .ingest_fastq import setup_argparse as setup_argparse_ingest_fastq +from .ingest import setup_argparse as setup_argparse_ingest from .lz_create import setup_argparse as setup_argparse_lz_create from .lz_list import setup_argparse as setup_argparse_lz_list from .lz_move import setup_argparse as setup_argparse_lz_move @@ -87,6 +91,7 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None: "ingest-fastq", help="Upload external files to SODAR (defaults for fastq)" ) ) + setup_argparse_ingest(subparsers.add_parser("ingest", help="Upload arbitrary files to SODAR")) setup_argparse_check_remote( subparsers.add_parser( "check-remote", help="Compare local files with md5 sum against SODAR/iRODS" diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py new file mode 100644 index 00000000..d48bb2d4 --- /dev/null +++ b/cubi_tk/sodar/ingest.py @@ -0,0 +1,366 @@ +"""``cubi-tk sodar ingest``: add arbitrary files to SODAR""" + +import argparse +import os +import sys +from pathlib import Path +import getpass +import typing +import attr + +from irods.session import iRODSSession +from logzero import logger +from sodar_cli import api +from ..common import load_toml_config, is_uuid, compute_md5_checksum, sizeof_fmt +from ..snappy.itransfer_common import TransferJob +from tqdm import tqdm + + +@attr.s(frozen=True, auto_attribs=True) +class Config: + """Configuration for the ingest command.""" + + config: str = attr.field(default=None) + sodar_server_url: str = attr.field(default=None) + sodar_api_token: str = attr.field(default=None, repr=lambda value: "***") # type: ignore + + +class SodarIngest: + """Implementation of sodar ingest command.""" + + def __init__(self, args): + # Command line arguments. + self.args = args + + # Path to iRODS environment file + self.irods_env_path = Path(Path.home(), ".irods", "irods_environment.json") + if not self.irods_env_path.exists(): + logger.error("iRODS environment file is missing.") + sys.exit(1) + + # iRODS environment + self.irods_env = None + + def _init_irods(self): + """Connect to iRODS.""" + irods_auth_path = self.irods_env_path.parent.joinpath(".irodsA") + if irods_auth_path.exists(): + try: + session = iRODSSession(irods_env_file=self.irods_env_path) + session.server_version # check for outdated .irodsA file + except Exception as e: + logger.error("iRODS connection failed: %s", self.get_irods_error(e)) + logger.error("Are you logged in? try 'iinit'") + sys.exit(1) + finally: + session.cleanup() + else: + # Query user for password. + logger.info("iRODS authentication file not found.") + password = getpass.getpass(prompt="Please enter SODAR password:") + try: + session = iRODSSession(irods_env_file=self.irods_env_path, password=password) + session.server_version # check for exceptions + except Exception as e: + logger.error("iRODS connection failed: %s", self.get_irods_error(e)) + sys.exit(1) + finally: + session.cleanup() + + return session + + @classmethod + def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--hidden-cmd", dest="sodar_cmd", default=cls.run, help=argparse.SUPPRESS + ) + group_sodar = parser.add_argument_group("SODAR-related") + group_sodar.add_argument( + "--sodar-url", + default=os.environ.get("SODAR_URL", "https://sodar.bihealth.org/"), + help="URL to SODAR, defaults to SODAR_URL environment variable or fallback to https://sodar.bihealth.org/", + ) + group_sodar.add_argument( + "--sodar-api-token", + default=os.environ.get("SODAR_API_TOKEN", None), + help="SODAR API token. Defaults to SODAR_API_TOKEN environment variable.", + ) + parser.add_argument( + "-r", + "--recursive", + default=False, + action="store_true", + help="Recursively match files in subdirectories. Creates iRODS sub-collections to match directory structure.", + ) + parser.add_argument( + "-c", + "--checksums", + default=False, + action="store_true", + help="Compute missing md5 checksums for source files.", + ) + parser.add_argument( + "-K", + "--remote-checksums", + default=False, + action="store_true", + help="Trigger checksum computation on the iRODS side.", + ) + parser.add_argument( + "-y", + default=False, + action="store_true", + help="Don't ask for permission prior to transfer.", + ) + parser.add_argument( + "--collection", type=str, help="Target iRODS collection. Skips manual selection input." + ) + parser.add_argument( + "sources", help="One or multiple files/directories to ingest.", nargs="+" + ) + parser.add_argument("destination", help="UUID or iRODS path of SODAR landing zone.") + + @classmethod + def get_irods_error(cls, e: Exception): + """Return logger friendly iRODS exception.""" + es = str(e) + return es if es != "None" else e.__class__.__name__ + + @classmethod + def run( + cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser + ) -> typing.Optional[int]: + """Entry point into the command.""" + return cls(args).execute() + + def execute(self): + """Execute ingest.""" + # Get SODAR API info + toml_config = load_toml_config(Config()) + + if not self.args.sodar_url: + self.args.sodar_url = toml_config.get("global", {}).get("sodar_server_url") + if not self.args.sodar_api_token: + self.args.sodar_api_token = toml_config.get("global", {}).get("sodar_api_token") + + # Retrieve iRODS path if destination is UUID + if is_uuid(self.args.destination): + try: + lz_info = api.landingzone.retrieve( + sodar_url=self.args.sodar_url, + sodar_api_token=self.args.sodar_api_token, + landingzone_uuid=self.args.destination, + ) + except Exception as e: + logger.error("Failed to retrieve landing zone information.") + logger.error(e) + sys.exit(1) + + # TODO: Replace with status_locked check once implemented in sodar_cli + if lz_info.status in ["ACTIVE", "FAILED"]: + self.lz_irods_path = lz_info.irods_path + logger.info(f"Target iRods path: {self.lz_irods_path}") + else: + logger.info("Target landing zone is not ACTIVE.") + sys.exit(1) + else: + self.lz_irods_path = self.args.destination + + # Build file list and add missing md5 files + source_paths = self.build_file_list() + source_paths = self.process_md5_sums(source_paths) + + # Initiate iRODS session + irods_session = self._init_irods() + + # Query target collection + logger.info("Querying landing zone collections…") + collections = [] + try: + coll = irods_session.collections.get(self.lz_irods_path) + for c in coll.subcollections: + collections.append(c.name) + except: + logger.error("Failed to query landing zone collections.") + sys.exit(1) + finally: + irods_session.cleanup() + + if self.args.collection is None: + user_input = "" + input_valid = False + input_message = "####################\nPlease choose target collection:\n" + for index, item in enumerate(collections): + input_message += f"{index+1}) {item}\n" + input_message += "Select by number: " + + while not input_valid: + user_input = input(input_message) + if user_input.isdigit(): + user_input = int(user_input) + if 0 < user_input < len(collections): + input_valid = True + + self.target_coll = collections[user_input - 1] + + elif self.args.collection in collections: + self.target_coll = self.args.collection + else: + logger.error("Selected target collection does not exist in landing zone.") + sys.exit(1) + + # Create sub-collections for folders + if self.args.recursive: + dirs = sorted( + {p["ipath"].parent for p in source_paths if not p["ipath"].parent == Path(".")} + ) + + logger.info("Planning to create the following sub-collections:") + for d in dirs: + print(f"{self.target_coll}/{str(d)}") + if not self.args.y: + if not input("Is this OK? [y/N] ").lower().startswith("y"): + logger.error("Aborting at your request.") + sys.exit(0) + + for d in dirs: + coll_name = f"{self.lz_irods_path}/{self.target_coll}/{str(d)}" + try: + irods_session.collections.create(coll_name) + except Exception as e: + logger.error("Error creating sub-collection.") + logger.error(e) + sys.exit(1) + finally: + irods_session.cleanup() + logger.info("Sub-collections created.") + + # Build transfer jobs + jobs = self.build_jobs(source_paths) + + # Final go from user & transfer + total_bytes = sum([job.bytes for job in jobs]) + logger.info("Planning to transfer the following files:") + for job in jobs: + print(job.path_src) + logger.info(f"With a total size of {sizeof_fmt(total_bytes)}") + logger.info("Into this iRODS collection:") + print(f"{self.lz_irods_path}/{self.target_coll}/") + + if not self.args.y: + if not input("Is this OK? [y/N] ").lower().startswith("y"): + logger.error("Aborting at your request.") + sys.exit(0) + # TODO: add more parenthesis after python 3.10 + with tqdm( + total=total_bytes, unit="B", unit_scale=True, unit_divisor=1024, position=1 + ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: + for job in jobs: + try: + file_log.set_description_str(f"Current file: {job.path_src}") + irods_session.data_objects.put(job.path_src, job.path_dest) + t.update(job.bytes) + except: + logger.error("Problem during transfer of " + job.path_src) + raise + finally: + irods_session.cleanup() + t.clear() + + logger.info("File transfer complete.") + + # Compute server-side checksums + if self.args.remote_checksums: + logger.info("Computing server-side checksums.") + self.compute_irods_checksums(session=irods_session, jobs=jobs) + + def build_file_list(self): + """Build list of source files to transfer. iRODS paths are relative to target collection.""" + + source_paths = [Path(src) for src in self.args.sources] + output_paths = list() + + for src in source_paths: + try: + abspath = src.resolve() + except: + logger.error("File not found:", src.name) + continue + + if src.is_dir(): + paths = abspath.glob("**/*" if self.args.recursive else "*") + for p in paths: + if p.is_file() and not p.suffix.lower() == ".md5": + output_paths.append({"spath": p, "ipath": p.relative_to(abspath)}) + else: + output_paths.append({"spath": src, "ipath": Path(src.name)}) + return output_paths + + def process_md5_sums(self, paths): + """Check input paths for corresponding .md5 file. Generate one if missing.""" + + output_paths = paths.copy() + for file in paths: + p = file["spath"] + i = file["ipath"].parent / (p.name + ".md5") + upper_path = p.parent / (p.name + ".MD5") + lower_path = p.parent / (p.name + ".md5") + if upper_path.exists() and lower_path.exists(): + logger.info( + f"Found both {upper_path.name} and {lower_path.name} in {str(p.parent)}/" + ) + logger.info("Removing upper case version.") + upper_path.unlink() + output_paths.append({"spath": lower_path, "ipath": i}) + elif upper_path.exists(): + logger.info(f"Found {upper_path.name} in {str(p.parent)}/") + logger.info("Renaming to " + lower_path.name) + upper_path.rename(upper_path.with_suffix(".md5")) + output_paths.append({"spath": lower_path, "ipath": i}) + elif lower_path.exists(): + output_paths.append({"spath": lower_path, "ipath": i}) + + if not lower_path.exists() and self.args.checksums: + with lower_path.open("w", encoding="utf-8") as file: + file.write(f"{compute_md5_checksum(p)} {p.name}") + output_paths.append({"spath": lower_path, "ipath": i}) + + return output_paths + + def build_jobs(self, source_paths) -> typing.Tuple[TransferJob, ...]: + """Build file transfer jobs.""" + + transfer_jobs = [] + + for p in source_paths: + transfer_jobs.append( + TransferJob( + path_src=str(p["spath"]), + path_dest=f"{self.lz_irods_path}/{self.target_coll}/{str(p['ipath'])}", + bytes=p["spath"].stat().st_size, + ) + ) + + return tuple(sorted(transfer_jobs)) + + def compute_irods_checksums(self, session, jobs: typing.Tuple[TransferJob, ...]): + for job in jobs: + if not job.path_src.endswith(".md5"): + print( + str( + Path(job.path_dest).relative_to(f"{self.lz_irods_path}/{self.target_coll}/") + ) + ) + try: + data_object = session.data_objects.get(job.path_dest) + data_object.chksum() + except Exception as e: + logger.error("iRODS checksum error.") + logger.error(e) + finally: + session.cleanup() + + +def setup_argparse(parser: argparse.ArgumentParser) -> None: + """Setup argument parser for ``cubi-tk sodar ingest``.""" + return SodarIngest.setup_argparse(parser) diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py new file mode 100644 index 00000000..1fa9540c --- /dev/null +++ b/tests/test_sodar_ingest.py @@ -0,0 +1,31 @@ +"""Tests for ``cubi_tk.sodar.ingest``.""" + +import pytest +from unittest import mock +from pyfakefs import fake_filesystem, fake_pathlib +from cubi_tk.__main__ import main, setup_argparse + + +def test_run_sodar_ingest_help(capsys): + parser, _subparsers = setup_argparse() + with pytest.raises(SystemExit) as e: + parser.parse_args(["sodar", "ingest", "--help"]) + + assert e.value.code == 0 + + res = capsys.readouterr() + assert res.out + assert not res.err + + +def test_run_sodar_ingest_nothing(capsys): + parser, _subparsers = setup_argparse() + + with pytest.raises(SystemExit) as e: + parser.parse_args(["sodar", "ingest"]) + + assert e.value.code == 2 + + res = capsys.readouterr() + assert not res.out + assert res.err From f21802c1ba190830226fc0cda0806cb79c4a42d8 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 10 May 2023 10:50:42 +0200 Subject: [PATCH 02/32] add -y option alias --yes --- cubi_tk/sodar/ingest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index d48bb2d4..d931d222 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -108,6 +108,7 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: ) parser.add_argument( "-y", + "--yes", default=False, action="store_true", help="Don't ask for permission prior to transfer.", @@ -218,7 +219,7 @@ def execute(self): logger.info("Planning to create the following sub-collections:") for d in dirs: print(f"{self.target_coll}/{str(d)}") - if not self.args.y: + if not self.args.yes: if not input("Is this OK? [y/N] ").lower().startswith("y"): logger.error("Aborting at your request.") sys.exit(0) @@ -247,7 +248,7 @@ def execute(self): logger.info("Into this iRODS collection:") print(f"{self.lz_irods_path}/{self.target_coll}/") - if not self.args.y: + if not self.args.yes: if not input("Is this OK? [y/N] ").lower().startswith("y"): logger.error("Aborting at your request.") sys.exit(0) From dec3528eedd563379a05bbb68b6c1855b619bef1 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 10 May 2023 10:54:29 +0200 Subject: [PATCH 03/32] make flake8 happy --- cubi_tk/sodar/__init__.py | 2 +- cubi_tk/sodar/ingest.py | 31 ++++++++++++++++++------------- tests/test_sodar_ingest.py | 5 ++--- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/cubi_tk/sodar/__init__.py b/cubi_tk/sodar/__init__.py index 699914e5..88575ad0 100644 --- a/cubi_tk/sodar/__init__.py +++ b/cubi_tk/sodar/__init__.py @@ -56,8 +56,8 @@ from .add_ped import setup_argparse as setup_argparse_add_ped from .check_remote import setup_argparse as setup_argparse_check_remote from .download_sheet import setup_argparse as setup_argparse_download_sheet -from .ingest_fastq import setup_argparse as setup_argparse_ingest_fastq from .ingest import setup_argparse as setup_argparse_ingest +from .ingest_fastq import setup_argparse as setup_argparse_ingest_fastq from .lz_create import setup_argparse as setup_argparse_lz_create from .lz_list import setup_argparse as setup_argparse_lz_list from .lz_move import setup_argparse as setup_argparse_lz_move diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index d931d222..9588e659 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -1,20 +1,21 @@ """``cubi-tk sodar ingest``: add arbitrary files to SODAR""" import argparse +import getpass import os -import sys from pathlib import Path -import getpass +import sys import typing -import attr +import attr from irods.session import iRODSSession from logzero import logger from sodar_cli import api -from ..common import load_toml_config, is_uuid, compute_md5_checksum, sizeof_fmt -from ..snappy.itransfer_common import TransferJob from tqdm import tqdm +from ..common import compute_md5_checksum, is_uuid, load_toml_config, sizeof_fmt +from ..snappy.itransfer_common import TransferJob + @attr.s(frozen=True, auto_attribs=True) class Config: @@ -125,7 +126,7 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: def get_irods_error(cls, e: Exception): """Return logger friendly iRODS exception.""" es = str(e) - return es if es != "None" else e.__class__.__name__ + return es if es else e.__class__.__name__ @classmethod def run( @@ -181,8 +182,8 @@ def execute(self): coll = irods_session.collections.get(self.lz_irods_path) for c in coll.subcollections: collections.append(c.name) - except: - logger.error("Failed to query landing zone collections.") + except Exception as e: + logger.error(f"Failed to query landing zone collections: {self.get_irods_error(e)}") sys.exit(1) finally: irods_session.cleanup() @@ -261,9 +262,10 @@ def execute(self): file_log.set_description_str(f"Current file: {job.path_src}") irods_session.data_objects.put(job.path_src, job.path_dest) t.update(job.bytes) - except: - logger.error("Problem during transfer of " + job.path_src) - raise + except Exception as e: + logger.error(f"Problem during transfer of {job.path_src}.") + logger.error(self.get_irods_error(e)) + sys.exit(1) finally: irods_session.cleanup() t.clear() @@ -284,8 +286,11 @@ def build_file_list(self): for src in source_paths: try: abspath = src.resolve() - except: - logger.error("File not found:", src.name) + except FileNotFoundError: + logger.error(f"File not found: {src.name}") + continue + except RuntimeError: + logger.error(f"Infinite loop: {src.name}") continue if src.is_dir(): diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index 1fa9540c..bd89996e 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -1,9 +1,8 @@ """Tests for ``cubi_tk.sodar.ingest``.""" import pytest -from unittest import mock -from pyfakefs import fake_filesystem, fake_pathlib -from cubi_tk.__main__ import main, setup_argparse + +from cubi_tk.__main__ import setup_argparse def test_run_sodar_ingest_help(capsys): From f0c3a976b8d0cdbc9c54a8605f6c536614f4c0ba Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 10 May 2023 19:10:17 +0200 Subject: [PATCH 04/32] extract iRODS functionality into separate file --- cubi_tk/irods_utils.py | 106 ++++++++++++++++++++++++++++++++++++++ cubi_tk/sodar/ingest.py | 83 +++-------------------------- tests/test_irods_utils.py | 62 ++++++++++++++++++++++ 3 files changed, 176 insertions(+), 75 deletions(-) create mode 100644 cubi_tk/irods_utils.py create mode 100644 tests/test_irods_utils.py diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py new file mode 100644 index 00000000..3eef42f6 --- /dev/null +++ b/cubi_tk/irods_utils.py @@ -0,0 +1,106 @@ +import getpass +import os.path +import sys +from typing import Tuple + +import attr +from irods.session import iRODSSession +from logzero import logger +from tqdm import tqdm + +# TODO: move this class to common.py? +# from .snappy.itransfer_common import TransferJob + + +@attr.s(frozen=True, auto_attribs=True) +class TransferJob: + """Encodes a transfer job from the local file system to the remote iRODS collection.""" + + #: Source path. + path_src: str + + #: Destination path. + path_dest: str + + #: Number of bytes to transfer. + bytes: int + + +def get_irods_error(e: Exception): + """Return logger friendly iRODS exception.""" + es = str(e) + return es if es and es != "None" else e.__class__.__name__ + + +def init_irods(irods_env_path: os.PathLike) -> iRODSSession: + """Connect to iRODS.""" + irods_auth_path = irods_env_path.parent.joinpath(".irodsA") + if irods_auth_path.exists(): + try: + session = iRODSSession(irods_env_file=irods_env_path) + session.server_version # check for outdated .irodsA file + except Exception as e: + logger.error(f"iRODS connection failed: {get_irods_error(e)}") + logger.error("Are you logged in? try 'iinit'") + sys.exit(1) + finally: + session.cleanup() + else: + # Query user for password. + logger.info("iRODS authentication file not found.") + password = getpass.getpass(prompt="Please enter SODAR password:") + try: + session = iRODSSession(irods_env_file=irods_env_path, password=password) + session.server_version # check for exceptions + except Exception as e: + logger.error(f"iRODS connection failed: {get_irods_error(e)}") + sys.exit(1) + finally: + session.cleanup() + + return session + + +class iRODSTransfer: + """Transfers files to and from iRODS.""" + + def __init__(self, session: iRODSSession, jobs: Tuple[TransferJob, ...]): + self.session = session + self.jobs = jobs + self.total_bytes = sum([job.bytes for job in self.jobs]) + self.destinations = [job.path_dest for job in self.jobs] + + def put(self): + # TODO: add more parenthesis after python 3.10 + with tqdm( + total=self.total_bytes, unit="B", unit_scale=True, unit_divisor=1024, position=1 + ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: + for job in self.jobs: + try: + file_log.set_description_str(f"Current file: {job.path_src}") + self.session.data_objects.put(job.path_src, job.path_dest) + t.update(job.bytes) + except Exception as e: + logger.error(f"Problem during transfer of {job.path_src}") + logger.error(get_irods_error(e)) + sys.exit(1) + finally: + self.session.cleanup() + t.clear() + + def get(self): + pass + + def chksum(self): + common_prefix = os.path.commonprefix(self.destinations) + for job in self.jobs: + if not job.path_src.endswith(".md5"): + print(os.path.relpath(job.path_dest, common_prefix)) + try: + data_object = self.session.data_objects.get(job.path_dest) + data_object.chksum() + except Exception as e: + logger.error("iRODS checksum error.") + logger.error(get_irods_error(e)) + finally: + self.session.cleanup() diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 9588e659..6418bf4f 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -1,20 +1,18 @@ """``cubi-tk sodar ingest``: add arbitrary files to SODAR""" import argparse -import getpass import os from pathlib import Path import sys import typing import attr -from irods.session import iRODSSession from logzero import logger from sodar_cli import api -from tqdm import tqdm + +from cubi_tk.irods_utils import TransferJob, get_irods_error, init_irods, iRODSTransfer from ..common import compute_md5_checksum, is_uuid, load_toml_config, sizeof_fmt -from ..snappy.itransfer_common import TransferJob @attr.s(frozen=True, auto_attribs=True) @@ -42,34 +40,6 @@ def __init__(self, args): # iRODS environment self.irods_env = None - def _init_irods(self): - """Connect to iRODS.""" - irods_auth_path = self.irods_env_path.parent.joinpath(".irodsA") - if irods_auth_path.exists(): - try: - session = iRODSSession(irods_env_file=self.irods_env_path) - session.server_version # check for outdated .irodsA file - except Exception as e: - logger.error("iRODS connection failed: %s", self.get_irods_error(e)) - logger.error("Are you logged in? try 'iinit'") - sys.exit(1) - finally: - session.cleanup() - else: - # Query user for password. - logger.info("iRODS authentication file not found.") - password = getpass.getpass(prompt="Please enter SODAR password:") - try: - session = iRODSSession(irods_env_file=self.irods_env_path, password=password) - session.server_version # check for exceptions - except Exception as e: - logger.error("iRODS connection failed: %s", self.get_irods_error(e)) - sys.exit(1) - finally: - session.cleanup() - - return session - @classmethod def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: parser.add_argument( @@ -122,12 +92,6 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: ) parser.add_argument("destination", help="UUID or iRODS path of SODAR landing zone.") - @classmethod - def get_irods_error(cls, e: Exception): - """Return logger friendly iRODS exception.""" - es = str(e) - return es if es else e.__class__.__name__ - @classmethod def run( cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser @@ -173,7 +137,7 @@ def execute(self): source_paths = self.process_md5_sums(source_paths) # Initiate iRODS session - irods_session = self._init_irods() + irods_session = init_irods(self.irods_env_path) # Query target collection logger.info("Querying landing zone collections…") @@ -183,7 +147,7 @@ def execute(self): for c in coll.subcollections: collections.append(c.name) except Exception as e: - logger.error(f"Failed to query landing zone collections: {self.get_irods_error(e)}") + logger.error(f"Failed to query landing zone collections: {get_irods_error(e)}") sys.exit(1) finally: irods_session.cleanup() @@ -241,7 +205,8 @@ def execute(self): jobs = self.build_jobs(source_paths) # Final go from user & transfer - total_bytes = sum([job.bytes for job in jobs]) + itransfer = iRODSTransfer(irods_session, jobs) + total_bytes = itransfer.total_bytes logger.info("Planning to transfer the following files:") for job in jobs: print(job.path_src) @@ -253,29 +218,14 @@ def execute(self): if not input("Is this OK? [y/N] ").lower().startswith("y"): logger.error("Aborting at your request.") sys.exit(0) - # TODO: add more parenthesis after python 3.10 - with tqdm( - total=total_bytes, unit="B", unit_scale=True, unit_divisor=1024, position=1 - ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: - for job in jobs: - try: - file_log.set_description_str(f"Current file: {job.path_src}") - irods_session.data_objects.put(job.path_src, job.path_dest) - t.update(job.bytes) - except Exception as e: - logger.error(f"Problem during transfer of {job.path_src}.") - logger.error(self.get_irods_error(e)) - sys.exit(1) - finally: - irods_session.cleanup() - t.clear() + itransfer.put() logger.info("File transfer complete.") # Compute server-side checksums if self.args.remote_checksums: logger.info("Computing server-side checksums.") - self.compute_irods_checksums(session=irods_session, jobs=jobs) + itransfer.chksum() def build_file_list(self): """Build list of source files to transfer. iRODS paths are relative to target collection.""" @@ -349,23 +299,6 @@ def build_jobs(self, source_paths) -> typing.Tuple[TransferJob, ...]: return tuple(sorted(transfer_jobs)) - def compute_irods_checksums(self, session, jobs: typing.Tuple[TransferJob, ...]): - for job in jobs: - if not job.path_src.endswith(".md5"): - print( - str( - Path(job.path_dest).relative_to(f"{self.lz_irods_path}/{self.target_coll}/") - ) - ) - try: - data_object = session.data_objects.get(job.path_dest) - data_object.chksum() - except Exception as e: - logger.error("iRODS checksum error.") - logger.error(e) - finally: - session.cleanup() - def setup_argparse(parser: argparse.ArgumentParser) -> None: """Setup argument parser for ``cubi-tk sodar ingest``.""" diff --git a/tests/test_irods_utils.py b/tests/test_irods_utils.py new file mode 100644 index 00000000..29d5779c --- /dev/null +++ b/tests/test_irods_utils.py @@ -0,0 +1,62 @@ +from pathlib import Path +import shutil +from unittest.mock import patch + +from irods.session import iRODSSession +import pytest + +from cubi_tk.irods_utils import TransferJob, iRODSTransfer + + +@pytest.fixture +def fake_filesystem(fs): + yield fs + + +@pytest.fixture +def jobs(): + return ( + TransferJob(path_src="myfile.csv", path_dest="dest_dir/myfile.csv", bytes=123), + TransferJob( + path_src="folder/file.csv", + path_dest="dest_dir/folder/file.csv", + bytes=1024, + ), + ) + + +@pytest.fixture +def itransfer(jobs): + session = iRODSSession( + irods_host="localhost", + irods_port=1247, + irods_user_name="pytest", + irods_zone_name="pytest", + ) + + return iRODSTransfer(session, jobs) + + +def test_irods_transfer_init(jobs, itransfer): + assert itransfer.total_bytes == sum([job.bytes for job in jobs]) + assert itransfer.destinations == [job.path_dest for job in jobs] + + +def test_irods_transfer_put(fs, itransfer): + fs.create_file("myfile.csv") + fs.create_dir("folder") + fs.create_file("folder/file.csv") + fs.create_dir("dest_dir/folder") + + with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy): + itransfer.put() + assert Path("dest_dir/myfile.csv").exists() + assert Path("dest_dir/folder/file.csv").exists() + + +def test_irods_transfer_chksum(itransfer): + with patch.object(itransfer.session.data_objects, "get") as mock: + itransfer.chksum() + + assert mock.called + assert mock.called_with(itransfer.destinations) From 17e96ab2c8e663875cf242b4a38d48658f829e0c Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 10 May 2023 20:14:25 +0200 Subject: [PATCH 05/32] compute md5 always and generate temp file on upload --- cubi_tk/irods_utils.py | 54 +++++++++++++++++++++++++++------------ cubi_tk/sodar/ingest.py | 40 +---------------------------- tests/test_irods_utils.py | 25 ++++++++++++------ 3 files changed, 56 insertions(+), 63 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index 3eef42f6..93efc872 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -1,6 +1,7 @@ import getpass import os.path import sys +import tempfile from typing import Tuple import attr @@ -25,6 +26,9 @@ class TransferJob: #: Number of bytes to transfer. bytes: int + #: MD5 hashsum of file. + md5: str + def get_irods_error(e: Exception): """Return logger friendly iRODS exception.""" @@ -71,22 +75,40 @@ def __init__(self, session: iRODSSession, jobs: Tuple[TransferJob, ...]): self.destinations = [job.path_dest for job in self.jobs] def put(self): - # TODO: add more parenthesis after python 3.10 - with tqdm( - total=self.total_bytes, unit="B", unit_scale=True, unit_divisor=1024, position=1 - ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: - for job in self.jobs: - try: - file_log.set_description_str(f"Current file: {job.path_src}") - self.session.data_objects.put(job.path_src, job.path_dest) - t.update(job.bytes) - except Exception as e: - logger.error(f"Problem during transfer of {job.path_src}") - logger.error(get_irods_error(e)) - sys.exit(1) - finally: - self.session.cleanup() - t.clear() + with tempfile.TemporaryDirectory() as temp_dir: + # Double tqdm for currently transferred file info + # TODO: add more parenthesis after python 3.10 + with tqdm( + total=self.total_bytes, + unit="B", + unit_scale=True, + unit_divisor=1024, + position=1, + ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: + for job in self.jobs: + job_name = os.path.basename(job.path_src) + + # create temporary md5 files + with open( + os.path.join(temp_dir, job_name) + ".md5", "w", encoding="utf-8" + ) as tmp: + tmp.write(f"{job.md5} {job_name}") + + try: + file_log.set_description_str(f"Current file: {job.path_src}") + self.session.data_objects.put(job.path_src, job.path_dest) + self.session.data_objects.put( + os.path.join(temp_dir, job_name) + ".md5", + job.path_dest + ".md5", + ) + t.update(job.bytes) + except Exception as e: + logger.error(f"Problem during transfer of {job.path_src}") + logger.error(get_irods_error(e)) + sys.exit(1) + finally: + self.session.cleanup() + t.clear() def get(self): pass diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 6418bf4f..d4702dd1 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -63,13 +63,6 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: action="store_true", help="Recursively match files in subdirectories. Creates iRODS sub-collections to match directory structure.", ) - parser.add_argument( - "-c", - "--checksums", - default=False, - action="store_true", - help="Compute missing md5 checksums for source files.", - ) parser.add_argument( "-K", "--remote-checksums", @@ -134,7 +127,6 @@ def execute(self): # Build file list and add missing md5 files source_paths = self.build_file_list() - source_paths = self.process_md5_sums(source_paths) # Initiate iRODS session irods_session = init_irods(self.irods_env_path) @@ -252,37 +244,6 @@ def build_file_list(self): output_paths.append({"spath": src, "ipath": Path(src.name)}) return output_paths - def process_md5_sums(self, paths): - """Check input paths for corresponding .md5 file. Generate one if missing.""" - - output_paths = paths.copy() - for file in paths: - p = file["spath"] - i = file["ipath"].parent / (p.name + ".md5") - upper_path = p.parent / (p.name + ".MD5") - lower_path = p.parent / (p.name + ".md5") - if upper_path.exists() and lower_path.exists(): - logger.info( - f"Found both {upper_path.name} and {lower_path.name} in {str(p.parent)}/" - ) - logger.info("Removing upper case version.") - upper_path.unlink() - output_paths.append({"spath": lower_path, "ipath": i}) - elif upper_path.exists(): - logger.info(f"Found {upper_path.name} in {str(p.parent)}/") - logger.info("Renaming to " + lower_path.name) - upper_path.rename(upper_path.with_suffix(".md5")) - output_paths.append({"spath": lower_path, "ipath": i}) - elif lower_path.exists(): - output_paths.append({"spath": lower_path, "ipath": i}) - - if not lower_path.exists() and self.args.checksums: - with lower_path.open("w", encoding="utf-8") as file: - file.write(f"{compute_md5_checksum(p)} {p.name}") - output_paths.append({"spath": lower_path, "ipath": i}) - - return output_paths - def build_jobs(self, source_paths) -> typing.Tuple[TransferJob, ...]: """Build file transfer jobs.""" @@ -294,6 +255,7 @@ def build_jobs(self, source_paths) -> typing.Tuple[TransferJob, ...]: path_src=str(p["spath"]), path_dest=f"{self.lz_irods_path}/{self.target_coll}/{str(p['ipath'])}", bytes=p["spath"].stat().st_size, + md5=compute_md5_checksum(p["spath"]), ) ) diff --git a/tests/test_irods_utils.py b/tests/test_irods_utils.py index 29d5779c..0abac294 100644 --- a/tests/test_irods_utils.py +++ b/tests/test_irods_utils.py @@ -16,11 +16,17 @@ def fake_filesystem(fs): @pytest.fixture def jobs(): return ( - TransferJob(path_src="myfile.csv", path_dest="dest_dir/myfile.csv", bytes=123), + TransferJob( + path_src="myfile.csv", + path_dest="dest_dir/myfile.csv", + bytes=123, + md5="ed3b3cbb18fd148bc925944ff0861ce6", + ), TransferJob( path_src="folder/file.csv", path_dest="dest_dir/folder/file.csv", bytes=1024, + md5="a6e9e3c859b803adb0f1d5f08a51d0f6", ), ) @@ -42,16 +48,19 @@ def test_irods_transfer_init(jobs, itransfer): assert itransfer.destinations == [job.path_dest for job in jobs] -def test_irods_transfer_put(fs, itransfer): - fs.create_file("myfile.csv") - fs.create_dir("folder") - fs.create_file("folder/file.csv") - fs.create_dir("dest_dir/folder") +def test_irods_transfer_put(fs, itransfer, jobs): + for job in jobs: + fs.create_file(job.path_src) + fs.create_dir(Path(job.path_dest).parent) with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy): itransfer.put() - assert Path("dest_dir/myfile.csv").exists() - assert Path("dest_dir/folder/file.csv").exists() + + for job in jobs: + assert Path(job.path_dest).exists() + assert Path(job.path_dest + ".md5").exists() + with Path(job.path_dest + ".md5").open() as file: + assert file.readline() == f"{job.md5} {Path(job.path_dest).name}" def test_irods_transfer_chksum(itransfer): From a5803dc01cce459a45496e304199863cfa6e049e Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Thu, 11 May 2023 14:57:30 +0200 Subject: [PATCH 06/32] adjusted log levels --- cubi_tk/sodar/ingest.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index d4702dd1..e48b3452 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -120,7 +120,7 @@ def execute(self): self.lz_irods_path = lz_info.irods_path logger.info(f"Target iRods path: {self.lz_irods_path}") else: - logger.info("Target landing zone is not ACTIVE.") + logger.error("Target landing zone is not ACTIVE.") sys.exit(1) else: self.lz_irods_path = self.args.destination @@ -178,7 +178,7 @@ def execute(self): print(f"{self.target_coll}/{str(d)}") if not self.args.yes: if not input("Is this OK? [y/N] ").lower().startswith("y"): - logger.error("Aborting at your request.") + logger.info("Aborting at your request.") sys.exit(0) for d in dirs: @@ -208,7 +208,7 @@ def execute(self): if not self.args.yes: if not input("Is this OK? [y/N] ").lower().startswith("y"): - logger.error("Aborting at your request.") + logger.info("Aborting at your request.") sys.exit(0) itransfer.put() @@ -229,10 +229,10 @@ def build_file_list(self): try: abspath = src.resolve() except FileNotFoundError: - logger.error(f"File not found: {src.name}") + logger.warning(f"File not found: {src.name}") continue except RuntimeError: - logger.error(f"Infinite loop: {src.name}") + logger.warning(f"Infinite loop: {src.name}") continue if src.is_dir(): From 423ff88634e03c44a2be2c5190abb522de2734e3 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Fri, 12 May 2023 17:10:52 +0200 Subject: [PATCH 07/32] add test for init_irods --- cubi_tk/irods_utils.py | 8 ++++---- tests/test_irods_utils.py | 26 +++++++++++++++++++++++--- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index 93efc872..2f63eade 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -43,7 +43,7 @@ def init_irods(irods_env_path: os.PathLike) -> iRODSSession: try: session = iRODSSession(irods_env_file=irods_env_path) session.server_version # check for outdated .irodsA file - except Exception as e: + except Exception as e: # pragma: no cover logger.error(f"iRODS connection failed: {get_irods_error(e)}") logger.error("Are you logged in? try 'iinit'") sys.exit(1) @@ -56,7 +56,7 @@ def init_irods(irods_env_path: os.PathLike) -> iRODSSession: try: session = iRODSSession(irods_env_file=irods_env_path, password=password) session.server_version # check for exceptions - except Exception as e: + except Exception as e: # pragma: no cover logger.error(f"iRODS connection failed: {get_irods_error(e)}") sys.exit(1) finally: @@ -102,7 +102,7 @@ def put(self): job.path_dest + ".md5", ) t.update(job.bytes) - except Exception as e: + except Exception as e: # pragma: no cover logger.error(f"Problem during transfer of {job.path_src}") logger.error(get_irods_error(e)) sys.exit(1) @@ -121,7 +121,7 @@ def chksum(self): try: data_object = self.session.data_objects.get(job.path_dest) data_object.chksum() - except Exception as e: + except Exception as e: # pragma: no cover logger.error("iRODS checksum error.") logger.error(get_irods_error(e)) finally: diff --git a/tests/test_irods_utils.py b/tests/test_irods_utils.py index 0abac294..d69ed225 100644 --- a/tests/test_irods_utils.py +++ b/tests/test_irods_utils.py @@ -5,7 +5,7 @@ from irods.session import iRODSSession import pytest -from cubi_tk.irods_utils import TransferJob, iRODSTransfer +from cubi_tk.irods_utils import TransferJob, init_irods, iRODSTransfer @pytest.fixture @@ -13,6 +13,26 @@ def fake_filesystem(fs): yield fs +@patch("cubi_tk.irods_utils.iRODSSession") +@patch("getpass.getpass") +def test_init_irods(mockpass, mocksession, fs): + ienv = Path(".irods/irods_environment.json") + password = "1234" + + # .irodsA not found, asks for password + mockpass.return_value = password + init_irods(ienv) + mockpass.assert_called() + mocksession.assert_called_with(irods_env_file=ienv, password=password) + + # .irodsA there, does not ask for password + fs.create_file(".irods/.irodsA") + mockpass.reset_mock() + init_irods(ienv) + mockpass.assert_not_called() + mocksession.assert_called_with(irods_env_file=ienv) + + @pytest.fixture def jobs(): return ( @@ -67,5 +87,5 @@ def test_irods_transfer_chksum(itransfer): with patch.object(itransfer.session.data_objects, "get") as mock: itransfer.chksum() - assert mock.called - assert mock.called_with(itransfer.destinations) + for path in itransfer.destinations: + mock.assert_any_call(path) From 02fe93d4467811bf914ae880887fd87679aa6b3a Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Fri, 12 May 2023 18:06:21 +0200 Subject: [PATCH 08/32] add test for get_irods_error --- cubi_tk/irods_utils.py | 3 --- tests/test_irods_utils.py | 10 +++++++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index 2f63eade..834e9f80 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -110,9 +110,6 @@ def put(self): self.session.cleanup() t.clear() - def get(self): - pass - def chksum(self): common_prefix = os.path.commonprefix(self.destinations) for job in self.jobs: diff --git a/tests/test_irods_utils.py b/tests/test_irods_utils.py index d69ed225..e88d6070 100644 --- a/tests/test_irods_utils.py +++ b/tests/test_irods_utils.py @@ -2,10 +2,11 @@ import shutil from unittest.mock import patch +import irods.exception from irods.session import iRODSSession import pytest -from cubi_tk.irods_utils import TransferJob, init_irods, iRODSTransfer +from cubi_tk.irods_utils import TransferJob, get_irods_error, init_irods, iRODSTransfer @pytest.fixture @@ -13,6 +14,13 @@ def fake_filesystem(fs): yield fs +def test_get_irods_error(): + e = irods.exception.NetworkException() + assert get_irods_error(e) == "NetworkException" + e = irods.exception.NetworkException("Connection reset") + assert get_irods_error(e) == "Connection reset" + + @patch("cubi_tk.irods_utils.iRODSSession") @patch("getpass.getpass") def test_init_irods(mockpass, mocksession, fs): From a2325a0d5c29300ee3a1130e4febce09a65b65df Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Fri, 12 May 2023 18:55:53 +0200 Subject: [PATCH 09/32] add test for transfer job builder --- tests/test_sodar_ingest.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index bd89996e..f6f5d639 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -1,8 +1,12 @@ """Tests for ``cubi_tk.sodar.ingest``.""" +from pathlib import Path +from unittest.mock import Mock, patch + import pytest from cubi_tk.__main__ import setup_argparse +from cubi_tk.sodar.ingest import SodarIngest def test_run_sodar_ingest_help(capsys): @@ -28,3 +32,34 @@ def test_run_sodar_ingest_nothing(capsys): res = capsys.readouterr() assert not res.out assert res.err + + +@pytest.fixture +def ingest(): + obj = SodarIngest(args={"sources": "testfolder", "recursive": True}) + obj.lz_irods_path = "/irodsZone" + obj.target_coll = "targetCollection" + return obj + + +@patch("cubi_tk.sodar.ingest.sorted") +@patch("cubi_tk.sodar.ingest.compute_md5_checksum", return_value="5555") +@patch("cubi_tk.sodar.ingest.Path.stat") +@patch("cubi_tk.sodar.ingest.TransferJob") +def test_sodar_ingest_build_jobs(mockjob, mockstats, mockmd5, mocksorted, ingest): + paths = [ + {"spath": Path("myfile.csv"), "ipath": Path("dest_dir/myfile.csv")}, + {"spath": Path("folder/file.csv"), "ipath": Path("dest_dir/folder/file.csv")}, + ] + mockstats.return_value = Mock(st_size=1024) + + ingest.build_jobs(paths) + print(mockjob.call_args_list) + for p in paths: + mockjob.assert_any_call( + path_src=str(p["spath"]), + path_dest=f"{ingest.lz_irods_path}/{ingest.target_coll}/{str(p['ipath'])}", + bytes=1024, + md5="5555", + ) + From cabb2e72031c2d78886e670d9e9b122f532886f8 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 17 May 2023 20:00:39 +0200 Subject: [PATCH 10/32] add iinit-like behaviour when asking for password --- cubi_tk/irods_utils.py | 58 +++++++++++++++++++++++++----- cubi_tk/sodar/ingest.py | 10 ++++-- tests/test_irods_utils.py | 74 +++++++++++++++++++++++--------------- tests/test_sodar_ingest.py | 23 ------------ 4 files changed, 104 insertions(+), 61 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index 834e9f80..0f468069 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -1,10 +1,13 @@ import getpass import os.path +from pathlib import Path import sys import tempfile from typing import Tuple import attr +from irods.exception import CAT_INVALID_AUTHENTICATION, PAM_AUTH_PASSWORD_FAILED +from irods.password_obfuscation import encode from irods.session import iRODSSession from logzero import logger from tqdm import tqdm @@ -36,35 +39,74 @@ def get_irods_error(e: Exception): return es if es and es != "None" else e.__class__.__name__ -def init_irods(irods_env_path: os.PathLike) -> iRODSSession: +def init_irods(irods_env_path: os.PathLike, ask: bool = False) -> iRODSSession: """Connect to iRODS.""" irods_auth_path = irods_env_path.parent.joinpath(".irodsA") if irods_auth_path.exists(): try: session = iRODSSession(irods_env_file=irods_env_path) session.server_version # check for outdated .irodsA file + return session except Exception as e: # pragma: no cover logger.error(f"iRODS connection failed: {get_irods_error(e)}") - logger.error("Are you logged in? try 'iinit'") - sys.exit(1) + pass finally: session.cleanup() - else: - # Query user for password. - logger.info("iRODS authentication file not found.") - password = getpass.getpass(prompt="Please enter SODAR password:") + + # No valid .irodsA file. Query user for password. + logger.info("No valid iRODS authentication file found.") + attempts = 0 + while attempts < 3: try: - session = iRODSSession(irods_env_file=irods_env_path, password=password) + session = iRODSSession( + irods_env_file=irods_env_path, + password=getpass.getpass(prompt="Please enter SODAR password:"), + ) session.server_version # check for exceptions + break + except PAM_AUTH_PASSWORD_FAILED: # pragma: no cover + if attempts < 2: + logger.warning("Wrong password. Please try again.") + attempts += 1 + continue + else: + logger.error("iRODS connection failed.") + sys.exit(1) except Exception as e: # pragma: no cover logger.error(f"iRODS connection failed: {get_irods_error(e)}") sys.exit(1) finally: session.cleanup() + if ask and input("Save iRODS session for passwordless operation? [y/N] ").lower().startswith( + "y" + ): + save_irods_token(session) + elif not ask: + save_irods_token(session) + return session +def save_irods_token(session: iRODSSession, irods_env_path=None): + """Retrieve PAM temp auth token 'obfuscate' it and save to disk.""" + if not irods_env_path: + irods_auth_path = Path.home().joinpath(".irods", ".irodsA") + else: + irods_auth_path = Path(irods_env_path).parent.joinpath(".irodsA") + + irods_auth_path.parent.mkdir(parents=True, exist_ok=True) + + try: + token = session.pam_pw_negotiated + except CAT_INVALID_AUTHENTICATION: + raise + + if isinstance(token, list) and token: + irods_auth_path.write_text(encode(token[0])) + irods_auth_path.chmod(0o600) + + class iRODSTransfer: """Transfers files to and from iRODS.""" diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index e48b3452..b78be18f 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -75,11 +75,17 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: "--yes", default=False, action="store_true", - help="Don't ask for permission prior to transfer.", + help="Don't ask for permission.", ) parser.add_argument( "--collection", type=str, help="Target iRODS collection. Skips manual selection input." ) + parser.add_argument( + "--iinit", + default=False, + action="store_true", + help="Save PAM auth token to disk. Keep login active.", + ) parser.add_argument( "sources", help="One or multiple files/directories to ingest.", nargs="+" ) @@ -129,7 +135,7 @@ def execute(self): source_paths = self.build_file_list() # Initiate iRODS session - irods_session = init_irods(self.irods_env_path) + irods_session = init_irods(self.irods_env_path, ask=not self.args.yes) # Query target collection logger.info("Querying landing zone collections…") diff --git a/tests/test_irods_utils.py b/tests/test_irods_utils.py index e88d6070..136a516c 100644 --- a/tests/test_irods_utils.py +++ b/tests/test_irods_utils.py @@ -1,12 +1,18 @@ from pathlib import Path import shutil -from unittest.mock import patch +from unittest.mock import MagicMock, PropertyMock, patch import irods.exception from irods.session import iRODSSession import pytest -from cubi_tk.irods_utils import TransferJob, get_irods_error, init_irods, iRODSTransfer +from cubi_tk.irods_utils import ( + TransferJob, + get_irods_error, + init_irods, + iRODSTransfer, + save_irods_token, +) @pytest.fixture @@ -14,6 +20,35 @@ def fake_filesystem(fs): yield fs +@pytest.fixture +def jobs(): + return ( + TransferJob( + path_src="myfile.csv", + path_dest="dest_dir/myfile.csv", + bytes=123, + md5="ed3b3cbb18fd148bc925944ff0861ce6", + ), + TransferJob( + path_src="folder/file.csv", + path_dest="dest_dir/folder/file.csv", + bytes=1024, + md5="a6e9e3c859b803adb0f1d5f08a51d0f6", + ), + ) + + +@pytest.fixture +def itransfer(jobs): + session = iRODSSession( + irods_host="localhost", + irods_port=1247, + irods_user_name="pytest", + irods_zone_name="pytest", + ) + return iRODSTransfer(session, jobs) + + def test_get_irods_error(): e = irods.exception.NetworkException() assert get_irods_error(e) == "NetworkException" @@ -41,34 +76,17 @@ def test_init_irods(mockpass, mocksession, fs): mocksession.assert_called_with(irods_env_file=ienv) -@pytest.fixture -def jobs(): - return ( - TransferJob( - path_src="myfile.csv", - path_dest="dest_dir/myfile.csv", - bytes=123, - md5="ed3b3cbb18fd148bc925944ff0861ce6", - ), - TransferJob( - path_src="folder/file.csv", - path_dest="dest_dir/folder/file.csv", - bytes=1024, - md5="a6e9e3c859b803adb0f1d5f08a51d0f6", - ), - ) - +@patch("cubi_tk.irods_utils.encode", return_value="it works") +def test_write_token(mockencode, fs): + ienv = Path(".irods/irods_environment.json") -@pytest.fixture -def itransfer(jobs): - session = iRODSSession( - irods_host="localhost", - irods_port=1247, - irods_user_name="pytest", - irods_zone_name="pytest", - ) + mocksession = MagicMock() + pam_pw = PropertyMock(return_value=["secure"]) + type(mocksession).pam_pw_negotiated = pam_pw - return iRODSTransfer(session, jobs) + save_irods_token(mocksession, ienv) + assert ienv.parent.joinpath(".irodsA").exists() + mockencode.assert_called_with("secure") def test_irods_transfer_init(jobs, itransfer): diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index f6f5d639..d696a863 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -40,26 +40,3 @@ def ingest(): obj.lz_irods_path = "/irodsZone" obj.target_coll = "targetCollection" return obj - - -@patch("cubi_tk.sodar.ingest.sorted") -@patch("cubi_tk.sodar.ingest.compute_md5_checksum", return_value="5555") -@patch("cubi_tk.sodar.ingest.Path.stat") -@patch("cubi_tk.sodar.ingest.TransferJob") -def test_sodar_ingest_build_jobs(mockjob, mockstats, mockmd5, mocksorted, ingest): - paths = [ - {"spath": Path("myfile.csv"), "ipath": Path("dest_dir/myfile.csv")}, - {"spath": Path("folder/file.csv"), "ipath": Path("dest_dir/folder/file.csv")}, - ] - mockstats.return_value = Mock(st_size=1024) - - ingest.build_jobs(paths) - print(mockjob.call_args_list) - for p in paths: - mockjob.assert_any_call( - path_src=str(p["spath"]), - path_dest=f"{ingest.lz_irods_path}/{ingest.target_coll}/{str(p['ipath'])}", - bytes=1024, - md5="5555", - ) - From 288cefced7b1e3229f071f09763b8909b379e372 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Fri, 19 May 2023 13:24:50 +0200 Subject: [PATCH 11/32] test sodar ingest build file list --- cubi_tk/sodar/ingest.py | 7 +++-- tests/test_sodar_ingest.py | 53 +++++++++++++++++++++++++++++++++----- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index b78be18f..e883dd8a 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -14,6 +14,9 @@ from ..common import compute_md5_checksum, is_uuid, load_toml_config, sizeof_fmt +# for testing +logger.propagate = True + @attr.s(frozen=True, auto_attribs=True) class Config: @@ -233,12 +236,12 @@ def build_file_list(self): for src in source_paths: try: - abspath = src.resolve() + abspath = src.resolve(strict=True) except FileNotFoundError: logger.warning(f"File not found: {src.name}") continue except RuntimeError: - logger.warning(f"Infinite loop: {src.name}") + logger.warning(f"Symlink loop: {src.name}") continue if src.is_dir(): diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index d696a863..d37b94b1 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -1,7 +1,7 @@ """Tests for ``cubi_tk.sodar.ingest``.""" from pathlib import Path -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, PropertyMock import pytest @@ -35,8 +35,49 @@ def test_run_sodar_ingest_nothing(capsys): @pytest.fixture -def ingest(): - obj = SodarIngest(args={"sources": "testfolder", "recursive": True}) - obj.lz_irods_path = "/irodsZone" - obj.target_coll = "targetCollection" - return obj +def fake_filesystem(fs): + yield fs + + +def test_sodar_ingest_build_file_list(fs, caplog): + class DummyArgs(object): + pass + + fs.create_symlink("/not_existing", "/broken_link") + fs.create_symlink("/loop_src", "/loop_src2") + fs.create_symlink("/loop_src2", "/loop_src") + + args = DummyArgs() + args.sources = ["broken_link", "not_here", "loop_src", "testdir"] + args.recursive = True + dummy = MagicMock() + args_mock = PropertyMock(return_value=args) + type(dummy).args = args_mock + + fs.create_dir("/testdir/subdir") + fs.create_file("/testdir/file1") + fs.create_file("/testdir/file1.md5") + fs.create_file("/testdir/subdir/file2") + fs.create_file("/file3") + fs.create_symlink("/testdir/file3", "/file3") + + paths = SodarIngest.build_file_list(dummy) + + # Sources + assert "File not found: broken_link" in caplog.messages + assert "File not found: not_here" in caplog.messages + assert "Symlink loop: loop_src" in caplog.messages + + # Files + assert {"spath": Path("/testdir/file1"), "ipath": Path("file1")} in paths + assert {"spath": Path("/testdir/file1.md5"), "ipath": Path("file1.md5")} not in paths + assert {"spath": Path("/testdir/subdir/file2"), "ipath": Path("subdir/file2")} in paths + assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths + + # Re-run without recursive search + args.recursive = False + paths = SodarIngest.build_file_list(dummy) + assert {"spath": Path("/testdir/file1"), "ipath": Path("file1")} in paths + assert {"spath": Path("/testdir/file1.md5"), "ipath": Path("file1.md5")} not in paths + assert {"spath": Path("/testdir/subdir/file2"), "ipath": Path("subdir/file2")} not in paths + assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths From db1fdbe5c03a39442692e777206fc13429f461da Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Fri, 19 May 2023 15:02:53 +0200 Subject: [PATCH 12/32] removed print statements in favour of logger --- cubi_tk/irods_utils.py | 8 +++++--- cubi_tk/sodar/ingest.py | 11 ++++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index 0f468069..32fda488 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -9,11 +9,13 @@ from irods.exception import CAT_INVALID_AUTHENTICATION, PAM_AUTH_PASSWORD_FAILED from irods.password_obfuscation import encode from irods.session import iRODSSession +import logzero from logzero import logger from tqdm import tqdm -# TODO: move this class to common.py? -# from .snappy.itransfer_common import TransferJob +# no-frills logger +formatter = logzero.LogFormatter(fmt="%(message)s") +output_logger = logzero.setup_logger(formatter=formatter) @attr.s(frozen=True, auto_attribs=True) @@ -156,7 +158,7 @@ def chksum(self): common_prefix = os.path.commonprefix(self.destinations) for job in self.jobs: if not job.path_src.endswith(".md5"): - print(os.path.relpath(job.path_dest, common_prefix)) + output_logger.info(os.path.relpath(job.path_dest, common_prefix)) try: data_object = self.session.data_objects.get(job.path_dest) data_object.chksum() diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index e883dd8a..d16f1d3c 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -7,6 +7,7 @@ import typing import attr +import logzero from logzero import logger from sodar_cli import api @@ -17,6 +18,10 @@ # for testing logger.propagate = True +# no-frills logger +formatter = logzero.LogFormatter(fmt="%(message)s") +output_logger = logzero.setup_logger(formatter=formatter) + @attr.s(frozen=True, auto_attribs=True) class Config: @@ -184,7 +189,7 @@ def execute(self): logger.info("Planning to create the following sub-collections:") for d in dirs: - print(f"{self.target_coll}/{str(d)}") + output_logger.info(f"{self.target_coll}/{str(d)}") if not self.args.yes: if not input("Is this OK? [y/N] ").lower().startswith("y"): logger.info("Aborting at your request.") @@ -210,10 +215,10 @@ def execute(self): total_bytes = itransfer.total_bytes logger.info("Planning to transfer the following files:") for job in jobs: - print(job.path_src) + output_logger.info(job.path_src) logger.info(f"With a total size of {sizeof_fmt(total_bytes)}") logger.info("Into this iRODS collection:") - print(f"{self.lz_irods_path}/{self.target_coll}/") + output_logger.info(f"{self.lz_irods_path}/{self.target_coll}/") if not self.args.yes: if not input("Is this OK? [y/N] ").lower().startswith("y"): From fc0888a5ea69ecdcaf404f5b39587e6f1f2376df Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Fri, 19 May 2023 15:06:57 +0200 Subject: [PATCH 13/32] make more clear that this is not about checksum mismatch --- cubi_tk/irods_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index 32fda488..efee6309 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -163,7 +163,7 @@ def chksum(self): data_object = self.session.data_objects.get(job.path_dest) data_object.chksum() except Exception as e: # pragma: no cover - logger.error("iRODS checksum error.") + logger.error("Problem during iRODS checksumming.") logger.error(get_irods_error(e)) finally: self.session.cleanup() From 38ec833eefe6994ffed392c2f9808d9b59f6fec0 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Fri, 19 May 2023 15:40:17 +0200 Subject: [PATCH 14/32] re-add test for transfer job builder --- tests/test_sodar_ingest.py | 41 +++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index d37b94b1..bbf20f56 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -1,7 +1,8 @@ """Tests for ``cubi_tk.sodar.ingest``.""" +from argparse import ArgumentParser from pathlib import Path -from unittest.mock import MagicMock, PropertyMock +from unittest.mock import MagicMock, PropertyMock, patch import pytest @@ -39,6 +40,23 @@ def fake_filesystem(fs): yield fs +@pytest.fixture +def ingest(fs): + fs.create_dir(Path.home().joinpath(".irods")) + fs.create_file(Path.home().joinpath(".irods", "irods_environment.json")) + + argv = ["--recursive", "testdir", "target"] + + parser = ArgumentParser() + SodarIngest.setup_argparse(parser) + args = parser.parse_args(argv) + + obj = SodarIngest(args) + obj.lz_irods_path = "/irodsZone" + obj.target_coll = "targetCollection" + return obj + + def test_sodar_ingest_build_file_list(fs, caplog): class DummyArgs(object): pass @@ -81,3 +99,24 @@ class DummyArgs(object): assert {"spath": Path("/testdir/file1.md5"), "ipath": Path("file1.md5")} not in paths assert {"spath": Path("/testdir/subdir/file2"), "ipath": Path("subdir/file2")} not in paths assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths + + +@patch("cubi_tk.sodar.ingest.sorted") +@patch("cubi_tk.sodar.ingest.compute_md5_checksum", return_value="5555") +@patch("pathlib.Path.stat") +@patch("cubi_tk.sodar.ingest.TransferJob") +def test_sodar_ingest_build_jobs(mockjob, mockstats, mockmd5, mocksorted, ingest): + paths = [ + {"spath": Path("myfile.csv"), "ipath": Path("dest_dir/myfile.csv")}, + {"spath": Path("folder/file.csv"), "ipath": Path("dest_dir/folder/file.csv")}, + ] + mockstats().st_size = 1024 + + ingest.build_jobs(paths) + for p in paths: + mockjob.assert_any_call( + path_src=str(p["spath"]), + path_dest=f"{ingest.lz_irods_path}/{ingest.target_coll}/{str(p['ipath'])}", + bytes=1024, + md5="5555", + ) From 381b7d6290f333c77c9791f6c4bf5e8485766b62 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Tue, 23 May 2023 15:39:20 +0200 Subject: [PATCH 15/32] add smoke check test for sodar ingest --- tests/test_sodar_ingest.py | 90 +++++++++++++++++++++++++++++++++++--- 1 file changed, 84 insertions(+), 6 deletions(-) diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index bbf20f56..204dca79 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -1,12 +1,13 @@ """Tests for ``cubi_tk.sodar.ingest``.""" +import os from argparse import ArgumentParser from pathlib import Path -from unittest.mock import MagicMock, PropertyMock, patch +from unittest.mock import MagicMock, PropertyMock, call, patch import pytest -from cubi_tk.__main__ import setup_argparse +from cubi_tk.__main__ import main, setup_argparse from cubi_tk.sodar.ingest import SodarIngest @@ -88,16 +89,28 @@ class DummyArgs(object): # Files assert {"spath": Path("/testdir/file1"), "ipath": Path("file1")} in paths - assert {"spath": Path("/testdir/file1.md5"), "ipath": Path("file1.md5")} not in paths - assert {"spath": Path("/testdir/subdir/file2"), "ipath": Path("subdir/file2")} in paths + assert { + "spath": Path("/testdir/file1.md5"), + "ipath": Path("file1.md5"), + } not in paths + assert { + "spath": Path("/testdir/subdir/file2"), + "ipath": Path("subdir/file2"), + } in paths assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths # Re-run without recursive search args.recursive = False paths = SodarIngest.build_file_list(dummy) assert {"spath": Path("/testdir/file1"), "ipath": Path("file1")} in paths - assert {"spath": Path("/testdir/file1.md5"), "ipath": Path("file1.md5")} not in paths - assert {"spath": Path("/testdir/subdir/file2"), "ipath": Path("subdir/file2")} not in paths + assert { + "spath": Path("/testdir/file1.md5"), + "ipath": Path("file1.md5"), + } not in paths + assert { + "spath": Path("/testdir/subdir/file2"), + "ipath": Path("subdir/file2"), + } not in paths assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths @@ -120,3 +133,68 @@ def test_sodar_ingest_build_jobs(mockjob, mockstats, mockmd5, mocksorted, ingest bytes=1024, md5="5555", ) + + +@patch("cubi_tk.sodar.ingest.init_irods") +@patch("cubi_tk.sodar.ingest.api.landingzone.retrieve") +def test_sodar_ingest_smoketest(mockapi, mocksession, fs): + class DummyAPI(object): + pass + + class DummyColl(object): + pass + + fs.create_dir(Path.home().joinpath(".irods")) + fs.create_file(Path.home().joinpath(".irods", "irods_environment.json")) + + fs.create_dir("/source/subdir") + fs.create_dir("/target/coll/") + fs.create_file("/source/file1") + fs.create_file("/source/subdir/file2") + lz_uuid = "f46b4fc3-0927-449d-b725-9ffed231507b" + argv = [ + "sodar", + "ingest", + "--sodar-url", + "sodar_url", + "--sodar-api-token", + "token", + "--collection", + "coll", + "--yes", + "--recursive", + "source", + lz_uuid, + ] + + # Test cancel no invalid LZ + api_return = DummyAPI() + api_return.status = "DELETED" + api_return.irods_path = "target" + mockapi.return_value = api_return + + with pytest.raises(SystemExit): + main(argv) + mockapi.assert_called_with( + sodar_url="sodar_url", sodar_api_token="token", landingzone_uuid=lz_uuid + ) + + # Test calls when LZ is active + api_return.status = "ACTIVE" + dcoll = DummyColl() + dcoll.subcollections = [ + DummyColl(), + ] + dcoll.subcollections[0].name = "coll" + mocksession.return_value.collections.get.return_value = dcoll + mocksession.return_value.collections.create = MagicMock(wraps=os.mkdir) + + main(argv) + assert call().collections.get("target") in mocksession.mock_calls + assert call().collections.create("target/coll/subdir") in mocksession.mock_calls + + # TODO: more assertions, but I don't know how to query this out of the mock... + # assert Path("/target/coll/file1").exists() + # assert Path("/target/coll/file1.md5").exists() + assert Path("/target/coll/subdir/").exists() + # assert Path("/target/coll/subdir/file2.md5").exists() From f2b8df57141c0b0be31ff39184aaa9eaa73382c3 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Tue, 23 May 2023 15:39:39 +0200 Subject: [PATCH 16/32] remove unused code --- cubi_tk/sodar/ingest.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index d16f1d3c..c772844f 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -45,9 +45,6 @@ def __init__(self, args): logger.error("iRODS environment file is missing.") sys.exit(1) - # iRODS environment - self.irods_env = None - @classmethod def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: parser.add_argument( @@ -88,12 +85,6 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: parser.add_argument( "--collection", type=str, help="Target iRODS collection. Skips manual selection input." ) - parser.add_argument( - "--iinit", - default=False, - action="store_true", - help="Save PAM auth token to disk. Keep login active.", - ) parser.add_argument( "sources", help="One or multiple files/directories to ingest.", nargs="+" ) From 62745ffa309c3e27644ad58f989ac90ab8612bc6 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Tue, 23 May 2023 15:43:32 +0200 Subject: [PATCH 17/32] isort --- tests/test_sodar_ingest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index 204dca79..1e8fb8a2 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -1,7 +1,7 @@ """Tests for ``cubi_tk.sodar.ingest``.""" -import os from argparse import ArgumentParser +import os from pathlib import Path from unittest.mock import MagicMock, PropertyMock, call, patch From ce3eb259f3a47c95087674a716dbc096dfc5c1c5 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 24 May 2023 12:32:50 +0200 Subject: [PATCH 18/32] use more pathlib --- cubi_tk/irods_utils.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index efee6309..a895d685 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -41,7 +41,7 @@ def get_irods_error(e: Exception): return es if es and es != "None" else e.__class__.__name__ -def init_irods(irods_env_path: os.PathLike, ask: bool = False) -> iRODSSession: +def init_irods(irods_env_path: Path, ask: bool = False) -> iRODSSession: """Connect to iRODS.""" irods_auth_path = irods_env_path.parent.joinpath(".irodsA") if irods_auth_path.exists(): @@ -130,19 +130,18 @@ def put(self): position=1, ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: for job in self.jobs: - job_name = os.path.basename(job.path_src) + job_name = Path(job.path_src).name + hashpath = Path(temp_dir).joinpath(job_name + ".md5") # create temporary md5 files - with open( - os.path.join(temp_dir, job_name) + ".md5", "w", encoding="utf-8" - ) as tmp: + with hashpath.open("w", encoding="utf-8") as tmp: tmp.write(f"{job.md5} {job_name}") try: file_log.set_description_str(f"Current file: {job.path_src}") self.session.data_objects.put(job.path_src, job.path_dest) self.session.data_objects.put( - os.path.join(temp_dir, job_name) + ".md5", + hashpath, job.path_dest + ".md5", ) t.update(job.bytes) @@ -155,10 +154,10 @@ def put(self): t.clear() def chksum(self): - common_prefix = os.path.commonprefix(self.destinations) + common_prefix = os.path.commonpath(self.destinations) for job in self.jobs: if not job.path_src.endswith(".md5"): - output_logger.info(os.path.relpath(job.path_dest, common_prefix)) + output_logger.info(Path(job.path_dest).relative_to(common_prefix)) try: data_object = self.session.data_objects.get(job.path_dest) data_object.chksum() From 8005c8c99aa5bfe36df6cebaa1249893fe222411 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 24 May 2023 14:46:45 +0200 Subject: [PATCH 19/32] more no-coverage regions --- cubi_tk/irods_utils.py | 4 ++-- cubi_tk/sodar/ingest.py | 32 ++++++++++++++++---------------- tests/test_sodar_ingest.py | 10 +++++++++- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index a895d685..efb2d518 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -83,7 +83,7 @@ def init_irods(irods_env_path: Path, ask: bool = False) -> iRODSSession: if ask and input("Save iRODS session for passwordless operation? [y/N] ").lower().startswith( "y" ): - save_irods_token(session) + save_irods_token(session) # pragma: no cover elif not ask: save_irods_token(session) @@ -101,7 +101,7 @@ def save_irods_token(session: iRODSSession, irods_env_path=None): try: token = session.pam_pw_negotiated - except CAT_INVALID_AUTHENTICATION: + except CAT_INVALID_AUTHENTICATION: # pragma: no cover raise if isinstance(token, list) and token: diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index c772844f..57bce64a 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -45,6 +45,14 @@ def __init__(self, args): logger.error("iRODS environment file is missing.") sys.exit(1) + # Get SODAR API info + toml_config = load_toml_config(Config()) + + if not self.args.sodar_url: + self.args.sodar_url = toml_config.get("global", {}).get("sodar_server_url") + if not self.args.sodar_api_token: + self.args.sodar_api_token = toml_config.get("global", {}).get("sodar_api_token") + @classmethod def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: parser.add_argument( @@ -99,14 +107,6 @@ def run( def execute(self): """Execute ingest.""" - # Get SODAR API info - toml_config = load_toml_config(Config()) - - if not self.args.sodar_url: - self.args.sodar_url = toml_config.get("global", {}).get("sodar_server_url") - if not self.args.sodar_api_token: - self.args.sodar_api_token = toml_config.get("global", {}).get("sodar_api_token") - # Retrieve iRODS path if destination is UUID if is_uuid(self.args.destination): try: @@ -115,7 +115,7 @@ def execute(self): sodar_api_token=self.args.sodar_api_token, landingzone_uuid=self.args.destination, ) - except Exception as e: + except Exception as e: # pragma: no cover logger.error("Failed to retrieve landing zone information.") logger.error(e) sys.exit(1) @@ -128,7 +128,7 @@ def execute(self): logger.error("Target landing zone is not ACTIVE.") sys.exit(1) else: - self.lz_irods_path = self.args.destination + self.lz_irods_path = self.args.destination # pragma: no cover # Build file list and add missing md5 files source_paths = self.build_file_list() @@ -143,7 +143,7 @@ def execute(self): coll = irods_session.collections.get(self.lz_irods_path) for c in coll.subcollections: collections.append(c.name) - except Exception as e: + except Exception as e: # pragma: no cover logger.error(f"Failed to query landing zone collections: {get_irods_error(e)}") sys.exit(1) finally: @@ -168,7 +168,7 @@ def execute(self): elif self.args.collection in collections: self.target_coll = self.args.collection - else: + else: # pragma: no cover logger.error("Selected target collection does not exist in landing zone.") sys.exit(1) @@ -182,7 +182,7 @@ def execute(self): for d in dirs: output_logger.info(f"{self.target_coll}/{str(d)}") if not self.args.yes: - if not input("Is this OK? [y/N] ").lower().startswith("y"): + if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover logger.info("Aborting at your request.") sys.exit(0) @@ -190,7 +190,7 @@ def execute(self): coll_name = f"{self.lz_irods_path}/{self.target_coll}/{str(d)}" try: irods_session.collections.create(coll_name) - except Exception as e: + except Exception as e: # pragma: no cover logger.error("Error creating sub-collection.") logger.error(e) sys.exit(1) @@ -212,7 +212,7 @@ def execute(self): output_logger.info(f"{self.lz_irods_path}/{self.target_coll}/") if not self.args.yes: - if not input("Is this OK? [y/N] ").lower().startswith("y"): + if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover logger.info("Aborting at your request.") sys.exit(0) @@ -220,7 +220,7 @@ def execute(self): logger.info("File transfer complete.") # Compute server-side checksums - if self.args.remote_checksums: + if self.args.remote_checksums: # pragma: no cover logger.info("Computing server-side checksums.") itransfer.chksum() diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index 1e8fb8a2..da2ab234 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -46,7 +46,15 @@ def ingest(fs): fs.create_dir(Path.home().joinpath(".irods")) fs.create_file(Path.home().joinpath(".irods", "irods_environment.json")) - argv = ["--recursive", "testdir", "target"] + argv = [ + "--recursive", + "--sodar-url", + "sodar_url", + "--sodar-api-token", + "token", + "testdir", + "target", + ] parser = ArgumentParser() SodarIngest.setup_argparse(parser) From 870a5d28f0b4e3b525081e979bf6162185b8f931 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Thu, 25 May 2023 11:30:20 +0200 Subject: [PATCH 20/32] check for missing API token --- cubi_tk/sodar/ingest.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 57bce64a..2a3bf870 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -47,11 +47,15 @@ def __init__(self, args): # Get SODAR API info toml_config = load_toml_config(Config()) - - if not self.args.sodar_url: - self.args.sodar_url = toml_config.get("global", {}).get("sodar_server_url") + if toml_config: + config_url = toml_config.get("global", {}).get("sodar_server_url") + if self.args.sodar_url == "https://sodar.bihealth.org/" and config_url: + self.args.sodar_url = config_url + if not self.args.sodar_api_token: + self.args.sodar_api_token = toml_config.get("global", {}).get("sodar_api_token") if not self.args.sodar_api_token: - self.args.sodar_api_token = toml_config.get("global", {}).get("sodar_api_token") + logger.error("SODAR API token missing.") + sys.exit(1) @classmethod def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: @@ -91,7 +95,9 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: help="Don't ask for permission.", ) parser.add_argument( - "--collection", type=str, help="Target iRODS collection. Skips manual selection input." + "--collection", + type=str, + help="Target iRODS collection. Skips manual selection input.", ) parser.add_argument( "sources", help="One or multiple files/directories to ingest.", nargs="+" From caa81d3617a227b7e24023ebda5c39efc845ca8c Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Thu, 25 May 2023 14:22:22 +0200 Subject: [PATCH 21/32] adjusted docstring --- cubi_tk/irods_utils.py | 8 +++++++- cubi_tk/sodar/ingest.py | 5 ++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index efb2d518..58174a75 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -110,7 +110,13 @@ def save_irods_token(session: iRODSSession, irods_env_path=None): class iRODSTransfer: - """Transfers files to and from iRODS.""" + """ + Transfer files to iRODS. + + Attributes: + session -- initialised iRODSSession + jobs -- a tuple of TransferJob objects + """ def __init__(self, session: iRODSSession, jobs: Tuple[TransferJob, ...]): self.session = session diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 2a3bf870..28643685 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -231,7 +231,10 @@ def execute(self): itransfer.chksum() def build_file_list(self): - """Build list of source files to transfer. iRODS paths are relative to target collection.""" + """ + Build list of source files to transfer. + iRODS paths are relative to target collection. + """ source_paths = [Path(src) for src in self.args.sources] output_paths = list() From 5b54c7c247c6f4f215b9eeab419cdf02e5390cf7 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Tue, 30 May 2023 13:29:54 +0200 Subject: [PATCH 22/32] add documentation for sodar ingest command --- docs_manual/index.rst | 6 ++- docs_manual/man_ingest_fastq.rst | 4 +- docs_manual/man_sodar_ingest.rst | 72 ++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 4 deletions(-) create mode 100644 docs_manual/man_sodar_ingest.rst diff --git a/docs_manual/index.rst b/docs_manual/index.rst index 18a80015..c5bfe1a9 100644 --- a/docs_manual/index.rst +++ b/docs_manual/index.rst @@ -14,8 +14,9 @@ Manual | :ref:`Creating ISA-tab files ` | :ref:`Annotating ISA-tab files ` - | :ref:`Upload raw data to SODAR ` - | :ref:`Upload raw data to SODAR ` + | :ref:`Upload data to SODAR ` + | :ref:`Upload fastq files to SODAR ` + | :ref:`Upload results of the Seasnap pipeline to SODAR ` | :ref:`Create a sample info file for Sea-snap ` | :ref:`Tools for archiving old projects ` @@ -51,6 +52,7 @@ Project Info man_isa_tpl man_isa_tab + man_sodar_ingest man_ingest_fastq man_itransfer_results man_write_sample_info diff --git a/docs_manual/man_ingest_fastq.rst b/docs_manual/man_ingest_fastq.rst index 107f8a3e..d61d197a 100644 --- a/docs_manual/man_ingest_fastq.rst +++ b/docs_manual/man_ingest_fastq.rst @@ -1,7 +1,7 @@ .. _man_ingest_fastq: =========================== -Manual for ``ingest-fastq`` +Manual for ``sodar ingest-fastq`` =========================== The ``cubi-tk sodar ingest-fastq`` command lets you upload raw data files to SODAR. @@ -105,4 +105,4 @@ The options are the following: More Information ---------------- -Also see ``cubi-tk sodar ingest-fastq`` :ref:`CLI documentation ` and ``cubi-tk sodar ingest-fastq --help`` for more information. \ No newline at end of file +Also see ``cubi-tk sodar ingest-fastq`` :ref:`CLI documentation ` and ``cubi-tk sodar ingest-fastq --help`` for more information. diff --git a/docs_manual/man_sodar_ingest.rst b/docs_manual/man_sodar_ingest.rst new file mode 100644 index 00000000..b129cf44 --- /dev/null +++ b/docs_manual/man_sodar_ingest.rst @@ -0,0 +1,72 @@ +.. _man_sodar_ingest: + +=========================== +Manual for ``sodar ingest`` +=========================== + +The ``cubi-tk sodar ingest`` command can be used to upload arbitrary data files to SODAR. +It facilitates transfer of one or multiple sources into one SODAR landing zone, while optionally recursively matching and preserving the sub-folder structure. + +---------------- +Basic usage +---------------- + +.. code-block:: bash + + $ cubi-tk sodar ingest [OPTION]... SOURCE [SOURCE ...] DESTINATION + +Where each ``SOURCE`` is a path to a folder containing files and ``DESTINATION`` is either an iRODS path to a *landing zone* in SODAR or the UUID of that *landing zone*. + +For seamless usage `~/.irods/irods_environment.json `_ and :ref:`~/.cubitkrc.toml` should be present. + +---------------- +Parameters +---------------- + +- ``-r, --recursive``: Recursively find files in subdirectories and create iRODS sub-collections to match directory structure. +- ``-K, --remote-checksums``: Instruct iRODS to compute MD5 checksums of uploaded files for SODAR validation step. +- ``-y, --yes``: Don't stop for user permission. Enables scripting with this command. +- ``--collection``: Set target iRODS collection in landing zone. Skips user input for this selection. + +.. _sodar-auth: + +-------------------- +SODAR authentication +-------------------- + +To be able to access the SODAR API (which is only required, if you specify a landing zone UUID instead of an iRODS path), you also need an API token. +For token management in SODAR, the following docs can be used: + +- https://sodar.bihealth.org/manual/ui_user_menu.html +- https://sodar.bihealth.org/manual/ui_api_tokens.html + +There are three options how to supply the token. +Only one is needed. +The options are the following: + +1. configure ``~/.cubitkrc.toml``. + + .. code-block:: toml + + [global] + + sodar_server_url = "https://sodar.bihealth.org/" + sodar_api_token = "" + +2. pass via command line. + + .. code-block:: bash + + $ cubi-tk sodar ingest-fastq --sodar-url "https://sodar.bihealth.org/" --sodar-api-token "" + +3. set as environment variable. + + .. code-block:: bash + + $ SODAR_API_TOKEN="" + +---------------- +More Information +---------------- + +Also see ``cubi-tk sodar ingest`` :ref:`CLI documentation ` and ``cubi-tk sodar ingest --help`` for more information. From fb9ec2c99a8098e2c8eb144ff958fa1fbe287164 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 31 May 2023 12:54:19 +0200 Subject: [PATCH 23/32] check if file already exists in iRODS and skip --- cubi_tk/irods_utils.py | 32 +++++++++++++++++++++++++++++--- tests/test_irods_utils.py | 5 +++-- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index 58174a75..48142572 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -6,7 +6,11 @@ from typing import Tuple import attr -from irods.exception import CAT_INVALID_AUTHENTICATION, PAM_AUTH_PASSWORD_FAILED +from irods.exception import ( + CAT_INVALID_AUTHENTICATION, + PAM_AUTH_PASSWORD_FAILED, + DataObjectDoesNotExist, +) from irods.password_obfuscation import encode from irods.session import iRODSSession import logzero @@ -136,15 +140,37 @@ def put(self): position=1, ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: for job in self.jobs: + file_log.set_description_str(f"Current file: {job.path_src}") job_name = Path(job.path_src).name - hashpath = Path(temp_dir).joinpath(job_name + ".md5") + + # check if remote file exists + try: + obj = self.session.data_objects.get(job.path_dest) + if obj.checksum == job.md5: + logger.debug( + f"File {job_name} already exists in iRODS with matching checksum. Skipping upload." + ) + t.total -= job.bytes + t.refresh() + continue + elif not obj.checksum and obj.size == job.bytes: + logger.debug( + f"File {job_name} already exists in iRODS with matching file size. Skipping upload." + ) + t.total -= job.bytes + t.refresh() + continue + except DataObjectDoesNotExist: # pragma: no cover + pass + finally: + self.session.cleanup() # create temporary md5 files + hashpath = Path(temp_dir).joinpath(job_name + ".md5") with hashpath.open("w", encoding="utf-8") as tmp: tmp.write(f"{job.md5} {job_name}") try: - file_log.set_description_str(f"Current file: {job.path_src}") self.session.data_objects.put(job.path_src, job.path_dest) self.session.data_objects.put( hashpath, diff --git a/tests/test_irods_utils.py b/tests/test_irods_utils.py index 136a516c..56933e85 100644 --- a/tests/test_irods_utils.py +++ b/tests/test_irods_utils.py @@ -99,8 +99,9 @@ def test_irods_transfer_put(fs, itransfer, jobs): fs.create_file(job.path_src) fs.create_dir(Path(job.path_dest).parent) - with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy): - itransfer.put() + with patch.object(itransfer.session.data_objects, "get"): + with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy): + itransfer.put() for job in jobs: assert Path(job.path_dest).exists() From 24c718d1cb27d1e3ca66dd2df466590e8442aa50 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 31 May 2023 14:22:13 +0200 Subject: [PATCH 24/32] moved remote file check to wrapper --- cubi_tk/irods_utils.py | 35 +++++------------------------------ cubi_tk/sodar/ingest.py | 29 +++++++++++++++++++++++++++-- tests/test_irods_utils.py | 5 ++--- 3 files changed, 34 insertions(+), 35 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index 48142572..b0c23a28 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -3,14 +3,10 @@ from pathlib import Path import sys import tempfile -from typing import Tuple +from typing import Set import attr -from irods.exception import ( - CAT_INVALID_AUTHENTICATION, - PAM_AUTH_PASSWORD_FAILED, - DataObjectDoesNotExist, -) +from irods.exception import CAT_INVALID_AUTHENTICATION, PAM_AUTH_PASSWORD_FAILED from irods.password_obfuscation import encode from irods.session import iRODSSession import logzero @@ -122,7 +118,7 @@ class iRODSTransfer: jobs -- a tuple of TransferJob objects """ - def __init__(self, session: iRODSSession, jobs: Tuple[TransferJob, ...]): + def __init__(self, session: iRODSSession, jobs: Set[TransferJob]): self.session = session self.jobs = jobs self.total_bytes = sum([job.bytes for job in self.jobs]) @@ -143,28 +139,6 @@ def put(self): file_log.set_description_str(f"Current file: {job.path_src}") job_name = Path(job.path_src).name - # check if remote file exists - try: - obj = self.session.data_objects.get(job.path_dest) - if obj.checksum == job.md5: - logger.debug( - f"File {job_name} already exists in iRODS with matching checksum. Skipping upload." - ) - t.total -= job.bytes - t.refresh() - continue - elif not obj.checksum and obj.size == job.bytes: - logger.debug( - f"File {job_name} already exists in iRODS with matching file size. Skipping upload." - ) - t.total -= job.bytes - t.refresh() - continue - except DataObjectDoesNotExist: # pragma: no cover - pass - finally: - self.session.cleanup() - # create temporary md5 files hashpath = Path(temp_dir).joinpath(job_name + ".md5") with hashpath.open("w", encoding="utf-8") as tmp: @@ -192,7 +166,8 @@ def chksum(self): output_logger.info(Path(job.path_dest).relative_to(common_prefix)) try: data_object = self.session.data_objects.get(job.path_dest) - data_object.chksum() + if not data_object.checksum: + data_object.chksum() except Exception as e: # pragma: no cover logger.error("Problem during iRODS checksumming.") logger.error(get_irods_error(e)) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 28643685..e1f3c6cb 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -7,6 +7,7 @@ import typing import attr +from irods.exception import DataObjectDoesNotExist import logzero from logzero import logger from sodar_cli import api @@ -138,6 +139,9 @@ def execute(self): # Build file list and add missing md5 files source_paths = self.build_file_list() + if len(source_paths) == 0: + logger.info("Nothing to do. Quitting.") + sys.exit(1) # Initiate iRODS session irods_session = init_irods(self.irods_env_path, ask=not self.args.yes) @@ -207,7 +211,28 @@ def execute(self): # Build transfer jobs jobs = self.build_jobs(source_paths) + # check if remote files exist + joblist = jobs.copy() + for job in joblist: + try: + obj = irods_session.data_objects.get(job.path_dest) + if not obj.checksum and self.args.remote_checksums: + obj.checksum = obj.chksum() + if obj.checksum == job.md5: + logger.info( + f"File {Path(job.path_dest).name} already exists in iRODS with matching checksum. Skipping upload." + ) + jobs.remove(job) + except DataObjectDoesNotExist: # pragma: no cover + pass + finally: + irods_session.cleanup() + # Final go from user & transfer + if len(jobs) == 0: + logger.info("Nothing to do. Quitting.") + sys.exit(1) + itransfer = iRODSTransfer(irods_session, jobs) total_bytes = itransfer.total_bytes logger.info("Planning to transfer the following files:") @@ -258,7 +283,7 @@ def build_file_list(self): output_paths.append({"spath": src, "ipath": Path(src.name)}) return output_paths - def build_jobs(self, source_paths) -> typing.Tuple[TransferJob, ...]: + def build_jobs(self, source_paths) -> typing.Set[TransferJob]: """Build file transfer jobs.""" transfer_jobs = [] @@ -273,7 +298,7 @@ def build_jobs(self, source_paths) -> typing.Tuple[TransferJob, ...]: ) ) - return tuple(sorted(transfer_jobs)) + return set(sorted(transfer_jobs)) def setup_argparse(parser: argparse.ArgumentParser) -> None: diff --git a/tests/test_irods_utils.py b/tests/test_irods_utils.py index 56933e85..136a516c 100644 --- a/tests/test_irods_utils.py +++ b/tests/test_irods_utils.py @@ -99,9 +99,8 @@ def test_irods_transfer_put(fs, itransfer, jobs): fs.create_file(job.path_src) fs.create_dir(Path(job.path_dest).parent) - with patch.object(itransfer.session.data_objects, "get"): - with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy): - itransfer.put() + with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy): + itransfer.put() for job in jobs: assert Path(job.path_dest).exists() From 995e8982bc03e44742cd61a146f168464aeb230a Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Thu, 1 Jun 2023 11:07:43 +0200 Subject: [PATCH 25/32] Clean exit code when nothing to do. --- cubi_tk/sodar/ingest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index e1f3c6cb..2ed09d59 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -141,7 +141,7 @@ def execute(self): source_paths = self.build_file_list() if len(source_paths) == 0: logger.info("Nothing to do. Quitting.") - sys.exit(1) + sys.exit(0) # Initiate iRODS session irods_session = init_irods(self.irods_env_path, ask=not self.args.yes) @@ -231,7 +231,7 @@ def execute(self): # Final go from user & transfer if len(jobs) == 0: logger.info("Nothing to do. Quitting.") - sys.exit(1) + sys.exit(0) itransfer = iRODSTransfer(irods_session, jobs) total_bytes = itransfer.total_bytes From 5ddd459abded60691c19cbb15cba3ed69dd68b87 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Fri, 2 Jun 2023 16:53:15 +0200 Subject: [PATCH 26/32] upgrade python-irodsclient version --- requirements/base.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/base.txt b/requirements/base.txt index f4badf7c..97fcc0a2 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -65,4 +65,4 @@ simplejson sodar-cli ==0.1.0 # Python iRODS client -python-irodsclient==1.1.3 +python-irodsclient==1.1.8 From 35223715cdbe23b4f404c44843d5250d7795b31e Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Fri, 14 Jul 2023 17:13:34 +0200 Subject: [PATCH 27/32] add support for exclude patterns --- cubi_tk/sodar/ingest.py | 13 ++++++++++++- tests/test_sodar_ingest.py | 7 ++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 2ed09d59..3d43d0e1 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -81,6 +81,13 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: action="store_true", help="Recursively match files in subdirectories. Creates iRODS sub-collections to match directory structure.", ) + parser.add_argument( + "-e", + "--exclude", + nargs="+", + type=list, + help="Exclude files by defining one or multiple glob-style patterns.", + ) parser.add_argument( "-K", "--remote-checksums", @@ -276,11 +283,15 @@ def build_file_list(self): if src.is_dir(): paths = abspath.glob("**/*" if self.args.recursive else "*") + excludes = self.args.exclude for p in paths: + if excludes and any([p.match(e) for e in excludes]): + continue if p.is_file() and not p.suffix.lower() == ".md5": output_paths.append({"spath": p, "ipath": p.relative_to(abspath)}) else: - output_paths.append({"spath": src, "ipath": Path(src.name)}) + if not any([src.match(e) for e in excludes if e]): + output_paths.append({"spath": src, "ipath": Path(src.name)}) return output_paths def build_jobs(self, source_paths) -> typing.Set[TransferJob]: diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py index da2ab234..bc5d05d2 100644 --- a/tests/test_sodar_ingest.py +++ b/tests/test_sodar_ingest.py @@ -75,8 +75,9 @@ class DummyArgs(object): fs.create_symlink("/loop_src2", "/loop_src") args = DummyArgs() - args.sources = ["broken_link", "not_here", "loop_src", "testdir"] + args.sources = ["broken_link", "not_here", "loop_src", "testdir", "testdir", "file5"] args.recursive = True + args.exclude = ["file4", "file5"] dummy = MagicMock() args_mock = PropertyMock(return_value=args) type(dummy).args = args_mock @@ -86,6 +87,8 @@ class DummyArgs(object): fs.create_file("/testdir/file1.md5") fs.create_file("/testdir/subdir/file2") fs.create_file("/file3") + fs.create_file("/testdir/file4") + fs.create_file("/file5") fs.create_symlink("/testdir/file3", "/file3") paths = SodarIngest.build_file_list(dummy) @@ -120,6 +123,8 @@ class DummyArgs(object): "ipath": Path("subdir/file2"), } not in paths assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths + assert {"spath": Path("/testdir/file4"), "ipath": Path("file4")} not in paths + assert {"spath": Path("file5"), "ipath": Path("file5")} not in paths @patch("cubi_tk.sodar.ingest.sorted") From 897b2d943ae74350f17aab99412d395009aaffc2 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 2 Aug 2023 16:11:21 +0200 Subject: [PATCH 28/32] fix excludes --- cubi_tk/sodar/ingest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 3d43d0e1..23b94ec9 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -85,7 +85,8 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: "-e", "--exclude", nargs="+", - type=list, + default="", + type=str, help="Exclude files by defining one or multiple glob-style patterns.", ) parser.add_argument( @@ -281,9 +282,9 @@ def build_file_list(self): logger.warning(f"Symlink loop: {src.name}") continue + excludes = self.args.exclude if src.is_dir(): paths = abspath.glob("**/*" if self.args.recursive else "*") - excludes = self.args.exclude for p in paths: if excludes and any([p.match(e) for e in excludes]): continue From 3b0d1fd112e703d6ffd3ecaf04429fddb5ed2037 Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 2 Aug 2023 16:11:42 +0200 Subject: [PATCH 29/32] sort file list for output --- cubi_tk/sodar/ingest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 23b94ec9..04612b39 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -241,6 +241,7 @@ def execute(self): logger.info("Nothing to do. Quitting.") sys.exit(0) + jobs = sorted(jobs, key=lambda x: x.path_src) itransfer = iRODSTransfer(irods_session, jobs) total_bytes = itransfer.total_bytes logger.info("Planning to transfer the following files:") From 67b74352cd84987740b517280e7dd61eaff2a80c Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Wed, 2 Aug 2023 17:43:37 +0200 Subject: [PATCH 30/32] reworked hashsum logic - now checks for .md5 file on disk - loads hash if found - computes hash if not found and stores on disk - checks iRODS for existing file using hash --- cubi_tk/irods_utils.py | 58 +++++++++++++++-------------------------- cubi_tk/sodar/ingest.py | 58 ++++++++++++++++++++++++++--------------- 2 files changed, 58 insertions(+), 58 deletions(-) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index b0c23a28..83a5161f 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -2,7 +2,6 @@ import os.path from pathlib import Path import sys -import tempfile from typing import Set import attr @@ -31,9 +30,6 @@ class TransferJob: #: Number of bytes to transfer. bytes: int - #: MD5 hashsum of file. - md5: str - def get_irods_error(e: Exception): """Return logger friendly iRODS exception.""" @@ -125,39 +121,27 @@ def __init__(self, session: iRODSSession, jobs: Set[TransferJob]): self.destinations = [job.path_dest for job in self.jobs] def put(self): - with tempfile.TemporaryDirectory() as temp_dir: - # Double tqdm for currently transferred file info - # TODO: add more parenthesis after python 3.10 - with tqdm( - total=self.total_bytes, - unit="B", - unit_scale=True, - unit_divisor=1024, - position=1, - ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: - for job in self.jobs: - file_log.set_description_str(f"Current file: {job.path_src}") - job_name = Path(job.path_src).name - - # create temporary md5 files - hashpath = Path(temp_dir).joinpath(job_name + ".md5") - with hashpath.open("w", encoding="utf-8") as tmp: - tmp.write(f"{job.md5} {job_name}") - - try: - self.session.data_objects.put(job.path_src, job.path_dest) - self.session.data_objects.put( - hashpath, - job.path_dest + ".md5", - ) - t.update(job.bytes) - except Exception as e: # pragma: no cover - logger.error(f"Problem during transfer of {job.path_src}") - logger.error(get_irods_error(e)) - sys.exit(1) - finally: - self.session.cleanup() - t.clear() + # Double tqdm for currently transferred file info + # TODO: add more parenthesis after python 3.10 + with tqdm( + total=self.total_bytes, + unit="B", + unit_scale=True, + unit_divisor=1024, + position=1, + ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: + for job in self.jobs: + file_log.set_description_str(f"Current file: {job.path_src}") + try: + self.session.data_objects.put(job.path_src, job.path_dest) + t.update(job.bytes) + except Exception as e: # pragma: no cover + logger.error(f"Problem during transfer of {job.path_src}") + logger.error(get_irods_error(e)) + sys.exit(1) + finally: + self.session.cleanup() + t.clear() def chksum(self): common_prefix = os.path.commonpath(self.destinations) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 04612b39..558f6e66 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -217,24 +217,7 @@ def execute(self): logger.info("Sub-collections created.") # Build transfer jobs - jobs = self.build_jobs(source_paths) - - # check if remote files exist - joblist = jobs.copy() - for job in joblist: - try: - obj = irods_session.data_objects.get(job.path_dest) - if not obj.checksum and self.args.remote_checksums: - obj.checksum = obj.chksum() - if obj.checksum == job.md5: - logger.info( - f"File {Path(job.path_dest).name} already exists in iRODS with matching checksum. Skipping upload." - ) - jobs.remove(job) - except DataObjectDoesNotExist: # pragma: no cover - pass - finally: - irods_session.cleanup() + jobs = self.build_jobs(source_paths, irods_session) # Final go from user & transfer if len(jobs) == 0: @@ -296,18 +279,51 @@ def build_file_list(self): output_paths.append({"spath": src, "ipath": Path(src.name)}) return output_paths - def build_jobs(self, source_paths) -> typing.Set[TransferJob]: + def build_jobs(self, source_paths, irods_session) -> typing.Set[TransferJob]: """Build file transfer jobs.""" transfer_jobs = [] for p in source_paths: + path_dest = f"{self.lz_irods_path}/{self.target_coll}/{str(p['ipath'])}" + md5_path = p["spath"].parent / (p["spath"].name + ".md5") + + if md5_path.exists(): + with md5_path.open() as f: + md5sum = f.readline()[:32] + logger.info(f"Found md5 hash on disk for {p['spath']}") + else: + md5sum = compute_md5_checksum(p["spath"]) + with md5_path.open("w", encoding="utf-8") as f: + f.write(f"{md5sum} {p['spath'].name}") + + try: + obj = irods_session.data_objects.get(path_dest) + if not obj.checksum and self.args.remote_checksums: + obj.checksum = obj.chksum() + if obj.checksum == md5sum: + logger.info( + f"File {Path(path_dest).name} already exists in iRODS with matching checksum. Skipping upload." + ) + continue + except DataObjectDoesNotExist: # pragma: no cover + pass + finally: + irods_session.cleanup() + transfer_jobs.append( TransferJob( path_src=str(p["spath"]), - path_dest=f"{self.lz_irods_path}/{self.target_coll}/{str(p['ipath'])}", + path_dest=path_dest, bytes=p["spath"].stat().st_size, - md5=compute_md5_checksum(p["spath"]), + ) + ) + + transfer_jobs.append( + TransferJob( + path_src=str(md5_path), + path_dest=path_dest + ".md5", + bytes=md5_path.stat().st_size, ) ) From e7df7b71b3cfb0678899936c6e9e8262096bd61c Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Mon, 7 Aug 2023 14:29:39 +0200 Subject: [PATCH 31/32] increased iRODS session timeout --- cubi_tk/irods_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py index 83a5161f..6748b90d 100644 --- a/cubi_tk/irods_utils.py +++ b/cubi_tk/irods_utils.py @@ -44,6 +44,7 @@ def init_irods(irods_env_path: Path, ask: bool = False) -> iRODSSession: try: session = iRODSSession(irods_env_file=irods_env_path) session.server_version # check for outdated .irodsA file + session.connection_timeout = 300 return session except Exception as e: # pragma: no cover logger.error(f"iRODS connection failed: {get_irods_error(e)}") From 96a93e79b276a52d1e0397fb57b140646734d8ff Mon Sep 17 00:00:00 2001 From: Thomas Sell Date: Tue, 26 Sep 2023 16:13:25 +0200 Subject: [PATCH 32/32] don't re-create already existing collections --- cubi_tk/sodar/ingest.py | 55 +++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py index 558f6e66..0d708c8d 100644 --- a/cubi_tk/sodar/ingest.py +++ b/cubi_tk/sodar/ingest.py @@ -196,25 +196,42 @@ def execute(self): {p["ipath"].parent for p in source_paths if not p["ipath"].parent == Path(".")} ) - logger.info("Planning to create the following sub-collections:") - for d in dirs: - output_logger.info(f"{self.target_coll}/{str(d)}") - if not self.args.yes: - if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover - logger.info("Aborting at your request.") - sys.exit(0) - - for d in dirs: - coll_name = f"{self.lz_irods_path}/{self.target_coll}/{str(d)}" - try: - irods_session.collections.create(coll_name) - except Exception as e: # pragma: no cover - logger.error("Error creating sub-collection.") - logger.error(e) - sys.exit(1) - finally: - irods_session.cleanup() - logger.info("Sub-collections created.") + # Filter out existing sub-collections + try: + dirs = [ + d + for d in dirs + if not irods_session.collections.exists( + f"{self.lz_irods_path}/{self.target_coll}/{str(d)}" + ) + ] + except Exception as e: # pragma: no cover + logger.error("Error checking for sub-collections.") + logger.error(e) + sys.exit(1) + finally: + irods_session.cleanup() + + if dirs: + logger.info("Planning to create the following sub-collections:") + for d in dirs: + output_logger.info(f"{self.target_coll}/{str(d)}") + if not self.args.yes: + if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover + logger.info("Aborting at your request.") + sys.exit(0) + + for d in dirs: + coll_name = f"{self.lz_irods_path}/{self.target_coll}/{str(d)}" + try: + irods_session.collections.create(coll_name) + except Exception as e: # pragma: no cover + logger.error("Error creating sub-collection.") + logger.error(e) + sys.exit(1) + finally: + irods_session.cleanup() + logger.info("Sub-collections created.") # Build transfer jobs jobs = self.build_jobs(source_paths, irods_session)