From fe13a3dfc17bf6bc02497065e9eb0f78057eb19e Mon Sep 17 00:00:00 2001 From: Rach <54503978+justrach@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:21:18 -0800 Subject: [PATCH] Update README.md --- README.md | 339 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 194 insertions(+), 145 deletions(-) diff --git a/README.md b/README.md index e90fb61..cd71aec 100644 --- a/README.md +++ b/README.md @@ -1,207 +1,256 @@ -# Kew Task Queue Manager - -A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns. - -## Features - -- Multiple named queues with independent configurations -- Priority-based task scheduling with millisecond precision -- Redis-backed persistence for reliability -- Configurable worker pools per queue with strict concurrency control -- Built-in circuit breaker for fault tolerance -- Comprehensive task lifecycle management -- Proper semaphore-based worker slot management -- Race condition protection in concurrent processing -- Automatic task expiration (24-hour default) -- Detailed logging and monitoring -- Graceful shutdown handling -- Thread-safe operations - -## Installation +# Kew: Modern Async Task Queue + +A Redis-backed task queue built for modern async Python applications. Handles background processing with precise concurrency control, priority queues, and circuit breakers - all running in your existing async process. + +## Why Kew? + +Building async applications often means dealing with background tasks. Existing solutions like Celery require separate worker processes and complex configuration. Kew takes a different approach: + +- **Runs in Your Process**: No separate workers to manage - tasks run in your existing async process +- **True Async**: Native async/await support - no sync/async bridges needed +- **Precise Control**: Semaphore-based concurrency ensures exact worker limits +- **Simple Setup**: Just Redis and a few lines of code to get started + +## How It Works + +Kew manages task execution using a combination of Redis for persistence and asyncio for processing: +```mermaid +graph LR + A[Application] -->|Submit Task| B[Task Queue] + B -->|Semaphore Control| C[Worker Pool] + C -->|Execute Task| D[Task Processing] + D -->|Success| E[Complete] + D -->|Error| F[Circuit Breaker] + F -->|Reset| B + style A fill:#f9f,stroke:#333 + style B fill:#bbf,stroke:#333 + style C fill:#bfb,stroke:#333 + style D fill:#fbb,stroke:#333 +``` +Tasks flow through several states with built-in error handling: +```mermaid +stateDiagram-v2 + [*] --> Submitted: Task Created + Submitted --> Queued: Priority Assignment + Queued --> Processing: Worker Available + Processing --> Completed: Success + Processing --> Failed: Error + Failed --> CircuitOpen: Multiple Failures + CircuitOpen --> Queued: Circuit Reset + Completed --> [*] +``` +## Quick Start +1. Install Kew: ```bash pip install kew ``` -## Quick Start - +2. Create a simple task processor: ```python import asyncio -from kew import TaskQueueManager, QueueConfig, QueuePriority +from kew import TaskQueueManager, QueueConfig -async def example_task(x: int): +async def process_order(order_id: str): + # Simulate order processing await asyncio.sleep(1) - return x * 2 + return f"Order {order_id} processed" async def main(): - # Initialize the task queue manager with Redis connection + # Initialize queue manager manager = TaskQueueManager(redis_url="redis://localhost:6379") await manager.initialize() - # Create a high-priority queue with concurrent processing limits + # Create processing queue await manager.create_queue(QueueConfig( - name="high_priority", - max_workers=4, # Strictly enforced concurrent task limit - max_size=1000, - priority=QueuePriority.HIGH + name="orders", + max_workers=4, # Only 4 concurrent tasks + max_size=1000 )) - # Submit a task - task_info = await manager.submit_task( - task_id="task1", - queue_name="high_priority", - task_type="multiplication", - task_func=example_task, - priority=QueuePriority.HIGH, - x=5 - ) - - # Check task status - await asyncio.sleep(2) - status = await manager.get_task_status("task1") - print(f"Task Result: {status.result}") + # Submit some tasks + tasks = [] + for i in range(10): + task = await manager.submit_task( + task_id=f"order-{i}", + queue_name="orders", + task_func=process_order, + order_id=str(i) + ) + tasks.append(task) - # Graceful shutdown - await manager.shutdown() + # Check results + for task in tasks: + status = await manager.get_task_status(task.task_id) + print(f"{task.task_id}: {status.result}") if __name__ == "__main__": asyncio.run(main()) ``` -## Queue Configuration - -### Creating Queues +## Real-World Examples +### Async Web Application ```python -from kew import QueueConfig, QueuePriority +from fastapi import FastAPI +from kew import TaskQueueManager, QueueConfig -# Create a high-priority queue with strictly enforced concurrent processing -await manager.create_queue(QueueConfig( - name="critical", - max_workers=4, # Maximum number of concurrent tasks - max_size=1000, - priority=QueuePriority.HIGH -)) -``` +app = FastAPI() +manager = TaskQueueManager() -### Worker Pool Management +@app.on_event("startup") +async def startup(): + await manager.initialize() + await manager.create_queue(QueueConfig( + name="emails", + max_workers=2 + )) -The queue manager now implements strict concurrency control: -- Uses semaphores to guarantee max_workers limit is respected -- Prevents task starvation through fair scheduling -- Properly releases worker slots after task completion -- Handles error cases with automatic worker slot cleanup -- Protects against race conditions in concurrent processing +@app.post("/signup") +async def signup(email: str): + # Handle signup immediately + user = await create_user(email) + + # Queue welcome email for background processing + await manager.submit_task( + task_id=f"welcome-{user.id}", + queue_name="emails", + task_func=send_welcome_email, + user_id=user.id + ) + return {"status": "success"} +``` -### Queue Priority Levels +### Data Processing Script +```python +async def process_batch(items: list): + manager = TaskQueueManager() + await manager.initialize() + + # Create high and low priority queues + await manager.create_queue(QueueConfig( + name="critical", + max_workers=4, + priority=QueuePriority.HIGH + )) + + await manager.create_queue(QueueConfig( + name="batch", + max_workers=2, + priority=QueuePriority.LOW + )) + + # Process priority items first + for item in filter(is_priority, items): + await manager.submit_task( + task_id=f"item-{item.id}", + queue_name="critical", + task_func=process_item, + item=item + ) + + # Queue remaining items + for item in filter(lambda x: not is_priority(x), items): + await manager.submit_task( + task_id=f"item-{item.id}", + queue_name="batch", + task_func=process_item, + item=item + ) +``` -- `QueuePriority.HIGH` (1) -- `QueuePriority.MEDIUM` (2) -- `QueuePriority.LOW` (3) +## Key Features -Tasks within the same priority level are processed in FIFO order with millisecond precision. +### Concurrency Control +```python +# Strictly enforce 4 concurrent tasks max +await manager.create_queue(QueueConfig( + name="api_calls", + max_workers=4 # Guaranteed not to exceed +)) +``` -## Task Management +### Priority Queues +```python +# High priority queue for urgent tasks +await manager.create_queue(QueueConfig( + name="urgent", + priority=QueuePriority.HIGH +)) -### Submitting Tasks +# Lower priority for batch processing +await manager.create_queue(QueueConfig( + name="batch", + priority=QueuePriority.LOW +)) +``` +### Circuit Breakers ```python -task_info = await manager.submit_task( - task_id="unique_id", - queue_name="critical", - task_type="example", - task_func=my_async_function, - priority=QueuePriority.HIGH, - *args, - **kwargs -) +# Configure circuit breaker for external API calls +await manager.create_queue(QueueConfig( + name="api_calls", + circuit_breaker_max_failures=5, # Open after 5 failures + circuit_breaker_reset_timeout=30 # Reset after 30 seconds +)) ``` -### Monitoring Task Status - +### Task Monitoring ```python -status = await manager.get_task_status("unique_id") -print(f"Status: {status.status}") # QUEUED, PROCESSING, COMPLETED, FAILED -print(f"Queue: {status.queue_name}") -print(f"Priority: {status.priority}") +# Check task status +status = await manager.get_task_status("task-123") +print(f"Status: {status.status}") print(f"Result: {status.result}") print(f"Error: {status.error}") + +# Monitor queue health +queue_status = await manager.get_queue_status("api_calls") +print(f"Active Tasks: {queue_status['current_workers']}") +print(f"Circuit Breaker: {queue_status['circuit_breaker_status']}") ``` -### Queue Status Monitoring +## Configuration +### Redis Settings ```python -status = await manager.get_queue_status("critical") -print(f"Queue Size: {status['queued_tasks']}") -print(f"Active Workers: {status['current_workers']}") # Shows current concurrent tasks -print(f"Circuit Breaker: {status['circuit_breaker_status']}") +manager = TaskQueueManager( + redis_url="redis://username:password@hostname:6379/0", + cleanup_on_start=True # Optional: clean stale tasks +) ``` -## Advanced Features - -### Concurrent Processing - -Each queue now implements robust concurrent task processing: -- Strict enforcement of max_workers limit through semaphores -- Fair scheduling of tasks to prevent starvation -- Automatic cleanup of worker slots on task completion -- Protected against race conditions in high-concurrency scenarios -- Error handling with proper resource cleanup - -### Circuit Breaker - -Each queue has a built-in circuit breaker that helps prevent cascade failures: -- Opens after 3 consecutive failures (configurable) -- Auto-resets after 60 seconds (configurable) -- Provides circuit state monitoring -- Integrates with concurrent processing controls - ### Task Expiration - -Tasks automatically expire after 24 hours (configurable) to prevent resource leaks. - -### Redis Configuration - ```python +# Tasks expire after 24 hours by default +# Configure custom expiration: manager = TaskQueueManager( - redis_url="redis://username:password@hostname:6379/0", - cleanup_on_start=True # Optional: cleans up existing tasks on startup + task_expiry_seconds=3600 # 1 hour ) ``` ## Error Handling -The system handles various error scenarios: - -- `TaskAlreadyExistsError`: Raised when submitting a task with a duplicate ID -- `TaskNotFoundError`: Raised when querying a non-existent task -- `QueueNotFoundError`: Raised when accessing an undefined queue -- `QueueProcessorError`: Raised for queue processing failures - -## API Reference +Kew provides comprehensive error handling: -### TaskQueueManager +- `TaskAlreadyExistsError`: Task ID already in use +- `TaskNotFoundError`: Task doesn't exist +- `QueueNotFoundError`: Queue not configured +- `QueueProcessorError`: Task processing failed -Core Methods: -- `async initialize()` -- `async create_queue(config: QueueConfig)` -- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)` -- `async get_task_status(task_id)` -- `async get_queue_status(queue_name)` -- `async shutdown(wait=True, timeout=5.0)` - -### QueueConfig - -Configuration Parameters: -- `name: str` -- `max_workers: int` - Strictly enforced concurrent task limit -- `max_size: int` -- `priority: QueuePriority` +```python +try: + await manager.submit_task(...) +except TaskAlreadyExistsError: + # Handle duplicate task +except QueueProcessorError as e: + # Handle processing error + print(f"Task failed: {e}") +``` ## Contributing -Contributions are welcome! Please feel free to submit a Pull Request. +We welcome contributions! Please check our [Contributing Guide](CONTRIBUTING.md) for details. ## License -This project is licensed under the MIT License - see the LICENSE file for details. \ No newline at end of file +MIT License - see the [LICENSE](LICENSE) file for details.