diff --git a/cubi_tk/irods_utils.py b/cubi_tk/irods_utils.py new file mode 100644 index 00000000..6748b90d --- /dev/null +++ b/cubi_tk/irods_utils.py @@ -0,0 +1,160 @@ +import getpass +import os.path +from pathlib import Path +import sys +from typing import Set + +import attr +from irods.exception import CAT_INVALID_AUTHENTICATION, PAM_AUTH_PASSWORD_FAILED +from irods.password_obfuscation import encode +from irods.session import iRODSSession +import logzero +from logzero import logger +from tqdm import tqdm + +# no-frills logger +formatter = logzero.LogFormatter(fmt="%(message)s") +output_logger = logzero.setup_logger(formatter=formatter) + + +@attr.s(frozen=True, auto_attribs=True) +class TransferJob: + """Encodes a transfer job from the local file system to the remote iRODS collection.""" + + #: Source path. + path_src: str + + #: Destination path. + path_dest: str + + #: Number of bytes to transfer. + bytes: int + + +def get_irods_error(e: Exception): + """Return logger friendly iRODS exception.""" + es = str(e) + return es if es and es != "None" else e.__class__.__name__ + + +def init_irods(irods_env_path: Path, ask: bool = False) -> iRODSSession: + """Connect to iRODS.""" + irods_auth_path = irods_env_path.parent.joinpath(".irodsA") + if irods_auth_path.exists(): + try: + session = iRODSSession(irods_env_file=irods_env_path) + session.server_version # check for outdated .irodsA file + session.connection_timeout = 300 + return session + except Exception as e: # pragma: no cover + logger.error(f"iRODS connection failed: {get_irods_error(e)}") + pass + finally: + session.cleanup() + + # No valid .irodsA file. Query user for password. + logger.info("No valid iRODS authentication file found.") + attempts = 0 + while attempts < 3: + try: + session = iRODSSession( + irods_env_file=irods_env_path, + password=getpass.getpass(prompt="Please enter SODAR password:"), + ) + session.server_version # check for exceptions + break + except PAM_AUTH_PASSWORD_FAILED: # pragma: no cover + if attempts < 2: + logger.warning("Wrong password. Please try again.") + attempts += 1 + continue + else: + logger.error("iRODS connection failed.") + sys.exit(1) + except Exception as e: # pragma: no cover + logger.error(f"iRODS connection failed: {get_irods_error(e)}") + sys.exit(1) + finally: + session.cleanup() + + if ask and input("Save iRODS session for passwordless operation? [y/N] ").lower().startswith( + "y" + ): + save_irods_token(session) # pragma: no cover + elif not ask: + save_irods_token(session) + + return session + + +def save_irods_token(session: iRODSSession, irods_env_path=None): + """Retrieve PAM temp auth token 'obfuscate' it and save to disk.""" + if not irods_env_path: + irods_auth_path = Path.home().joinpath(".irods", ".irodsA") + else: + irods_auth_path = Path(irods_env_path).parent.joinpath(".irodsA") + + irods_auth_path.parent.mkdir(parents=True, exist_ok=True) + + try: + token = session.pam_pw_negotiated + except CAT_INVALID_AUTHENTICATION: # pragma: no cover + raise + + if isinstance(token, list) and token: + irods_auth_path.write_text(encode(token[0])) + irods_auth_path.chmod(0o600) + + +class iRODSTransfer: + """ + Transfer files to iRODS. + + Attributes: + session -- initialised iRODSSession + jobs -- a tuple of TransferJob objects + """ + + def __init__(self, session: iRODSSession, jobs: Set[TransferJob]): + self.session = session + self.jobs = jobs + self.total_bytes = sum([job.bytes for job in self.jobs]) + self.destinations = [job.path_dest for job in self.jobs] + + def put(self): + # Double tqdm for currently transferred file info + # TODO: add more parenthesis after python 3.10 + with tqdm( + total=self.total_bytes, + unit="B", + unit_scale=True, + unit_divisor=1024, + position=1, + ) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log: + for job in self.jobs: + file_log.set_description_str(f"Current file: {job.path_src}") + try: + self.session.data_objects.put(job.path_src, job.path_dest) + t.update(job.bytes) + except Exception as e: # pragma: no cover + logger.error(f"Problem during transfer of {job.path_src}") + logger.error(get_irods_error(e)) + sys.exit(1) + finally: + self.session.cleanup() + t.clear() + + def chksum(self): + common_prefix = os.path.commonpath(self.destinations) + for job in self.jobs: + if not job.path_src.endswith(".md5"): + output_logger.info(Path(job.path_dest).relative_to(common_prefix)) + try: + data_object = self.session.data_objects.get(job.path_dest) + if not data_object.checksum: + data_object.chksum() + except Exception as e: # pragma: no cover + logger.error("Problem during iRODS checksumming.") + logger.error(get_irods_error(e)) + finally: + self.session.cleanup() diff --git a/cubi_tk/sodar/__init__.py b/cubi_tk/sodar/__init__.py index 95321569..88575ad0 100644 --- a/cubi_tk/sodar/__init__.py +++ b/cubi_tk/sodar/__init__.py @@ -37,6 +37,9 @@ Upload external files to SODAR (defaults for fastq files). +``ingest`` + Upload arbitrary files to SODAR + ``check-remote`` Check if or which local files with md5 sums are already deposited in iRODs/Sodar @@ -53,6 +56,7 @@ from .add_ped import setup_argparse as setup_argparse_add_ped from .check_remote import setup_argparse as setup_argparse_check_remote from .download_sheet import setup_argparse as setup_argparse_download_sheet +from .ingest import setup_argparse as setup_argparse_ingest from .ingest_fastq import setup_argparse as setup_argparse_ingest_fastq from .lz_create import setup_argparse as setup_argparse_lz_create from .lz_list import setup_argparse as setup_argparse_lz_list @@ -87,6 +91,7 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None: "ingest-fastq", help="Upload external files to SODAR (defaults for fastq)" ) ) + setup_argparse_ingest(subparsers.add_parser("ingest", help="Upload arbitrary files to SODAR")) setup_argparse_check_remote( subparsers.add_parser( "check-remote", help="Compare local files with md5 sum against SODAR/iRODS" diff --git a/cubi_tk/sodar/ingest.py b/cubi_tk/sodar/ingest.py new file mode 100644 index 00000000..0d708c8d --- /dev/null +++ b/cubi_tk/sodar/ingest.py @@ -0,0 +1,352 @@ +"""``cubi-tk sodar ingest``: add arbitrary files to SODAR""" + +import argparse +import os +from pathlib import Path +import sys +import typing + +import attr +from irods.exception import DataObjectDoesNotExist +import logzero +from logzero import logger +from sodar_cli import api + +from cubi_tk.irods_utils import TransferJob, get_irods_error, init_irods, iRODSTransfer + +from ..common import compute_md5_checksum, is_uuid, load_toml_config, sizeof_fmt + +# for testing +logger.propagate = True + +# no-frills logger +formatter = logzero.LogFormatter(fmt="%(message)s") +output_logger = logzero.setup_logger(formatter=formatter) + + +@attr.s(frozen=True, auto_attribs=True) +class Config: + """Configuration for the ingest command.""" + + config: str = attr.field(default=None) + sodar_server_url: str = attr.field(default=None) + sodar_api_token: str = attr.field(default=None, repr=lambda value: "***") # type: ignore + + +class SodarIngest: + """Implementation of sodar ingest command.""" + + def __init__(self, args): + # Command line arguments. + self.args = args + + # Path to iRODS environment file + self.irods_env_path = Path(Path.home(), ".irods", "irods_environment.json") + if not self.irods_env_path.exists(): + logger.error("iRODS environment file is missing.") + sys.exit(1) + + # Get SODAR API info + toml_config = load_toml_config(Config()) + if toml_config: + config_url = toml_config.get("global", {}).get("sodar_server_url") + if self.args.sodar_url == "https://sodar.bihealth.org/" and config_url: + self.args.sodar_url = config_url + if not self.args.sodar_api_token: + self.args.sodar_api_token = toml_config.get("global", {}).get("sodar_api_token") + if not self.args.sodar_api_token: + logger.error("SODAR API token missing.") + sys.exit(1) + + @classmethod + def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--hidden-cmd", dest="sodar_cmd", default=cls.run, help=argparse.SUPPRESS + ) + group_sodar = parser.add_argument_group("SODAR-related") + group_sodar.add_argument( + "--sodar-url", + default=os.environ.get("SODAR_URL", "https://sodar.bihealth.org/"), + help="URL to SODAR, defaults to SODAR_URL environment variable or fallback to https://sodar.bihealth.org/", + ) + group_sodar.add_argument( + "--sodar-api-token", + default=os.environ.get("SODAR_API_TOKEN", None), + help="SODAR API token. Defaults to SODAR_API_TOKEN environment variable.", + ) + parser.add_argument( + "-r", + "--recursive", + default=False, + action="store_true", + help="Recursively match files in subdirectories. Creates iRODS sub-collections to match directory structure.", + ) + parser.add_argument( + "-e", + "--exclude", + nargs="+", + default="", + type=str, + help="Exclude files by defining one or multiple glob-style patterns.", + ) + parser.add_argument( + "-K", + "--remote-checksums", + default=False, + action="store_true", + help="Trigger checksum computation on the iRODS side.", + ) + parser.add_argument( + "-y", + "--yes", + default=False, + action="store_true", + help="Don't ask for permission.", + ) + parser.add_argument( + "--collection", + type=str, + help="Target iRODS collection. Skips manual selection input.", + ) + parser.add_argument( + "sources", help="One or multiple files/directories to ingest.", nargs="+" + ) + parser.add_argument("destination", help="UUID or iRODS path of SODAR landing zone.") + + @classmethod + def run( + cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser + ) -> typing.Optional[int]: + """Entry point into the command.""" + return cls(args).execute() + + def execute(self): + """Execute ingest.""" + # Retrieve iRODS path if destination is UUID + if is_uuid(self.args.destination): + try: + lz_info = api.landingzone.retrieve( + sodar_url=self.args.sodar_url, + sodar_api_token=self.args.sodar_api_token, + landingzone_uuid=self.args.destination, + ) + except Exception as e: # pragma: no cover + logger.error("Failed to retrieve landing zone information.") + logger.error(e) + sys.exit(1) + + # TODO: Replace with status_locked check once implemented in sodar_cli + if lz_info.status in ["ACTIVE", "FAILED"]: + self.lz_irods_path = lz_info.irods_path + logger.info(f"Target iRods path: {self.lz_irods_path}") + else: + logger.error("Target landing zone is not ACTIVE.") + sys.exit(1) + else: + self.lz_irods_path = self.args.destination # pragma: no cover + + # Build file list and add missing md5 files + source_paths = self.build_file_list() + if len(source_paths) == 0: + logger.info("Nothing to do. Quitting.") + sys.exit(0) + + # Initiate iRODS session + irods_session = init_irods(self.irods_env_path, ask=not self.args.yes) + + # Query target collection + logger.info("Querying landing zone collections…") + collections = [] + try: + coll = irods_session.collections.get(self.lz_irods_path) + for c in coll.subcollections: + collections.append(c.name) + except Exception as e: # pragma: no cover + logger.error(f"Failed to query landing zone collections: {get_irods_error(e)}") + sys.exit(1) + finally: + irods_session.cleanup() + + if self.args.collection is None: + user_input = "" + input_valid = False + input_message = "####################\nPlease choose target collection:\n" + for index, item in enumerate(collections): + input_message += f"{index+1}) {item}\n" + input_message += "Select by number: " + + while not input_valid: + user_input = input(input_message) + if user_input.isdigit(): + user_input = int(user_input) + if 0 < user_input < len(collections): + input_valid = True + + self.target_coll = collections[user_input - 1] + + elif self.args.collection in collections: + self.target_coll = self.args.collection + else: # pragma: no cover + logger.error("Selected target collection does not exist in landing zone.") + sys.exit(1) + + # Create sub-collections for folders + if self.args.recursive: + dirs = sorted( + {p["ipath"].parent for p in source_paths if not p["ipath"].parent == Path(".")} + ) + + # Filter out existing sub-collections + try: + dirs = [ + d + for d in dirs + if not irods_session.collections.exists( + f"{self.lz_irods_path}/{self.target_coll}/{str(d)}" + ) + ] + except Exception as e: # pragma: no cover + logger.error("Error checking for sub-collections.") + logger.error(e) + sys.exit(1) + finally: + irods_session.cleanup() + + if dirs: + logger.info("Planning to create the following sub-collections:") + for d in dirs: + output_logger.info(f"{self.target_coll}/{str(d)}") + if not self.args.yes: + if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover + logger.info("Aborting at your request.") + sys.exit(0) + + for d in dirs: + coll_name = f"{self.lz_irods_path}/{self.target_coll}/{str(d)}" + try: + irods_session.collections.create(coll_name) + except Exception as e: # pragma: no cover + logger.error("Error creating sub-collection.") + logger.error(e) + sys.exit(1) + finally: + irods_session.cleanup() + logger.info("Sub-collections created.") + + # Build transfer jobs + jobs = self.build_jobs(source_paths, irods_session) + + # Final go from user & transfer + if len(jobs) == 0: + logger.info("Nothing to do. Quitting.") + sys.exit(0) + + jobs = sorted(jobs, key=lambda x: x.path_src) + itransfer = iRODSTransfer(irods_session, jobs) + total_bytes = itransfer.total_bytes + logger.info("Planning to transfer the following files:") + for job in jobs: + output_logger.info(job.path_src) + logger.info(f"With a total size of {sizeof_fmt(total_bytes)}") + logger.info("Into this iRODS collection:") + output_logger.info(f"{self.lz_irods_path}/{self.target_coll}/") + + if not self.args.yes: + if not input("Is this OK? [y/N] ").lower().startswith("y"): # pragma: no cover + logger.info("Aborting at your request.") + sys.exit(0) + + itransfer.put() + logger.info("File transfer complete.") + + # Compute server-side checksums + if self.args.remote_checksums: # pragma: no cover + logger.info("Computing server-side checksums.") + itransfer.chksum() + + def build_file_list(self): + """ + Build list of source files to transfer. + iRODS paths are relative to target collection. + """ + + source_paths = [Path(src) for src in self.args.sources] + output_paths = list() + + for src in source_paths: + try: + abspath = src.resolve(strict=True) + except FileNotFoundError: + logger.warning(f"File not found: {src.name}") + continue + except RuntimeError: + logger.warning(f"Symlink loop: {src.name}") + continue + + excludes = self.args.exclude + if src.is_dir(): + paths = abspath.glob("**/*" if self.args.recursive else "*") + for p in paths: + if excludes and any([p.match(e) for e in excludes]): + continue + if p.is_file() and not p.suffix.lower() == ".md5": + output_paths.append({"spath": p, "ipath": p.relative_to(abspath)}) + else: + if not any([src.match(e) for e in excludes if e]): + output_paths.append({"spath": src, "ipath": Path(src.name)}) + return output_paths + + def build_jobs(self, source_paths, irods_session) -> typing.Set[TransferJob]: + """Build file transfer jobs.""" + + transfer_jobs = [] + + for p in source_paths: + path_dest = f"{self.lz_irods_path}/{self.target_coll}/{str(p['ipath'])}" + md5_path = p["spath"].parent / (p["spath"].name + ".md5") + + if md5_path.exists(): + with md5_path.open() as f: + md5sum = f.readline()[:32] + logger.info(f"Found md5 hash on disk for {p['spath']}") + else: + md5sum = compute_md5_checksum(p["spath"]) + with md5_path.open("w", encoding="utf-8") as f: + f.write(f"{md5sum} {p['spath'].name}") + + try: + obj = irods_session.data_objects.get(path_dest) + if not obj.checksum and self.args.remote_checksums: + obj.checksum = obj.chksum() + if obj.checksum == md5sum: + logger.info( + f"File {Path(path_dest).name} already exists in iRODS with matching checksum. Skipping upload." + ) + continue + except DataObjectDoesNotExist: # pragma: no cover + pass + finally: + irods_session.cleanup() + + transfer_jobs.append( + TransferJob( + path_src=str(p["spath"]), + path_dest=path_dest, + bytes=p["spath"].stat().st_size, + ) + ) + + transfer_jobs.append( + TransferJob( + path_src=str(md5_path), + path_dest=path_dest + ".md5", + bytes=md5_path.stat().st_size, + ) + ) + + return set(sorted(transfer_jobs)) + + +def setup_argparse(parser: argparse.ArgumentParser) -> None: + """Setup argument parser for ``cubi-tk sodar ingest``.""" + return SodarIngest.setup_argparse(parser) diff --git a/docs_manual/index.rst b/docs_manual/index.rst index 18a80015..c5bfe1a9 100644 --- a/docs_manual/index.rst +++ b/docs_manual/index.rst @@ -14,8 +14,9 @@ Manual | :ref:`Creating ISA-tab files ` | :ref:`Annotating ISA-tab files ` - | :ref:`Upload raw data to SODAR ` - | :ref:`Upload raw data to SODAR ` + | :ref:`Upload data to SODAR ` + | :ref:`Upload fastq files to SODAR ` + | :ref:`Upload results of the Seasnap pipeline to SODAR ` | :ref:`Create a sample info file for Sea-snap ` | :ref:`Tools for archiving old projects ` @@ -51,6 +52,7 @@ Project Info man_isa_tpl man_isa_tab + man_sodar_ingest man_ingest_fastq man_itransfer_results man_write_sample_info diff --git a/docs_manual/man_ingest_fastq.rst b/docs_manual/man_ingest_fastq.rst index 107f8a3e..d61d197a 100644 --- a/docs_manual/man_ingest_fastq.rst +++ b/docs_manual/man_ingest_fastq.rst @@ -1,7 +1,7 @@ .. _man_ingest_fastq: =========================== -Manual for ``ingest-fastq`` +Manual for ``sodar ingest-fastq`` =========================== The ``cubi-tk sodar ingest-fastq`` command lets you upload raw data files to SODAR. @@ -105,4 +105,4 @@ The options are the following: More Information ---------------- -Also see ``cubi-tk sodar ingest-fastq`` :ref:`CLI documentation ` and ``cubi-tk sodar ingest-fastq --help`` for more information. \ No newline at end of file +Also see ``cubi-tk sodar ingest-fastq`` :ref:`CLI documentation ` and ``cubi-tk sodar ingest-fastq --help`` for more information. diff --git a/docs_manual/man_sodar_ingest.rst b/docs_manual/man_sodar_ingest.rst new file mode 100644 index 00000000..b129cf44 --- /dev/null +++ b/docs_manual/man_sodar_ingest.rst @@ -0,0 +1,72 @@ +.. _man_sodar_ingest: + +=========================== +Manual for ``sodar ingest`` +=========================== + +The ``cubi-tk sodar ingest`` command can be used to upload arbitrary data files to SODAR. +It facilitates transfer of one or multiple sources into one SODAR landing zone, while optionally recursively matching and preserving the sub-folder structure. + +---------------- +Basic usage +---------------- + +.. code-block:: bash + + $ cubi-tk sodar ingest [OPTION]... SOURCE [SOURCE ...] DESTINATION + +Where each ``SOURCE`` is a path to a folder containing files and ``DESTINATION`` is either an iRODS path to a *landing zone* in SODAR or the UUID of that *landing zone*. + +For seamless usage `~/.irods/irods_environment.json `_ and :ref:`~/.cubitkrc.toml` should be present. + +---------------- +Parameters +---------------- + +- ``-r, --recursive``: Recursively find files in subdirectories and create iRODS sub-collections to match directory structure. +- ``-K, --remote-checksums``: Instruct iRODS to compute MD5 checksums of uploaded files for SODAR validation step. +- ``-y, --yes``: Don't stop for user permission. Enables scripting with this command. +- ``--collection``: Set target iRODS collection in landing zone. Skips user input for this selection. + +.. _sodar-auth: + +-------------------- +SODAR authentication +-------------------- + +To be able to access the SODAR API (which is only required, if you specify a landing zone UUID instead of an iRODS path), you also need an API token. +For token management in SODAR, the following docs can be used: + +- https://sodar.bihealth.org/manual/ui_user_menu.html +- https://sodar.bihealth.org/manual/ui_api_tokens.html + +There are three options how to supply the token. +Only one is needed. +The options are the following: + +1. configure ``~/.cubitkrc.toml``. + + .. code-block:: toml + + [global] + + sodar_server_url = "https://sodar.bihealth.org/" + sodar_api_token = "" + +2. pass via command line. + + .. code-block:: bash + + $ cubi-tk sodar ingest-fastq --sodar-url "https://sodar.bihealth.org/" --sodar-api-token "" + +3. set as environment variable. + + .. code-block:: bash + + $ SODAR_API_TOKEN="" + +---------------- +More Information +---------------- + +Also see ``cubi-tk sodar ingest`` :ref:`CLI documentation ` and ``cubi-tk sodar ingest --help`` for more information. diff --git a/requirements/base.txt b/requirements/base.txt index f4badf7c..97fcc0a2 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -65,4 +65,4 @@ simplejson sodar-cli ==0.1.0 # Python iRODS client -python-irodsclient==1.1.3 +python-irodsclient==1.1.8 diff --git a/tests/test_irods_utils.py b/tests/test_irods_utils.py new file mode 100644 index 00000000..136a516c --- /dev/null +++ b/tests/test_irods_utils.py @@ -0,0 +1,117 @@ +from pathlib import Path +import shutil +from unittest.mock import MagicMock, PropertyMock, patch + +import irods.exception +from irods.session import iRODSSession +import pytest + +from cubi_tk.irods_utils import ( + TransferJob, + get_irods_error, + init_irods, + iRODSTransfer, + save_irods_token, +) + + +@pytest.fixture +def fake_filesystem(fs): + yield fs + + +@pytest.fixture +def jobs(): + return ( + TransferJob( + path_src="myfile.csv", + path_dest="dest_dir/myfile.csv", + bytes=123, + md5="ed3b3cbb18fd148bc925944ff0861ce6", + ), + TransferJob( + path_src="folder/file.csv", + path_dest="dest_dir/folder/file.csv", + bytes=1024, + md5="a6e9e3c859b803adb0f1d5f08a51d0f6", + ), + ) + + +@pytest.fixture +def itransfer(jobs): + session = iRODSSession( + irods_host="localhost", + irods_port=1247, + irods_user_name="pytest", + irods_zone_name="pytest", + ) + return iRODSTransfer(session, jobs) + + +def test_get_irods_error(): + e = irods.exception.NetworkException() + assert get_irods_error(e) == "NetworkException" + e = irods.exception.NetworkException("Connection reset") + assert get_irods_error(e) == "Connection reset" + + +@patch("cubi_tk.irods_utils.iRODSSession") +@patch("getpass.getpass") +def test_init_irods(mockpass, mocksession, fs): + ienv = Path(".irods/irods_environment.json") + password = "1234" + + # .irodsA not found, asks for password + mockpass.return_value = password + init_irods(ienv) + mockpass.assert_called() + mocksession.assert_called_with(irods_env_file=ienv, password=password) + + # .irodsA there, does not ask for password + fs.create_file(".irods/.irodsA") + mockpass.reset_mock() + init_irods(ienv) + mockpass.assert_not_called() + mocksession.assert_called_with(irods_env_file=ienv) + + +@patch("cubi_tk.irods_utils.encode", return_value="it works") +def test_write_token(mockencode, fs): + ienv = Path(".irods/irods_environment.json") + + mocksession = MagicMock() + pam_pw = PropertyMock(return_value=["secure"]) + type(mocksession).pam_pw_negotiated = pam_pw + + save_irods_token(mocksession, ienv) + assert ienv.parent.joinpath(".irodsA").exists() + mockencode.assert_called_with("secure") + + +def test_irods_transfer_init(jobs, itransfer): + assert itransfer.total_bytes == sum([job.bytes for job in jobs]) + assert itransfer.destinations == [job.path_dest for job in jobs] + + +def test_irods_transfer_put(fs, itransfer, jobs): + for job in jobs: + fs.create_file(job.path_src) + fs.create_dir(Path(job.path_dest).parent) + + with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy): + itransfer.put() + + for job in jobs: + assert Path(job.path_dest).exists() + assert Path(job.path_dest + ".md5").exists() + with Path(job.path_dest + ".md5").open() as file: + assert file.readline() == f"{job.md5} {Path(job.path_dest).name}" + + +def test_irods_transfer_chksum(itransfer): + with patch.object(itransfer.session.data_objects, "get") as mock: + itransfer.chksum() + + for path in itransfer.destinations: + mock.assert_any_call(path) diff --git a/tests/test_sodar_ingest.py b/tests/test_sodar_ingest.py new file mode 100644 index 00000000..bc5d05d2 --- /dev/null +++ b/tests/test_sodar_ingest.py @@ -0,0 +1,213 @@ +"""Tests for ``cubi_tk.sodar.ingest``.""" + +from argparse import ArgumentParser +import os +from pathlib import Path +from unittest.mock import MagicMock, PropertyMock, call, patch + +import pytest + +from cubi_tk.__main__ import main, setup_argparse +from cubi_tk.sodar.ingest import SodarIngest + + +def test_run_sodar_ingest_help(capsys): + parser, _subparsers = setup_argparse() + with pytest.raises(SystemExit) as e: + parser.parse_args(["sodar", "ingest", "--help"]) + + assert e.value.code == 0 + + res = capsys.readouterr() + assert res.out + assert not res.err + + +def test_run_sodar_ingest_nothing(capsys): + parser, _subparsers = setup_argparse() + + with pytest.raises(SystemExit) as e: + parser.parse_args(["sodar", "ingest"]) + + assert e.value.code == 2 + + res = capsys.readouterr() + assert not res.out + assert res.err + + +@pytest.fixture +def fake_filesystem(fs): + yield fs + + +@pytest.fixture +def ingest(fs): + fs.create_dir(Path.home().joinpath(".irods")) + fs.create_file(Path.home().joinpath(".irods", "irods_environment.json")) + + argv = [ + "--recursive", + "--sodar-url", + "sodar_url", + "--sodar-api-token", + "token", + "testdir", + "target", + ] + + parser = ArgumentParser() + SodarIngest.setup_argparse(parser) + args = parser.parse_args(argv) + + obj = SodarIngest(args) + obj.lz_irods_path = "/irodsZone" + obj.target_coll = "targetCollection" + return obj + + +def test_sodar_ingest_build_file_list(fs, caplog): + class DummyArgs(object): + pass + + fs.create_symlink("/not_existing", "/broken_link") + fs.create_symlink("/loop_src", "/loop_src2") + fs.create_symlink("/loop_src2", "/loop_src") + + args = DummyArgs() + args.sources = ["broken_link", "not_here", "loop_src", "testdir", "testdir", "file5"] + args.recursive = True + args.exclude = ["file4", "file5"] + dummy = MagicMock() + args_mock = PropertyMock(return_value=args) + type(dummy).args = args_mock + + fs.create_dir("/testdir/subdir") + fs.create_file("/testdir/file1") + fs.create_file("/testdir/file1.md5") + fs.create_file("/testdir/subdir/file2") + fs.create_file("/file3") + fs.create_file("/testdir/file4") + fs.create_file("/file5") + fs.create_symlink("/testdir/file3", "/file3") + + paths = SodarIngest.build_file_list(dummy) + + # Sources + assert "File not found: broken_link" in caplog.messages + assert "File not found: not_here" in caplog.messages + assert "Symlink loop: loop_src" in caplog.messages + + # Files + assert {"spath": Path("/testdir/file1"), "ipath": Path("file1")} in paths + assert { + "spath": Path("/testdir/file1.md5"), + "ipath": Path("file1.md5"), + } not in paths + assert { + "spath": Path("/testdir/subdir/file2"), + "ipath": Path("subdir/file2"), + } in paths + assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths + + # Re-run without recursive search + args.recursive = False + paths = SodarIngest.build_file_list(dummy) + assert {"spath": Path("/testdir/file1"), "ipath": Path("file1")} in paths + assert { + "spath": Path("/testdir/file1.md5"), + "ipath": Path("file1.md5"), + } not in paths + assert { + "spath": Path("/testdir/subdir/file2"), + "ipath": Path("subdir/file2"), + } not in paths + assert {"spath": Path("/testdir/file3"), "ipath": Path("file3")} in paths + assert {"spath": Path("/testdir/file4"), "ipath": Path("file4")} not in paths + assert {"spath": Path("file5"), "ipath": Path("file5")} not in paths + + +@patch("cubi_tk.sodar.ingest.sorted") +@patch("cubi_tk.sodar.ingest.compute_md5_checksum", return_value="5555") +@patch("pathlib.Path.stat") +@patch("cubi_tk.sodar.ingest.TransferJob") +def test_sodar_ingest_build_jobs(mockjob, mockstats, mockmd5, mocksorted, ingest): + paths = [ + {"spath": Path("myfile.csv"), "ipath": Path("dest_dir/myfile.csv")}, + {"spath": Path("folder/file.csv"), "ipath": Path("dest_dir/folder/file.csv")}, + ] + mockstats().st_size = 1024 + + ingest.build_jobs(paths) + for p in paths: + mockjob.assert_any_call( + path_src=str(p["spath"]), + path_dest=f"{ingest.lz_irods_path}/{ingest.target_coll}/{str(p['ipath'])}", + bytes=1024, + md5="5555", + ) + + +@patch("cubi_tk.sodar.ingest.init_irods") +@patch("cubi_tk.sodar.ingest.api.landingzone.retrieve") +def test_sodar_ingest_smoketest(mockapi, mocksession, fs): + class DummyAPI(object): + pass + + class DummyColl(object): + pass + + fs.create_dir(Path.home().joinpath(".irods")) + fs.create_file(Path.home().joinpath(".irods", "irods_environment.json")) + + fs.create_dir("/source/subdir") + fs.create_dir("/target/coll/") + fs.create_file("/source/file1") + fs.create_file("/source/subdir/file2") + lz_uuid = "f46b4fc3-0927-449d-b725-9ffed231507b" + argv = [ + "sodar", + "ingest", + "--sodar-url", + "sodar_url", + "--sodar-api-token", + "token", + "--collection", + "coll", + "--yes", + "--recursive", + "source", + lz_uuid, + ] + + # Test cancel no invalid LZ + api_return = DummyAPI() + api_return.status = "DELETED" + api_return.irods_path = "target" + mockapi.return_value = api_return + + with pytest.raises(SystemExit): + main(argv) + mockapi.assert_called_with( + sodar_url="sodar_url", sodar_api_token="token", landingzone_uuid=lz_uuid + ) + + # Test calls when LZ is active + api_return.status = "ACTIVE" + dcoll = DummyColl() + dcoll.subcollections = [ + DummyColl(), + ] + dcoll.subcollections[0].name = "coll" + mocksession.return_value.collections.get.return_value = dcoll + mocksession.return_value.collections.create = MagicMock(wraps=os.mkdir) + + main(argv) + assert call().collections.get("target") in mocksession.mock_calls + assert call().collections.create("target/coll/subdir") in mocksession.mock_calls + + # TODO: more assertions, but I don't know how to query this out of the mock... + # assert Path("/target/coll/file1").exists() + # assert Path("/target/coll/file1.md5").exists() + assert Path("/target/coll/subdir/").exists() + # assert Path("/target/coll/subdir/file2.md5").exists()