Skip to content

Commit

Permalink
add message queue send function
Browse files Browse the repository at this point in the history
  • Loading branch information
cl117 committed Aug 21, 2024
1 parent db907e6 commit c92fca4
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 6 deletions.
83 changes: 78 additions & 5 deletions flask/explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import threading
import time
import pika
from flask_debugtoolbar import DebugToolbarExtension
from flask_debugtoolbar_lineprofilerpanel.profile import line_profile

Expand All @@ -17,7 +18,16 @@
import utils
import query

# Configure logging, This will affect all loggers in your application, not just the Werkzeug logger.
def profile_flask_app():
app.run(debug=True)

if __name__ == "__main__":
#profiler = profile.Profile()
#profiler.enable()
profile_flask_app()
#profiler.disable()
#profiler.print_stats(sort='time')

log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)

Expand All @@ -43,7 +53,7 @@
# Initialize the debug toolbar
toolbar = DebugToolbarExtension(app)

# Error handler

@app.errorhandler(Exception)
def handle_error(e):
log.error(f'[ERROR] Returning error {e}\n Traceback:\n{traceback.format_exc()}')
Expand All @@ -57,11 +67,9 @@ def auto_update_index():
update_interval = int(utils.get_config().get('updateTimeInDays', 0)) * 86400
while True:
time.sleep(update_interval)
# Implement your update logic here
if utils.get_config().get('autoUpdateIndex', False):
update_index()

# Start the background thread for auto-updating the index
update_thread = threading.Thread(target=auto_update_index, daemon=True)
update_thread.start()

Expand All @@ -71,6 +79,7 @@ def auto_update_index():
os.remove(log_file)

utils.log('SBOLExplorer started :)')
utils.log('Hello')

# Check and create index if necessary
try:
Expand Down Expand Up @@ -181,13 +190,18 @@ def SBOLExplore_test_endpoint():
@app.route('/', methods=['GET'])
@line_profile
def sparql_search_endpoint():
utils.log('sparql_search_endpoint')
try:
es = utils.get_es()
index_name = utils.get_config().get('elasticsearch_index_name')
if not es.indices.exists(index=index_name) or es.cat.indices(format='json')[0]['health'] == 'red':
abort(503, 'Elasticsearch is not working or the index does not exist.')

sparql_query = request.args.get('query')
utils.log(f'Preparing to send message to queue for query: {sparql_query}')
# Send the search query to the message queue
send_to_queue('search_queue', f'Search query received: {sparql_query}')

if sparql_query:
default_graph_uri = request.args.get('default-graph-uri')
response = jsonify(search.search(
Expand All @@ -209,20 +223,79 @@ def sparql_search_endpoint():
log.error(f'Error during SPARQL search: {e}')
raise

# Utility function to send messages to RabbitMQ
def send_to_queue(queue_name, message):
connection = None
try:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(exchange='', routing_key=queue_name, body=message)
utils.log(f'Sent message to queue "{queue_name}": {message}')
print(f'Sent message to queue "{queue_name}": {message}')
except Exception as e:
log.error(f"Failed to send message to queue: {e}")
finally:
if connection and not connection.is_closed:
connection.close()

# Receiver function to process messages from RabbitMQ
# This function connects to the RabbitMQ queue and starts consuming messages
# using the callback function.
# The callback function processes each message received from the queue.
def start_receiver(queue_name):
def callback(ch, method, properties, body):
utils.log(f"Received message from queue '{queue_name}': {body}")
# Here you can process the message
# For example: process_search_request(body)
# In a typical Flask route, you would return response directly, but since this is within
# a callback function, returning a response doesn't make sense. Instead, you might log
# the result or send it to another queue, depending on your application's architecture.
if queue_name == 'search_queue':
response = jsonify(search.search_es(body)['hits'])
utils.log(f"Response: '{response}'")

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', heartbeat=600))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

utils.log(f"Started consuming from queue '{queue_name}'")
channel.start_consuming()

# Start the receiver in a separate thread
# By using a daemon thread, the receiver thread will automatically exit
# when the main Flask application exits.
def start_receiver_thread(queue_name):
receiver_thread = threading.Thread(target=start_receiver, args=(queue_name,))
receiver_thread.daemon = True # Set as a daemon thread so it exits when the main program exits
receiver_thread.start()

@app.route('/search', methods=['GET'])
def search_by_string():
utils.log('search_by_string endpoint called')
print("Hello")
try:
es = utils.get_es()
index_name = utils.get_config().get('elasticsearch_index_name')
if not es.indices.exists(index=index_name) or es.cat.indices(format='json')[0]['health'] == 'red':
abort(503, 'Elasticsearch is not working or the index does not exist.')

query = request.args.get('query')
utils.log(f'Preparing to send message to queue for query: {query}')
# Send the search query to the message queue
send_to_queue('search_queue', f'Search query received: {query}')
# TODOs: change the return response add to another queue
response = jsonify(search.search_es(query)['hits'])
utils.log('Message sent to queue successfully')
return response
except Exception as e:
log.error(f'Error during search by string: {e}')
raise

if __name__ == "__main__":
# Start the receiver before running the Flask app
# This ensures it runs concurrently with the Flask application.
start_receiver_thread('search_queue')
app.run(debug=True)
4 changes: 3 additions & 1 deletion flask/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ def parse_sparql_query(sparql_query, is_count_query):

# Construct es_query
es_query = ' '.join(keywords).strip()
print("Hello es_query: ", es_query)
if not is_count_query:
print("Hello es_query: ", es_query)
# if is not count query, print

return es_query, _from, criteria, offset, limit, sequence, flags

Expand Down

0 comments on commit c92fca4

Please sign in to comment.