Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Common functions for interfacing with python-irodsclient #202

Merged
merged 31 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
95c76ab
feat: Common functions for interfacing with python-irodsclient
sellth Nov 7, 2023
6b72d9d
fix tests
sellth Nov 7, 2023
2636c8c
fix chksum test
sellth Nov 7, 2023
779d402
move all basic irods logic into new class iRODSCommon
sellth Nov 8, 2023
1f58f7e
adapt tests to new class
sellth Nov 8, 2023
da40f44
iRODSTransfer is now a child of iRODSCommon
sellth Nov 8, 2023
d8dbbbf
flake8
sellth Nov 8, 2023
064a1e6
custom irods_env_path support; pass along kwargs
sellth Nov 8, 2023
7837b2a
flake8
sellth Nov 8, 2023
32aaa5f
make internal variables private
sellth Nov 8, 2023
f36efff
cleanup
sellth Nov 8, 2023
523bc28
improved docstrings
sellth Nov 9, 2023
c62faf1
TransferJob computes file size during init
sellth Nov 17, 2023
f66cc42
flake8
sellth Nov 17, 2023
e2bda03
using attrs import instead of attr shadow
sellth Nov 17, 2023
9d480c9
show file counts in progress bar
sellth Nov 17, 2023
179aee5
add recursive put
sellth Nov 17, 2023
4b0e1d2
use proper context manager
sellth Nov 17, 2023
4bde914
add sync mode; check for existing files on remote
sellth Nov 23, 2023
538e9a8
do not clean up session too soon
sellth Nov 27, 2023
41c8e12
prepare TransferJob to also handle gets
sellth Nov 27, 2023
5df1d7e
add get method
sellth Nov 27, 2023
205c6e4
restructure session creation; check valid token every time
sellth Nov 28, 2023
27e54da
create irods sessions dynamically
sellth Nov 28, 2023
f1e5105
also check for timed out session token
sellth Nov 28, 2023
78e01a1
make session available as property
sellth Nov 28, 2023
e1625a8
linting
sellth Nov 28, 2023
1a58949
remove irods session context manager and multiple sessions
sellth Nov 28, 2023
d6f6300
improve logging during chksum
sellth Dec 8, 2023
cb5d10a
increase session timeout
sellth Dec 8, 2023
bab45a0
better error handling
sellth Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 203 additions & 0 deletions cubi_tk/irods_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from contextlib import contextmanager
import getpass
import os.path
from pathlib import Path
import sys
from typing import Iterable

import attr
from irods.exception import CAT_INVALID_AUTHENTICATION, PAM_AUTH_PASSWORD_FAILED
from irods.password_obfuscation import encode
from irods.session import NonAnonymousLoginWithoutPassword, iRODSSession
import logzero
from logzero import logger
from tqdm import tqdm

# no-frills logger
formatter = logzero.LogFormatter(fmt="%(message)s")
output_logger = logzero.setup_logger(formatter=formatter)

NUM_PARALLEL_SESSIONS = 4


@attr.s(frozen=True, auto_attribs=True)
class TransferJob:
"""Encodes a transfer job from the local file system to the remote iRODS collection."""

#: Source path.
path_src: str

#: Destination path.
path_dest: str

#: Number of bytes to transfer.
bytes: int


class iRODSCommon:
"""
Implementation of common iRODS utility functions.

:param ask: Confirm with user before certain actions.
:type ask: bool, optional
:param irods_env_path: Path to irods_environment.json
:type irods_env_path: pathlib.Path, optional
"""

def __init__(self, ask: bool = False, irods_env_path: Path = None):
# Path to iRODS environment file
if irods_env_path is None:
self.irods_env_path = Path.home().joinpath(".irods", "irods_environment.json")
else:
self.irods_env_path = irods_env_path
self.ask = ask
self._check_auth()

@staticmethod
def get_irods_error(e: Exception):
"""Return logger friendly iRODS exception."""
es = str(e)
return es if es and es != "None" else e.__class__.__name__

def _init_irods(self) -> iRODSSession:
"""Connect to iRODS."""
try:
session = iRODSSession(irods_env_file=self.irods_env_path)
session.connection_timeout = 300
return session
except Exception as e: # pragma: no cover
logger.error(f"iRODS connection failed: {self.get_irods_error(e)}")
raise

def _check_auth(self):
"""Check auth status and login if needed."""
try:
self._init_irods().server_version
return 0
except NonAnonymousLoginWithoutPassword as e: # pragma: no cover
logger.info(self.get_irods_error(e))
pass
except CAT_INVALID_AUTHENTICATION: # pragma: no cover
logger.warning("Problem with your session token.")
self.irods_env_path.parent.joinpath(".irodsA").unlink()
pass
sellth marked this conversation as resolved.
Show resolved Hide resolved

# No valid .irodsA file. Query user for password.
attempts = 0
while attempts < 3:
try:
session = iRODSSession(
irods_env_file=self.irods_env_path,
password=getpass.getpass(prompt="Please enter SODAR password:"),
)
token = session.pam_pw_negotiated
session.cleanup()
break
except PAM_AUTH_PASSWORD_FAILED: # pragma: no cover
if attempts < 2:
logger.warning("Wrong password. Please try again.")
attempts += 1
continue
else:
logger.error("iRODS connection failed.")
sys.exit(1)
except Exception as e: # pragma: no cover
logger.error(f"iRODS connection failed: {self.get_irods_error(e)}")
sys.exit(1)

if self.ask and input(
"Save iRODS session for passwordless operation? [y/N] "
).lower().startswith("y"):
self._save_irods_token(token) # pragma: no cover
elif not self.ask:
self._save_irods_token(token)

def _save_irods_token(self, token: str):
"""Retrieve PAM temp auth token 'obfuscate' it and save to disk."""
irods_auth_path = self.irods_env_path.parent.joinpath(".irodsA")
irods_auth_path.parent.mkdir(parents=True, exist_ok=True)

if isinstance(token, list) and token:
irods_auth_path.write_text(encode(token[0]))
irods_auth_path.chmod(0o600)
else:
logger.warning("No token found to be saved.")

@contextmanager
def _get_irods_sessions(self, count=NUM_PARALLEL_SESSIONS):
if count < 1:
count = 1
irods_sessions = [self._init_irods() for _ in range(count)]
try:
yield irods_sessions
finally:
for irods in irods_sessions:
irods.cleanup()


class iRODSTransfer(iRODSCommon):
"""
Transfer files to iRODS.

:param jobs: Iterable of TransferJob objects
:type jobs: Union[list,tuple,dict,set]
"""

def __init__(self, jobs: Iterable[TransferJob], **kwargs):
super().__init__(**kwargs)
with self._get_irods_sessions(1) as s:
self.session = s[0] # TODO: use more sessions
self.__jobs = jobs
self.__total_bytes = sum([job.bytes for job in self.__jobs])
self.__destinations = [job.path_dest for job in self.__jobs]

@property
def jobs(self):
return self.__jobs

@property
def size(self):
return self.__total_bytes

@property
def destinations(self):
return self.__destinations

def put(self):
# Double tqdm for currently transferred file info
# TODO: add more parenthesis after python 3.10
with tqdm(
total=self.__total_bytes,
unit="B",
unit_scale=True,
unit_divisor=1024,
position=1,
) as t, tqdm(total=0, position=0, bar_format="{desc}", leave=False) as file_log:
for job in self.__jobs:
file_log.set_description_str(f"Current file: {job.path_src}")
try:
self.session.data_objects.put(job.path_src, job.path_dest)
t.update(job.bytes)
except Exception as e: # pragma: no cover
logger.error(f"Problem during transfer of {job.path_src}")
logger.error(self.get_irods_error(e))
sys.exit(1)
finally:
self.session.cleanup()
t.clear()

def chksum(self):
"""Compute remote md5 checksums for all jobs."""
common_prefix = os.path.commonpath(self.__destinations)
for job in self.__jobs:
if not job.path_src.endswith(".md5"):
sellth marked this conversation as resolved.
Show resolved Hide resolved
output_logger.info(Path(job.path_dest).relative_to(common_prefix))
try:
data_object = self.session.data_objects.get(job.path_dest)
if not data_object.checksum:
data_object.chksum()
except Exception as e: # pragma: no cover
logger.error("Problem during iRODS checksumming.")
logger.error(self.get_irods_error(e))
finally:
self.session.cleanup()
132 changes: 132 additions & 0 deletions tests/test_irods_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from pathlib import Path
import shutil
from unittest.mock import ANY, MagicMock, patch

import irods.exception
from irods.session import NonAnonymousLoginWithoutPassword
import pytest

from cubi_tk.irods_common import TransferJob, iRODSCommon, iRODSTransfer


@patch("cubi_tk.irods_common.iRODSSession")
def test_common_init(mocksession):
assert iRODSCommon().irods_env_path is not None
icommon = iRODSCommon(irods_env_path="a/b/c.json")
assert icommon.irods_env_path == "a/b/c.json"
assert type(iRODSCommon().ask) is bool


@patch("cubi_tk.irods_common.iRODSSession")
def test_get_irods_error(mocksession):
e = irods.exception.NetworkException()
assert iRODSCommon().get_irods_error(e) == "NetworkException"
e = irods.exception.NetworkException("Connection reset")
assert iRODSCommon().get_irods_error(e) == "Connection reset"


@patch("getpass.getpass")
@patch("cubi_tk.irods_common.iRODSSession")
def test_check_auth(mocksession, mockpass, fs):
fs.create_file(".irods/irods_environment.json")
password = "1234"

icommon = iRODSCommon()
mockpass.return_value = password
with patch.object(icommon, "_init_irods") as mockinit:
mockinit.side_effect = NonAnonymousLoginWithoutPassword()

# .irodsA not found, asks for password
icommon._check_auth()
mockpass.assert_called()
mocksession.assert_any_call(irods_env_file=ANY, password=password)

# .irodsA there, does not ask for password
mockpass.reset_mock()
mocksession.reset_mock()
icommon._check_auth()
mockpass.assert_not_called()
mocksession.assert_called_once()


@patch("cubi_tk.irods_common.encode", return_value="it works")
@patch("cubi_tk.irods_common.iRODSSession")
def test_save_irods_token(mocksession, mockencode, fs):
token = [
"secure",
]
icommon = iRODSCommon()
icommon.irods_env_path = Path("testdir/env.json")
icommon._save_irods_token(token=token)

assert icommon.irods_env_path.parent.joinpath(".irodsA").exists()
mockencode.assert_called_with("secure")


@patch("cubi_tk.irods_common.iRODSSession")
def test_init_irods(mocksession, fs):
fs.create_file(".irods/irods_environment.json")
fs.create_file(".irods/.irodsA")

iRODSCommon()._init_irods()
mocksession.assert_called()


@patch("cubi_tk.irods_common.iRODSSession")
def test_get_irods_sessions(mocksession):
with iRODSCommon()._get_irods_sessions(count=3) as sessions:
assert len(sessions) == 3
with iRODSCommon()._get_irods_sessions(count=-1) as sessions:
assert len(sessions) == 1


# Test iRODSTransfer #########
@pytest.fixture
def jobs():
return (
TransferJob(path_src="myfile.csv", path_dest="dest_dir/myfile.csv", bytes=123),
TransferJob(path_src="folder/file.csv", path_dest="dest_dir/folder/file.csv", bytes=1024),
)


@pytest.fixture
def itransfer(jobs):
with patch("cubi_tk.irods_common.iRODSSession"):
return iRODSTransfer(jobs)


def test_irods_transfer_init(jobs, itransfer):
assert itransfer.jobs == jobs
assert itransfer.size == sum([job.bytes for job in jobs])
assert itransfer.destinations == [job.path_dest for job in jobs]

with patch("cubi_tk.irods_common.iRODSSession"):
itransferc = iRODSTransfer(jobs=jobs, irods_env_path="a/b/c", ask=True)
assert itransferc.irods_env_path == "a/b/c"
assert itransferc.ask is True


def test_irods_transfer_put(fs, itransfer, jobs):
for job in jobs:
fs.create_file(job.path_src)
fs.create_dir(Path(job.path_dest).parent)

with patch.object(itransfer.session.data_objects, "put", wraps=shutil.copy):
itransfer.put()

for job in jobs:
assert Path(job.path_dest).exists()


def test_irods_transfer_chksum(itransfer):
with patch.object(itransfer.session.data_objects, "get") as mockget:
mock_data_object = MagicMock()
mockget.return_value = mock_data_object
mock_data_object.checksum = None
mock_data_object.chksum = MagicMock()

itransfer.chksum()

assert mock_data_object.chksum.call_count == len(itransfer.destinations)
for path in itransfer.destinations:
mockget.assert_any_call(path)
Loading