Skip to content

Commit

Permalink
moved remote file check to wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
sellth committed May 31, 2023
1 parent fb9ec2c commit 24c718d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 35 deletions.
35 changes: 5 additions & 30 deletions cubi_tk/irods_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@
from pathlib import Path
import sys
import tempfile
from typing import Tuple
from typing import Set

import attr
from irods.exception import (
CAT_INVALID_AUTHENTICATION,
PAM_AUTH_PASSWORD_FAILED,
DataObjectDoesNotExist,
)
from irods.exception import CAT_INVALID_AUTHENTICATION, PAM_AUTH_PASSWORD_FAILED
from irods.password_obfuscation import encode
from irods.session import iRODSSession
import logzero
Expand Down Expand Up @@ -122,7 +118,7 @@ class iRODSTransfer:
jobs -- a tuple of TransferJob objects
"""

def __init__(self, session: iRODSSession, jobs: Tuple[TransferJob, ...]):
def __init__(self, session: iRODSSession, jobs: Set[TransferJob]):
self.session = session
self.jobs = jobs
self.total_bytes = sum([job.bytes for job in self.jobs])
Expand All @@ -143,28 +139,6 @@ def put(self):
file_log.set_description_str(f"Current file: {job.path_src}")
job_name = Path(job.path_src).name

# check if remote file exists
try:
obj = self.session.data_objects.get(job.path_dest)
if obj.checksum == job.md5:
logger.debug(
f"File {job_name} already exists in iRODS with matching checksum. Skipping upload."
)
t.total -= job.bytes
t.refresh()
continue
elif not obj.checksum and obj.size == job.bytes:
logger.debug(
f"File {job_name} already exists in iRODS with matching file size. Skipping upload."
)
t.total -= job.bytes
t.refresh()
continue
except DataObjectDoesNotExist: # pragma: no cover
pass
finally:
self.session.cleanup()

# create temporary md5 files
hashpath = Path(temp_dir).joinpath(job_name + ".md5")
with hashpath.open("w", encoding="utf-8") as tmp:
Expand Down Expand Up @@ -192,7 +166,8 @@ def chksum(self):
output_logger.info(Path(job.path_dest).relative_to(common_prefix))
try:
data_object = self.session.data_objects.get(job.path_dest)
data_object.chksum()
if not data_object.checksum:
data_object.chksum()
except Exception as e: # pragma: no cover
logger.error("Problem during iRODS checksumming.")
logger.error(get_irods_error(e))
Expand Down
29 changes: 27 additions & 2 deletions cubi_tk/sodar/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import typing

import attr
from irods.exception import DataObjectDoesNotExist
import logzero
from logzero import logger
from sodar_cli import api
Expand Down Expand Up @@ -138,6 +139,9 @@ def execute(self):

# Build file list and add missing md5 files
source_paths = self.build_file_list()
if len(source_paths) == 0:
logger.info("Nothing to do. Quitting.")
sys.exit(1)

# Initiate iRODS session
irods_session = init_irods(self.irods_env_path, ask=not self.args.yes)
Expand Down Expand Up @@ -207,7 +211,28 @@ def execute(self):
# Build transfer jobs
jobs = self.build_jobs(source_paths)

# check if remote files exist
joblist = jobs.copy()
for job in joblist:
try:
obj = irods_session.data_objects.get(job.path_dest)
if not obj.checksum and self.args.remote_checksums:
obj.checksum = obj.chksum()
if obj.checksum == job.md5:
logger.info(
f"File {Path(job.path_dest).name} already exists in iRODS with matching checksum. Skipping upload."
)
jobs.remove(job)
except DataObjectDoesNotExist: # pragma: no cover
pass
finally:
irods_session.cleanup()

# Final go from user & transfer
if len(jobs) == 0:
logger.info("Nothing to do. Quitting.")
sys.exit(1)

itransfer = iRODSTransfer(irods_session, jobs)
total_bytes = itransfer.total_bytes
logger.info("Planning to transfer the following files:")
Expand Down Expand Up @@ -258,7 +283,7 @@ def build_file_list(self):
output_paths.append({"spath": src, "ipath": Path(src.name)})
return output_paths

def build_jobs(self, source_paths) -> typing.Tuple[TransferJob, ...]:
def build_jobs(self, source_paths) -> typing.Set[TransferJob]:
"""Build file transfer jobs."""

transfer_jobs = []
Expand All @@ -273,7 +298,7 @@ def build_jobs(self, source_paths) -> typing.Tuple[TransferJob, ...]:
)
)

return tuple(sorted(transfer_jobs))
return set(sorted(transfer_jobs))


def setup_argparse(parser: argparse.ArgumentParser) -> None:
Expand Down
5 changes: 2 additions & 3 deletions tests/test_irods_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ def test_irods_transfer_put(fs, itransfer, jobs):
fs.create_file(job.path_src)
fs.create_dir(Path(job.path_dest).parent)

with patch.object(itransfer.session.data_objects, "get"):
with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy):
itransfer.put()
with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy):
itransfer.put()

for job in jobs:
assert Path(job.path_dest).exists()
Expand Down

0 comments on commit 24c718d

Please sign in to comment.