From c92fca4ca4735ff72fb3100f345a994427cfc5f6 Mon Sep 17 00:00:00 2001 From: cl117 Date: Wed, 21 Aug 2024 12:32:10 -0600 Subject: [PATCH] add message queue send function --- flask/explorer.py | 83 ++++++++++++++++++++++++++++++++++++++++++++--- flask/search.py | 4 ++- 2 files changed, 81 insertions(+), 6 deletions(-) diff --git a/flask/explorer.py b/flask/explorer.py index c10f8e9..38dd09f 100644 --- a/flask/explorer.py +++ b/flask/explorer.py @@ -7,6 +7,7 @@ import logging import threading import time +import pika from flask_debugtoolbar import DebugToolbarExtension from flask_debugtoolbar_lineprofilerpanel.profile import line_profile @@ -17,7 +18,16 @@ import utils import query -# Configure logging, This will affect all loggers in your application, not just the Werkzeug logger. +def profile_flask_app(): + app.run(debug=True) + +if __name__ == "__main__": + #profiler = profile.Profile() + #profiler.enable() + profile_flask_app() + #profiler.disable() + #profiler.print_stats(sort='time') + log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) @@ -43,7 +53,7 @@ # Initialize the debug toolbar toolbar = DebugToolbarExtension(app) -# Error handler + @app.errorhandler(Exception) def handle_error(e): log.error(f'[ERROR] Returning error {e}\n Traceback:\n{traceback.format_exc()}') @@ -57,11 +67,9 @@ def auto_update_index(): update_interval = int(utils.get_config().get('updateTimeInDays', 0)) * 86400 while True: time.sleep(update_interval) - # Implement your update logic here if utils.get_config().get('autoUpdateIndex', False): update_index() - # Start the background thread for auto-updating the index update_thread = threading.Thread(target=auto_update_index, daemon=True) update_thread.start() @@ -71,6 +79,7 @@ def auto_update_index(): os.remove(log_file) utils.log('SBOLExplorer started :)') + utils.log('Hello') # Check and create index if necessary try: @@ -181,6 +190,7 @@ def SBOLExplore_test_endpoint(): @app.route('/', methods=['GET']) @line_profile def sparql_search_endpoint(): + utils.log('sparql_search_endpoint') try: es = utils.get_es() index_name = utils.get_config().get('elasticsearch_index_name') @@ -188,6 +198,10 @@ def sparql_search_endpoint(): abort(503, 'Elasticsearch is not working or the index does not exist.') sparql_query = request.args.get('query') + utils.log(f'Preparing to send message to queue for query: {sparql_query}') + # Send the search query to the message queue + send_to_queue('search_queue', f'Search query received: {sparql_query}') + if sparql_query: default_graph_uri = request.args.get('default-graph-uri') response = jsonify(search.search( @@ -209,8 +223,59 @@ def sparql_search_endpoint(): log.error(f'Error during SPARQL search: {e}') raise +# Utility function to send messages to RabbitMQ +def send_to_queue(queue_name, message): + connection = None + try: + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600)) + channel = connection.channel() + channel.queue_declare(queue=queue_name, durable=True) + channel.basic_publish(exchange='', routing_key=queue_name, body=message) + utils.log(f'Sent message to queue "{queue_name}": {message}') + print(f'Sent message to queue "{queue_name}": {message}') + except Exception as e: + log.error(f"Failed to send message to queue: {e}") + finally: + if connection and not connection.is_closed: + connection.close() + +# Receiver function to process messages from RabbitMQ +# This function connects to the RabbitMQ queue and starts consuming messages +# using the callback function. +# The callback function processes each message received from the queue. +def start_receiver(queue_name): + def callback(ch, method, properties, body): + utils.log(f"Received message from queue '{queue_name}': {body}") + # Here you can process the message + # For example: process_search_request(body) + # In a typical Flask route, you would return response directly, but since this is within + # a callback function, returning a response doesn't make sense. Instead, you might log + # the result or send it to another queue, depending on your application's architecture. + if queue_name == 'search_queue': + response = jsonify(search.search_es(body)['hits']) + utils.log(f"Response: '{response}'") + + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600)) + channel = connection.channel() + channel.queue_declare(queue=queue_name, durable=True) + + channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) + + utils.log(f"Started consuming from queue '{queue_name}'") + channel.start_consuming() + +# Start the receiver in a separate thread +# By using a daemon thread, the receiver thread will automatically exit +# when the main Flask application exits. +def start_receiver_thread(queue_name): + receiver_thread = threading.Thread(target=start_receiver, args=(queue_name,)) + receiver_thread.daemon = True # Set as a daemon thread so it exits when the main program exits + receiver_thread.start() + @app.route('/search', methods=['GET']) def search_by_string(): + utils.log('search_by_string endpoint called') + print("Hello") try: es = utils.get_es() index_name = utils.get_config().get('elasticsearch_index_name') @@ -218,11 +283,19 @@ def search_by_string(): abort(503, 'Elasticsearch is not working or the index does not exist.') query = request.args.get('query') + utils.log(f'Preparing to send message to queue for query: {query}') + # Send the search query to the message queue + send_to_queue('search_queue', f'Search query received: {query}') + # TODOs: change the return response add to another queue response = jsonify(search.search_es(query)['hits']) + utils.log('Message sent to queue successfully') return response except Exception as e: log.error(f'Error during search by string: {e}') raise - + if __name__ == "__main__": + # Start the receiver before running the Flask app + # This ensures it runs concurrently with the Flask application. + start_receiver_thread('search_queue') app.run(debug=True) diff --git a/flask/search.py b/flask/search.py index 4d316db..8bd2a93 100644 --- a/flask/search.py +++ b/flask/search.py @@ -186,7 +186,9 @@ def parse_sparql_query(sparql_query, is_count_query): # Construct es_query es_query = ' '.join(keywords).strip() - print("Hello es_query: ", es_query) + if not is_count_query: + print("Hello es_query: ", es_query) + # if is not count query, print return es_query, _from, criteria, offset, limit, sequence, flags