-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #18312 from jdavcs/dev_data_access3
More data access tests, some refactoring and cleanup
- Loading branch information
Showing
31 changed files
with
1,012 additions
and
688 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from sqlalchemy import ( | ||
asc, | ||
false, | ||
func, | ||
not_, | ||
or_, | ||
select, | ||
true, | ||
) | ||
|
||
from galaxy.model import ( | ||
Library, | ||
LibraryPermissions, | ||
) | ||
|
||
|
||
def get_library_ids(session, library_access_action): | ||
stmt = select(LibraryPermissions.library_id).where(LibraryPermissions.action == library_access_action).distinct() | ||
return session.scalars(stmt) | ||
|
||
|
||
def get_library_permissions_by_role(session, role_ids): | ||
stmt = select(LibraryPermissions).where(LibraryPermissions.role_id.in_(role_ids)) | ||
return session.scalars(stmt) | ||
|
||
|
||
def get_libraries_for_admins(session, deleted): | ||
stmt = select(Library) | ||
if deleted is None: | ||
# Flag is not specified, do not filter on it. | ||
pass | ||
elif deleted: | ||
stmt = stmt.where(Library.deleted == true()) | ||
else: | ||
stmt = stmt.where(Library.deleted == false()) | ||
stmt = stmt.order_by(asc(func.lower(Library.name))) | ||
return session.scalars(stmt) | ||
|
||
|
||
def get_libraries_for_nonadmins(session, restricted_library_ids, accessible_restricted_library_ids): | ||
stmt = ( | ||
select(Library) | ||
.where(Library.deleted == false()) | ||
.where( | ||
or_( | ||
not_(Library.id.in_(restricted_library_ids)), | ||
Library.id.in_(accessible_restricted_library_ids), | ||
) | ||
) | ||
) | ||
stmt = stmt.order_by(asc(func.lower(Library.name))) | ||
return session.scalars(stmt) | ||
|
||
|
||
def get_libraries_by_name(session, name): | ||
stmt = select(Library).where(Library.deleted == false()).where(Library.name == name) | ||
return session.scalars(stmt) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from sqlalchemy import ( | ||
and_, | ||
false, | ||
select, | ||
) | ||
|
||
from galaxy.model import ( | ||
Role, | ||
UserRoleAssociation, | ||
) | ||
from galaxy.model.scoped_session import galaxy_scoped_session | ||
|
||
|
||
def get_npns_roles(session): | ||
""" | ||
non-private, non-sharing roles | ||
""" | ||
stmt = ( | ||
select(Role) | ||
.where(and_(Role.deleted == false(), Role.type != Role.types.PRIVATE, Role.type != Role.types.SHARING)) | ||
.order_by(Role.name) | ||
) | ||
return session.scalars(stmt) | ||
|
||
|
||
def get_private_user_role(user, session): | ||
stmt = ( | ||
select(Role) | ||
.where( | ||
and_( | ||
UserRoleAssociation.user_id == user.id, | ||
Role.id == UserRoleAssociation.role_id, | ||
Role.type == Role.types.PRIVATE, | ||
) | ||
) | ||
.distinct() | ||
) | ||
return session.execute(stmt).scalar_one_or_none() | ||
|
||
|
||
def get_roles_by_ids(session: galaxy_scoped_session, role_ids): | ||
stmt = select(Role).where(Role.id.in_(role_ids)) | ||
return session.scalars(stmt).all() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
from typing import Optional | ||
|
||
from sqlalchemy import ( | ||
false, | ||
func, | ||
or_, | ||
select, | ||
true, | ||
) | ||
|
||
from galaxy.model import User | ||
from galaxy.model.scoped_session import galaxy_scoped_session | ||
|
||
|
||
def get_users_by_ids(session: galaxy_scoped_session, user_ids): | ||
stmt = select(User).where(User.id.in_(user_ids)) | ||
return session.scalars(stmt).all() | ||
|
||
|
||
# The get_user_by_email and get_user_by_username functions may be called from | ||
# the tool_shed app, which has its own User model, which is different from | ||
# galaxy.model.User. In that case, the tool_shed user model should be passed as | ||
# the model_class argument. | ||
def get_user_by_email(session, email: str, model_class=User, case_sensitive=True): | ||
filter_clause = model_class.email == email | ||
if not case_sensitive: | ||
filter_clause = func.lower(model_class.email) == func.lower(email) | ||
stmt = select(model_class).where(filter_clause).limit(1) | ||
return session.scalars(stmt).first() | ||
|
||
|
||
def get_user_by_username(session, username: str, model_class=User): | ||
stmt = select(model_class).filter(model_class.username == username).limit(1) | ||
return session.scalars(stmt).first() | ||
|
||
|
||
def get_users_for_index( | ||
session, | ||
deleted: bool, | ||
f_email: Optional[str] = None, | ||
f_name: Optional[str] = None, | ||
f_any: Optional[str] = None, | ||
is_admin: bool = False, | ||
expose_user_email: bool = False, | ||
expose_user_name: bool = False, | ||
): | ||
stmt = select(User) | ||
if f_email and (is_admin or expose_user_email): | ||
stmt = stmt.where(User.email.like(f"%{f_email}%")) | ||
if f_name and (is_admin or expose_user_name): | ||
stmt = stmt.where(User.username.like(f"%{f_name}%")) | ||
if f_any: | ||
if is_admin: | ||
stmt = stmt.where(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) | ||
else: | ||
if expose_user_email and expose_user_name: | ||
stmt = stmt.where(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) | ||
elif expose_user_email: | ||
stmt = stmt.where(User.email.like(f"%{f_any}%")) | ||
elif expose_user_name: | ||
stmt = stmt.where(User.username.like(f"%{f_any}%")) | ||
if deleted: | ||
stmt = stmt.where(User.deleted == true()) | ||
else: | ||
stmt = stmt.where(User.deleted == false()) | ||
return session.scalars(stmt).all() |
Oops, something went wrong.