Skip to content

Commit

Permalink
denoise debug logs and take suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Jan 29, 2025
1 parent 0048f5a commit 793d5ef
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 37 deletions.
21 changes: 1 addition & 20 deletions src/prefect/server/services/task_run_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,14 @@
MessageHandler,
create_consumer,
)
from prefect.server.utilities.messaging.memory import (
METRICS,
_metrics_lock, # type: ignore
)
from prefect.server.utilities.messaging.memory import log_metrics_periodically

if TYPE_CHECKING:
import logging

logger: "logging.Logger" = get_logger(__name__)


async def log_metrics_periodically(interval: float = 2.0) -> None:
while True:
await asyncio.sleep(interval)
async with _metrics_lock:
for topic, data in METRICS.items():
depth = data["published"] - data["consumed"]
logger.debug(
"Topic=%r | published=%d consumed=%d retried=%d depth=%d",
topic,
data["published"],
data["consumed"],
data["retried"],
depth,
)


def causal_ordering() -> CausalOrdering:
return CausalOrdering(
"task-run-recorder",
Expand Down
41 changes: 24 additions & 17 deletions src/prefect/server/utilities/messaging/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import copy
import threading
from collections import defaultdict
from collections.abc import AsyncGenerator, Iterable, Mapping, MutableMapping
from contextlib import asynccontextmanager
Expand Down Expand Up @@ -31,7 +32,7 @@
logger: "logging.Logger" = get_logger(__name__)

# Simple global counters by topic with thread-safe access
_metrics_lock: asyncio.Lock | None = None
_metrics_lock: threading.Lock | None = None
METRICS: dict[str, dict[str, int]] = defaultdict(
lambda: {
"published": 0,
Expand All @@ -41,11 +42,31 @@
)


async def log_metrics_periodically(interval: float = 2.0) -> None:
if _metrics_lock is None:
return
while True:
await asyncio.sleep(interval)
with _metrics_lock:
for topic, data in METRICS.items():
if data["published"] == 0:
continue
depth = data["published"] - data["consumed"]
logger.debug(
"Topic=%r | published=%d consumed=%d retried=%d depth=%d",
topic,
data["published"],
data["consumed"],
data["retried"],
depth,
)


async def update_metric(topic: str, key: str, amount: int = 1) -> None:
global _metrics_lock
if _metrics_lock is None:
_metrics_lock = asyncio.Lock()
async with _metrics_lock:
_metrics_lock = threading.Lock()
with _metrics_lock:
METRICS[topic][key] += amount


Expand Down Expand Up @@ -141,20 +162,6 @@ async def get(self) -> MemoryMessage:
"""
Get a message from the subscription's queue.
"""
if not self._retry.empty():
logger.debug(
"Getting message from RETRY queue on topic=%r queue_size=%d retry_queue_size=%d",
self.topic.name,
self._queue.qsize(),
self._retry.qsize(),
)
return await self._retry.get()
logger.debug(
"Getting message from MAIN queue on topic=%r queue_size=%d retry_queue_size=%d",
self.topic.name,
self._queue.qsize(),
self._retry.qsize(),
)
return await self._queue.get()

async def send_to_dead_letter_queue(self, message: MemoryMessage) -> None:
Expand Down

0 comments on commit 793d5ef

Please sign in to comment.