diff --git a/backend/db/utils.py b/backend/db/utils.py index 9ad2141e..b1e0bf72 100644 --- a/backend/db/utils.py +++ b/backend/db/utils.py @@ -327,6 +327,28 @@ def current_datetime(time_zone=TIMEZONE_DEFAULT) -> str: return tz_datetime_str(datetime.now(), time_zone=time_zone) +# todo: Can update update_db_status_var() so that it can accept optional param 'con' to improve performance. +def update_db_status_var(key: str, val: str, local=False): + """Update the `manage` table with information for a given variable, e.g. when a table was last updated""" + with get_db_connection(schema='', local=local) as con: + run_sql(con, f"DELETE FROM public.manage WHERE key = '{key}';") + sql_str = f"INSERT INTO public.manage (key, value) VALUES (:key, :val);" + run_sql(con, sql_str, {'key': key, 'val': val}) + + +def check_db_status_var(key: str, local=False): + """Check the value of a given variable the `manage`table """ + with get_db_connection(schema='', local=local) as con: + results: List = sql_query_single_col(con, f"SELECT value FROM public.manage WHERE key = '{key}';") + return results[0] if results else None + + +def delete_db_status_var(key: str, local=False): + """Delete information from the `manage` table """ + with get_db_connection(schema='', local=local) as con2: + run_sql(con2, f"DELETE FROM public.manage WHERE key = '{key}';") + + def last_refresh_timestamp(con: Connection) -> str: """Get the timestamp of the last database refresh""" results: List[List] = sql_query( @@ -343,14 +365,11 @@ def is_up_to_date(last_updated: Union[datetime, str], threshold_hours=24) -> boo return hours_since_update < threshold_hours -def check_if_updated(key: str, skip_if_updated_within_hours: int = None) -> bool: +def check_if_updated(key: str, skip_if_updated_within_hours: int = None, local=False) -> bool: """Check if table is up to date""" - with get_db_connection(schema='') as con2: - results: List[List] = sql_query(con2, f"SELECT value FROM public.manage WHERE key = '{key}';", return_with_keys=False) - last_updated = results[0][0] if results else None + last_updated: str = check_db_status_var(key, local) return last_updated and is_up_to_date(last_updated, skip_if_updated_within_hours) - def is_table_up_to_date(table_name: str, skip_if_updated_within_hours: int = None) -> bool: """Check if table is up to date""" if not skip_if_updated_within_hours: @@ -421,28 +440,6 @@ def is_derived_refresh_active(local=False) -> bool: return is_refresh_active('derived', local) -# todo: Can update update_db_status_var() so that it can accept optional param 'con' to improve performance. -def update_db_status_var(key: str, val: str, local=False): - """Update the `manage` table with information for a given variable, e.g. when a table was last updated""" - with get_db_connection(schema='', local=local) as con: - run_sql(con, f"DELETE FROM public.manage WHERE key = '{key}';") - sql_str = f"INSERT INTO public.manage (key, value) VALUES (:key, :val);" - run_sql(con, sql_str, {'key': key, 'val': val}) - - -def check_db_status_var(key: str, local=False): - """Check the value of a given variable the `manage`table """ - with get_db_connection(schema='', local=local) as con: - results: List = sql_query_single_col(con, f"SELECT value FROM public.manage WHERE key = '{key}';") - return results[0] if results else None - - -def delete_db_status_var(key: str, local=False): - """Delete information from the `manage` table """ - with get_db_connection(schema='', local=local) as con2: - run_sql(con2, f"DELETE FROM public.manage WHERE key = '{key}';") - - def insert_fetch_statuses(rows: List[Dict], local=False): """Update fetch status of record :param: rows: expects keys 'comment', 'primary_key', 'table', and 'status_initially'.""" diff --git a/backend/routes/graph.py b/backend/routes/graph.py index 1d5b52db..3c91b696 100644 --- a/backend/routes/graph.py +++ b/backend/routes/graph.py @@ -1,29 +1,20 @@ """Graph related functions and routes""" import os, warnings -# import csv -# import io -# import json +import dateutil.parser as dp +from datetime import datetime from pathlib import Path from typing import Any, Iterable, List, Set, Tuple, Union, Dict, Optional import pickle import networkx as nx -# import pydot from fastapi import APIRouter, Query, Request from networkx import DiGraph from sqlalchemy import Row, RowMapping from sqlalchemy.sql import text -# from fastapi.responses import JSONResponse -# from fastapi.responses import Response -# from fastapi.encoders import jsonable_encoder -# from collections import OrderedDict -# from igraph import Graph -# from networkx.drawing.nx_pydot import to_pydot, from_pydot - from backend.routes.db import get_cset_members_items from backend.db.queries import get_concepts -from backend.db.utils import get_db_connection, SCHEMA +from backend.db.utils import check_db_status_var, get_db_connection, SCHEMA from backend.api_logger import Api_logger from backend.utils import get_timer, commify @@ -45,17 +36,15 @@ async def concept_graph_get( ) -> Dict[str, Any]: """Return concept graph""" cids = cids if cids else [] - return await concept_graph_post(request, codeset_ids, cids, hide_vocabs, - hide_nonstandard_concepts, verbose) + return await concept_graph_post(request, codeset_ids, cids, hide_vocabs, hide_nonstandard_concepts, verbose) -# todo: match return of concept_graph() @router.post("/concept-graph") async def concept_graph_post( request: Request, codeset_ids: List[int], cids: Union[List[int], None] = [], hide_vocabs = ['RxNorm Extension'], hide_nonstandard_concepts=False, verbose = VERBOSE, -) -> List[List[Union[int, Any]]]: - +) -> Dict: + """Return concept graph via HTTP POST""" rpt = Api_logger() try: await rpt.start_rpt(request, params={'codeset_ids': codeset_ids, 'cids': cids}) @@ -140,6 +129,7 @@ async def concept_graph( # TODO: @Siggie: move below to frontend # noinspection PyPep8Naming def MOVE_TO_FRONT_END(): + """Move to front end""" hidden_by_voc = {} hide_if_over = 50 tree = [] # this used to be indented tree stuff that we're no longer using @@ -190,6 +180,10 @@ def filter_concepts( # noinspection PyPep8Naming def get_missing_in_between_nodes(G: nx.DiGraph, subgraph_nodes: Union[List[int], Set[int]], verbose=VERBOSE) -> Set: + """Find any missing nodes that exist in a subgraph. + + This can happen when a concept set expansions that are indirect subtrees of other expansions. + For""" missing_in_between_nodes = set() missing_in_between_nodes_tmp = set() subgraph_nodes = set(subgraph_nodes) @@ -249,23 +243,6 @@ def get_missing_in_between_nodes(G: nx.DiGraph, subgraph_nodes: Union[List[int], return missing_in_between_nodes -def test_get_missing_in_between_nodes( - whole_graph_edges=None, non_subgraph_nodes=None, expected_missing_in_between_nodes=None, subgraph_nodes=None, - fail=True, verbose=False -): - # add code to load whole REL_GRAPH - G = DiGraph(whole_graph_edges) - subgraph_nodes = subgraph_nodes or set(G.nodes) - set(non_subgraph_nodes) - missing_in_between_nodes = get_missing_in_between_nodes(G, subgraph_nodes, verbose=verbose) - if fail: - assert missing_in_between_nodes == set(expected_missing_in_between_nodes) - else: - if missing_in_between_nodes == set(expected_missing_in_between_nodes): - print(f"passed with {missing_in_between_nodes}") - else: - print(f"expected {expected_missing_in_between_nodes}, got {missing_in_between_nodes}") - - @router.get("/wholegraph") def wholegraph(): """Get subgraph edges for the whole graph""" @@ -375,23 +352,26 @@ def create_rel_graphs(save_to_pickle: bool) -> DiGraph: return G # , Gu -def load_relationship_graph(save_if_not_exists=True): +def is_graph_up_to_date(graph_path: str = GRAPH_PATH) -> bool: + """Determine if the networkx relationship_graph derived from OMOP vocab is current""" + voc_last_updated = dp.parse(check_db_status_var('last_refreshed_vocab_tables')) + graph_last_updated = datetime.fromtimestamp(os.path.getmtime(graph_path)) + if voc_last_updated.tzinfo and not graph_last_updated.tzinfo: # if one has timezone, both need + graph_last_updated = graph_last_updated.replace(tzinfo=voc_last_updated.tzinfo) + return graph_last_updated > voc_last_updated + + +# noinspection PyPep8Naming for_G +def load_relationship_graph(graph_path: str = GRAPH_PATH, update_if_outdated=True, save=True) -> DiGraph: """Load relationship graph from disk""" timer = get_timer('./load_relationship_graph') - timer(f'loading {GRAPH_PATH}') - if os.path.isfile(GRAPH_PATH): - with open(GRAPH_PATH, 'rb') as pickle_file: - # noinspection PyPep8Naming - G = pickle.load(pickle_file) - # while True: - # try: - # chunk = pickle.load(pickle_file) - # G.add_edges_from(chunk) - # except EOFError: - # break # End of file reached + timer(f'loading {graph_path}') + up_to_date = True if not update_if_outdated else is_graph_up_to_date(GRAPH_PATH) + if os.path.isfile(graph_path) and up_to_date: + with open(graph_path, 'rb') as pickle_file: + G: DiGraph = pickle.load(pickle_file) else: - # noinspection PyPep8Naming - G = create_rel_graphs(save_if_not_exists) + G: DiGraph = create_rel_graphs(save) timer('done') return G @@ -406,20 +386,7 @@ def load_relationship_graph(save_if_not_exists=True): # import builtins # builtins.DONT_LOAD_GRAPH = True import builtins - if hasattr(builtins, 'DONT_LOAD_GRAPH') and builtins.DONT_LOAD_GRAPH: warnings.warn('not loading relationship graph') else: - REL_GRAPH = load_relationship_graph(save_if_not_exists=True) - # REVERSE_GRAPH = REL_GRAPH.reverse() - - # G_ROOTS = set([n for n in REL_GRAPH.nodes if REL_GRAPH.in_degree(n) == 0]) - # def distance_to_root(G, node): - # n = node - # d = 0 - # for p in G.predecessors(node): - # if n in G_ROOTS: - # return d - # d += 1 - # n = p - # raise Exception(f"can't find root for {node}") + REL_GRAPH = load_relationship_graph() diff --git a/testing_get_missing_in_between_nodes.py b/testing_get_missing_in_between_nodes.py index b58e60ef..b8f75866 100644 --- a/testing_get_missing_in_between_nodes.py +++ b/testing_get_missing_in_between_nodes.py @@ -1,10 +1,39 @@ +"""Testing get_missing_in_between_nodes() + +todo: @Siggie: (1) Should this be a proper Python test file?, (2) I moved test_get_missing_in_between_nodes() here + because the Python debugger was making that file run this as a test instead of doing what I wanted. But IDK if this + breaks anything for you. +""" import networkx as nx import builtins + +from networkx import DiGraph + builtins.DONT_LOAD_GRAPH = True -from backend.routes.graph import get_missing_in_between_nodes, test_get_missing_in_between_nodes +# from backend.routes.graph import get_missing_in_between_nodes, test_get_missing_in_between_nodes +from backend.routes.graph import get_missing_in_between_nodes print_stack = lambda s: '; '.join([f"""{n}{'<--' if p else ''}{','.join(p)}""" for n,p in reversed(s)]) + +def test_get_missing_in_between_nodes( + whole_graph_edges=None, non_subgraph_nodes=None, expected_missing_in_between_nodes=None, subgraph_nodes=None, + fail=True, verbose=False +): + # add code to load whole REL_GRAPH + # noinspection PyPep8Naming + G = DiGraph(whole_graph_edges) + subgraph_nodes = subgraph_nodes or set(G.nodes) - set(non_subgraph_nodes) + missing_in_between_nodes = get_missing_in_between_nodes(G, subgraph_nodes, verbose=verbose) + if fail: + assert missing_in_between_nodes == set(expected_missing_in_between_nodes) + else: + if missing_in_between_nodes == set(expected_missing_in_between_nodes): + print(f"passed with {missing_in_between_nodes}") + else: + print(f"expected {expected_missing_in_between_nodes}, got {missing_in_between_nodes}") + + def get_missing(edges, subgraph_nodes, verbose=False): G = nx.DiGraph(edges) n = get_missing_in_between_nodes(G, subgraph_nodes, verbose=verbose)