Skip to content

Commit

Permalink
undo random rename
Browse files Browse the repository at this point in the history
oops
  • Loading branch information
zzstoatzz committed Jan 29, 2025
1 parent a7aa0d7 commit 719f7ec
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/prefect/server/utilities/messaging/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ async def get(self) -> MemoryMessage:
"""
Get a message from the subscription's queue.
"""
if not self._retry.empty():
return await self._retry.get()
return await self._queue.get()

async def send_to_dead_letter_queue(self, message: MemoryMessage) -> None:
Expand Down Expand Up @@ -349,16 +351,16 @@ async def run(self, handler: MessageHandler) -> None:

async def _consume_loop(self, handler: MessageHandler) -> None:
while True:
msg = await self.subscription.get()
message = await self.subscription.get()
try:
await handler(msg)
await handler(message)
await update_metric(self.topic.name, "consumed")
except StopConsumer as e:
if not e.ack:
await self.subscription.retry(msg)
await self.subscription.retry(message)
raise # Propagate to task group
except Exception:
await self.subscription.retry(msg)
await self.subscription.retry(message)


@asynccontextmanager
Expand Down

0 comments on commit 719f7ec

Please sign in to comment.