Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SODAR ingest command based on irods-pythonclient #168

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
73fa2b7
feat: generic ingest function based on irods pythonclient
sellth May 3, 2023
f21802c
add -y option alias --yes
sellth May 10, 2023
dec3528
make flake8 happy
sellth May 10, 2023
f0c3a97
extract iRODS functionality into separate file
sellth May 10, 2023
17e96ab
compute md5 always and generate temp file on upload
sellth May 10, 2023
a5803dc
adjusted log levels
sellth May 11, 2023
423ff88
add test for init_irods
sellth May 12, 2023
02fe93d
add test for get_irods_error
sellth May 12, 2023
a2325a0
add test for transfer job builder
sellth May 12, 2023
cabb2e7
add iinit-like behaviour when asking for password
sellth May 17, 2023
288cefc
test sodar ingest build file list
sellth May 19, 2023
db1fdbe
removed print statements in favour of logger
sellth May 19, 2023
fc0888a
make more clear that this is not about checksum mismatch
sellth May 19, 2023
38ec833
re-add test for transfer job builder
sellth May 19, 2023
381b7d6
add smoke check test for sodar ingest
sellth May 23, 2023
f2b8df5
remove unused code
sellth May 23, 2023
62745ff
isort
sellth May 23, 2023
ce3eb25
use more pathlib
sellth May 24, 2023
8005c8c
more no-coverage regions
sellth May 24, 2023
870a5d2
check for missing API token
sellth May 25, 2023
caa81d3
adjusted docstring
sellth May 25, 2023
5b54c7c
add documentation for sodar ingest command
sellth May 30, 2023
fb9ec2c
check if file already exists in iRODS and skip
sellth May 31, 2023
24c718d
moved remote file check to wrapper
sellth May 31, 2023
995e898
Clean exit code when nothing to do.
sellth Jun 1, 2023
5ddd459
upgrade python-irodsclient version
sellth Jun 2, 2023
3522371
add support for exclude patterns
sellth Jul 14, 2023
897b2d9
fix excludes
sellth Aug 2, 2023
3b0d1fd
sort file list for output
sellth Aug 2, 2023
67b7435
reworked hashsum logic
sellth Aug 2, 2023
e7df7b7
increased iRODS session timeout
sellth Aug 7, 2023
96a93e7
don't re-create already existing collections
sellth Sep 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cubi_tk/sodar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
Upload external files to SODAR
(defaults for fastq files).

``ingest``
Upload arbitrary files to SODAR

``check-remote``
Check if or which local files with md5 sums are already deposited in iRODs/Sodar

Expand All @@ -54,6 +57,7 @@
from .check_remote import setup_argparse as setup_argparse_check_remote
from .download_sheet import setup_argparse as setup_argparse_download_sheet
from .ingest_fastq import setup_argparse as setup_argparse_ingest_fastq
from .ingest import setup_argparse as setup_argparse_ingest
from .lz_create import setup_argparse as setup_argparse_lz_create
from .lz_list import setup_argparse as setup_argparse_lz_list
from .lz_move import setup_argparse as setup_argparse_lz_move
Expand Down Expand Up @@ -87,6 +91,7 @@ def setup_argparse(parser: argparse.ArgumentParser) -> None:
"ingest-fastq", help="Upload external files to SODAR (defaults for fastq)"
)
)
setup_argparse_ingest(subparsers.add_parser("ingest", help="Upload arbitrary files to SODAR"))
setup_argparse_check_remote(
subparsers.add_parser(
"check-remote", help="Compare local files with md5 sum against SODAR/iRODS"
Expand Down
366 changes: 366 additions & 0 deletions cubi_tk/sodar/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,366 @@
"""``cubi-tk sodar ingest``: add arbitrary files to SODAR"""

import argparse
import os
import sys
from pathlib import Path
import getpass
import typing
import attr

from irods.session import iRODSSession
from logzero import logger
from sodar_cli import api
from ..common import load_toml_config, is_uuid, compute_md5_checksum, sizeof_fmt
from ..snappy.itransfer_common import TransferJob
from tqdm import tqdm


@attr.s(frozen=True, auto_attribs=True)
class Config:
"""Configuration for the ingest command."""

config: str = attr.field(default=None)
sodar_server_url: str = attr.field(default=None)
sodar_api_token: str = attr.field(default=None, repr=lambda value: "***") # type: ignore


class SodarIngest:
sellth marked this conversation as resolved.
Show resolved Hide resolved
"""Implementation of sodar ingest command."""

def __init__(self, args):
# Command line arguments.
self.args = args

# Path to iRODS environment file
self.irods_env_path = Path(Path.home(), ".irods", "irods_environment.json")
if not self.irods_env_path.exists():
logger.error("iRODS environment file is missing.")
sys.exit(1)

# iRODS environment
self.irods_env = None

def _init_irods(self):
"""Connect to iRODS."""
irods_auth_path = self.irods_env_path.parent.joinpath(".irodsA")
if irods_auth_path.exists():
try:
session = iRODSSession(irods_env_file=self.irods_env_path)
session.server_version # check for outdated .irodsA file
except Exception as e:
logger.error("iRODS connection failed: %s", self.get_irods_error(e))
logger.error("Are you logged in? try 'iinit'")
sys.exit(1)
finally:
session.cleanup()
else:
# Query user for password.
logger.info("iRODS authentication file not found.")
password = getpass.getpass(prompt="Please enter SODAR password:")
try:
session = iRODSSession(irods_env_file=self.irods_env_path, password=password)
session.server_version # check for exceptions
except Exception as e:
logger.error("iRODS connection failed: %s", self.get_irods_error(e))
sys.exit(1)
finally:
session.cleanup()

return session

@classmethod
def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--hidden-cmd", dest="sodar_cmd", default=cls.run, help=argparse.SUPPRESS
)
group_sodar = parser.add_argument_group("SODAR-related")
group_sodar.add_argument(
"--sodar-url",
default=os.environ.get("SODAR_URL", "https://sodar.bihealth.org/"),
help="URL to SODAR, defaults to SODAR_URL environment variable or fallback to https://sodar.bihealth.org/",
)
group_sodar.add_argument(
"--sodar-api-token",
default=os.environ.get("SODAR_API_TOKEN", None),
help="SODAR API token. Defaults to SODAR_API_TOKEN environment variable.",
)
parser.add_argument(
"-r",
"--recursive",
default=False,
action="store_true",
help="Recursively match files in subdirectories. Creates iRODS sub-collections to match directory structure.",
)
parser.add_argument(
sellth marked this conversation as resolved.
Show resolved Hide resolved
"-c",
"--checksums",
default=False,
action="store_true",
help="Compute missing md5 checksums for source files.",
)
parser.add_argument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should definitely be TRUE by default

Copy link
Contributor Author

@sellth sellth May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I agree as this starts writing a bunch of files to your computer. I thought about creating temporary .md5 files and removing them during a cleanup step. Maybe that would be a better option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I placed this comment unclearly. I meant the following line here (validating the checksums in irods)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit hesitant to introduce too much "magic", in this case assuming that the user always wants to compute remote checksums right away. For large files this takes a significant amount of time. There's also an upcoming SODAR feature which would make this flag mostly irrelevant: bihealth/sodar-server/issues/1634

"-K",
"--remote-checksums",
default=False,
action="store_true",
help="Trigger checksum computation on the iRODS side.",
)
parser.add_argument(
"-y",
sellth marked this conversation as resolved.
Show resolved Hide resolved
default=False,
action="store_true",
help="Don't ask for permission prior to transfer.",
)
parser.add_argument(
"--collection", type=str, help="Target iRODS collection. Skips manual selection input."
)
parser.add_argument(
"sources", help="One or multiple files/directories to ingest.", nargs="+"
)
parser.add_argument("destination", help="UUID or iRODS path of SODAR landing zone.")

@classmethod
def get_irods_error(cls, e: Exception):
"""Return logger friendly iRODS exception."""
es = str(e)
return es if es != "None" else e.__class__.__name__

@classmethod
def run(
cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser
) -> typing.Optional[int]:
"""Entry point into the command."""
return cls(args).execute()

def execute(self):
"""Execute ingest."""
# Get SODAR API info
toml_config = load_toml_config(Config())

if not self.args.sodar_url:
self.args.sodar_url = toml_config.get("global", {}).get("sodar_server_url")
if not self.args.sodar_api_token:
self.args.sodar_api_token = toml_config.get("global", {}).get("sodar_api_token")

# Retrieve iRODS path if destination is UUID
if is_uuid(self.args.destination):
try:
lz_info = api.landingzone.retrieve(
sodar_url=self.args.sodar_url,
sodar_api_token=self.args.sodar_api_token,
landingzone_uuid=self.args.destination,
)
except Exception as e:
logger.error("Failed to retrieve landing zone information.")
logger.error(e)
sys.exit(1)

# TODO: Replace with status_locked check once implemented in sodar_cli
if lz_info.status in ["ACTIVE", "FAILED"]:
self.lz_irods_path = lz_info.irods_path
logger.info(f"Target iRods path: {self.lz_irods_path}")
else:
logger.info("Target landing zone is not ACTIVE.")
sys.exit(1)
else:
self.lz_irods_path = self.args.destination

# Build file list and add missing md5 files
source_paths = self.build_file_list()
sellth marked this conversation as resolved.
Show resolved Hide resolved
source_paths = self.process_md5_sums(source_paths)

# Initiate iRODS session
irods_session = self._init_irods()

# Query target collection
logger.info("Querying landing zone collections…")
collections = []
try:
coll = irods_session.collections.get(self.lz_irods_path)
for c in coll.subcollections:
collections.append(c.name)
except:
logger.error("Failed to query landing zone collections.")
sys.exit(1)
finally:
irods_session.cleanup()

if self.args.collection is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if one wants to upload to multiple or all available collections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I'm for keeping it simple and leave this kind of functionality to outside scripts or other, more specialised commands.

user_input = ""
input_valid = False
input_message = "####################\nPlease choose target collection:\n"
for index, item in enumerate(collections):
input_message += f"{index+1}) {item}\n"
input_message += "Select by number: "

while not input_valid:
user_input = input(input_message)
if user_input.isdigit():
user_input = int(user_input)
if 0 < user_input < len(collections):
input_valid = True

self.target_coll = collections[user_input - 1]

elif self.args.collection in collections:
self.target_coll = self.args.collection
else:
logger.error("Selected target collection does not exist in landing zone.")
sys.exit(1)

# Create sub-collections for folders
if self.args.recursive:
dirs = sorted(
{p["ipath"].parent for p in source_paths if not p["ipath"].parent == Path(".")}
)

logger.info("Planning to create the following sub-collections:")
for d in dirs:
print(f"{self.target_coll}/{str(d)}")
if not self.args.y:
if not input("Is this OK? [y/N] ").lower().startswith("y"):
logger.error("Aborting at your request.")
sys.exit(0)

for d in dirs:
coll_name = f"{self.lz_irods_path}/{self.target_coll}/{str(d)}"
try:
irods_session.collections.create(coll_name)
except Exception as e:
logger.error("Error creating sub-collection.")
logger.error(e)
sys.exit(1)
finally:
irods_session.cleanup()
logger.info("Sub-collections created.")

# Build transfer jobs
jobs = self.build_jobs(source_paths)

# Final go from user & transfer
total_bytes = sum([job.bytes for job in jobs])
logger.info("Planning to transfer the following files:")
for job in jobs:
print(job.path_src)
sellth marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"With a total size of {sizeof_fmt(total_bytes)}")
logger.info("Into this iRODS collection:")
print(f"{self.lz_irods_path}/{self.target_coll}/")

if not self.args.y:
if not input("Is this OK? [y/N] ").lower().startswith("y"):
logger.error("Aborting at your request.")
sys.exit(0)
# TODO: add more parenthesis after python 3.10
with tqdm(
total=total_bytes, unit="B", unit_scale=True, unit_divisor=1024, position=1
) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log:
for job in jobs:
try:
file_log.set_description_str(f"Current file: {job.path_src}")
irods_session.data_objects.put(job.path_src, job.path_dest)
t.update(job.bytes)
except:
logger.error("Problem during transfer of " + job.path_src)
raise
finally:
irods_session.cleanup()
t.clear()

logger.info("File transfer complete.")

# Compute server-side checksums
if self.args.remote_checksums:
logger.info("Computing server-side checksums.")
self.compute_irods_checksums(session=irods_session, jobs=jobs)

def build_file_list(self):
"""Build list of source files to transfer. iRODS paths are relative to target collection."""

source_paths = [Path(src) for src in self.args.sources]
output_paths = list()

for src in source_paths:
try:
abspath = src.resolve()
except:
logger.error("File not found:", src.name)
continue

if src.is_dir():
paths = abspath.glob("**/*" if self.args.recursive else "*")
for p in paths:
if p.is_file() and not p.suffix.lower() == ".md5":
output_paths.append({"spath": p, "ipath": p.relative_to(abspath)})
else:
output_paths.append({"spath": src, "ipath": Path(src.name)})
return output_paths

def process_md5_sums(self, paths):
"""Check input paths for corresponding .md5 file. Generate one if missing."""

output_paths = paths.copy()
for file in paths:
p = file["spath"]
i = file["ipath"].parent / (p.name + ".md5")
upper_path = p.parent / (p.name + ".MD5")
lower_path = p.parent / (p.name + ".md5")
if upper_path.exists() and lower_path.exists():
logger.info(
f"Found both {upper_path.name} and {lower_path.name} in {str(p.parent)}/"
)
logger.info("Removing upper case version.")
upper_path.unlink()
output_paths.append({"spath": lower_path, "ipath": i})
elif upper_path.exists():
logger.info(f"Found {upper_path.name} in {str(p.parent)}/")
logger.info("Renaming to " + lower_path.name)
upper_path.rename(upper_path.with_suffix(".md5"))
output_paths.append({"spath": lower_path, "ipath": i})
elif lower_path.exists():
output_paths.append({"spath": lower_path, "ipath": i})

if not lower_path.exists() and self.args.checksums:
with lower_path.open("w", encoding="utf-8") as file:
file.write(f"{compute_md5_checksum(p)} {p.name}")
output_paths.append({"spath": lower_path, "ipath": i})

return output_paths

def build_jobs(self, source_paths) -> typing.Tuple[TransferJob, ...]:
"""Build file transfer jobs."""

transfer_jobs = []

for p in source_paths:
transfer_jobs.append(
TransferJob(
path_src=str(p["spath"]),
path_dest=f"{self.lz_irods_path}/{self.target_coll}/{str(p['ipath'])}",
bytes=p["spath"].stat().st_size,
)
)

return tuple(sorted(transfer_jobs))

def compute_irods_checksums(self, session, jobs: typing.Tuple[TransferJob, ...]):
for job in jobs:
if not job.path_src.endswith(".md5"):
print(
sellth marked this conversation as resolved.
Show resolved Hide resolved
str(
Path(job.path_dest).relative_to(f"{self.lz_irods_path}/{self.target_coll}/")
)
)
try:
data_object = session.data_objects.get(job.path_dest)
data_object.chksum()
except Exception as e:
sellth marked this conversation as resolved.
Show resolved Hide resolved
logger.error("iRODS checksum error.")
logger.error(e)
finally:
session.cleanup()


def setup_argparse(parser: argparse.ArgumentParser) -> None:
"""Setup argument parser for ``cubi-tk sodar ingest``."""
return SodarIngest.setup_argparse(parser)
Loading