diff --git a/Dockerfile b/Dockerfile index e79c820..fdfc716 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,4 +17,4 @@ WORKDIR /opt/servicelayer RUN pip3 install -q --no-cache-dir -e /opt/servicelayer[dev] RUN pip3 install -r requirements.txt -CMD /bin/bash \ No newline at end of file +CMD /bin/bash diff --git a/Makefile b/Makefile index 0c0f17d..1f70608 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ install: pip install -q twine coverage nose moto boto3 dev: - python3 -m pip install --upgrade pip + python3 -m pip install --upgrade pip setuptools python3 -m pip install -q -r requirements.txt python3 -m pip install -q -r requirements-dev.txt diff --git a/servicelayer/settings.py b/servicelayer/settings.py index 25cb151..810bac8 100644 --- a/servicelayer/settings.py +++ b/servicelayer/settings.py @@ -41,6 +41,7 @@ RABBITMQ_BLOCKED_CONNECTION_TIMEOUT = env.to_int( "RABBITMQ_BLOCKED_CONNECTION_TIMEOUT", 300 ) +RABBITMQ_MAX_PRIORITY = 10 QUEUE_ALEPH = "aleph_queue" QUEUE_INGEST = "ingest_queue" QUEUE_INDEX = "index_queue" diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index 8c51b7f..393bad4 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -45,6 +45,7 @@ class Task: operation: str context: dict payload: dict + priority: int collection_id: Optional[str] = None @property @@ -211,6 +212,7 @@ def get_task(body, delivery_tag) -> Task: operation=body["operation"], context=body["context"] or {}, payload=body["payload"] or {}, + priority=body["priority"] or 0, ) @@ -370,8 +372,8 @@ def ack_message(self, task, channel): skip_ack = task.context.get("skip_ack") if skip_ack: log.info( - f"Skipping acknowledging message {task.delivery_tag}" - "for task_id {task.task_id}" + f"""Skipping acknowledging message + {task.delivery_tag} for task_id {task.task_id}""" ) else: log.info( @@ -395,6 +397,11 @@ def run(self): def process(): return self.process(blocking=True) + if not self.num_threads: + # TODO - seems like we need at least one thread + # consuming and processing require separate threads + self.num_threads = 1 + threads = [] for _ in range(self.num_threads): thread = threading.Thread(target=process) @@ -409,7 +416,11 @@ def process(): channel.basic_qos(prefetch_count=self.prefetch_count) on_message_callback = functools.partial(self.on_message, args=(connection,)) for queue in self.queues: - channel.queue_declare(queue=queue, durable=True) + channel.queue_declare( + queue=queue, + durable=True, + arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY}, + ) channel.basic_consume(queue=queue, on_message_callback=on_message_callback) channel.start_consuming() @@ -430,15 +441,34 @@ def get_rabbitmq_connection(): ) ) local.connection = connection + if local.connection.is_open: channel = local.connection.channel() - channel.queue_declare(queue=settings.QUEUE_ALEPH, durable=True) - channel.queue_declare(queue=settings.QUEUE_INGEST, durable=True) - channel.queue_declare(queue=settings.QUEUE_INDEX, durable=True) + + channel.queue_declare( + queue=settings.QUEUE_ALEPH, + durable=True, + arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY}, + ) + + channel.queue_declare( + queue=settings.QUEUE_INGEST, + durable=True, + arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY}, + ) + + channel.queue_declare( + queue=settings.QUEUE_INDEX, + durable=True, + arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY}, + ) + channel.close() return local.connection + except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPError): log.exception("RabbitMQ error") local.connection = None + backoff(failures=attempt) raise RuntimeError("Could not connect to RabbitMQ") diff --git a/tests/test_taskqueue.py b/tests/test_taskqueue.py index dfa7e9b..aa45589 100644 --- a/tests/test_taskqueue.py +++ b/tests/test_taskqueue.py @@ -1,7 +1,9 @@ from unittest import TestCase from unittest.mock import patch import json +from random import randrange +import pika from servicelayer import settings from servicelayer.cache import get_fakeredis @@ -29,6 +31,7 @@ def test_task_queue(self): conn = get_fakeredis() collection_id = 2 task_id = "test-task" + priority = randrange(1, settings.RABBITMQ_MAX_PRIORITY + 1) body = { "collection_id": 2, "job_id": "test-job", @@ -36,11 +39,13 @@ def test_task_queue(self): "operation": "test-op", "context": {}, "payload": {}, + "priority": priority, } connection = get_rabbitmq_connection() channel = connection.channel() channel.queue_purge(settings.QUEUE_INGEST) channel.basic_publish( + properties=pika.BasicProperties(priority=priority), exchange="", routing_key=settings.QUEUE_INGEST, body=json.dumps(body), @@ -71,6 +76,7 @@ def test_task_queue(self): channel = connection.channel() channel.queue_purge(settings.QUEUE_INGEST) channel.basic_publish( + properties=pika.BasicProperties(priority=priority), exchange="", routing_key=settings.QUEUE_INGEST, body=json.dumps(body),