diff --git a/src/prefect/server/utilities/messaging/memory.py b/src/prefect/server/utilities/messaging/memory.py index 172dcd5738be0..bcb87aee22855 100644 --- a/src/prefect/server/utilities/messaging/memory.py +++ b/src/prefect/server/utilities/messaging/memory.py @@ -323,30 +323,22 @@ def __init__( topic: str, subscription: Optional[Subscription] = None, max_queue_depth: int = 0, - concurrency: int = 4, ): self.topic: Topic = Topic.by_name(topic) if not subscription: subscription = self.topic.subscribe(max_queue_depth=max_queue_depth) assert subscription.topic is self.topic self.subscription = subscription - self.concurrency = concurrency async def run(self, handler: MessageHandler) -> None: - async with anyio.create_task_group() as tg: - for _ in range(self.concurrency): - tg.start_soon(self._consume_loop, handler) - - async def _consume_loop(self, handler: MessageHandler) -> None: while True: message = await self.subscription.get() try: await handler(message) - await update_metric(self.topic.name, "consumed") except StopConsumer as e: if not e.ack: await self.subscription.retry(message) - raise # ends task group + return except Exception: await self.subscription.retry(message) diff --git a/tests/test_settings.py b/tests/test_settings.py index deb2076a51f2b..976e5d8fa957f 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -184,6 +184,10 @@ "test_value": True, "legacy": True, }, + "PREFECT_API_SERVICES_TASK_RUN_RECORDER_MAX_QUEUE_DEPTH": { + "test_value": 10, + "legacy": True, + }, "PREFECT_API_SERVICES_TRIGGERS_ENABLED": {"test_value": True, "legacy": True}, "PREFECT_API_SSL_CERT_FILE": {"test_value": "/path/to/cert"}, "PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH": {"test_value": 10, "legacy": True},