Skip to content

Commit

Permalink
updated readme.md
Browse files Browse the repository at this point in the history
  • Loading branch information
justrach committed Nov 1, 2024
1 parent 7ec98b3 commit 2cd7348
Showing 1 changed file with 76 additions and 101 deletions.
177 changes: 76 additions & 101 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# Kew Task Queue Manager

A flexible and robust asynchronous task queue manager for Python applications with support for multiple priority queues.
A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.

## Features

- Multiple named queues with independent configurations
- Priority-based task processing
- Asynchronous task execution
- Priority-based task scheduling with millisecond precision
- Redis-backed persistence for reliability
- Configurable worker pools per queue
- Task status tracking and monitoring
- Automatic cleanup of completed tasks
- Built-in circuit breaker for fault tolerance
- Comprehensive task lifecycle management
- Automatic task expiration (24-hour default)
- Detailed logging and monitoring
- Graceful shutdown handling
- Thread-safe operations
- Comprehensive logging

## Installation

Expand All @@ -30,24 +32,20 @@ async def example_task(x: int):
return x * 2

async def main():
# Initialize the task queue manager
manager = TaskQueueManager()
# Initialize the task queue manager with Redis connection
manager = TaskQueueManager(redis_url="redis://localhost:6379")
await manager.initialize()

# Create queues with different priorities
manager.create_queue(QueueConfig(
# Create a high-priority queue
await manager.create_queue(QueueConfig(
name="high_priority",
max_workers=4,
max_size=1000,
priority=QueuePriority.HIGH
))

manager.create_queue(QueueConfig(
name="background",
max_workers=1,
priority=QueuePriority.LOW
))

# Submit tasks to different queues
critical_task = await manager.submit_task(
# Submit a task
task_info = await manager.submit_task(
task_id="task1",
queue_name="high_priority",
task_type="multiplication",
Expand All @@ -56,78 +54,41 @@ async def main():
x=5
)

background_task = await manager.submit_task(
task_id="task2",
queue_name="background",
task_type="multiplication",
task_func=example_task,
priority=QueuePriority.LOW,
x=10
)

# Wait for results
# Check task status
await asyncio.sleep(2)
high_status = manager.get_task_status("task1")
low_status = manager.get_task_status("task2")
print(f"High Priority Result: {high_status.result}")
print(f"Background Result: {low_status.result}")
status = await manager.get_task_status("task1")
print(f"Task Result: {status.result}")

# Cleanup
# Graceful shutdown
await manager.shutdown()

if __name__ == "__main__":
asyncio.run(main())
```

## Queue Management
## Queue Configuration

### Creating Queues

```python
from kew import QueueConfig, QueuePriority

# Create a high-priority queue with 4 workers
manager.create_queue(QueueConfig(
await manager.create_queue(QueueConfig(
name="critical",
max_workers=4,
priority=QueuePriority.HIGH,
max_size=1000,
task_timeout=3600
))

# Create a background queue with 1 worker
manager.create_queue(QueueConfig(
name="background",
max_workers=1,
priority=QueuePriority.LOW
priority=QueuePriority.HIGH
))
```

### Queue Priorities
### Queue Priority Levels

- `QueuePriority.HIGH` (1): Critical tasks
- `QueuePriority.MEDIUM` (2): Standard tasks
- `QueuePriority.LOW` (3): Background tasks
- `QueuePriority.HIGH` (1)
- `QueuePriority.MEDIUM` (2)
- `QueuePriority.LOW` (3)

### Queue Monitoring

```python
# Get queue status
status = manager.get_queue_status("critical")
print(f"Active Tasks: {status['active_tasks']}")
print(f"Queued Tasks: {status['queued_tasks']}")
print(f"Completed Tasks: {status['completed_tasks']}")
```

### Queue Operations

```python
# Wait for specific queue to complete
await manager.wait_for_queue("critical")

# Clean up old tasks in a queue
manager.cleanup_old_tasks(max_age_hours=24, queue_name="background")
```
Tasks within the same priority level are processed in FIFO order with millisecond precision.

## Task Management

Expand All @@ -145,63 +106,77 @@ task_info = await manager.submit_task(
)
```

### Task Status Monitoring
### Monitoring Task Status

```python
status = manager.get_task_status("unique_id")
print(f"Status: {status.status}") # TaskStatus.QUEUED, PROCESSING, COMPLETED, FAILED
status = await manager.get_task_status("unique_id")
print(f"Status: {status.status}") # QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")
```

### Waiting for Tasks
### Queue Status Monitoring

```python
# Wait for specific task
await manager.wait_for_task("task1", timeout=30)
status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")
print(f"Circuit Breaker: {status['circuit_breaker_status']}")
```

## Advanced Features

### Circuit Breaker

Each queue has a built-in circuit breaker that helps prevent cascade failures:

- Opens after 3 consecutive failures (configurable)
- Auto-resets after 60 seconds (configurable)
- Provides circuit state monitoring

# Wait for all tasks in a queue
await manager.wait_for_queue("critical", timeout=60)
### Task Expiration

Tasks automatically expire after 24 hours (configurable) to prevent resource leaks.

### Redis Configuration

```python
manager = TaskQueueManager(
redis_url="redis://username:password@hostname:6379/0",
cleanup_on_start=True # Optional: cleans up existing tasks on startup
)
```

## Error Handling

The system handles various error scenarios:

- `TaskAlreadyExistsError`: Raised when submitting a task with a duplicate ID
- `TaskNotFoundError`: Raised when querying a non-existent task
- `QueueNotFoundError`: Raised when accessing an undefined queue
- `QueueProcessorError`: Raised for queue processing failures

## API Reference

### TaskQueueManager

- `__init__()`
- `create_queue(config: QueueConfig)`
Core Methods:
- `async initialize()`
- `async create_queue(config: QueueConfig)`
- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)`
- `get_task_status(task_id)`
- `get_queue_status(queue_name)`
- `async wait_for_task(task_id, timeout=None)`
- `async wait_for_queue(queue_name, timeout=None)`
- `cleanup_old_tasks(max_age_hours=24, queue_name=None)`
- `async shutdown(wait=True)`
- `async get_task_status(task_id)`
- `async get_queue_status(queue_name)`
- `async shutdown(wait=True, timeout=5.0)`

### QueueConfig

Configuration Parameters:
- `name: str`
- `max_workers: int`
- `priority: QueuePriority = QueuePriority.MEDIUM`
- `max_size: int = 1000`
- `task_timeout: int = 3600`

### TaskStatus

Enum with states:
- `QUEUED`
- `PROCESSING`
- `COMPLETED`
- `FAILED`

### QueuePriority

Enum with levels:
- `HIGH` (1)
- `MEDIUM` (2)
- `LOW` (3)
- `max_size: int`
- `priority: QueuePriority`

## Contributing

Expand Down

0 comments on commit 2cd7348

Please sign in to comment.