diff --git a/lib/galaxy/managers/hdcas.py b/lib/galaxy/managers/hdcas.py index ad5bcda58e8b..50fd82e3affb 100644 --- a/lib/galaxy/managers/hdcas.py +++ b/lib/galaxy/managers/hdcas.py @@ -8,6 +8,8 @@ import logging from typing import Dict +from sqlalchemy import select + from galaxy import model from galaxy.managers import ( annotatable, @@ -334,3 +336,12 @@ def serialize_job_state_summary(self, item, key, **context): def serialize_elements_datatypes(self, item, key, **context): extensions_set = item.dataset_dbkeys_and_extensions_summary[1] return list(extensions_set) + + +def get_hdca_by_name(session, name): + stmt = ( + select(model.HistoryDatasetCollectionAssociation) + .where(model.HistoryDatasetCollectionAssociation.name == name) + .limit(1) + ) + return session.scalars(stmt).first() diff --git a/lib/galaxy/managers/users.py b/lib/galaxy/managers/users.py index 0429e37007a0..813d8c96ef30 100644 --- a/lib/galaxy/managers/users.py +++ b/lib/galaxy/managers/users.py @@ -20,7 +20,9 @@ from sqlalchemy import ( and_, exc, + false, func, + or_, select, true, ) @@ -893,3 +895,35 @@ def get_user_by_email(session, email: str, model_class=User, case_sensitive=True def get_user_by_username(session, username: str, model_class=User): stmt = select(model_class).filter(model_class.username == username).limit(1) return session.scalars(stmt).first() + + +def get_users_for_index( + session, + deleted: bool, + f_email: Optional[str] = None, + f_name: Optional[str] = None, + f_any: Optional[str] = None, + is_admin: bool = False, + expose_user_email: bool = False, + expose_user_name: bool = False, +): + stmt = select(User) + if f_email and (is_admin or expose_user_email): + stmt = stmt.where(User.email.like(f"%{f_email}%")) + if f_name and (is_admin or expose_user_name): + stmt = stmt.where(User.username.like(f"%{f_name}%")) + if f_any: + if is_admin: + stmt = stmt.where(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) + else: + if expose_user_email and expose_user_name: + stmt = stmt.where(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) + elif expose_user_email: + stmt = stmt.where(User.email.like(f"%{f_any}%")) + elif expose_user_name: + stmt = stmt.where(User.username.like(f"%{f_any}%")) + if deleted: + stmt = stmt.where(User.deleted == true()) + else: + stmt = stmt.where(User.deleted == false()) + return session.scalars(stmt).all() diff --git a/lib/galaxy/model/security.py b/lib/galaxy/model/security.py index 7a761d562edb..f048e2b3e319 100644 --- a/lib/galaxy/model/security.py +++ b/lib/galaxy/model/security.py @@ -80,22 +80,11 @@ def sort_by_attr(self, seq, attr): intermed.sort() return [_[-1] for _ in intermed] - def _get_npns_roles(self, trans): - """ - non-private, non-sharing roles - """ - stmt = ( - select(Role) - .where(and_(Role.deleted == false(), Role.type != Role.types.PRIVATE, Role.type != Role.types.SHARING)) - .order_by(Role.name) - ) - return trans.sa_session.scalars(stmt) - def get_all_roles(self, trans, cntrller): admin_controller = cntrller in ["library_admin"] roles = set() if not trans.user: - return self._get_npns_roles(trans) + return get_npns_roles(trans.sa_session) if admin_controller: # The library is public and the user is an admin, so all roles are legitimate stmt = select(Role).where(Role.deleted == false()).order_by(Role.name) @@ -108,7 +97,7 @@ def get_all_roles(self, trans, cntrller): for role in self.get_sharing_roles(trans.user): roles.add(role) # Add all remaining non-private, non-sharing roles - for role in self._get_npns_roles(trans): + for role in get_npns_roles(trans.sa_session): roles.add(role) return self.sort_by_attr(list(roles), "name") @@ -189,7 +178,7 @@ def get_valid_roles(self, trans, item, query=None, page=None, page_limit=None, i for role in self.get_sharing_roles(trans.user): roles.append(role) # Add all remaining non-private, non-sharing roles - for role in self._get_npns_roles(trans): + for role in get_npns_roles(trans.sa_session): roles.append(role) # User will see all the roles derived from the access roles on the item else: @@ -765,23 +754,9 @@ def create_private_user_role(self, user): return self.get_private_user_role(user) def get_private_user_role(self, user, auto_create=False): - stmt = ( - select(Role) - .where( - and_( - UserRoleAssociation.user_id == user.id, - Role.id == UserRoleAssociation.role_id, - Role.type == Role.types.PRIVATE, - ) - ) - .distinct() - ) - role = self.sa_session.execute(stmt).scalar_one_or_none() - if not role: - if auto_create: - return self.create_private_user_role(user) - else: - return None + role = get_private_user_role(user, self.sa_session) + if not role and auto_create: + role = self.create_private_user_role(user) return role def get_role(self, name, type=None): @@ -1695,3 +1670,27 @@ def _walk_action_roles(permissions, query_action): yield action, roles elif action == query_action.action and roles: yield action, roles + + +def get_npns_roles(session): + """ + non-private, non-sharing roles + """ + stmt = ( + select(Role) + .where(and_(Role.deleted == false(), Role.type != Role.types.PRIVATE, Role.type != Role.types.SHARING)) + .order_by(Role.name) + ) + return session.scalars(stmt) + + +def get_private_user_role(user, session): + stmt = select(Role).where( + and_( + UserRoleAssociation.user_id == user.id, + Role.id == UserRoleAssociation.role_id, + Role.type == Role.types.PRIVATE, + ) + ) + .distinct() + return session.execute(stmt).scalar_one_or_none() diff --git a/lib/galaxy/webapps/galaxy/services/users.py b/lib/galaxy/webapps/galaxy/services/users.py index 909a9898f193..18eb594c659b 100644 --- a/lib/galaxy/webapps/galaxy/services/users.py +++ b/lib/galaxy/webapps/galaxy/services/users.py @@ -4,13 +4,6 @@ Union, ) -from sqlalchemy import ( - false, - or_, - select, - true, -) - import galaxy.managers.base as managers_base from galaxy import ( exceptions as glx_exceptions, @@ -22,6 +15,7 @@ ProvidesUserContext, ) from galaxy.managers.users import ( + get_users_for_index, UserDeserializer, UserManager, UserSerializer, @@ -205,31 +199,11 @@ def get_index( f_name: Optional[str], f_any: Optional[str], ) -> List[Union[UserModel, LimitedUserModel]]: - rval = [] - stmt = select(User) - - if f_email and (trans.user_is_admin or trans.app.config.expose_user_email): - stmt = stmt.filter(User.email.like(f"%{f_email}%")) - - if f_name and (trans.user_is_admin or trans.app.config.expose_user_name): - stmt = stmt.filter(User.username.like(f"%{f_name}%")) - - if f_any: - if trans.user_is_admin: - stmt = stmt.filter(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) - else: - if trans.app.config.expose_user_email and trans.app.config.expose_user_name: - stmt = stmt.filter(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) - elif trans.app.config.expose_user_email: - stmt = stmt.filter(User.email.like(f"%{f_any}%")) - elif trans.app.config.expose_user_name: - stmt = stmt.filter(User.username.like(f"%{f_any}%")) - + # check for early return conditions if deleted: - # only admins can see deleted users if not trans.user_is_admin: + # only admins can see deleted users return [] - stmt = stmt.filter(User.deleted == true()) else: # special case: user can see only their own user # special case2: if the galaxy admin has specified that other user email/names are @@ -244,8 +218,19 @@ def get_index( return [item] else: return [] - stmt = stmt.filter(User.deleted == false()) - for user in trans.sa_session.scalars(stmt).all(): + + users = get_users_for_index( + trans.sa_session, + deleted, + f_email, + f_name, + f_any, + trans.user_is_admin, + trans.app.config.expose_user_email, + trans.app.config.expose_user_name, + ) + rval = [] + for user in users: item = user.to_dict() # If NOT configured to expose_email, do not expose email UNLESS the user is self, or # the user is an admin diff --git a/test/unit/data/data_access/__init__.py b/test/unit/data/data_access/__init__.py new file mode 100644 index 000000000000..4aa7a22a7513 --- /dev/null +++ b/test/unit/data/data_access/__init__.py @@ -0,0 +1,20 @@ +from collections import namedtuple + +PRIVATE_OBJECT_STORE_ID = "my_private_data" + +MockTransaction = namedtuple("MockTransaction", "user") + + +class MockObjectStore: + + def is_private(self, object): + if object.object_store_id == PRIVATE_OBJECT_STORE_ID: + return True + else: + return False + + +def verify_items(items1, length, items2=None): + assert len(items1) == length + if items2: + assert set(items2) == set(items1) diff --git a/test/unit/data/data_access/conftest.py b/test/unit/data/data_access/conftest.py new file mode 100644 index 000000000000..18e439b5b425 --- /dev/null +++ b/test/unit/data/data_access/conftest.py @@ -0,0 +1,352 @@ +import contextlib +import random +import string + +import pytest +from sqlalchemy import ( + create_engine, + text, +) +from sqlalchemy.orm import Session + +from galaxy import model as m +from galaxy.datatypes.registry import Registry as DatatypesRegistry +from galaxy.model.triggers.update_audit_table import install as install_timestamp_triggers +from . import MockObjectStore + +# utility fixtures + + +@contextlib.contextmanager +def transaction(session): + if not session.in_transaction(): + with session.begin(): + yield + else: + yield + + +@pytest.fixture(scope="module") +def engine(): + db_uri = "sqlite:///:memory:" + return create_engine(db_uri) + + +@pytest.fixture(autouse=True, scope="module") +def setup(engine): + m.mapper_registry.metadata.create_all(engine) + m.Dataset.object_store = MockObjectStore() # type:ignore[assignment] + datatypes_registry = DatatypesRegistry() + datatypes_registry.load_datatypes() + m.set_datatypes_registry(datatypes_registry) + install_timestamp_triggers(engine) + print("\nSETUP CALLED") + + +@pytest.fixture(autouse=True) +def teardown(engine): + """Delete all rows from all tables. Called after each test.""" + yield + with engine.begin() as conn: + for table in m.mapper_registry.metadata.tables: + stmt = text(f"DELETE FROM {table}") + conn.execute(stmt) + + +@pytest.fixture +def session(engine): + engine = engine + return Session(engine) + + +@pytest.fixture +def make_random_users(session, make_user): + def f(count): + return [make_user() for _ in range(count)] + + return f + + +# utility functions + + +def random_str(): + alphabet = string.ascii_lowercase + string.digits + size = random.randint(5, 10) + return "".join(random.choices(alphabet, k=size)) + + +def random_email(): + text = random_str() + return f"{text}@galaxy.testing" + + +# model fixture factories + + +@pytest.fixture +def make_dataset_collection(session): + def f(**kwd): + dc = m.DatasetCollection(**kwd) + with transaction(session): + session.add(dc) + session.commit() + return dc + + return f + + +@pytest.fixture +def make_dataset_collection_element(session, make_hda): + def f(**kwd): + kwd["element"] = kwd.get("element", make_hda()) + dce = m.DatasetCollectionElement(**kwd) + with transaction(session): + session.add(dce) + session.commit() + return dce + + return f + + +@pytest.fixture +def make_dataset_permissions(session): + def f(**kwd): + dp = m.DatasetPermissions(**kwd) + with transaction(session): + session.add(dp) + session.commit() + return dp + + return f + + +@pytest.fixture +def make_galaxy_session(session): + def f(**kwd): + gs = m.GalaxySession(**kwd) + with transaction(session): + session.add(gs) + session.commit() + return gs + + return f + + +@pytest.fixture +def make_history(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user", make_user()) + history = m.History(**kwd) + with transaction(session): + session.add(history) + session.commit() + return history + + return f + + +@pytest.fixture +def make_hda(session, make_history): + def f(**kwd): + kwd["history"] = kwd.get("history", make_history()) + hda = m.HistoryDatasetAssociation(**kwd) + with transaction(session): + session.add(hda) + session.commit() + return hda + + return f + + +@pytest.fixture +def make_hdca(session): + def f(**kwd): + hdca = m.HistoryDatasetCollectionAssociation(**kwd) + with transaction(session): + session.add(hdca) + session.commit() + return hdca + + return f + + +@pytest.fixture +def make_job(session): + def f(**kwd): + job = m.Job(**kwd) + with transaction(session): + session.add(job) + session.commit() + return job + + return f + + +@pytest.fixture +def make_ldca(session): + def f(**kwd): + ldca = m.LibraryDatasetCollectionAssociation(**kwd) + with transaction(session): + session.add(ldca) + session.commit() + return ldca + + return f + + +@pytest.fixture +def make_ldda(session): + def f(**kwd): + ldda = m.LibraryDatasetDatasetAssociation(**kwd) + with transaction(session): + session.add(ldda) + session.commit() + return ldda + + return f + + +@pytest.fixture +def make_library(session): + def f(**kwd): + lib = m.Library(**kwd) + with transaction(session): + session.add(lib) + session.commit() + return lib + + return f + + +@pytest.fixture +def make_library_folder(session): + def f(**kwd): + lib_folder = m.LibraryFolder(**kwd) + with transaction(session): + session.add(lib_folder) + session.commit() + return lib_folder + + return f + + +@pytest.fixture +def make_library_permissions(session, make_library, make_role): + def f(**kwd): + action = kwd.get("action", random_str()) + library = kwd.get("library", make_library()) + role = kwd.get("role", make_role()) + lp = m.LibraryPermissions(action, library, role) + with transaction(session): + session.add(lp) + session.commit() + return lp + + return f + + +@pytest.fixture +def make_page(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user", make_user()) + page = m.Page(**kwd) + with transaction(session): + session.add(page) + session.commit() + return page + + return f + + +@pytest.fixture +def make_role(session): + def f(**kwd): + role = m.Role(**kwd) + with transaction(session): + session.add(role) + session.commit() + return role + + return f + + +@pytest.fixture +def make_stored_workflow(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user", make_user()) + sw = m.StoredWorkflow(**kwd) + with transaction(session): + session.add(sw) + session.commit() + return sw + + return f + + +@pytest.fixture +def make_task(session, make_job): + def f(**kwd): + kwd["job"] = kwd.get("job", make_job()) + # Assumption: if the following args are needed, a test should supply them + kwd["working_directory"] = kwd.get("working_directory", random_str()) + kwd["prepare_files_cmd"] = kwd.get("prepare_files_cmd", random_str()) + task = m.Task(**kwd) + with transaction(session): + session.add(task) + session.commit() + return task + + return f + + +@pytest.fixture +def make_user(session): + def f(**kwd): + kwd["username"] = kwd.get("username", random_str()) + kwd["email"] = kwd.get("email", random_email()) + kwd["password"] = kwd.get("password", random_str()) + user = m.User(**kwd) + with transaction(session): + session.add(user) + session.commit() + return user + + return f + + +@pytest.fixture +def make_user_item_rating_association(session): + def f(assoc_class, user, item, rating): + assoc = assoc_class(user, item, rating) + with transaction(session): + session.add(assoc) + session.commit() + return assoc + + return f + + +@pytest.fixture +def make_user_role_association(session): + def f(user, role): + assoc = m.UserRoleAssociation(user, role) + with transaction(session): + session.add(assoc) + session.commit() + return assoc + + return f + + +@pytest.fixture +def make_visualization(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user", make_user()) + vis = m.Visualization(**kwd) + with transaction(session): + session.add(vis) + session.commit() + return vis + + return f diff --git a/test/unit/data/data_access/test_library.py b/test/unit/data/data_access/test_library.py new file mode 100644 index 000000000000..74ac9c3ecebe --- /dev/null +++ b/test/unit/data/data_access/test_library.py @@ -0,0 +1,57 @@ +from galaxy.managers import libraries as lib +from . import verify_items + + +def test_get_library_ids(session, make_library, make_library_permissions): + l1, l2, l3, l4 = make_library(), make_library(), make_library(), make_library() + make_library_permissions(library=l1, action="a") + make_library_permissions(library=l2, action="b") + make_library_permissions(library=l3, action="b") + make_library_permissions(library=l3, action="b") # intentional duplicate + make_library_permissions(library=l4, action="c") + + ids = lib.get_library_ids(session, "b").all() + verify_items(ids, 2, (l2.id, l3.id)) + + +def test_get_library_permissions_by_role(session, make_role, make_library_permissions): + r1, r2 = make_role(), make_role() + make_library_permissions() + make_library_permissions() + make_library_permissions(role=r1) + make_library_permissions(role=r2) + lps = lib.get_library_permissions_by_role(session, (r1.id, r2.id)).all() + + lp_roles = [lp.role for lp in lps] + verify_items(lp_roles, 2, (r1, r2)) + + +def test_get_libraries_for_admins(session, make_library): + libs = [make_library() for _ in range(5)] + libs[0].deleted = True + libs[1].deleted = True + libs[2].deleted = False + libs[3].deleted = False + libs[4].deleted = False + + libs_deleted = lib.get_libraries_for_admins(session, True).all() + verify_items(libs_deleted, 2, (libs[0], libs[1])) + + libs_not_deleted = lib.get_libraries_for_admins(session, False).all() + verify_items(libs_not_deleted, 3, (libs[2], libs[3], libs[4])) + + libs_all = lib.get_libraries_for_admins(session, None).all() + verify_items(libs_all, 5, libs) + # TODO: verify sorted by lib name, case insensitive + + +def test_get_libraries_for_non_admins(session, make_library): + libs = [make_library() for _ in range(6)] + restricted_ids = (libs[0].id, libs[1].id, libs[2].id, libs[3].id) + accessible_restricted_ids = (libs[2].id, libs[3].id) + libs[3].deleted = True + libs[4].deleted = True + # Expected ids: 2 (accessible restricted, not deleted), 5 (not deleted) + # Not returned: 1 (restricted), 3(deleted), 4(deleted) + allowed = lib.get_libraries_for_nonadmins(session, restricted_ids, accessible_restricted_ids).all() + verify_items(allowed, 2, (libs[2], libs[5])) diff --git a/test/unit/data/data_access/test_misc.py b/test/unit/data/data_access/test_misc.py new file mode 100644 index 000000000000..ff6335189284 --- /dev/null +++ b/test/unit/data/data_access/test_misc.py @@ -0,0 +1,330 @@ +import random + +import pytest +from sqlalchemy import inspect +from sqlalchemy.exc import IntegrityError + +from galaxy import model as m +from galaxy.managers import hdcas as lib +from . import ( + MockTransaction, + PRIVATE_OBJECT_STORE_ID, +) + +# test model definitions + + +class User: + def test_username_is_unique(self, make_user): + make_user(username="a") + with pytest.raises(IntegrityError): + make_user(username="a") + + +def test_history_update(make_history, make_hda, session): + h1 = make_history() + old_update_time = h1.update_time + + hda = make_hda(history=h1, create_dataset=True, sa_session=session) + # history updated due to hda insert + assert h1.update_time > old_update_time + + old_update_time = h1.update_time + hda.name = "new name" + session.add(hda) + session.commit() + # history updated due to hda update + assert h1.update_time > old_update_time + + old_update_time = h1.update_time + hda2 = hda.copy() + assert hda2 + # history NOT updated when hda copied + assert h1.update_time == old_update_time + + +def test_ratings( + make_user, + make_stored_workflow, + make_history, + make_page, + make_visualization, + make_hdca, + make_ldca, + make_user_item_rating_association, +): + def _test_rating(assoc_class, item, assoc_class_item_attr_name): + user = make_user() + rating = random.randint(0, 100) + rating_assoc = make_user_item_rating_association(assoc_class, user, item, rating) + assert rating_assoc.user == user + assert getattr(rating_assoc, assoc_class_item_attr_name) == item + assert rating_assoc.rating == rating + + _test_rating(m.StoredWorkflowRatingAssociation, make_stored_workflow(), "stored_workflow") + _test_rating(m.HistoryRatingAssociation, make_history(), "history") + _test_rating(m.PageRatingAssociation, make_page(), "page") + _test_rating(m.VisualizationRatingAssociation, make_visualization(), "visualization") + _test_rating(m.HistoryDatasetCollectionRatingAssociation, make_hdca(), "dataset_collection") + _test_rating(m.LibraryDatasetCollectionRatingAssociation, make_ldca(), "dataset_collection") + + +def test_hda_to_library_dataset_dataset_association(session, make_user, make_history, make_hda, make_library_folder): + hda = make_hda(create_dataset=True, sa_session=session) + target_folder = make_library_folder() + mock_trans = MockTransaction(user=None) + + ldda = hda.to_library_dataset_dataset_association( + trans=mock_trans, + target_folder=target_folder, + ) + assert target_folder.item_count == 1 + assert ldda.id + assert ldda.library_dataset.id + assert ldda.library_dataset.library_dataset_dataset_association.id + + new_ldda = hda.to_library_dataset_dataset_association( + trans=mock_trans, target_folder=target_folder, replace_dataset=ldda.library_dataset + ) + assert new_ldda.id != ldda.id + assert new_ldda.library_dataset_id == ldda.library_dataset_id + assert new_ldda.library_dataset.library_dataset_dataset_association_id == new_ldda.id + assert len(new_ldda.library_dataset.expired_datasets) == 1 + assert new_ldda.library_dataset.expired_datasets[0] == ldda + assert target_folder.item_count == 1 + + +def test_hda_to_library_dataset_dataset_association_fails_if_private( + session, make_user, make_history, make_hda, make_library_folder +): + hda = make_hda(create_dataset=True, sa_session=session) + hda.dataset.object_store_id = PRIVATE_OBJECT_STORE_ID + target_folder = make_library_folder() + mock_trans = MockTransaction(user=None) + + with pytest.raises(Exception) as exec_info: + hda.to_library_dataset_dataset_association( + trans=mock_trans, + target_folder=target_folder, + ) + assert m.CANNOT_SHARE_PRIVATE_DATASET_MESSAGE in str(exec_info.value) + + +def test_collection_get_interface(session, make_hda, make_dataset_collection): + c = make_dataset_collection(collection_type="list") + d = make_hda(create_dataset=True, sa_session=session) + elements = 100 + dces = [ + m.DatasetCollectionElement(collection=c, element=d, element_identifier=f"{i}", element_index=i) + for i in range(elements) + ] + for i in range(elements): + assert c[i] == dces[i] + + +def test_collections_in_histories(session, make_dataset_collection, make_dataset_collection_element, make_hdca): + c = make_dataset_collection(collection_type="pair") + dce1 = make_dataset_collection_element(collection=c, element_identifier="left") + dce2 = make_dataset_collection_element(collection=c, element_identifier="right") + make_hdca(name="foo", collection=c) + loaded_dataset_collection = lib.get_hdca_by_name(session, "foo").collection + + assert len(loaded_dataset_collection.elements) == 2 + assert loaded_dataset_collection.collection_type == "pair" + assert loaded_dataset_collection["left"] == dce1 + assert loaded_dataset_collection["right"] == dce2 + + +def test_dataset_action_tuples( + session, + make_user, + make_history, + make_hda, + make_role, + make_dataset_permissions, + make_dataset_collection, + make_dataset_collection_element, +): + role = make_role() + hda1 = make_hda(create_dataset=True, sa_session=session) + hda2 = make_hda(create_dataset=True, sa_session=session) + make_dataset_permissions(action="action1", dataset=hda1.dataset, role=role) + make_dataset_permissions(action=None, dataset=hda1.dataset, role=role) + make_dataset_permissions(action="action3", dataset=hda1.dataset, role=role) + c = make_dataset_collection(collection_type="type1") + make_dataset_collection_element(collection=c, element=hda1) + make_dataset_collection_element(collection=c, element=hda2) + assert c.dataset_action_tuples == [("action1", role.id), ("action3", role.id)] + + +def test_dataset_dbkeys_and_extensions_summary( + session, make_hda, make_dataset_collection, make_dataset_collection_element, make_hdca +): + d1 = make_hda(extension="bam", dbkey="hg19", create_dataset=True, sa_session=session) + d2 = make_hda(extension="txt", dbkey="hg19", create_dataset=True, sa_session=session) + c1 = make_dataset_collection(collection_type="paired") + make_dataset_collection_element(collection=c1, element=d1) + make_dataset_collection_element(collection=c1, element=d2) + + hdca = make_hdca(collection=c1) + assert hdca.dataset_dbkeys_and_extensions_summary[0] == {"hg19"} + assert hdca.dataset_dbkeys_and_extensions_summary[1] == {"bam", "txt"} + + +def test_populated_optimized_ok(session, make_dataset_collection, make_dataset_collection_element, make_hda): + c1 = make_dataset_collection(collection_type="paired") + make_dataset_collection_element(collection=c1, element=make_hda(create_dataset=True, sa_session=session)) + make_dataset_collection_element(collection=c1, element=make_hda(create_dataset=True, sa_session=session)) + assert c1.populated + assert c1.populated_optimized + + +def test_populated_optimized_empty_list_list_ok(make_dataset_collection, make_dataset_collection_element): + c1 = make_dataset_collection(collection_type="list") + c2 = make_dataset_collection(collection_type="list:list") + make_dataset_collection_element(collection=c2, element=c1) + assert c1.populated + assert c1.populated_optimized + assert c2.populated + assert c2.populated_optimized + + +def test_populated_optimized_list_list_not_populated(make_dataset_collection, make_dataset_collection_element): + c1 = make_dataset_collection(collection_type="list", populated=False) + c2 = make_dataset_collection(collection_type="list:list") + make_dataset_collection_element(collection=c2, element=c1) + assert not c1.populated + assert not c1.populated_optimized + assert not c2.populated + assert not c2.populated_optimized + + +def test_default_disk_usage(session, make_user): + u = make_user() + u.adjust_total_disk_usage(1, None) + user_reload = session.get(m.User, u.id) + assert user_reload.disk_usage == 1 + + +def test_history_contents(session, make_history, make_hda): + h1 = make_history() + d1 = make_hda(history=h1, name="1") + d2 = make_hda(history=h1, name="2", visible=False, create_dataset=True, sa_session=session) + d2.dataset.object_store_id = "foobar" + d3 = make_hda(history=h1, name="3", deleted=True, create_dataset=True, sa_session=session) + d3.dataset.object_store_id = "three_store" + d4 = make_hda(history=h1, name="4", visible=False, deleted=True) + + def contents_iter_names(**kwds): + history = session.get(m.History, h1.id) + return [h.name for h in history.contents_iter(**kwds)] + + assert contents_iter_names() == ["1", "2", "3", "4"] + assert contents_iter_names(deleted=False) == ["1", "2"] + assert contents_iter_names(visible=True) == ["1", "3"] + assert contents_iter_names(visible=True, object_store_ids=["three_store"]) == ["3"] + assert contents_iter_names(visible=False) == ["2", "4"] + assert contents_iter_names(deleted=True, visible=False) == ["4"] + assert contents_iter_names(deleted=False, object_store_ids=["foobar"]) == ["2"] + assert contents_iter_names(deleted=False, object_store_ids=["foobar2"]) == [] + assert contents_iter_names(ids=[d1.id, d2.id, d3.id, d4.id]) == ["1", "2", "3", "4"] + assert contents_iter_names(ids=[d1.id, d2.id, d3.id, d4.id], max_in_filter_length=1) == ["1", "2", "3", "4"] + assert contents_iter_names(ids=[d1.id, d3.id]) == ["1", "3"] + + +def test_current_galaxy_session(make_user, make_galaxy_session): + user = make_user() + galaxy_session = make_galaxy_session(user=user) + assert user.current_galaxy_session == galaxy_session + new_galaxy_session = make_galaxy_session() + user.galaxy_sessions.append(new_galaxy_session) + assert user.current_galaxy_session == new_galaxy_session + + +def test_next_hid(make_history): + h = make_history() + assert h.hid_counter == 1 + h._next_hid() + assert h.hid_counter == 2 + h._next_hid(n=3) + assert h.hid_counter == 5 + + +def test_history_hid_counter_is_expired_after_next_hid_call(make_history): + h = make_history() + state = inspect(h) + assert h.hid_counter == 1 + assert "hid_counter" not in state.unloaded + assert "id" not in state.unloaded + + h._next_hid() + + assert "hid_counter" in state.unloaded # this attribute has been expired + assert "id" not in state.unloaded # but other attributes have NOT been expired + assert h.hid_counter == 2 # check this last: this causes this hid_counter to be reloaded + + +def test_get_display_name(make_ldda, make_hda, make_history, make_library, make_library_folder): + + def assert_display_name_converts_to_unicode(item, name): + assert isinstance(item.get_display_name(), str) + assert item.get_display_name() == name + + ldda = make_ldda(name="ldda_name") + assert_display_name_converts_to_unicode(ldda, "ldda_name") + + hda = make_hda(name="hda_name") + assert_display_name_converts_to_unicode(hda, "hda_name") + + history = make_history(name="history_name") + assert_display_name_converts_to_unicode(history, "history_name") + + library = make_library(name="library_name") + assert_display_name_converts_to_unicode(library, "library_name") + + library_folder = make_library_folder(name="library_folder") + assert_display_name_converts_to_unicode(library_folder, "library_folder") + + history = make_history(name="Hello₩◎ґʟⅾ") + assert isinstance(history.name, str) + assert isinstance(history.get_display_name(), str) + assert history.get_display_name() == "Hello₩◎ґʟⅾ" + + +def test_metadata_spec(make_hda): + metadata = dict(chromCol=1, startCol=2, endCol=3) + d = make_hda(extension="interval", metadata=metadata) + assert d.metadata.chromCol == 1 + assert d.metadata.anyAttribute is None + assert "items" not in d.metadata + + +def test_job_metrics(make_job): + job = make_job() + job.add_metric("gx", "galaxy_slots", 5) + job.add_metric("system", "system_name", "localhost") + + assert len(job.text_metrics) == 1 + assert job.text_metrics[0].plugin == "system" + assert job.text_metrics[0].metric_name == "system_name" + assert job.text_metrics[0].metric_value == "localhost" + assert len(job.numeric_metrics) == 1 + assert job.numeric_metrics[0].plugin == "gx" + assert job.numeric_metrics[0].metric_name == "galaxy_slots" + assert job.numeric_metrics[0].metric_value == 5 + + +def test_task_metrics(make_task): + task = make_task() + task.add_metric("foo", "some-name", "some-value") + big_value = ":".join(f"{i}" for i in range(2000)) + task.add_metric("env", "BIG_PATH", big_value) + + assert len(task.text_metrics) == 2 + assert task.text_metrics[0].plugin == "foo" + assert task.text_metrics[0].metric_name == "some-name" + assert task.text_metrics[0].metric_value == "some-value" + assert task.text_metrics[1].plugin == "env" + assert task.text_metrics[1].metric_name == "BIG_PATH" + # Ensure big values truncated + assert len(task.text_metrics[1].metric_value) <= 1023 diff --git a/test/unit/data/data_access/test_role.py b/test/unit/data/data_access/test_role.py new file mode 100644 index 000000000000..922aba3beb23 --- /dev/null +++ b/test/unit/data/data_access/test_role.py @@ -0,0 +1,30 @@ +from galaxy import model as m +from galaxy.model.security import ( + get_npns_roles, + get_private_user_role, +) +from . import verify_items + + +def test_get_npns_roles(session, make_role): + make_role(deleted=True) + make_role(type=m.Role.types.PRIVATE) + make_role(type=m.Role.types.SHARING) + r4 = make_role() + r5 = make_role() + + roles = get_npns_roles(session).all() + verify_items(roles, 2, (r4, r5)) + + +def test_get_private_user_role(session, make_user, make_role, make_user_role_association): + u1, u2 = make_user(), make_user() + r1 = make_role(type=m.Role.types.PRIVATE) + r2 = make_role(type=m.Role.types.PRIVATE) + r3 = make_role() + make_user_role_association(u1, r1) # user1 private + make_user_role_association(u1, r3) # user1 non-private + make_user_role_association(u2, r2) # user2 private + + role = get_private_user_role(u1, session) + assert role is r1 diff --git a/test/unit/data/data_access/test_user.py b/test/unit/data/data_access/test_user.py new file mode 100644 index 000000000000..f27b61258834 --- /dev/null +++ b/test/unit/data/data_access/test_user.py @@ -0,0 +1,81 @@ +from galaxy.managers import users as lib +from . import verify_items + + +def test_get_user_by_username(session, make_user, make_random_users): + make_random_users(3) + my_user = make_user(username="a") + + user = lib.get_user_by_username(session, "a") + assert user is my_user + + +def test_get_user_by_email(session, make_user, make_random_users): + make_random_users(3) + my_user = make_user(email="a@foo.bar") + + user = lib.get_user_by_email(session, "a@foo.bar") + assert user is my_user + + +def test_get_users_by_ids(session, make_random_users): + users = make_random_users(10) + u1, u2, u3 = users[0], users[3], users[7] # select some random users + ids = [u1.id, u2.id, u3.id] + + users2 = lib.get_users_by_ids(session, ids) + verify_items(users2, 3, (u1, u2, u3)) + + +def test_get_users_for_index(session, make_user): + u1 = make_user(email="a", username="b") + u2 = make_user(email="c", username="d") + u3 = make_user(email="e", username="f") + u4 = make_user(email="g", username="h") + u5 = make_user(email="i", username="z") + u6 = make_user(email="z", username="i") + + users = lib.get_users_for_index(session, False, f_email="a", expose_user_email=True) + verify_items(users, 1, [u1]) + users = lib.get_users_for_index(session, False, f_email="c", is_admin=True) + verify_items(users, 1, [u2]) + users = lib.get_users_for_index(session, False, f_name="f", expose_user_name=True) + verify_items(users, 1, [u3]) + users = lib.get_users_for_index(session, False, f_name="h", is_admin=True) + verify_items(users, 1, [u4]) + users = lib.get_users_for_index(session, False, f_any="i", is_admin=True) + verify_items(users, 2, [u5, u6]) + users = lib.get_users_for_index(session, False, f_any="i", expose_user_email=True, expose_user_name=True) + verify_items(users, 2, [u5, u6]) + users = lib.get_users_for_index(session, False, f_any="i", expose_user_email=True) + verify_items(users, 1, [u5]) + users = lib.get_users_for_index(session, False, f_any="i", expose_user_name=True) + verify_items(users, 1, [u6]) + + u1.deleted = True + users = lib.get_users_for_index(session, True) + verify_items(users, 1, [u1]) + + +# TODO: factor out +# def test_get_users_by_role(session, make_user, make_role, make_user_role_association): +# user1, user2, user3 = make_user(), make_user(), make_user() +# role1, role2, role3 = make_role(), make_role(), make_role() +# make_user_role_association(user1, role1) +# make_user_role_association(user2, role1) +# make_user_role_association(user2, role2) +# make_user_role_association(user3, role2) +# +# role1_users = lib.get_users_by_role(session, role1) +# role2_users = lib.get_users_by_role(session, role2) +# role3_users = lib.get_users_by_role(session, role3) +# verify_items(role1_users, 2, (user1, user2)) +# verify_items(role2_users, 2, (user2, user3)) +# verify_items(role3_users, 0) + + +# TODO: factor out from model +# def test_email_exists(session, make_user): +# make_user(email="a@foo.bar") +# assert lib.email_exists(session, "a@foo.bar") +# assert not lib.email_exists(session, "b@foo.bar") diff --git a/test/unit/data/test_galaxy_mapping.py b/test/unit/data/test_galaxy_mapping.py index 5cb87936b71a..f5e5c69f26fe 100644 --- a/test/unit/data/test_galaxy_mapping.py +++ b/test/unit/data/test_galaxy_mapping.py @@ -1,4 +1,3 @@ -import collections import os import random import uuid @@ -76,189 +75,6 @@ def expunge(cls): class TestMappings(BaseModelTestCase): - def test_ratings(self): - user_email = "rater@example.com" - u = model.User(email=user_email, password="password") - self.persist(u) - - def persist_and_check_rating(rating_class, item): - rating = 5 - rating_association = rating_class(u, item, rating) - self.persist(rating_association) - self.expunge() - stored_rating = self.model.session.scalars(select(rating_class)).all()[0] - assert stored_rating.rating == rating - assert stored_rating.user.email == user_email - - sw = model.StoredWorkflow() - add_object_to_object_session(sw, u) - sw.user = u - self.persist(sw) - persist_and_check_rating(model.StoredWorkflowRatingAssociation, sw) - - h = model.History(name="History for Rating", user=u) - self.persist(h) - persist_and_check_rating(model.HistoryRatingAssociation, h) - - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h, create_dataset=True, sa_session=self.model.session - ) - self.persist(d1) - persist_and_check_rating(model.HistoryDatasetAssociationRatingAssociation, d1) - - page = model.Page() - page.user = u - self.persist(page) - persist_and_check_rating(model.PageRatingAssociation, page) - - visualization = model.Visualization() - visualization.user = u - self.persist(visualization) - persist_and_check_rating(model.VisualizationRatingAssociation, visualization) - - dataset_collection = model.DatasetCollection(collection_type="paired") - history_dataset_collection = model.HistoryDatasetCollectionAssociation(collection=dataset_collection) - self.persist(history_dataset_collection) - persist_and_check_rating(model.HistoryDatasetCollectionRatingAssociation, history_dataset_collection) - - library_dataset_collection = model.LibraryDatasetCollectionAssociation(collection=dataset_collection) - self.persist(library_dataset_collection) - persist_and_check_rating(model.LibraryDatasetCollectionRatingAssociation, library_dataset_collection) - - def test_display_name(self): - def assert_display_name_converts_to_unicode(item, name): - assert isinstance(item.get_display_name(), str) - assert item.get_display_name() == name - - ldda = model.LibraryDatasetDatasetAssociation(name="ldda_name") - assert_display_name_converts_to_unicode(ldda, "ldda_name") - - hda = model.HistoryDatasetAssociation(name="hda_name") - assert_display_name_converts_to_unicode(hda, "hda_name") - - history = model.History(name="history_name") - assert_display_name_converts_to_unicode(history, "history_name") - - library = model.Library(name="library_name") - assert_display_name_converts_to_unicode(library, "library_name") - - library_folder = model.LibraryFolder(name="library_folder") - assert_display_name_converts_to_unicode(library_folder, "library_folder") - - history = model.History(name="Hello₩◎ґʟⅾ") - - assert isinstance(history.name, str) - assert isinstance(history.get_display_name(), str) - assert history.get_display_name() == "Hello₩◎ґʟⅾ" - - def test_hda_to_library_dataset_dataset_association(self): - model = self.model - u = self.model.User(email="mary@example.com", password="password") - h1 = model.History(name="History 1", user=u) - hda = model.HistoryDatasetAssociation( - name="hda_name", create_dataset=True, history=h1, sa_session=model.session - ) - self.persist(hda) - trans = collections.namedtuple("trans", "user") - target_folder = model.LibraryFolder(name="library_folder") - ldda = hda.to_library_dataset_dataset_association( - trans=trans(user=u), - target_folder=target_folder, - ) - assert target_folder.item_count == 1 - assert ldda.id - assert ldda.library_dataset.id - assert ldda.library_dataset_id - assert ldda.library_dataset.library_dataset_dataset_association - assert ldda.library_dataset.library_dataset_dataset_association_id - library_dataset_id = ldda.library_dataset_id - replace_dataset = ldda.library_dataset - new_ldda = hda.to_library_dataset_dataset_association( - trans=trans(user=u), target_folder=target_folder, replace_dataset=replace_dataset - ) - assert new_ldda.id != ldda.id - assert new_ldda.library_dataset_id == library_dataset_id - assert new_ldda.library_dataset.library_dataset_dataset_association_id == new_ldda.id - assert len(new_ldda.library_dataset.expired_datasets) == 1 - assert new_ldda.library_dataset.expired_datasets[0] == ldda - assert target_folder.item_count == 1 - - def test_hda_to_library_dataset_dataset_association_fails_if_private(self): - model = self.model - u = model.User(email="mary2@example.com", password="password") - h1 = model.History(name="History 1", user=u) - hda = model.HistoryDatasetAssociation( - name="hda_name", create_dataset=True, history=h1, sa_session=model.session - ) - hda.dataset.object_store_id = PRIVATE_OBJECT_STORE_ID - self.persist(hda) - trans = collections.namedtuple("trans", "user") - target_folder = model.LibraryFolder(name="library_folder") - with pytest.raises(Exception) as exec_info: - hda.to_library_dataset_dataset_association( - trans=trans(user=u), - target_folder=target_folder, - ) - assert galaxy.model.CANNOT_SHARE_PRIVATE_DATASET_MESSAGE in str(exec_info.value) - - def test_tags(self): - TAG_NAME = "Test Tag" - my_tag = model.Tag(name=TAG_NAME) - u = model.User(email="tagger@example.com", password="password") - self.persist(my_tag, u) - - def tag_and_test(taggable_object, tag_association_class): - q = select(tag_association_class).join(model.Tag).where(model.Tag.name == TAG_NAME) - - assert len(self.model.session.execute(q).all()) == 0 - - tag_association = tag_association_class() - tag_association.tag = my_tag - taggable_object.tags = [tag_association] - self.persist(tag_association, taggable_object) - - assert len(self.model.session.execute(q).all()) == 1 - - sw = model.StoredWorkflow(user=u) - tag_and_test(sw, model.StoredWorkflowTagAssociation) - - h = model.History(name="History for Tagging", user=u) - tag_and_test(h, model.HistoryTagAssociation) - - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h, create_dataset=True, sa_session=self.model.session - ) - tag_and_test(d1, model.HistoryDatasetAssociationTagAssociation) - - page = model.Page(user=u) - tag_and_test(page, model.PageTagAssociation) - - visualization = model.Visualization(user=u) - tag_and_test(visualization, model.VisualizationTagAssociation) - - dataset_collection = model.DatasetCollection(collection_type="paired") - history_dataset_collection = model.HistoryDatasetCollectionAssociation(collection=dataset_collection) - tag_and_test(history_dataset_collection, model.HistoryDatasetCollectionTagAssociation) - - library_dataset_collection = model.LibraryDatasetCollectionAssociation(collection=dataset_collection) - tag_and_test(library_dataset_collection, model.LibraryDatasetCollectionTagAssociation) - - def test_collection_get_interface(self): - u = model.User(email="mary@example.com", password="password") - h1 = model.History(name="History 1", user=u) - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - c1 = model.DatasetCollection(collection_type="list") - elements = 100 - dces = [ - model.DatasetCollectionElement(collection=c1, element=d1, element_identifier=f"{i}", element_index=i) - for i in range(elements) - ] - self.persist(u, h1, d1, c1, *dces, commit=False, expunge=False) - self.model.session.flush() - for i in range(elements): - assert c1[i] == dces[i] def test_dataset_instance_order(self) -> None: u = model.User(email="mary@example.com", password="password") @@ -307,71 +123,6 @@ def test_dataset_instance_order(self) -> None: assert all(d.name == f"forward_{i}" for i, d in enumerate(forward_hdas)) assert all(d.name == f"reverse_{i}" for i, d in enumerate(reverse_hdas)) - def test_collections_in_histories(self): - u = model.User(email="mary@example.com", password="password") - h1 = model.History(name="History 1", user=u) - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - d2 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - - c1 = model.DatasetCollection(collection_type="pair") - hc1 = model.HistoryDatasetCollectionAssociation(history=h1, collection=c1, name="HistoryCollectionTest1") - - dce1 = model.DatasetCollectionElement(collection=c1, element=d1, element_identifier="left") - dce2 = model.DatasetCollectionElement(collection=c1, element=d2, element_identifier="right") - - self.persist(u, h1, d1, d2, c1, hc1, dce1, dce2) - - stmt = ( - select(model.HistoryDatasetCollectionAssociation) - .filter(model.HistoryDatasetCollectionAssociation.name == "HistoryCollectionTest1") - .limit(1) - ) - loaded_dataset_collection = self.model.session.scalars(stmt).first().collection - assert len(loaded_dataset_collection.elements) == 2 - assert loaded_dataset_collection.collection_type == "pair" - assert loaded_dataset_collection["left"] == dce1 - assert loaded_dataset_collection["right"] == dce2 - - def test_collections_in_library_folders(self): - u = model.User(email="mary2@example.com", password="password") - lf = model.LibraryFolder(name="RootFolder") - library = model.Library(name="Library1", root_folder=lf) - ld1 = model.LibraryDataset() - ld2 = model.LibraryDataset() - - ldda1 = model.LibraryDatasetDatasetAssociation(extension="txt", library_dataset=ld1) - ldda2 = model.LibraryDatasetDatasetAssociation(extension="txt", library_dataset=ld1) - - c1 = model.DatasetCollection(collection_type="pair") - dce1 = model.DatasetCollectionElement(collection=c1, element=ldda1) - dce2 = model.DatasetCollectionElement(collection=c1, element=ldda2) - self.persist(u, library, lf, ld1, ld2, c1, ldda1, ldda2, dce1, dce2) - - # TODO: - # loaded_dataset_collection = self.query( model.DatasetCollection ).filter( model.DatasetCollection.name == "LibraryCollectionTest1" ).first() - # assert len(loaded_dataset_collection.datasets) == 2 - # assert loaded_dataset_collection.collection_type == "pair" - - def test_dataset_action_tuples(self): - u = model.User(email="foo", password="foo") - h1 = model.History(user=u) - hda1 = model.HistoryDatasetAssociation(history=h1, create_dataset=True, sa_session=self.model.session) - hda2 = model.HistoryDatasetAssociation(history=h1, create_dataset=True, sa_session=self.model.session) - r1 = model.Role() - dp1 = model.DatasetPermissions(action="action1", dataset=hda1.dataset, role=r1) - dp2 = model.DatasetPermissions(action=None, dataset=hda1.dataset, role=r1) - dp3 = model.DatasetPermissions(action="action3", dataset=hda1.dataset, role=r1) - c1 = model.DatasetCollection(collection_type="type1") - dce1 = model.DatasetCollectionElement(collection=c1, element=hda1) - dce2 = model.DatasetCollectionElement(collection=c1, element=hda2) - self.model.session.add_all([u, h1, hda1, hda2, r1, dp1, dp2, dp3, c1, dce1, dce2]) - self.model.session.flush() - assert c1.dataset_action_tuples == [("action1", r1.id), ("action3", r1.id)] - def test_nested_collection_attributes(self): u = model.User(email="mary2@example.com", password="password") h1 = model.History(name="History 1", user=u) @@ -460,213 +211,6 @@ def test_nested_collection_attributes(self): ] assert c4.dataset_elements == [dce1, dce2] - def test_dataset_dbkeys_and_extensions_summary(self): - u = model.User(email="mary2@example.com", password="password") - h1 = model.History(name="History 1", user=u) - d1 = model.HistoryDatasetAssociation( - extension="bam", dbkey="hg19", history=h1, create_dataset=True, sa_session=self.model.session - ) - d2 = model.HistoryDatasetAssociation( - extension="txt", dbkey="hg19", history=h1, create_dataset=True, sa_session=self.model.session - ) - c1 = model.DatasetCollection(collection_type="paired") - dce1 = model.DatasetCollectionElement(collection=c1, element=d1, element_identifier="forward", element_index=0) - dce2 = model.DatasetCollectionElement(collection=c1, element=d2, element_identifier="reverse", element_index=1) - hdca = model.HistoryDatasetCollectionAssociation(collection=c1, history=h1) - self.model.session.add_all([d1, d2, c1, dce1, dce2, hdca]) - self.model.session.flush() - assert hdca.dataset_dbkeys_and_extensions_summary[0] == {"hg19"} - assert hdca.dataset_dbkeys_and_extensions_summary[1] == {"bam", "txt"} - - def test_populated_optimized_ok(self): - u = model.User(email="mary2@example.com", password="password") - h1 = model.History(name="History 1", user=u) - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - d2 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - c1 = model.DatasetCollection(collection_type="paired") - dce1 = model.DatasetCollectionElement(collection=c1, element=d1, element_identifier="forward", element_index=0) - dce2 = model.DatasetCollectionElement(collection=c1, element=d2, element_identifier="reverse", element_index=1) - self.model.session.add_all([d1, d2, c1, dce1, dce2]) - self.model.session.flush() - assert c1.populated - assert c1.populated_optimized - - def test_populated_optimized_empty_list_list_ok(self): - c1 = model.DatasetCollection(collection_type="list") - c2 = model.DatasetCollection(collection_type="list:list") - dce1 = model.DatasetCollectionElement( - collection=c2, element=c1, element_identifier="empty_list", element_index=0 - ) - self.model.session.add_all([c1, c2, dce1]) - self.model.session.flush() - assert c1.populated - assert c1.populated_optimized - assert c2.populated - assert c2.populated_optimized - - def test_populated_optimized_list_list_not_populated(self): - c1 = model.DatasetCollection(collection_type="list") - c1.populated_state = False - c2 = model.DatasetCollection(collection_type="list:list") - dce1 = model.DatasetCollectionElement( - collection=c2, element=c1, element_identifier="empty_list", element_index=0 - ) - self.model.session.add_all([c1, c2, dce1]) - self.model.session.flush() - assert not c1.populated - assert not c1.populated_optimized - assert not c2.populated - assert not c2.populated_optimized - - def test_default_disk_usage(self): - u = model.User(email="disk_default@test.com", password="password") - self.persist(u) - u.adjust_total_disk_usage(1, None) - u_id = u.id - self.expunge() - user_reload = self.model.session.get(model.User, u_id) - assert user_reload.disk_usage == 1 - - def test_basic(self): - original_user_count = len(self.model.session.scalars(select(model.User)).all()) - - # Make some changes and commit them - u = model.User(email="james@foo.bar.baz", password="password") - h1 = model.History(name="History 1", user=u) - h2 = model.History(name=("H" * 1024)) - self.persist(u, h1, h2) - metadata = dict(chromCol=1, startCol=2, endCol=3) - d1 = model.HistoryDatasetAssociation( - extension="interval", metadata=metadata, history=h2, create_dataset=True, sa_session=self.model.session - ) - self.persist(d1) - - # Check - users = self.model.session.scalars(select(model.User)).all() - assert len(users) == original_user_count + 1 - user = [user for user in users if user.email == "james@foo.bar.baz"][0] - assert user.email == "james@foo.bar.baz" - assert user.password == "password" - assert len(user.histories) == 1 - assert user.histories[0].name == "History 1" - hists = self.model.session.scalars(select(model.History)).all() - hist0 = [history for history in hists if history.id == h1.id][0] - hist1 = [history for history in hists if history.id == h2.id][0] - assert hist0.name == "History 1" - assert hist1.name == ("H" * 255) - assert hist0.user == user - assert hist1.user is None - assert hist1.datasets[0].metadata.chromCol == 1 - # The filename test has moved to objectstore - # id = hist1.datasets[0].id - # assert hist1.datasets[0].file_name == os.path.join( "/tmp", *directory_hash_id( id ) ) + f"/dataset_{id}.dat" - # Do an update and check - hist1.name = "History 2b" - self.expunge() - hists = self.model.session.scalars(select(model.History)).all() - hist0 = [history for history in hists if history.name == "History 1"][0] - hist1 = [history for history in hists if history.name == "History 2b"][0] - assert hist0.name == "History 1" - assert hist1.name == "History 2b" - # gvk TODO need to ad test for GalaxySessions, but not yet sure what they should look like. - - def test_metadata_spec(self): - metadata = dict(chromCol=1, startCol=2, endCol=3) - d = model.HistoryDatasetAssociation(extension="interval", metadata=metadata, sa_session=self.model.session) - assert d.metadata.chromCol == 1 - assert d.metadata.anyAttribute is None - assert "items" not in d.metadata - - def test_dataset_job_relationship(self): - dataset = model.Dataset() - job = model.Job() - dataset.job = job - self.persist(job, dataset) - loaded_dataset = self.model.session.execute( - select(model.Dataset).filter(model.Dataset.id == dataset.id) - ).scalar_one() - assert loaded_dataset.job_id == job.id - - def test_jobs(self): - u = model.User(email="jobtest@foo.bar.baz", password="password") - job = model.Job() - job.user = u - job.tool_id = "cat1" - - self.persist(u, job) - - loaded_job = self.model.session.scalars(select(model.Job).filter(model.Job.user == u).limit(1)).first() - assert loaded_job.tool_id == "cat1" - - def test_job_metrics(self): - u = model.User(email="jobtest@foo.bar.baz", password="password") - job = model.Job() - job.user = u - job.tool_id = "cat1" - - job.add_metric("gx", "galaxy_slots", 5) - job.add_metric("system", "system_name", "localhost") - - self.persist(u, job) - - task = model.Task(job=job, working_directory="/tmp", prepare_files_cmd="split.sh") - task.add_metric("gx", "galaxy_slots", 5) - task.add_metric("system", "system_name", "localhost") - - big_value = ":".join(f"{i}" for i in range(2000)) - task.add_metric("env", "BIG_PATH", big_value) - self.persist(task) - # Ensure big values truncated - assert len(task.text_metrics[1].metric_value) <= 1023 - - def test_tasks(self): - u = model.User(email="jobtest@foo.bar.baz", password="password") - job = model.Job() - task = model.Task(job=job, working_directory="/tmp", prepare_files_cmd="split.sh") - job.user = u - self.persist(u, job, task) - - loaded_task = self.model.session.scalars(select(model.Task).filter(model.Task.job == job).limit(1)).first() - assert loaded_task.prepare_input_files_cmd == "split.sh" - - def test_history_contents(self): - u = model.User(email="contents@foo.bar.baz", password="password") - # gs = model.GalaxySession() - h1 = model.History(name="HistoryContentsHistory1", user=u) - - self.persist(u, h1, expunge=False) - - d1 = self.new_hda(h1, name="1") - d2 = self.new_hda(h1, name="2", visible=False, object_store_id="foobar") - d3 = self.new_hda(h1, name="3", deleted=True, object_store_id="three_store") - d4 = self.new_hda(h1, name="4", visible=False, deleted=True) - - self.session().flush() - - def contents_iter_names(**kwds): - history = self.model.session.scalars( - select(model.History).filter(model.History.name == "HistoryContentsHistory1").limit(1) - ).first() - return [hda.name for hda in history.contents_iter(**kwds)] - - assert contents_iter_names() == ["1", "2", "3", "4"] - assert contents_iter_names(deleted=False) == ["1", "2"] - assert contents_iter_names(visible=True) == ["1", "3"] - assert contents_iter_names(visible=True, object_store_ids=["three_store"]) == ["3"] - assert contents_iter_names(visible=False) == ["2", "4"] - assert contents_iter_names(deleted=True, visible=False) == ["4"] - assert contents_iter_names(deleted=False, object_store_ids=["foobar"]) == ["2"] - assert contents_iter_names(deleted=False, object_store_ids=["foobar2"]) == [] - - assert contents_iter_names(ids=[d1.id, d2.id, d3.id, d4.id]) == ["1", "2", "3", "4"] - assert contents_iter_names(ids=[d1.id, d2.id, d3.id, d4.id], max_in_filter_length=1) == ["1", "2", "3", "4"] - - assert contents_iter_names(ids=[d1.id, d3.id]) == ["1", "3"] - def test_history_audit(self): u = model.User(email="contents@foo.bar.baz", password="password") h1 = model.History(name="HistoryAuditHistory", user=u) @@ -722,17 +266,6 @@ def _non_empty_flush(self): session.add(lf) session.flush() - def test_current_session(self): - user = model.User(email="testworkflows@bx.psu.edu", password="password") - galaxy_session = model.GalaxySession() - galaxy_session.user = user - self.persist(user, galaxy_session) - assert user.current_galaxy_session == galaxy_session - new_galaxy_session = model.GalaxySession() - user.galaxy_sessions.append(new_galaxy_session) - self.persist(user, new_galaxy_session) - assert user.current_galaxy_session == new_galaxy_session - def test_flush_refreshes(self): # Normally I don't believe in unit testing library code, but the behaviors around attribute # states and flushing in SQL Alchemy is very subtle and it is good to have a executable @@ -1031,31 +564,6 @@ def test_can_manage_private_dataset(self): assert security_agent.can_manage_dataset(u_from.all_roles(), d1.dataset) assert not security_agent.can_manage_dataset(u_other.all_roles(), d1.dataset) - def test_history_hid_counter_is_expired_after_next_hid_call(self): - u = model.User(email="hid_abuser@example.com", password="password") - h = model.History(name="History for hid testing", user=u) - self.persist(u, h) - state = inspect(h) - assert h.hid_counter == 1 - assert "hid_counter" not in state.unloaded - assert "id" not in state.unloaded - - h._next_hid() - - assert "hid_counter" in state.unloaded # this attribute has been expired - assert "id" not in state.unloaded # but other attributes have NOT been expired - assert h.hid_counter == 2 # check this last: this causes thie hid_counter to be reloaded - - def test_next_hid(self): - u = model.User(email="hid_abuser@example.com", password="password") - h = model.History(name="History for hid testing", user=u) - self.persist(u, h) - assert h.hid_counter == 1 - h._next_hid() - assert h.hid_counter == 2 - h._next_hid(n=3) - assert h.hid_counter == 5 - def test_cannot_make_private_objectstore_dataset_public(self): security_agent = GalaxyRBACAgent(self.model) u_from, u_to, _ = self._three_users("cannot_make_private_public")