Skip to content

Commit

Permalink
rm setting w undefined behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Jan 29, 2025
1 parent 7f7152d commit 1c3aba2
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 44 deletions.
12 changes: 0 additions & 12 deletions docs/v3/develop/settings-ref.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -1948,18 +1948,6 @@ Whether or not to start the task run recorder service in the server application.
**Supported environment variables**:
`PREFECT_SERVER_SERVICES_TASK_RUN_RECORDER_ENABLED`, `PREFECT_API_SERVICES_TASK_RUN_RECORDER_ENABLED`

### `max_queue_depth`
The maximum number of messages the task run recorder will queue. Defaults to `0`, which means the queue can grow indefinitely.

**Type**: `integer`

**Default**: `0`

**TOML dotted key path**: `server.services.task_run_recorder.max_queue_depth`

**Supported environment variables**:
`PREFECT_SERVER_SERVICES_TASK_RUN_RECORDER_MAX_QUEUE_DEPTH`

---
## ServerServicesTriggersSettings
Settings for controlling the triggers service
Expand Down
9 changes: 0 additions & 9 deletions schemas/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1697,15 +1697,6 @@
],
"title": "Enabled",
"type": "boolean"
},
"max_queue_depth": {
"default": 0,
"description": "The maximum number of messages the task run recorder will queue. Defaults to `0`, which means the queue can grow indefinitely.",
"supported_environment_variables": [
"PREFECT_SERVER_SERVICES_TASK_RUN_RECORDER_MAX_QUEUE_DEPTH"
],
"title": "Max Queue Depth",
"type": "integer"
}
},
"title": "ServerServicesTaskRunRecorderSettings",
Expand Down
6 changes: 1 addition & 5 deletions src/prefect/server/services/task_run_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
METRICS,
_metrics_lock, # type: ignore
)
from prefect.settings import get_current_settings

if TYPE_CHECKING:
import logging
Expand Down Expand Up @@ -257,10 +256,7 @@ def started_event(self, value: asyncio.Event) -> None:

async def start(self) -> None:
assert self.consumer_task is None, "TaskRunRecorder already started"
self.consumer: Consumer = create_consumer(
"events",
max_queue_depth=get_current_settings().server.services.task_run_recorder.max_queue_depth,
)
self.consumer: Consumer = create_consumer("events")

async with consumer() as handler:
self.consumer_task = asyncio.create_task(self.consumer.run(handler))
Expand Down
12 changes: 2 additions & 10 deletions src/prefect/server/utilities/messaging/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def __init__(
topic: "Topic",
max_retries: int = 3,
dead_letter_queue_path: Path | str | None = None,
max_queue_depth: int = 0,
) -> None:
self.topic = topic
self.max_retries = max_retries
Expand All @@ -90,12 +89,7 @@ def __init__(
if dead_letter_queue_path
else get_current_settings().home / "dlq"
)
logger.warning(
f"topic {self.topic.name} using max_queue_depth {max_queue_depth}"
)
self._queue: asyncio.Queue[MemoryMessage] = asyncio.Queue(
maxsize=max_queue_depth
)
self._queue: asyncio.Queue[MemoryMessage] = asyncio.Queue()
self._retry: asyncio.Queue[MemoryMessage] = asyncio.Queue()

async def deliver(self, message: MemoryMessage) -> None:
Expand Down Expand Up @@ -208,7 +202,6 @@ def clear_all(cls) -> None:
cls._topics = {}

def subscribe(self, **subscription_kwargs: Any) -> Subscription:
logger.warning(f"topic {self.name} using consumer kwargs {subscription_kwargs}")
subscription = Subscription(self, **subscription_kwargs)
self._subscriptions.append(subscription)
return subscription
Expand Down Expand Up @@ -326,12 +319,11 @@ def __init__(
self,
topic: str,
subscription: Optional[Subscription] = None,
max_queue_depth: int = 0,
concurrency: int = 2,
):
self.topic: Topic = Topic.by_name(topic)
if not subscription:
subscription = self.topic.subscribe(max_queue_depth=max_queue_depth)
subscription = self.topic.subscribe()
assert subscription.topic is self.topic
self.subscription = subscription
self.concurrency = concurrency
Expand Down
8 changes: 0 additions & 8 deletions src/prefect/settings/models/server/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,6 @@ class ServerServicesTaskRunRecorderSettings(PrefectBaseSettings):
),
)

max_queue_depth: int = Field(
default=0,
description=(
"The maximum number of messages the task run recorder will queue. Defaults to `0`, which "
"means the queue can grow indefinitely."
),
)


class ServerServicesTriggersSettings(PrefectBaseSettings):
"""
Expand Down

0 comments on commit 1c3aba2

Please sign in to comment.