From ccad037d53c92c4be4b3b3c88544571f7ac2458d Mon Sep 17 00:00:00 2001 From: cl117 Date: Wed, 28 Aug 2024 16:04:40 -0600 Subject: [PATCH] fix the bug in search.py --- flask/index.py | 218 +++++++++++++++++++++--------------------------- flask/search.py | 98 +++++++++++++++++----- 2 files changed, 171 insertions(+), 145 deletions(-) diff --git a/flask/index.py b/flask/index.py index 0762a72..de3a796 100644 --- a/flask/index.py +++ b/flask/index.py @@ -5,222 +5,198 @@ import json from logger import Logger +# Load config and initialize managers once config_manager = ConfigManager() +config = config_manager.load_config() elasticsearch_manager = ElasticsearchManager(config_manager) logger_ = Logger() def add_pagerank(parts_response, uri2rank): """ - Adds the pagerank score for each part + Adds the pagerank score for each part. Arguments: parts_response {List} -- List containing all parts from the SPARQL query - uri2rank {List} -- List of each part and its calculated pagerank score + uri2rank {Dict} -- Dictionary of each part and its calculated pagerank score """ - for part in parts_response: - subject = part['subject'] - - if subject in uri2rank: - part['pagerank'] = uri2rank[subject] - else: - part['pagerank'] = 1 + part['pagerank'] = uri2rank.get(part['subject'], 1) def add_keywords(parts_response): """ - Adds the displayId to the 'keyword' category - + Adds the displayId to the 'keyword' category. + Arguments: parts_response {List} -- List containing all parts from the SPARQL query """ - for part in parts_response: - keywords = [] - - displayId = part.get('displayId') - if displayId is not None: - keywords.extend(displayId.split('_')) + display_id = part.get('displayId') + if display_id: + part['keywords'] = ' '.join(display_id.split('_')) + else: + part['keywords'] = '' - part['keywords'] = ' '.join(keywords) -def add_roles(parts_response): +def add_roles(parts_response, term_list): """ - Adds the synonyms from the SO-Ontologies list to each part's keyword category - + Adds the synonyms from the SO-Ontologies list to each part's keyword category. + Arguments: parts_response {List} -- List containing all parts from the SPARQL query + term_list {List} -- List of terms from the SO-Ontologies """ - with open('so-simplified.json','r') as so_json: - term_list = json.load(so_json) - - for part in parts_response: - # Split the CSV of roles from sparql - role = part.get('role') + print("parts_response: ", len(parts_response)) + print("term_list: ", len(term_list)) + for part in parts_response: + # Split the CSV of roles from sparql + role = part.get('role') + if role and 'identifiers.org' in role: + keywords_list = [] + so_term = role[-10:].replace(':','_') + + for term in term_list: + if so_term in term['id']: + keywords_list.append(term['lbl']) + synonyms = term.get('synonyms', []) + for synonym in synonyms: + # remove the annoying header from the synonyms + if 'INSDC' in synonym: + synonym = synonym.replace('INSDC_qualifier:', '') + if synonym not in keywords_list: + keywords_list.append(synonym) + + part['keywords'] += ' ' + ' '.join(keywords_list) - if role is not None and 'identifiers.org' in role: - keywords_list = [] - so_term = role[-10:] - so_term = so_term.replace(':','_') - - for term in term_list: - if so_term in term['id']: - keywords_list.append(term['lbl']) - - if 'synonyms' in term and term['synonyms'] is not None: - for synonym in term['synonyms']: - - # remove the annoying header from the synonyms - if 'INSDC' in synonym: - synonym = synonym.replace('INSDC_qualifier:', '') - - if synonym not in keywords_list: - keywords_list.append(synonym) - - for keyword in keywords_list: - part['keywords'] += ' ' + keyword def add_sbol_type(parts_response): for part in parts_response: sbol_type = part.get('sboltype') + if sbol_type and 'http://www.biopax.org/release/biopax-level3.owl#' in sbol_type: + type_ = sbol_type[48:] + if 'region' in type_: + type_ = type_.replace('Region','') + part['keywords'] += ' ' + type_ - if sbol_type is not None and 'http://www.biopax.org/release/biopax-level3.owl#' in sbol_type: - type = sbol_type[48:] - - if 'region' in type: - type = type.replace('Region','') - - part['keywords'] += ' ' + type def create_parts_index(index_name): """ - Creates a new index + Creates a new index. Arguments: index_name {String} -- Name of the new index """ - - if elasticsearch_manager.get_es().indices.exists(index_name): + es = elasticsearch_manager.get_es() + if es.indices.exists(index_name): logger_.log('Index already exists -> deleting', True) - elasticsearch_manager.get_es().indices.delete(index=index_name) + es.indices.delete(index=index_name) body = { 'mappings': { - index_name: { - 'properties': { - 'subject': { - 'type': 'keyword' - }, - 'graph': { - 'type': 'keyword' - } - }, + 'properties': { + 'subject': {'type': 'keyword'}, + 'graph': {'type': 'keyword'} } }, - "settings": { - "number_of_shards": 1 + 'settings': { + 'number_of_shards': 1 } - } - elasticsearch_manager.get_es().indices.create(index=index_name, body=body) + es.indices.create(index=index_name, body=body) logger_.log('Index created', True) def bulk_index_parts(parts_response, index_name): """ - Adds each part as a document to the index - + Adds each part as a document to the index. + Arguments: parts_response {List} -- List containing all parts from the SPARQL query index_name {String} -- Name of the index - - Raises: - Exception -- Indexing fails """ - - actions = [] - for i in range(len(parts_response)): - action = { - '_index': index_name, - '_type': index_name, - '_id': parts_response[i].get('subject'), - '_source': parts_response[i] - } - - actions.append(action) + es = elasticsearch_manager.get_es() + + def actions(): + for part in parts_response: + yield { + '_index': index_name, + '_id': part['subject'], + '_source': part + } logger_.log('Bulk indexing', True) try: - stats = helpers.bulk(elasticsearch_manager.get_es(), actions) + stats = helpers.bulk(es, actions()) logger_.log('Bulk indexing complete', True) - except: - logger_.log('[ERROR] Error_messages: ' + '\n'.join(stats[1]), True) - raise Exception("Bulk indexing failed") + except Exception as e: + logger_.log(f'[ERROR] Error during bulk indexing: {str(e)}' + '\n'.join(stats[1]), True) + raise + def update_index(uri2rank): """ - Main method - Args: - uri2rank: List of pageranks for each URI - - Returns: + Main method to update the index. + Args: + uri2rank: Dictionary of pageranks for each URI """ - index_name = config_manager.load_config()['elasticsearch_index_name'] + index_name = config['elasticsearch_index_name'] logger_.log('------------ Updating index ------------', True) - logger_.log('******** Query for parts ********', True) - parts_response = query.query_parts(indexing = True) + parts_response = query.query_parts(indexing=True) logger_.log('******** Query for parts complete ********', True) logger_.log('******** Adding parts to new index ********', True) add_pagerank(parts_response, uri2rank) add_keywords(parts_response) - add_roles(parts_response) + + # Load the SO-Ontologies list once + with open('so-simplified.json', 'r') as so_json: + term_list = json.load(so_json) + add_roles(parts_response, term_list) + add_sbol_type(parts_response) create_parts_index(index_name) bulk_index_parts(parts_response, index_name) - logger_.log('******** Finished adding ' + str(len(parts_response)) + ' parts to index ********', True) - + logger_.log(f'******** Finished adding {len(parts_response)} parts to index ********', True) logger_.log('------------ Successfully updated index ------------\n', True) def delete_subject(subject): """ - Delete part for incremental indexing - Args: - subject: - - Returns: + Delete part for incremental indexing. + Args: + subject: The subject to delete from the index. """ - index_name = config_manager.load_config()['elasticsearch_index_name'] + index_name = config['elasticsearch_index_name'] + es = elasticsearch_manager.get_es() body = { 'query': { 'bool': { 'must': [ - {'ids': {'values': subject}} + {'ids': {'values': [subject]}} ] } }, 'conflicts': 'proceed' } - elasticsearch_manager.get_es().delete_by_query(index=index_name, doc_type=index_name, body=body) + es.delete_by_query(index=index_name, body=body) def index_part(part): delete_subject(part['subject']) - index_name = config_manager.load_config()['elasticsearch_index_name'] - elasticsearch_manager.get_es().index(index=index_name, doc_type=index_name, id=part['subject'], body=part) + index_name = config['elasticsearch_index_name'] + es = elasticsearch_manager.get_es() + es.index(index=index_name, id=part['subject'], body=part) def refresh_index(subject, uri2rank): delete_subject(subject) - - part_response = query.query_parts('', 'FILTER (?subject = <' + subject + '>)', True) + part_response = query.query_parts('', f'FILTER (?subject = <{subject}>)', True) if len(part_response) == 1: add_pagerank(part_response, uri2rank) @@ -246,18 +222,16 @@ def incremental_remove(subject): def incremental_remove_collection(subject, uri_prefix): - collection_membership_query = ''' + collection_membership_query = f''' SELECT ?s - WHERE { - <''' + subject + '''> sbol2:member ?s . - FILTER(STRSTARTS(str(?s),''' + "'" + uri_prefix + "'" + ''')) - } + WHERE {{ + <{subject}> sbol2:member ?s . + FILTER(STRSTARTS(str(?s), '{uri_prefix}')) + }} ''' members = query.query_sparql(collection_membership_query) delete_subject(subject) for member in members: delete_subject(member['s']) - - diff --git a/flask/search.py b/flask/search.py index d772b8f..e8e7926 100644 --- a/flask/search.py +++ b/flask/search.py @@ -62,13 +62,20 @@ def search_es(es_query: str) -> Dict: } try: return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) - except Exception as e: - logger_.log(f"ES search failed: {e}") + except: + logger_.log("search_es(es_query: str)") raise def empty_search_es(offset: int, limit: int, allowed_graphs: List[str]) -> Dict: """ Empty string search based solely on pagerank. + Arguments: + offset {int} -- Offset for search results + limit {int} -- Size of search + allowed_graphs {List} -- List of allowed graphs to search on + + Returns: + List -- List of search results """ query = {'term': {'graph': allowed_graphs[0]}} if len(allowed_graphs) == 1 else {'terms': {'graph': allowed_graphs}} @@ -88,13 +95,19 @@ def empty_search_es(offset: int, limit: int, allowed_graphs: List[str]) -> Dict: } try: return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) - except Exception as e: - logger_.log(f"ES search failed: {e}") + except: + logger_.log("empty_search_es(offset: int, limit: int, allowed_graphs: List[str])") raise def search_es_allowed_subjects(es_query: str, allowed_subjects: List[str]) -> Dict: """ String query for ES searches limited to allowed parts. + Arguments: + es_query {string} -- String to search for + allowed_subjects {list} - list of allowed subjects from Virtuoso + + Returns: + List -- List of all search results """ body = { 'query': { @@ -116,7 +129,7 @@ def search_es_allowed_subjects(es_query: str, allowed_subjects: List[str]) -> Di 'operator': 'or', 'fuzziness': 'AUTO' }}, - {'ids': {'values': allowed_subjects}} + {'ids': {'values': list(allowed_subjects)}} ] } }, @@ -124,21 +137,26 @@ def search_es_allowed_subjects(es_query: str, allowed_subjects: List[str]) -> Di 'script': { 'source': "_score * Math.log(doc['pagerank'].value + 1)" } - } - } + }, + }, }, 'from': 0, 'size': 10000 } try: return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) - except Exception as e: - logger_.log(f"ES search failed: {e}") + except: + logger_.log("search_es_allowed_subjects(es_query: str, allowed_subjects: List[str])") raise -def search_es_allowed_subjects_empty_string(allowed_subjects: List[str]) -> Dict: +def search_es_allowed_subjects_empty_string(allowed_subjects: List[str]): """ ES search purely limited to allowed parts. + Arguments: + allowed_subjects {list} - list of allowed subjects from Virtuoso + + Returns: + List -- List of all search results """ body = { 'query': { @@ -146,7 +164,7 @@ def search_es_allowed_subjects_empty_string(allowed_subjects: List[str]) -> Dict 'query': { 'bool': { 'must': [ - {'ids': {'values': allowed_subjects}} + {'ids': {'values': list(allowed_subjects)}} ] } }, @@ -154,16 +172,16 @@ def search_es_allowed_subjects_empty_string(allowed_subjects: List[str]) -> Dict 'script': { 'source': "_score * Math.log(doc['pagerank'].value + 1)" } - } - } + }, + }, }, 'from': 0, 'size': 10000 } try: return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) - except Exception as e: - logger_.log(f"ES search failed: {e}") + except: + logger_.log("search_es_allowed_subjects_empty_string") raise def parse_sparql_query(sparql_query, is_count_query): # Find FROM clause @@ -209,13 +227,18 @@ def extract_query(sparql_query): List -- List of information extracted """ return parse_sparql_query(sparql_query, is_count_query(sparql_query)) - - + def extract_allowed_graphs(_from: str, default_graph_uri: str) -> List[str]: """ Extracts the allowed graphs to search over. + Arguments: + _from {string} -- Graph where search originated + default_graph_uri {string} -- The default graph URI pulled from SBH + + Returns: + List -- List of allowed graphs """ - allowed_graphs = [default_graph_uri] if not _from else [graph.strip()[1:-1] for graph in _from.split('FROM') if graph.strip()] + allowed_graphs = [default_graph_uri] if not _from else [graph.strip()[1:-1] for graph in _from.split('FROM') if graph.strip()[1:-1]] if config_manager.load_config()['distributed_search']: allowed_graphs.extend(instance['instanceUrl'] + '/public' for instance in wor_client_.get_wor_instance()) return allowed_graphs @@ -226,6 +249,14 @@ def is_count_query(sparql_query: str) -> bool: def create_response(count: int, bindings: List[Dict], return_count: bool) -> Dict: """ Creates response to be sent back to SBH. + + Arguments: + count {int} -- ? + bindings {Dict} -- The bindings + return_count {int} -- ? + + Returns: + ? -- ? """ if return_count: return { @@ -254,6 +285,24 @@ def create_binding(subject: str, displayId: Optional[str], version: Optional[int percentMatch: float = -1, strandAlignment: str = 'N/A', CIGAR: str = 'N/A') -> Dict: """ Creates bindings to be sent to SBH. + Arguments: + subject {string} -- URI of part + displayId {string} -- DisplayId of part + version {int} -- Version of part + name {string} -- Name of part + description {string} -- Description of part + _type {string} -- SBOL type of part + role {string} -- S.O. role of part + order_by {?} -- ? + + Keyword Arguments: + percentMatch {number} -- Percent match of query part to the target part (default: {-1}) + strandAlignment {str} -- Strand alignment of the query part relatve to the target part (default: {'N/A'}) + CIGAR {str} -- Alignment of query part relative to the target part (default: {'N/A'}) + + Returns: + Dict -- Part and its information + """ binding = {} attributes = { @@ -292,6 +341,10 @@ def create_bindings(es_response, clusters, allowed_graphs, allowed_subjects=None Returns: Dict -- All parts and their corresponding information """ + if es_response is None or 'hits' not in es_response or 'hits' not in es_response['hits']: + logger_.log("[ERROR] Elasticsearch response is None or malformed.") + return [] + bindings = [] cluster_duplicates = set() @@ -314,7 +367,7 @@ def create_bindings(es_response, clusters, allowed_graphs, allowed_subjects=None elif subject in clusters: cluster_duplicates.update(clusters[subject]) - if 'http://sbols.org/v2#Sequence' in _source.get('type', ''): + if _source.get('type') is not None and 'http://sbols.org/v2#Sequence' in _source.get('type'): _score /= 10.0 binding = create_binding( @@ -348,7 +401,8 @@ def create_criteria_bindings(criteria_response, uri2rank, sequence_search=False, Dict -- Binding of parts """ bindings = [] - for part in criteria_response: + parts = (p for p in criteria_response if p.get('role') is None or 'http://wiki.synbiohub.org' in p.get('role')) + for part in parts: subject = part.get('subject') pagerank = uri2rank.get(subject, 1) @@ -570,6 +624,4 @@ def filter_sequence_search_subjects(_from, uris): uris {list} -- List of URI's from sequence search """ from_uris = set(re.findall(r"\<([A-Za-z0-9:\/.]+)\>*", _from)) - return [uri for uri in uris if any(f in uri for f in from_uris)] - - + return [uri for uri in uris if any(f in uri for f in from_uris)] \ No newline at end of file