From 0d433833f84911860d9f5bb1761a4c80a08ffe10 Mon Sep 17 00:00:00 2001 From: olli <80164276+ollibowers@users.noreply.github.com> Date: Sun, 1 Sep 2024 15:41:38 +1000 Subject: [PATCH] fix(api): refactor utility functions to detangle dependencies and reduce coupling (#1189) * refactor(utility): removed reliance on get_course route function, created get_course_details in utility.py * refactor(utility): created term offering helpers to remove reliance on terms_offered route function * refactor(utility): created specialisation getter, refactored setupDegreeWizard to not rely on route functions * refactor(utility): created get_program_structure utility and moved out all dependencies from program routers * refactor(utility): moved out regex_search to utility.py * wip: move out user helpers into their own file * refactor(utility): moved out logic for and some graph helpers into utility.py * fix: switched up setup_degree_wizard so we can get a bit better type inference * fix: new line at end of user.py * fix: remove unused CODE_MAPPING cache constant * refactor: routers folder structure - move middleware.py into sessions utility folder * refactor: routers folder - rename auth_utility folder, move utility.py into it and rename * refactor: routers folder - move manual_fixes into utility folder * fix: remove my todo list --- backend/server/routers/auth.py | 10 +- backend/server/routers/courses.py | 92 +--- backend/server/routers/ctf.py | 7 +- backend/server/routers/model.py | 5 +- backend/server/routers/planner.py | 15 +- backend/server/routers/programs.py | 236 +---------- backend/server/routers/user.py | 155 ++----- backend/server/routers/utility.py | 66 --- .../{auth_utility => utility}/__init__.py | 0 backend/server/routers/utility/common.py | 394 ++++++++++++++++++ .../{ => routers/utility}/manual_fixes.py | 0 .../oidc/__init__.py | 0 .../oidc/constants.py | 0 .../{auth_utility => utility}/oidc/errors.py | 0 .../oidc/requests.py | 0 .../sessions/__init__.py | 0 .../sessions/errors.py | 0 .../sessions/interface.py | 0 .../sessions}/middleware.py | 4 +- backend/server/routers/utility/user.py | 92 ++++ 20 files changed, 573 insertions(+), 503 deletions(-) delete mode 100644 backend/server/routers/utility.py rename backend/server/routers/{auth_utility => utility}/__init__.py (100%) create mode 100644 backend/server/routers/utility/common.py rename backend/server/{ => routers/utility}/manual_fixes.py (100%) rename backend/server/routers/{auth_utility => utility}/oidc/__init__.py (100%) rename backend/server/routers/{auth_utility => utility}/oidc/constants.py (100%) rename backend/server/routers/{auth_utility => utility}/oidc/errors.py (100%) rename backend/server/routers/{auth_utility => utility}/oidc/requests.py (100%) rename backend/server/routers/{auth_utility => utility}/sessions/__init__.py (100%) rename backend/server/routers/{auth_utility => utility}/sessions/errors.py (100%) rename backend/server/routers/{auth_utility => utility}/sessions/interface.py (100%) rename backend/server/routers/{auth_utility => utility/sessions}/middleware.py (96%) create mode 100644 backend/server/routers/utility/user.py diff --git a/backend/server/routers/auth.py b/backend/server/routers/auth.py index c2e333b43..57e6c637f 100644 --- a/backend/server/routers/auth.py +++ b/backend/server/routers/auth.py @@ -10,12 +10,12 @@ from server.db.helpers.models import NotSetupUserStorage, GuestSessionInfoModel, RefreshToken, SessionID, SessionInfoModel, SessionOIDCInfoModel, SessionToken from server.db.helpers.users import delete_user, insert_new_user -from .auth_utility.sessions.errors import ExpiredRefreshTokenError, ExpiredSessionTokenError, OldRefreshTokenError -from .auth_utility.sessions.interface import create_new_guest_token_pair, get_session_info_from_refresh_token, get_session_info_from_session_token, logout_session, setup_new_csesoc_session, create_new_csesoc_token_pair, setup_new_guest_session +from .utility.sessions.errors import ExpiredRefreshTokenError, ExpiredSessionTokenError, OldRefreshTokenError +from .utility.sessions.interface import create_new_guest_token_pair, get_session_info_from_refresh_token, get_session_info_from_session_token, logout_session, setup_new_csesoc_session, create_new_csesoc_token_pair, setup_new_guest_session -from .auth_utility.middleware import HTTPBearer401, set_secure_cookie -from .auth_utility.oidc.requests import DecodedIDToken, exchange_and_validate, generate_oidc_auth_url, get_userinfo_and_validate, refresh_and_validate, revoke_token, validate_authorization_response -from .auth_utility.oidc.errors import OIDCInvalidGrant, OIDCInvalidToken, OIDCTokenError, OIDCValidationError +from .utility.sessions.middleware import HTTPBearer401, set_secure_cookie +from .utility.oidc.requests import DecodedIDToken, exchange_and_validate, generate_oidc_auth_url, get_userinfo_and_validate, refresh_and_validate, revoke_token, validate_authorization_response +from .utility.oidc.errors import OIDCInvalidGrant, OIDCInvalidToken, OIDCTokenError, OIDCValidationError REFRESH_TOKEN_COOKIE = f"{"__Host-" if SECURE_COOKIES else ""}refresh-token" diff --git a/backend/server/routers/courses.py b/backend/server/routers/courses.py index 427a4bbd0..8419d8b63 100644 --- a/backend/server/routers/courses.py +++ b/backend/server/routers/courses.py @@ -4,21 +4,21 @@ import pickle import re from contextlib import suppress -from typing import Annotated, Dict, List, Mapping, Optional, Set, Tuple +from typing import Annotated, Dict, List, Optional, Set, Tuple from algorithms.create_program import PROGRAM_RESTRICTIONS_PICKLE_FILE from algorithms.objects.program_restrictions import NoRestriction, ProgramRestriction from algorithms.objects.user import User -from data.config import ARCHIVED_YEARS, GRAPH_CACHE_FILE, LIVE_YEAR -from data.utility.data_helpers import read_data +from data.config import ARCHIVED_YEARS from fastapi import APIRouter, HTTPException, Security from fuzzywuzzy import fuzz # type: ignore -from server.routers.auth_utility.middleware import HTTPBearerToUserID +from server.routers.utility.sessions.middleware import HTTPBearerToUserID +from server.routers.utility.user import get_setup_user from server.db.mongo.conn import archivesDB, coursesCOL from server.routers.model import (CACHED_HANDBOOK_NOTE, CONDITIONS, CourseCodes, CourseDetails, CoursesPath, CoursesPathDict, CoursesState, CoursesUnlockedWhenTaken, ProgramCourses, TermsList, TermsOffered, UserData) -from server.routers.utility import get_core_courses, map_suppressed_errors +from server.routers.utility.common import get_core_courses, get_course_details, get_incoming_edges, get_legacy_course_details, get_program_structure, get_terms_offered_multiple_years router = APIRouter( prefix="/courses", @@ -29,9 +29,6 @@ # TODO: would prefer to initialise ALL_COURSES here but that fails on CI for some reason ALL_COURSES: Optional[Dict[str, str]] = None -CODE_MAPPING: Dict = read_data("data/utility/programCodeMappings.json")["title_to_code"] -GRAPH: Dict[str, Dict[str, List[str]]] = read_data(GRAPH_CACHE_FILE) -INCOMING_ADJACENCY: Dict[str, List[str]] = GRAPH.get("incoming_adjacency_list", {}) def fetch_all_courses() -> Dict[str, str]: """ @@ -68,7 +65,7 @@ def fix_user_data(userData: dict): if not isinstance(userData["courses"][course], list) ] filledInCourses = { - course: [get_course(course)['UOC'], userData["courses"][course]] + course: [get_course_details(course)['UOC'], userData["courses"][course]] for course in coursesWithoutUoc } userData["courses"].update(filledInCourses) @@ -167,27 +164,11 @@ def get_course(courseCode: str): - start with the current database - if not found, check the archives """ - result = coursesCOL.find_one({"code": courseCode}) - if not result: - for year in sorted(ARCHIVED_YEARS, reverse=True): - result = archivesDB[str(year)].find_one({"code": courseCode}) - if result is not None: - result.setdefault("raw_requirements", "") - result["is_legacy"] = True - break - else: - result["is_legacy"] = False + result = get_course_details(courseCode) - if not result: - raise HTTPException( - status_code=400, detail=f"Course code {courseCode} was not found" - ) - result.setdefault("school", None) result['is_accurate'] = CONDITIONS.get(courseCode) is not None result['handbook_note'] = CACHED_HANDBOOK_NOTE.get(courseCode, "") - del result["_id"] - with suppress(KeyError): - del result["exclusions"]["leftover_plaintext"] + return result @@ -227,16 +208,13 @@ def search(search_string: str, uid: Annotated[str, Security(require_uid)]) -> Di "COMP1531": "SoftEng Fundamentals", ……. } """ - # TODO: remove these because circular imports - from server.routers.user import get_setup_user - from server.routers.programs import get_structure all_courses = fetch_all_courses() user = get_setup_user(uid) specialisations = list(user['degree']['specs']) majors = list(filter(lambda x: x.endswith("1") or x.endswith("H"), specialisations)) minors = list(filter(lambda x: x.endswith("2"), specialisations)) - structure = get_structure(user['degree']['programCode'], "+".join(specialisations))['structure'] + structure = get_program_structure(user['degree']['programCode'], specs=specialisations)[0] top_results = sorted(all_courses.items(), reverse=True, key=lambda course: fuzzy_match(course, search_string) @@ -251,23 +229,6 @@ def search(search_string: str, uid: Annotated[str, Security(require_uid)]) -> Di return dict(weighted_results) -def regex_search(search_string: str) -> Mapping[str, str]: - """ - Uses the search string as a regex to match all courses with an exact pattern. - """ - - pat = re.compile(search_string, re.I) - courses = list(coursesCOL.find({"code": {"$regex": pat}})) - - # TODO: do we want to always include matching legacy courses (excluding duplicates)? - if not courses: - for year in sorted(ARCHIVED_YEARS, reverse=True): - courses = list(archivesDB[str(year)].find({"code": {"$regex": pat}})) - if courses: - break - - return {course["code"]: course["title"] for course in courses} - @router.post( "/getAllUnlocked/", @@ -364,11 +325,7 @@ def get_legacy_course(year: str, courseCode: str): Like /getCourse/ but for legacy courses in the given year. Returns information relating to the given course """ - result = archivesDB.get_collection(year).find_one({"code": courseCode}) - if not result: - raise HTTPException(status_code=400, detail="invalid course code or year") - del result["_id"] - result["is_legacy"] = False # not a legacy, assuming you know what you are doing + result = get_legacy_course_details(courseCode, year) return result @@ -453,10 +410,9 @@ def get_path_from(course: str) -> CoursesPathDict: fetches courses which can be used to satisfy 'course' eg 2521 -> 1511 """ - out: List[str] = list(INCOMING_ADJACENCY.get(course, [])) return { "original" : course, - "courses" : out, + "courses" : get_incoming_edges(course), } @@ -480,7 +436,7 @@ def courses_unlocked_when_taken(userData: UserData, courseToBeTaken: str) -> Dic ## initial state courses_initially_unlocked = unlocked_set(get_all_unlocked(userData)['courses_state']) ## add course to the user - userData.courses[courseToBeTaken] = [get_course(courseToBeTaken)['UOC'], None] + userData.courses[courseToBeTaken] = [get_course_details(courseToBeTaken)['UOC'], None] ## final state courses_now_unlocked = unlocked_set(get_all_unlocked(userData)['courses_state']) new_courses = courses_now_unlocked - courses_initially_unlocked @@ -542,14 +498,10 @@ def terms_offered(course: str, years:str) -> TermsOffered: fails: [(year, exception)] } """ - fails: list[tuple] = [] - terms = { - year: map_suppressed_errors(get_term_offered, fails, course, year) or [] - for year in years.split("+") - } + offerings, fails = get_terms_offered_multiple_years(course, years.split("+")) return { - "terms": terms, + "terms": offerings, "fails": fails, } @@ -643,22 +595,6 @@ def weight_course(course: tuple[str, str], search_term: str, structure: dict, ma return weight -def get_course_info(course: str, year: str | int = LIVE_YEAR): - """ - Returns the course info for the given course and year. - If no year is given, the current year is used. - If the year is not the LIVE_YEAR, then uses legacy information - """ - return get_course(course) if int(year) >= int(LIVE_YEAR) else get_legacy_course(str(year), course) - -def get_term_offered(course: str, year: int | str=LIVE_YEAR) -> list[str]: - """ - Returns the terms in which the given course is offered, for the given year. - If the year is from the future then, backfill the LIVE_YEAR's results - """ - year_to_fetch: int | str = LIVE_YEAR if int(year) > LIVE_YEAR else year - return get_course_info(course, year_to_fetch)['terms'] or [] - def get_program_restriction(program_code: Optional[str]) -> Optional[ProgramRestriction]: """ Returns the program restriction for the given program code. diff --git a/backend/server/routers/ctf.py b/backend/server/routers/ctf.py index a6a5d7ce8..e96509dc2 100644 --- a/backend/server/routers/ctf.py +++ b/backend/server/routers/ctf.py @@ -45,9 +45,8 @@ from typing import Annotated, Callable, Optional from fastapi import APIRouter, Security -from server.routers.auth_utility.middleware import HTTPBearerToUserID -from server.routers.user import get_setup_user -from server.routers.planner import convert_to_planner_data +from server.routers.utility.sessions.middleware import HTTPBearerToUserID +from server.routers.utility.user import get_setup_user from server.routers.model import ValidPlannerData router = APIRouter( @@ -255,6 +254,8 @@ def validate_ctf(uid: Annotated[str, Security(require_uid)]): """ Validates the CTF """ + from server.routers.planner import convert_to_planner_data # TODO: remove when converted + data = convert_to_planner_data(get_setup_user(uid)) passed: list[str] = [] flags: list[str] = [] diff --git a/backend/server/routers/model.py b/backend/server/routers/model.py index 92b50c7e3..5f72d94a2 100644 --- a/backend/server/routers/model.py +++ b/backend/server/routers/model.py @@ -189,8 +189,7 @@ def to_user(self) -> User: user.specialisations = self.specialisations[:] # prevent circular import; TODO: There has to be a better way - from server.routers.courses import get_course - from server.routers.utility import get_core_courses + from server.routers.utility.common import get_core_courses, get_course_details for year in self.plan: for term in year: @@ -198,7 +197,7 @@ def to_user(self) -> User: for course_name, course_value in term.items(): cleaned_term[course_name] = ( (course_value[0], course_value[1]) if course_value - else (get_course(course_name)["UOC"], None) # type: ignore + else (get_course_details(course_name)['UOC'], None) # type: ignore ) user.add_courses(cleaned_term) # get the cores of the user diff --git a/backend/server/routers/planner.py b/backend/server/routers/planner.py index 53f2f42ba..452201860 100644 --- a/backend/server/routers/planner.py +++ b/backend/server/routers/planner.py @@ -10,12 +10,11 @@ from algorithms.transcript import parse_transcript from algorithms.validate_term_planner import validate_terms from fastapi import APIRouter, HTTPException, Security, UploadFile -from server.routers.auth_utility.middleware import HTTPBearerToUserID -from server.routers.courses import get_course +from server.routers.utility.sessions.middleware import HTTPBearerToUserID +from server.routers.utility.user import get_setup_user, set_user from server.routers.model import (CourseCode, PlannedToTerm, PlannerData, ProgramTime, Storage, UnPlannedToTerm, ValidCoursesState, ValidPlannerData, markMap) -from server.routers.user import get_setup_user, set_user -from server.routers.utility import get_course_object +from server.routers.utility.common import get_course_details, get_course_object MIN_COMPLETED_COURSE_UOC = 6 @@ -81,12 +80,12 @@ def add_to_unplanned(data: CourseCode, uid: Annotated[str, Security(require_uid) if data.courseCode in user['courses'].keys() or data.courseCode in user['planner']['unplanned']: raise HTTPException(status_code=400, detail=f'{data.courseCode} is already planned.') - course = get_course(data.courseCode) # raises exception anyway when unfound + uoc = get_course_details(data.courseCode)['UOC'] # raises exception anyway when unfound user['planner']['unplanned'].append(data.courseCode) user['courses'][data.courseCode] = { 'code': data.courseCode, 'mark': None, - 'uoc': course['UOC'], + 'uoc': uoc, 'ignoreFromProgression': False } set_user(uid, user, True) @@ -110,7 +109,7 @@ def set_unplanned_course_to_term(data: UnPlannedToTerm, uid: Annotated[str, Secu HTTPException: Moving a multiterm course somewhere that would result in a course being placed before or after the planner """ - course = get_course(data.courseCode) + course = get_course_details(data.courseCode) uoc, terms = itemgetter('UOC', 'terms')(course) user = get_setup_user(uid) planner = user['planner'] @@ -163,7 +162,7 @@ def set_planned_course_to_term(data: PlannedToTerm, uid: Annotated[str, Security course being placed before or after the planner """ # pylint: disable=too-many-locals - course = get_course(data.courseCode) + course = get_course_details(data.courseCode) user = get_setup_user(uid) uoc, terms_offered, is_multiterm = itemgetter( diff --git a/backend/server/routers/programs.py b/backend/server/routers/programs.py index e534c81b7..b2581fcb8 100644 --- a/backend/server/routers/programs.py +++ b/backend/server/routers/programs.py @@ -1,19 +1,15 @@ """ API for fetching data about programs and specialisations """ import functools -import re from contextlib import suppress -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, cast +from typing import Any, Callable, Dict, List, Optional -from data.processors.models import CourseContainer, Program, ProgramContainer, Specialisation -from data.utility import data_helpers -from fastapi import APIRouter, HTTPException -from server.db.mongo.conn import programsCOL, specialisationsCOL -from server.manual_fixes import apply_manual_fixes -from server.routers.courses import get_path_from, regex_search +from fastapi import APIRouter +from server.db.mongo.conn import programsCOL +from server.routers.utility.manual_fixes import apply_manual_fixes from server.routers.model import (CourseCodes, Courses, CoursesPathDict, Graph, Programs, Structure, StructureContainer, StructureDict) -from server.routers.utility import get_core_courses, map_suppressed_errors +from server.routers.utility.common import add_program_code_details_to_structure, add_specialisations_to_structure, convert_adj_list_to_edge_list, get_core_courses, get_gen_eds, get_incoming_edges, get_program_structure, prune_edges router = APIRouter( prefix="/programs", @@ -80,89 +76,6 @@ def get_programs() -> dict[str, dict[str, str]]: } } -def convert_subgroup_object_to_courses_dict(object: str, description: str | list[str]) -> Mapping[str, str | list[str]]: - """ Gets a subgroup object (format laid out in the processor) and fetches the exact courses its referring to """ - if " or " in object and isinstance(description, list): - return {c: description[index] for index, c in enumerate(object.split(" or "))} - if not re.match(r"[A-Z]{4}[0-9]{4}", object): - return regex_search(rf"^{object}") - - return { object: description } - -def add_subgroup_container(structure: dict[str, StructureContainer], type: str, container: ProgramContainer | CourseContainer, exceptions: list[str]) -> list[str]: - """ Returns the added courses """ - # TODO: further standardise non_spec_data to remove these lines: - title = container.get("title", "") - if container.get("type") == "gened": - title = "General Education" - conditional_type = container.get("type") - if conditional_type is not None and "rule" in conditional_type: - type = "Rules" - structure[type]["content"][title] = { - "UOC": container.get("credits_to_complete") or 0, - "courses": functools.reduce( - lambda rest, current: rest | { - course: description for course, description - in convert_subgroup_object_to_courses_dict(current[0], current[1]).items() - if course not in exceptions - }, container.get("courses", {}).items(), {} - ), - "type": container.get("type", ""), - "notes": container.get("notes", "") if type == "Rules" else "" - } - return list(structure[type]["content"][title]["courses"].keys()) - -def add_geneds_courses(programCode: str, structure: dict[str, StructureContainer], container: ProgramContainer) -> list[str]: - """ Returns the added courses """ - if container.get("type") != "gened": - return [] - - item = structure["General"]["content"]["General Education"] - item["courses"] = {} - if container.get("courses") is None: - gen_ed_courses = list(set(get_gen_eds(programCode)["courses"].keys()) - set(sum( - ( - sum(( - list(value["courses"].keys()) - for sub_group, value in spec["content"].items() - if 'core' in sub_group.lower() - ), []) - for spec_name, spec in structure.items() - if "Major" in spec_name or "Honours" in spec_name) - , []))) - geneds = get_gen_eds(programCode) - item["courses"] = {course: geneds["courses"][course] for course in gen_ed_courses} - - - return list(item["courses"].keys()) - - -def add_specialisation(structure: dict[str, StructureContainer], code: str) -> None: - """ Add a specialisation to the structure of a getStructure call """ - # in a specialisation, the first container takes priority - no duplicates may exist - if code.endswith("1"): - type = "Major" - elif code.endswith("2"): - type = "Minor" - else: - type = "Honours" - - spnResult = cast(Optional[Specialisation], specialisationsCOL.find_one({"code": code})) - type = f"{type} - {code}" - if not spnResult: - raise HTTPException( - status_code=400, detail=f"{code} of type {type} not found") - structure[type] = {"name": spnResult["name"], "content": {}} - # NOTE: takes Core Courses are first - exceptions: list[str] = [] - for cores in filter(lambda a: "Core" in a["title"], spnResult["curriculum"]): - new = add_subgroup_container(structure, type, cores, exceptions) - exceptions.extend(new) - - for container in spnResult["curriculum"]: - if "Core" not in container["title"]: - add_subgroup_container(structure, type, container, exceptions) - @router.get( "/getStructure/{programCode}/{spec}", response_model=Structure, @@ -248,19 +161,11 @@ def get_structure( programCode: str, spec: Optional[str] = None, ignore: Optional[str] = None ) -> StructureDict: """ get the structure of a course given specs and program code """ - # TODO: This ugly, use compose instead ignored = ignore.split("+") if ignore else [] + specs = spec.split("+") if spec else [] - structure: dict[str, StructureContainer] = {} - uoc = 0 # ensure always atleast set regardless of ignore # TODO-OLLI: None? - if "spec" not in ignored: - structure = add_specialisations(structure, spec) - if "code_details" not in ignored: - structure, uoc = add_program_code_details(structure, programCode) - if "gened" not in ignored: - structure = add_geneds_to_structure(structure, programCode) - apply_manual_fixes(structure, programCode) + structure, uoc = get_program_structure(programCode, specs=specs, ignored=ignored) return { "structure": structure, @@ -278,9 +183,10 @@ def get_structure_course_list( TODO: Add a test for this. """ structure: dict[str, StructureContainer] = {} + specs = spec.split("+") if spec is not None else [] - structure = add_specialisations(structure, spec) - structure, _ = add_program_code_details(structure, programCode) + structure = add_specialisations_to_structure(structure, specs) + structure, _ = add_program_code_details_to_structure(structure, programCode) apply_manual_fixes(structure, programCode) return { @@ -314,32 +220,9 @@ def get_structure_course_list( ) def get_gen_eds_route(programCode: str) -> Dict[str, Dict[str, str]]: """ Fetches the geneds for a given program code """ - course_list: List[str] = course_list_from_structure(get_structure(programCode, ignore="gened")["structure"]) + course_list: List[str] = course_list_from_structure(get_program_structure(programCode, ignored=["gened"])[0]) return get_gen_eds(programCode, course_list) -def get_gen_eds( - programCode: str, excluded_courses: Optional[List[str]] = None - ) -> Dict[str, Dict[str, str]]: - """ - fetches gen eds from file and removes excluded courses. - - `programCode` is the program code to fetch geneds for - - `excluded_courses` is a list of courses to exclude from the gened list. - Typically the result of a `courseList` from `getStructure` to prevent - duplicate courses between cores, electives and geneds. - """ - excluded_courses = excluded_courses if excluded_courses is not None else [] - try: - geneds: Dict[str, str] = data_helpers.read_data("data/scrapers/genedPureRaw.json")[programCode] - except KeyError as err: - raise HTTPException(status_code=400, detail=f"No geneds for progrm code {programCode}") from err - - for course in excluded_courses: - if course in geneds: - del geneds[course] - - return {"courses": geneds} - - @router.get("/graph/{programCode}/{spec}", response_model=Graph) @router.get("/graph/{programCode}", response_model=Graph) def graph( @@ -363,18 +246,19 @@ def graph( caught by the processor """ courses = get_structure_course_list(programCode, spec)["courses"] - edges = [] - - failed_courses: list[tuple] = [] - proto_edges: list[Optional[CoursesPathDict]] = [map_suppressed_errors( - get_path_from, failed_courses, course - ) for course in courses] + proto_edges: list[CoursesPathDict] = [ + { + "original": course, + "courses": get_incoming_edges(course) + } + for course in courses + ] edges = prune_edges( - proto_edges_to_edges(proto_edges), - courses - ) + convert_adj_list_to_edge_list(proto_edges), + courses + ) return { "edges": edges, @@ -416,87 +300,9 @@ def __recursive_course_search(structure: dict) -> None: __recursive_course_search(dict(structure)) return courses -def add_specialisations(structure: dict[str, StructureContainer], spec: Optional[str]) -> dict[str, StructureContainer]: - """ - Take a string of `+` joined specialisations and add - them to the structure - """ - if spec: - specs = spec.split("+") if "+" in spec else [spec] - for m in specs: - add_specialisation(structure, m) - return structure - -def add_program_code_details(structure: dict[str, StructureContainer], programCode: str) -> Tuple[dict[str, StructureContainer], int]: - """ - Add the details for given program code to the structure. - Returns: - - structure - - uoc (int) associated with the program code. - """ - programsResult = cast(Optional[Program], programsCOL.find_one({"code": programCode})) - if not programsResult: - raise HTTPException( - status_code=400, detail="Program code was not found") - - structure['General'] = {"name": "General Program Requirements", "content": {}} - structure['Rules'] = {"name": "General Program Rules", "content": {}} - return (structure, programsResult["UOC"]) - -# TODO: This should be computed at scrape-time -def add_geneds_to_structure(structure: dict[str, StructureContainer], programCode: str) -> dict[str, StructureContainer]: - """ - Insert geneds of the given programCode into the structure - provided - """ - programsResult = cast(Optional[Program], programsCOL.find_one({"code": programCode})) - if programsResult is None: - raise HTTPException( - status_code=400, detail="Program code was not found") - - with suppress(KeyError): - for container in programsResult['components']['non_spec_data']: - add_subgroup_container(structure, "General", container, []) - if container.get("type") == "gened": - add_geneds_courses(programCode, structure, container) - return structure - - def compose(*functions: Callable) -> Callable: """ Compose a list of functions into a single function. The functions are applied in the order they are given. """ return functools.reduce(lambda f, g: lambda *args, **kwargs: f(g(*args, **kwargs)), functions) - -def proto_edges_to_edges(proto_edges: list[Optional[CoursesPathDict]]) -> List[Dict[str, str]]: - """ - Take the proto-edges created by calls to `path_from` and convert them into - a full list of edges of form. - [ - { - "source": (str) - course_code, # This is the 'original' value - "target": (str) - course_code, # This is the value of 'courses' - } - ] - Effectively, turning an adjacency list into a flat list of edges - """ - edges = [] - for proto_edge in proto_edges: - # Incoming: { original: str, courses: list[str]} - # Outcome: { "src": str, "target": str } - if not proto_edge or not proto_edge["courses"]: - continue - for course in proto_edge["courses"]: - edges.append({ - "source": course, - "target": proto_edge["original"], - } - ) - return edges - -def prune_edges(edges: list[dict[str, str]], courses: list[str]) -> list[dict[str, str]]: - """ - Remove edges between vertices that are not in the list of courses provided. - """ - return [edge for edge in edges if edge["source"] in courses and edge["target"] in courses] diff --git a/backend/server/routers/user.py b/backend/server/routers/user.py index 871202416..7f4fcbaef 100644 --- a/backend/server/routers/user.py +++ b/backend/server/routers/user.py @@ -1,17 +1,14 @@ from itertools import chain -from typing import Annotated, Any, Dict, Optional, cast +from typing import Annotated, Dict, Optional, cast from fastapi import APIRouter, HTTPException, Security -from starlette.status import HTTP_403_FORBIDDEN -from server.routers.auth_utility.middleware import HTTPBearerToUserID -from server.routers.courses import get_course +from data.processors.models import SpecData +from server.routers.utility.common import get_all_specialisations, get_course_details +from server.routers.utility.sessions.middleware import HTTPBearerToUserID +from server.routers.utility.user import get_setup_user, set_user from server.routers.model import CourseMark, CourseStorage, DegreeLength, DegreeWizardInfo, HiddenYear, SettingsStorage, StartYear, CourseStorageWithExtra, DegreeLocalStorage, LocalStorage, PlannerLocalStorage, Storage, SpecType -from server.routers.programs import get_programs -from server.routers.specialisations import get_specialisation_types, get_specialisations import server.db.helpers.users as udb -from server.db.helpers.models import PartialUserStorage, UserStorage as NEWUserStorage, UserDegreeStorage as NEWUserDegreeStorage, UserPlannerStorage as NEWUserPlannerStorage, UserCoursesStorage as NEWUserCoursesStorage, UserCourseStorage as NEWUserCourseStorage, UserSettingsStorage as NEWUserSettingsStorage - router = APIRouter( prefix="/user", @@ -20,91 +17,6 @@ require_uid = HTTPBearerToUserID() -# TODO-OLLI(pm): remove these underwrite helpers once we get rid of the old TypedDicts -# nto and otn means new-to-old and old-to-new -def _otn_planner(s: PlannerLocalStorage) -> NEWUserPlannerStorage: - return NEWUserPlannerStorage.model_validate(s) - -def _otn_degree(s: DegreeLocalStorage) -> NEWUserDegreeStorage: - return NEWUserDegreeStorage.model_validate(s) - -def _otn_courses(s: dict[str, CourseStorage]) -> NEWUserCoursesStorage: - return { code: NEWUserCourseStorage.model_validate(info) for code, info in s.items() } - -def _otn_settings(s: SettingsStorage) -> NEWUserSettingsStorage: - return NEWUserSettingsStorage.model_validate(s.model_dump()) - -def _nto_courses(s: NEWUserCoursesStorage) -> dict[str, CourseStorage]: - return { - code: { - 'code': info.code, - 'ignoreFromProgression': info.ignoreFromProgression, - 'mark': info.mark, - 'uoc': info.uoc, - } for code, info in s.items() - } - -def _nto_planner(s: NEWUserPlannerStorage) -> PlannerLocalStorage: - return { - 'isSummerEnabled': s.isSummerEnabled, - 'lockedTerms': s.lockedTerms, - 'startYear': s.startYear, - 'unplanned': s.unplanned, - 'years': [{ - 'T0': y.T0, - 'T1': y.T1, - 'T2': y.T2, - 'T3': y.T3, - } for y in s.years], - } - -def _nto_degree(s: NEWUserDegreeStorage) -> DegreeLocalStorage: - return { - 'programCode': s.programCode, - 'specs': s.specs, - } - -def _nto_settings(s: NEWUserSettingsStorage) -> SettingsStorage: - return SettingsStorage(showMarks=s.showMarks, hiddenYears=s.hiddenYears) - -def _nto_storage(s: NEWUserStorage) -> Storage: - return { - 'courses': _nto_courses(s.courses), - 'degree': _nto_degree(s.degree), - 'planner': _nto_planner(s.planner), - 'settings': _nto_settings(s.settings), - } - - -def get_setup_user(uid: str) -> Storage: - data = udb.get_user(uid) - assert data is not None # this uid should only come from a token exchange, and we only delete users after logout - if data.setup is False: - raise HTTPException( - status_code=HTTP_403_FORBIDDEN, - detail="User must be setup to access this resource.", - ) - - return _nto_storage(data) - -# keep this private -def set_user(uid: str, item: Storage, overwrite: bool = False): - if not overwrite and udb.user_is_setup(uid): - # TODO-OLLI(pm): get rid of the overwrite field when we get rid of this function all together - print("Tried to overwrite existing user. Use overwrite=True to overwrite.") - print("++ ABOUT TO ASSERT FALSE:", uid) - assert False # want to remove these cases too - - res = udb.update_user(uid, PartialUserStorage( - courses=_otn_courses(item['courses']), - degree=_otn_degree(item['degree']), - planner=_otn_planner(item['planner']), - settings=_otn_settings(item['settings']), - )) - - assert res - - # Ideally not used often. @router.post("/saveLocalStorage") def save_local_storage(localStorage: LocalStorage, uid: Annotated[str, Security(require_uid)]): @@ -115,7 +27,7 @@ def save_local_storage(localStorage: LocalStorage, uid: Annotated[str, Security( course: { 'code': course, 'mark': None, # wtf we nuking marks? - 'uoc': get_course(course)['UOC'], + 'uoc': get_course_details(course)['UOC'], 'ignoreFromProgression': False } for course in chain(planned, unplanned) @@ -166,7 +78,7 @@ def get_user_p(uid: Annotated[str, Security(require_uid)]) -> Dict[str, CourseSt res: Dict[str, CourseStorageWithExtra] = {} for raw_course in raw_courses.values(): - course_info = get_course(raw_course['code']) + course_info = get_course_details(raw_course['code']) with_extra_info: CourseStorageWithExtra = { 'code': raw_course['code'], @@ -315,46 +227,43 @@ def setup_degree_wizard(wizard: DegreeWizardInfo, uid: Annotated[str, Security(r if num_years < 1: raise HTTPException(status_code=400, detail="Invalid year range") - # Ensure valid prog code - done by the specialisatoin check so this is - # techincally redundant - progs = get_programs()["programs"] - if progs is None: - raise HTTPException(status_code=400, detail="Invalid program code") - # Ensure that all specialisations are valid # Need a bidirectoinal validate # All specs in wizard (lhs) must be in the RHS # All specs in the RHS that are "required" must have an associated LHS selection - avail_spec_types: list[SpecType] = get_specialisation_types(wizard.programCode)["types"] - # Type of elemn in the following is - # Dict[Literal['specs'], Dict[str(programTitle), specInfo]] + # Keys in the specInfo # - 'specs': List[str] - name of the specialisations - thing that matters # - 'notes': str - dw abt this (Fe's prob tbh) # - 'is_optional': bool - if true then u need to validate associated elem in LHS - avail_specs = list(chain.from_iterable( - cast(list[Any], specs) # for some bs, specs is still List[Any | None] ?????? - for spec_type in avail_spec_types - if (specs := list(get_specialisations(wizard.programCode, spec_type).values())) is not None - )) - # LHS subset All specs + + avail_specs = get_all_specialisations(wizard.programCode) + if avail_specs is None: + raise HTTPException(status_code=400, detail="Invalid program code") + + # list[(is_optional, spec_codes)] + flattened_containers: list[tuple[bool, list[str]]] = [ + ( + program_sub_container["is_optional"], + list(program_sub_container["specs"].keys()) + ) + for spec_type_container in cast(dict[SpecType, dict[str, SpecData]], avail_specs).values() + for program_sub_container in spec_type_container.values() + ] + invalid_lhs_specs = set(wizard.specs).difference( spec_code - for specs in avail_specs - for actl_spec in specs.values() - for spec_code in actl_spec.get('specs', []).keys() + for (_, spec_codes) in flattened_containers + for spec_code in spec_codes ) - # All compulsory in RHS has an entry in LHS - spec_reqs_not_met = [ - actl_spec - for specs in avail_specs - for actl_spec in specs.values() - if ( - actl_spec.get('is_optional') is False - and not set(actl_spec.get('specs', []).keys()).intersection(wizard.specs) - ) - ] + spec_reqs_not_met = any( + ( + not is_optional + and not set(spec_codes).intersection(wizard.specs) + ) + for (is_optional, spec_codes) in flattened_containers + ) # ceebs returning the bad data because FE should be valid anyways if invalid_lhs_specs or spec_reqs_not_met: diff --git a/backend/server/routers/utility.py b/backend/server/routers/utility.py deleted file mode 100644 index 844627424..000000000 --- a/backend/server/routers/utility.py +++ /dev/null @@ -1,66 +0,0 @@ -""" -General purpose utility functions for the server, that do not fit -specifically in any one function -""" - -from typing import Callable, Optional, TypeVar - -from algorithms.objects.course import Course -from data.utility import data_helpers -from server.routers.model import CONDITIONS, ProgramTime - -COURSES = data_helpers.read_data("data/final_data/coursesProcessed.json") - -R = TypeVar('R') -def map_suppressed_errors(func: Callable[..., R], errors_log: list[tuple], *args, **kwargs) -> Optional[R]: - """ - Map a function to a list of arguments, and return the result of the function - if no error is raised. If an error is raised, log the error and return None. - """ - try: - return func(*args, **kwargs) - except Exception as e: - errors_log.append((*args, str(e))) - return None - - -def get_core_courses(program: str, specialisations: list[str]): - from server.routers.programs import get_structure - - req = get_structure(program, "+".join(specialisations)) - return sum(( - sum(( - list(value["courses"].keys()) - for sub_group, value in spec["content"].items() - if 'core' in sub_group.lower() - ), []) - for spec_name, spec in req["structure"].items() - if "Major" in spec_name or "Honours" in spec_name) - , []) - - -def get_course_object(code: str, prog_time: ProgramTime, locked_offering: Optional[tuple[int, int]] = None, mark: Optional[int] = 100) -> Course: - ''' - This return the Course object for the given course code. - Note the difference between this and the get_course function in courses.py - ''' - if mark is None: - mark = 100 - from server.routers.courses import terms_offered - years = "+".join(str(year) for year in range(prog_time.startTime[0], prog_time.endTime[0] + 1)) - terms_result = terms_offered(code, years)["terms"] - terms_possible = {} - for year, terms in terms_result.items(): - terms_possible[int(year)] = [int(term[1]) for term in terms] - - try: - return Course( - code, - CONDITIONS[code], - mark, - COURSES[code]["UOC"], - terms_possible, - locked_offering - ) - except KeyError as err: - raise KeyError(f"Course {code} not found (course is most likely discontinued)") from err diff --git a/backend/server/routers/auth_utility/__init__.py b/backend/server/routers/utility/__init__.py similarity index 100% rename from backend/server/routers/auth_utility/__init__.py rename to backend/server/routers/utility/__init__.py diff --git a/backend/server/routers/utility/common.py b/backend/server/routers/utility/common.py new file mode 100644 index 000000000..90335b0d0 --- /dev/null +++ b/backend/server/routers/utility/common.py @@ -0,0 +1,394 @@ +""" +General purpose utility functions for the server, that do not fit +specifically in any one function +""" + +from contextlib import suppress +import functools +import re +from typing import Callable, Mapping, Optional, Tuple, TypeVar, cast + +from fastapi import HTTPException + +from algorithms.objects.course import Course +from data.processors.models import CourseContainer, Program, ProgramContainer, SpecData, Specialisation, SpecsData +from data.config import ARCHIVED_YEARS, GRAPH_CACHE_FILE, LIVE_YEAR +from data.utility import data_helpers +from server.routers.utility.manual_fixes import apply_manual_fixes +from server.routers.model import CONDITIONS, CoursesPathDict, ProgramTime, StructureContainer +from server.db.mongo.conn import archivesDB, coursesCOL, programsCOL, specialisationsCOL + +# TODO: move these constants out into new file, or move model.py ones into here (once we dont have top-level connection initialisation) +COURSES = data_helpers.read_data("data/final_data/coursesProcessed.json") +GRAPH: dict[str, dict[str, list[str]]] = data_helpers.read_data(GRAPH_CACHE_FILE) +INCOMING_ADJACENCY: dict[str, list[str]] = GRAPH.get("incoming_adjacency_list", {}) + +# P = ParamSpec('P') # TODO: type the args and kwargs here using ParamSpec, pylint cries rn +R = TypeVar('R') # TODO: might be able to inline this https://typing.readthedocs.io/en/latest/spec/generics.html#variance-inference +def map_suppressed_errors(func: Callable[..., R], errors_log: list[tuple], *args, **kwargs) -> Optional[R]: + """ + Map a function to a list of arguments, and return the result of the function + if no error is raised. If an error is raised, log the error and return None. + """ + try: + return func(*args, **kwargs) + except Exception as e: + errors_log.append((*args, str(e))) + return None + + +def get_core_courses(program: str, specialisations: list[str]): + structure = get_program_structure(program, specs=specialisations)[0] + return sum(( + sum(( + list(value["courses"].keys()) + for sub_group, value in spec["content"].items() + if 'core' in sub_group.lower() + ), []) + for spec_name, spec in structure.items() + if "Major" in spec_name or "Honours" in spec_name) + , []) + + +def get_course_object(code: str, prog_time: ProgramTime, locked_offering: Optional[tuple[int, int]] = None, mark: Optional[int] = 100) -> Course: + ''' + This return the Course object for the given course code. + Note the difference between this and the get_course_details function + ''' + if mark is None: + mark = 100 + + years = [str(year) for year in range(prog_time.startTime[0], prog_time.endTime[0] + 1)] + terms_result = get_terms_offered_multiple_years(code, years)[0] + terms_possible = { + int(year): [int(term[1]) for term in terms] + for year, terms in terms_result.items() + } + + try: + return Course( + code, + CONDITIONS[code], + mark, + COURSES[code]["UOC"], + terms_possible, + locked_offering + ) + except KeyError as err: + raise KeyError(f"Course {code} not found (course is most likely discontinued)") from err + +def get_course_details(code: str) -> dict: # TODO: type this output + ''' + Get info about a course given its courseCode + - start with the current database + - if not found, check the archives + + Raises if not found + ''' + # TODO: make take in a year, and be explicit about getting archived or not + + result = coursesCOL.find_one({"code": code}) + if not result: + for year in sorted(ARCHIVED_YEARS, reverse=True): + result = archivesDB[str(year)].find_one({"code": code}) + if result is not None: + result["is_legacy"] = True # TODO: is_legacy might not actually mean what it is used for here + break + else: + result["is_legacy"] = False + + # if not result: + # return None + if not result: + raise HTTPException( + status_code=400, detail=f"Course code {code} was not found" + ) + + result.setdefault("school", None) + result.setdefault("raw_requirements", "") + + with suppress(KeyError): + del result["exclusions"]["leftover_plaintext"] + del result["_id"] + + return result + +# TODO: make all of these year params ints +def get_legacy_course_details(code: str, year: str) -> dict: # TODO: type output + ''' + Returns the course details for a legacy year, similarly to get_course_details. + + Will raise a 400 if the course code could not be found in that year. + ''' + result = archivesDB.get_collection(year).find_one({"code": code}) + if not result: + raise HTTPException(status_code=400, detail="invalid course code or year") + + del result["_id"] + result["is_legacy"] = False # not a legacy, assuming you know what you are doing + + return result + +def get_terms_offered(code: str, year: str) -> list[str]: + ''' + Returns the terms in which the given course is offered, for the given year. + If the year is from the future then, backfill the LIVE_YEAR's results + ''' + course_info = get_course_details(code) if int(year) >= LIVE_YEAR else get_legacy_course_details(code, year) + return course_info['terms'] or [] + +def get_terms_offered_multiple_years(code: str, years: list[str]) -> Tuple[dict[str, list[str]], list[tuple]]: + ''' + Returns the term offerings for a course code over multiple years, or an empty list if it could not be found. + Also returns the years that were not found, and their specific errors. + ''' + fails: list[tuple] = [] + offerings: dict[str, list[str]] = { + year: map_suppressed_errors(get_terms_offered, fails, code, year) or [] + for year in years + } + + return offerings, fails + +def get_all_specialisations(program_code: str) -> Optional[SpecsData]: + ''' + Returns the specs for the given programCode, removing any specs that are not supported. + ''' + program = cast(Optional[Program], programsCOL.find_one({"code": program_code})) + if program is None: + return None + + # TODO: this can be done in a single aggregate + raw_specs = program["components"]["spec_data"] + for spec_type_container in raw_specs.values(): + spec_type_container = cast(dict[str, SpecData], spec_type_container) + for program_specs in spec_type_container.values(): + for code in [*program_specs["specs"].keys()]: + if not specialisationsCOL.find_one({"code": code}): + del program_specs["specs"][code] + + return raw_specs + +def get_program_structure(program_code: str, specs: Optional[list[str]] = None, ignored: Optional[list[str]] = None) -> Tuple[dict[str, StructureContainer], int]: + """Gets the structure of a course given specs and program code, ignoring what is specified.""" + # TODO: This ugly, use compose instead + if ignored is None: + ignored = [] + + structure: dict[str, StructureContainer] = {} + uoc = 0 # ensure always atleast set regardless of ignore # TODO: None? + if "spec" not in ignored and specs is not None: + structure = add_specialisations_to_structure(structure, specs) + if "code_details" not in ignored: + structure, uoc = add_program_code_details_to_structure(structure, program_code) + if "gened" not in ignored: + structure = add_geneds_to_structure(structure, program_code) + apply_manual_fixes(structure, program_code) + + return structure, uoc + +def add_specialisation_to_structure(structure: dict[str, StructureContainer], code: str) -> None: + """ Add a specialisation to the structure of a getStructure call """ + # in a specialisation, the first container takes priority - no duplicates may exist + if code.endswith("1"): + type = "Major" + elif code.endswith("2"): + type = "Minor" + else: + type = "Honours" + + spnResult = cast(Optional[Specialisation], specialisationsCOL.find_one({"code": code})) + type = f"{type} - {code}" + if not spnResult: + raise HTTPException(status_code=400, detail=f"{code} of type {type} not found") + + structure[type] = {"name": spnResult["name"], "content": {}} + # NOTE: takes Core Courses are first + exceptions: list[str] = [] + for cores in filter(lambda a: "Core" in a["title"], spnResult["curriculum"]): + new = add_subgroup_container_to_structure(structure, type, cores, exceptions) + exceptions.extend(new) + + for container in spnResult["curriculum"]: + if "Core" not in container["title"]: + add_subgroup_container_to_structure(structure, type, container, exceptions) + +def add_specialisations_to_structure(structure: dict[str, StructureContainer], specs: list[str]) -> dict[str, StructureContainer]: + """ + Take a list of specs and adds them to the structure + """ + for m in specs: + add_specialisation_to_structure(structure, m) + return structure + +def add_program_code_details_to_structure(structure: dict[str, StructureContainer], program_code: str) -> Tuple[dict[str, StructureContainer], int]: + """ + Add the details for given program code to the structure. + Returns: + - structure + - uoc (int) associated with the program code. + """ + programsResult = cast(Optional[Program], programsCOL.find_one({"code": program_code})) + if not programsResult: + raise HTTPException(status_code=400, detail="Program code was not found") + + structure['General'] = {"name": "General Program Requirements", "content": {}} + structure['Rules'] = {"name": "General Program Rules", "content": {}} + return (structure, programsResult["UOC"]) + +def add_subgroup_container_to_structure(structure: dict[str, StructureContainer], type: str, container: ProgramContainer | CourseContainer, exceptions: list[str]) -> list[str]: + """ Returns the added courses """ + # TODO: further standardise non_spec_data to remove these lines: + + title = container.get("title", "") + if container.get("type") == "gened": + title = "General Education" + conditional_type = container.get("type") + if conditional_type is not None and "rule" in conditional_type: + type = "Rules" + structure[type]["content"][title] = { + "UOC": container.get("credits_to_complete") or 0, + "courses": functools.reduce( + lambda rest, current: rest | { + course: description for course, description + in convert_subgroup_object_to_courses_dict(current[0], current[1]).items() + if course not in exceptions + }, container.get("courses", {}).items(), {} + ), + "type": container.get("type", ""), + "notes": container.get("notes", "") if type == "Rules" else "" + } + return list(structure[type]["content"][title]["courses"].keys()) + +def get_gen_eds( + programCode: str, excluded_courses: Optional[list[str]] = None + ) -> dict[str, dict[str, str]]: + """ + fetches gen eds from file and removes excluded courses. + - `programCode` is the program code to fetch geneds for + - `excluded_courses` is a list of courses to exclude from the gened list. + Typically the result of a `courseList` from `getStructure` to prevent + duplicate courses between cores, electives and geneds. + """ + excluded_courses = excluded_courses if excluded_courses is not None else [] + try: + geneds: dict[str, str] = data_helpers.read_data("data/scrapers/genedPureRaw.json")[programCode] + except KeyError as err: + raise HTTPException(status_code=400, detail=f"No geneds for progrm code {programCode}") from err + + for course in excluded_courses: + if course in geneds: + del geneds[course] + + return {"courses": geneds} + +def add_geneds_courses_to_structure(programCode: str, structure: dict[str, StructureContainer], container: ProgramContainer) -> list[str]: + """ Returns the added courses """ + if container.get("type") != "gened": + return [] + + item = structure["General"]["content"]["General Education"] + item["courses"] = {} + if container.get("courses") is None: + gen_ed_courses = list(set(get_gen_eds(programCode)["courses"].keys()) - set(sum( + ( + sum(( + list(value["courses"].keys()) + for sub_group, value in spec["content"].items() + if 'core' in sub_group.lower() + ), []) + for spec_name, spec in structure.items() + if "Major" in spec_name or "Honours" in spec_name) + , []))) + geneds = get_gen_eds(programCode) + item["courses"] = {course: geneds["courses"][course] for course in gen_ed_courses} + + + return list(item["courses"].keys()) + +# TODO: This should be computed at scrape-time +def add_geneds_to_structure(structure: dict[str, StructureContainer], programCode: str) -> dict[str, StructureContainer]: + """ + Insert geneds of the given programCode into the structure + provided + """ + programsResult = cast(Optional[Program], programsCOL.find_one({"code": programCode})) + if programsResult is None: + raise HTTPException( + status_code=400, detail="Program code was not found") + + with suppress(KeyError): + for container in programsResult['components']['non_spec_data']: + add_subgroup_container_to_structure(structure, "General", container, []) + if container.get("type") == "gened": + add_geneds_courses_to_structure(programCode, structure, container) + return structure + +def convert_subgroup_object_to_courses_dict(object: str, description: str | list[str]) -> Mapping[str, str | list[str]]: + """ Gets a subgroup object (format laid out in the processor) and fetches the exact courses its referring to """ + + if " or " in object and isinstance(description, list): + return {c: description[index] for index, c in enumerate(object.split(" or "))} + if not re.match(r"[A-Z]{4}[0-9]{4}", object): + return regex_search(rf"^{object}") + + return { object: description } + +def regex_search(search_string: str) -> Mapping[str, str]: + """ + Uses the search string as a regex to match all courses with an exact pattern. + """ + + pat = re.compile(search_string, re.I) + courses = list(coursesCOL.find({"code": {"$regex": pat}})) + + # TODO: do we want to always include matching legacy courses (excluding duplicates)? + if not courses: + for year in sorted(ARCHIVED_YEARS, reverse=True): + courses = list(archivesDB[str(year)].find({"code": {"$regex": pat}})) + if courses: + break + + return {course["code"]: course["title"] for course in courses} + +def get_incoming_edges(course_code: str) -> list[str]: + """ + returns the course codes that can be used to satisfy 'course', eg 2521 -> 1511. + + (previously from `get_path_from`) + """ + return INCOMING_ADJACENCY.get(course_code, []) + +def convert_adj_list_to_edge_list(proto_edges: list[CoursesPathDict]) -> list[dict[str, str]]: + """ + Take the proto-edges created by calls to `path_from` and convert them into + a full list of edges of form. + [ + { + "source": (str) - course_code, # This is the 'original' value + "target": (str) - course_code, # This is the value of 'courses' + } + ] + Effectively, turning an adjacency list into a flat list of edges. + + (previously `proto_edges_to_edges`) + """ + edges = [] + for proto_edge in proto_edges: + # Incoming: { original: str, courses: list[str]} + # Outcome: { "src": str, "target": str } + if not proto_edge["courses"]: + continue + for course in proto_edge["courses"]: + edges.append({ + "source": course, + "target": proto_edge["original"], + } + ) + return edges + +def prune_edges(edges: list[dict[str, str]], courses: list[str]) -> list[dict[str, str]]: + """ + Remove edges between vertices that are not in the list of courses provided. + """ + return [edge for edge in edges if edge["source"] in courses and edge["target"] in courses] diff --git a/backend/server/manual_fixes.py b/backend/server/routers/utility/manual_fixes.py similarity index 100% rename from backend/server/manual_fixes.py rename to backend/server/routers/utility/manual_fixes.py diff --git a/backend/server/routers/auth_utility/oidc/__init__.py b/backend/server/routers/utility/oidc/__init__.py similarity index 100% rename from backend/server/routers/auth_utility/oidc/__init__.py rename to backend/server/routers/utility/oidc/__init__.py diff --git a/backend/server/routers/auth_utility/oidc/constants.py b/backend/server/routers/utility/oidc/constants.py similarity index 100% rename from backend/server/routers/auth_utility/oidc/constants.py rename to backend/server/routers/utility/oidc/constants.py diff --git a/backend/server/routers/auth_utility/oidc/errors.py b/backend/server/routers/utility/oidc/errors.py similarity index 100% rename from backend/server/routers/auth_utility/oidc/errors.py rename to backend/server/routers/utility/oidc/errors.py diff --git a/backend/server/routers/auth_utility/oidc/requests.py b/backend/server/routers/utility/oidc/requests.py similarity index 100% rename from backend/server/routers/auth_utility/oidc/requests.py rename to backend/server/routers/utility/oidc/requests.py diff --git a/backend/server/routers/auth_utility/sessions/__init__.py b/backend/server/routers/utility/sessions/__init__.py similarity index 100% rename from backend/server/routers/auth_utility/sessions/__init__.py rename to backend/server/routers/utility/sessions/__init__.py diff --git a/backend/server/routers/auth_utility/sessions/errors.py b/backend/server/routers/utility/sessions/errors.py similarity index 100% rename from backend/server/routers/auth_utility/sessions/errors.py rename to backend/server/routers/utility/sessions/errors.py diff --git a/backend/server/routers/auth_utility/sessions/interface.py b/backend/server/routers/utility/sessions/interface.py similarity index 100% rename from backend/server/routers/auth_utility/sessions/interface.py rename to backend/server/routers/utility/sessions/interface.py diff --git a/backend/server/routers/auth_utility/middleware.py b/backend/server/routers/utility/sessions/middleware.py similarity index 96% rename from backend/server/routers/auth_utility/middleware.py rename to backend/server/routers/utility/sessions/middleware.py index 4d98e891c..11f4298d7 100644 --- a/backend/server/routers/auth_utility/middleware.py +++ b/backend/server/routers/utility/sessions/middleware.py @@ -9,8 +9,8 @@ from server.config import SECURE_COOKIES from server.db.helpers.models import SessionToken -from .sessions.errors import ExpiredSessionTokenError -from .sessions.interface import get_token_info +from .errors import ExpiredSessionTokenError +from .interface import get_token_info def extract_bearer_token(request: Request) -> Optional[str]: authorization = request.headers.get("Authorization") diff --git a/backend/server/routers/utility/user.py b/backend/server/routers/utility/user.py new file mode 100644 index 000000000..eb034717b --- /dev/null +++ b/backend/server/routers/utility/user.py @@ -0,0 +1,92 @@ +from fastapi import HTTPException +from starlette.status import HTTP_403_FORBIDDEN + +from server.routers.model import CourseStorage, SettingsStorage, DegreeLocalStorage, PlannerLocalStorage, Storage + +import server.db.helpers.users as udb +from server.db.helpers.models import PartialUserStorage, UserStorage as NEWUserStorage, UserDegreeStorage as NEWUserDegreeStorage, UserPlannerStorage as NEWUserPlannerStorage, UserCoursesStorage as NEWUserCoursesStorage, UserCourseStorage as NEWUserCourseStorage, UserSettingsStorage as NEWUserSettingsStorage + + +# TODO-OLLI(pm): remove these underwrite helpers once we get rid of the old TypedDicts +# nto and otn means new-to-old and old-to-new +def _otn_planner(s: PlannerLocalStorage) -> NEWUserPlannerStorage: + return NEWUserPlannerStorage.model_validate(s) + +def _otn_degree(s: DegreeLocalStorage) -> NEWUserDegreeStorage: + return NEWUserDegreeStorage.model_validate(s) + +def _otn_courses(s: dict[str, CourseStorage]) -> NEWUserCoursesStorage: + return { code: NEWUserCourseStorage.model_validate(info) for code, info in s.items() } + +def _otn_settings(s: SettingsStorage) -> NEWUserSettingsStorage: + return NEWUserSettingsStorage.model_validate(s.model_dump()) + +def _nto_courses(s: NEWUserCoursesStorage) -> dict[str, CourseStorage]: + return { + code: { + 'code': info.code, + 'ignoreFromProgression': info.ignoreFromProgression, + 'mark': info.mark, + 'uoc': info.uoc, + } for code, info in s.items() + } + +def _nto_planner(s: NEWUserPlannerStorage) -> PlannerLocalStorage: + return { + 'isSummerEnabled': s.isSummerEnabled, + 'lockedTerms': s.lockedTerms, + 'startYear': s.startYear, + 'unplanned': s.unplanned, + 'years': [{ + 'T0': y.T0, + 'T1': y.T1, + 'T2': y.T2, + 'T3': y.T3, + } for y in s.years], + } + +def _nto_degree(s: NEWUserDegreeStorage) -> DegreeLocalStorage: + return { + 'programCode': s.programCode, + 'specs': s.specs, + } + +def _nto_settings(s: NEWUserSettingsStorage) -> SettingsStorage: + return SettingsStorage(showMarks=s.showMarks, hiddenYears=s.hiddenYears) + +def _nto_storage(s: NEWUserStorage) -> Storage: + return { + 'courses': _nto_courses(s.courses), + 'degree': _nto_degree(s.degree), + 'planner': _nto_planner(s.planner), + 'settings': _nto_settings(s.settings), + } + + +def get_setup_user(uid: str) -> Storage: + data = udb.get_user(uid) + assert data is not None # this uid should only come from a token exchange, and we only delete users after logout + if data.setup is False: + raise HTTPException( + status_code=HTTP_403_FORBIDDEN, + detail="User must be setup to access this resource.", + ) + + return _nto_storage(data) + +# keep this private +def set_user(uid: str, item: Storage, overwrite: bool = False): + if not overwrite and udb.user_is_setup(uid): + # TODO-OLLI(pm): get rid of the overwrite field when we get rid of this function all together + print("Tried to overwrite existing user. Use overwrite=True to overwrite.") + print("++ ABOUT TO ASSERT FALSE:", uid) + assert False # want to remove these cases too + + res = udb.update_user(uid, PartialUserStorage( + courses=_otn_courses(item['courses']), + degree=_otn_degree(item['degree']), + planner=_otn_planner(item['planner']), + settings=_otn_settings(item['settings']), + )) + + assert res