Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priorities to tasks published using RabbitMQ #112

Merged
merged 14 commits into from
Dec 20, 2023
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ WORKDIR /opt/servicelayer
RUN pip3 install -q --no-cache-dir -e /opt/servicelayer[dev]
RUN pip3 install -r requirements.txt

CMD /bin/bash
CMD /bin/bash
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ install:
pip install -q twine coverage nose moto boto3

dev:
python3 -m pip install --upgrade pip
python3 -m pip install --upgrade pip setuptools
python3 -m pip install -q -r requirements.txt
python3 -m pip install -q -r requirements-dev.txt

Expand Down
1 change: 1 addition & 0 deletions servicelayer/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT = env.to_int(
"RABBITMQ_BLOCKED_CONNECTION_TIMEOUT", 300
)
RABBITMQ_MAX_PRIORITY = 10
QUEUE_ALEPH = "aleph_queue"
QUEUE_INGEST = "ingest_queue"
QUEUE_INDEX = "index_queue"
Expand Down
42 changes: 36 additions & 6 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Task:
operation: str
context: dict
payload: dict
priority: int
collection_id: Optional[str] = None

@property
Expand Down Expand Up @@ -211,6 +212,7 @@ def get_task(body, delivery_tag) -> Task:
operation=body["operation"],
context=body["context"] or {},
payload=body["payload"] or {},
priority=body["priority"] or 0,
)


Expand Down Expand Up @@ -370,8 +372,8 @@ def ack_message(self, task, channel):
skip_ack = task.context.get("skip_ack")
if skip_ack:
log.info(
f"Skipping acknowledging message {task.delivery_tag}"
"for task_id {task.task_id}"
f"""Skipping acknowledging message
{task.delivery_tag} for task_id {task.task_id}"""
)
else:
log.info(
Expand All @@ -395,6 +397,11 @@ def run(self):
def process():
return self.process(blocking=True)

if not self.num_threads:
# TODO - seems like we need at least one thread
# consuming and processing require separate threads
self.num_threads = 1

threads = []
for _ in range(self.num_threads):
thread = threading.Thread(target=process)
Expand All @@ -409,7 +416,11 @@ def process():
channel.basic_qos(prefetch_count=self.prefetch_count)
on_message_callback = functools.partial(self.on_message, args=(connection,))
for queue in self.queues:
channel.queue_declare(queue=queue, durable=True)
channel.queue_declare(
queue=queue,
durable=True,
arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY},
)
channel.basic_consume(queue=queue, on_message_callback=on_message_callback)
channel.start_consuming()

Expand All @@ -430,15 +441,34 @@ def get_rabbitmq_connection():
)
)
local.connection = connection

if local.connection.is_open:
channel = local.connection.channel()
channel.queue_declare(queue=settings.QUEUE_ALEPH, durable=True)
channel.queue_declare(queue=settings.QUEUE_INGEST, durable=True)
channel.queue_declare(queue=settings.QUEUE_INDEX, durable=True)

channel.queue_declare(
queue=settings.QUEUE_ALEPH,
durable=True,
arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY},
)

channel.queue_declare(
queue=settings.QUEUE_INGEST,
durable=True,
arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY},
)

channel.queue_declare(
queue=settings.QUEUE_INDEX,
durable=True,
arguments={"x-max-priority": settings.RABBITMQ_MAX_PRIORITY},
)

channel.close()
return local.connection

except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPError):
log.exception("RabbitMQ error")
local.connection = None

backoff(failures=attempt)
raise RuntimeError("Could not connect to RabbitMQ")
6 changes: 6 additions & 0 deletions tests/test_taskqueue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from unittest import TestCase
from unittest.mock import patch
import json
from random import randrange

import pika

from servicelayer import settings
from servicelayer.cache import get_fakeredis
Expand Down Expand Up @@ -29,18 +31,21 @@ def test_task_queue(self):
conn = get_fakeredis()
collection_id = 2
task_id = "test-task"
priority = randrange(1, settings.RABBITMQ_MAX_PRIORITY + 1)
body = {
"collection_id": 2,
"job_id": "test-job",
"task_id": "test-task",
"operation": "test-op",
"context": {},
"payload": {},
"priority": priority,
}
connection = get_rabbitmq_connection()
channel = connection.channel()
channel.queue_purge(settings.QUEUE_INGEST)
channel.basic_publish(
properties=pika.BasicProperties(priority=priority),
exchange="",
routing_key=settings.QUEUE_INGEST,
body=json.dumps(body),
Expand Down Expand Up @@ -71,6 +76,7 @@ def test_task_queue(self):
channel = connection.channel()
channel.queue_purge(settings.QUEUE_INGEST)
channel.basic_publish(
properties=pika.BasicProperties(priority=priority),
exchange="",
routing_key=settings.QUEUE_INGEST,
body=json.dumps(body),
Expand Down