diff --git a/README.md b/README.md index 5a02761..8449ac2 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,19 @@ # Kew Task Queue Manager -A flexible and robust asynchronous task queue manager for Python applications with support for multiple priority queues. +A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns. ## Features - Multiple named queues with independent configurations -- Priority-based task processing -- Asynchronous task execution +- Priority-based task scheduling with millisecond precision +- Redis-backed persistence for reliability - Configurable worker pools per queue -- Task status tracking and monitoring -- Automatic cleanup of completed tasks +- Built-in circuit breaker for fault tolerance +- Comprehensive task lifecycle management +- Automatic task expiration (24-hour default) +- Detailed logging and monitoring +- Graceful shutdown handling - Thread-safe operations -- Comprehensive logging ## Installation @@ -30,24 +32,20 @@ async def example_task(x: int): return x * 2 async def main(): - # Initialize the task queue manager - manager = TaskQueueManager() + # Initialize the task queue manager with Redis connection + manager = TaskQueueManager(redis_url="redis://localhost:6379") + await manager.initialize() - # Create queues with different priorities - manager.create_queue(QueueConfig( + # Create a high-priority queue + await manager.create_queue(QueueConfig( name="high_priority", max_workers=4, + max_size=1000, priority=QueuePriority.HIGH )) - manager.create_queue(QueueConfig( - name="background", - max_workers=1, - priority=QueuePriority.LOW - )) - - # Submit tasks to different queues - critical_task = await manager.submit_task( + # Submit a task + task_info = await manager.submit_task( task_id="task1", queue_name="high_priority", task_type="multiplication", @@ -56,30 +54,19 @@ async def main(): x=5 ) - background_task = await manager.submit_task( - task_id="task2", - queue_name="background", - task_type="multiplication", - task_func=example_task, - priority=QueuePriority.LOW, - x=10 - ) - - # Wait for results + # Check task status await asyncio.sleep(2) - high_status = manager.get_task_status("task1") - low_status = manager.get_task_status("task2") - print(f"High Priority Result: {high_status.result}") - print(f"Background Result: {low_status.result}") + status = await manager.get_task_status("task1") + print(f"Task Result: {status.result}") - # Cleanup + # Graceful shutdown await manager.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -## Queue Management +## Queue Configuration ### Creating Queues @@ -87,47 +74,21 @@ if __name__ == "__main__": from kew import QueueConfig, QueuePriority # Create a high-priority queue with 4 workers -manager.create_queue(QueueConfig( +await manager.create_queue(QueueConfig( name="critical", max_workers=4, - priority=QueuePriority.HIGH, max_size=1000, - task_timeout=3600 -)) - -# Create a background queue with 1 worker -manager.create_queue(QueueConfig( - name="background", - max_workers=1, - priority=QueuePriority.LOW + priority=QueuePriority.HIGH )) ``` -### Queue Priorities +### Queue Priority Levels -- `QueuePriority.HIGH` (1): Critical tasks -- `QueuePriority.MEDIUM` (2): Standard tasks -- `QueuePriority.LOW` (3): Background tasks +- `QueuePriority.HIGH` (1) +- `QueuePriority.MEDIUM` (2) +- `QueuePriority.LOW` (3) -### Queue Monitoring - -```python -# Get queue status -status = manager.get_queue_status("critical") -print(f"Active Tasks: {status['active_tasks']}") -print(f"Queued Tasks: {status['queued_tasks']}") -print(f"Completed Tasks: {status['completed_tasks']}") -``` - -### Queue Operations - -```python -# Wait for specific queue to complete -await manager.wait_for_queue("critical") - -# Clean up old tasks in a queue -manager.cleanup_old_tasks(max_age_hours=24, queue_name="background") -``` +Tasks within the same priority level are processed in FIFO order with millisecond precision. ## Task Management @@ -145,63 +106,77 @@ task_info = await manager.submit_task( ) ``` -### Task Status Monitoring +### Monitoring Task Status ```python -status = manager.get_task_status("unique_id") -print(f"Status: {status.status}") # TaskStatus.QUEUED, PROCESSING, COMPLETED, FAILED +status = await manager.get_task_status("unique_id") +print(f"Status: {status.status}") # QUEUED, PROCESSING, COMPLETED, FAILED print(f"Queue: {status.queue_name}") print(f"Priority: {status.priority}") print(f"Result: {status.result}") print(f"Error: {status.error}") ``` -### Waiting for Tasks +### Queue Status Monitoring ```python -# Wait for specific task -await manager.wait_for_task("task1", timeout=30) +status = await manager.get_queue_status("critical") +print(f"Queue Size: {status['queued_tasks']}") +print(f"Active Workers: {status['current_workers']}") +print(f"Circuit Breaker: {status['circuit_breaker_status']}") +``` + +## Advanced Features + +### Circuit Breaker + +Each queue has a built-in circuit breaker that helps prevent cascade failures: + +- Opens after 3 consecutive failures (configurable) +- Auto-resets after 60 seconds (configurable) +- Provides circuit state monitoring -# Wait for all tasks in a queue -await manager.wait_for_queue("critical", timeout=60) +### Task Expiration + +Tasks automatically expire after 24 hours (configurable) to prevent resource leaks. + +### Redis Configuration + +```python +manager = TaskQueueManager( + redis_url="redis://username:password@hostname:6379/0", + cleanup_on_start=True # Optional: cleans up existing tasks on startup +) ``` +## Error Handling + +The system handles various error scenarios: + +- `TaskAlreadyExistsError`: Raised when submitting a task with a duplicate ID +- `TaskNotFoundError`: Raised when querying a non-existent task +- `QueueNotFoundError`: Raised when accessing an undefined queue +- `QueueProcessorError`: Raised for queue processing failures + ## API Reference ### TaskQueueManager -- `__init__()` -- `create_queue(config: QueueConfig)` +Core Methods: +- `async initialize()` +- `async create_queue(config: QueueConfig)` - `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)` -- `get_task_status(task_id)` -- `get_queue_status(queue_name)` -- `async wait_for_task(task_id, timeout=None)` -- `async wait_for_queue(queue_name, timeout=None)` -- `cleanup_old_tasks(max_age_hours=24, queue_name=None)` -- `async shutdown(wait=True)` +- `async get_task_status(task_id)` +- `async get_queue_status(queue_name)` +- `async shutdown(wait=True, timeout=5.0)` ### QueueConfig +Configuration Parameters: - `name: str` - `max_workers: int` -- `priority: QueuePriority = QueuePriority.MEDIUM` -- `max_size: int = 1000` -- `task_timeout: int = 3600` - -### TaskStatus - -Enum with states: -- `QUEUED` -- `PROCESSING` -- `COMPLETED` -- `FAILED` - -### QueuePriority - -Enum with levels: -- `HIGH` (1) -- `MEDIUM` (2) -- `LOW` (3) +- `max_size: int` +- `priority: QueuePriority` ## Contributing