Skip to content

Commit

Permalink
updated the readme.mds
Browse files Browse the repository at this point in the history
"
  • Loading branch information
justrach committed Nov 3, 2024
1 parent e25831f commit 32a9135
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 16 deletions.
36 changes: 28 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ A robust, Redis-backed asynchronous task queue manager for Python applications w
- Multiple named queues with independent configurations
- Priority-based task scheduling with millisecond precision
- Redis-backed persistence for reliability
- Configurable worker pools per queue
- Configurable worker pools per queue with strict concurrency control
- Built-in circuit breaker for fault tolerance
- Comprehensive task lifecycle management
- Proper semaphore-based worker slot management
- Race condition protection in concurrent processing
- Automatic task expiration (24-hour default)
- Detailed logging and monitoring
- Graceful shutdown handling
Expand All @@ -36,10 +38,10 @@ async def main():
manager = TaskQueueManager(redis_url="redis://localhost:6379")
await manager.initialize()

# Create a high-priority queue
# Create a high-priority queue with concurrent processing limits
await manager.create_queue(QueueConfig(
name="high_priority",
max_workers=4,
max_workers=4, # Strictly enforced concurrent task limit
max_size=1000,
priority=QueuePriority.HIGH
))
Expand Down Expand Up @@ -73,15 +75,24 @@ if __name__ == "__main__":
```python
from kew import QueueConfig, QueuePriority

# Create a high-priority queue with 4 workers
# Create a high-priority queue with strictly enforced concurrent processing
await manager.create_queue(QueueConfig(
name="critical",
max_workers=4,
max_workers=4, # Maximum number of concurrent tasks
max_size=1000,
priority=QueuePriority.HIGH
))
```

### Worker Pool Management

The queue manager now implements strict concurrency control:
- Uses semaphores to guarantee max_workers limit is respected
- Prevents task starvation through fair scheduling
- Properly releases worker slots after task completion
- Handles error cases with automatic worker slot cleanup
- Protects against race conditions in concurrent processing

### Queue Priority Levels

- `QueuePriority.HIGH` (1)
Expand Down Expand Up @@ -122,19 +133,28 @@ print(f"Error: {status.error}")
```python
status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")
print(f"Active Workers: {status['current_workers']}") # Shows current concurrent tasks
print(f"Circuit Breaker: {status['circuit_breaker_status']}")
```

## Advanced Features

### Concurrent Processing

Each queue now implements robust concurrent task processing:
- Strict enforcement of max_workers limit through semaphores
- Fair scheduling of tasks to prevent starvation
- Automatic cleanup of worker slots on task completion
- Protected against race conditions in high-concurrency scenarios
- Error handling with proper resource cleanup

### Circuit Breaker

Each queue has a built-in circuit breaker that helps prevent cascade failures:

- Opens after 3 consecutive failures (configurable)
- Auto-resets after 60 seconds (configurable)
- Provides circuit state monitoring
- Integrates with concurrent processing controls

### Task Expiration

Expand Down Expand Up @@ -174,7 +194,7 @@ Core Methods:

Configuration Parameters:
- `name: str`
- `max_workers: int`
- `max_workers: int` - Strictly enforced concurrent task limit
- `max_size: int`
- `priority: QueuePriority`

Expand Down
36 changes: 28 additions & 8 deletions kew/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ A robust, Redis-backed asynchronous task queue manager for Python applications w
- Multiple named queues with independent configurations
- Priority-based task scheduling with millisecond precision
- Redis-backed persistence for reliability
- Configurable worker pools per queue
- Configurable worker pools per queue with strict concurrency control
- Built-in circuit breaker for fault tolerance
- Comprehensive task lifecycle management
- Proper semaphore-based worker slot management
- Race condition protection in concurrent processing
- Automatic task expiration (24-hour default)
- Detailed logging and monitoring
- Graceful shutdown handling
Expand All @@ -36,10 +38,10 @@ async def main():
manager = TaskQueueManager(redis_url="redis://localhost:6379")
await manager.initialize()

# Create a high-priority queue
# Create a high-priority queue with concurrent processing limits
await manager.create_queue(QueueConfig(
name="high_priority",
max_workers=4,
max_workers=4, # Strictly enforced concurrent task limit
max_size=1000,
priority=QueuePriority.HIGH
))
Expand Down Expand Up @@ -73,15 +75,24 @@ if __name__ == "__main__":
```python
from kew import QueueConfig, QueuePriority

# Create a high-priority queue with 4 workers
# Create a high-priority queue with strictly enforced concurrent processing
await manager.create_queue(QueueConfig(
name="critical",
max_workers=4,
max_workers=4, # Maximum number of concurrent tasks
max_size=1000,
priority=QueuePriority.HIGH
))
```

### Worker Pool Management

The queue manager now implements strict concurrency control:
- Uses semaphores to guarantee max_workers limit is respected
- Prevents task starvation through fair scheduling
- Properly releases worker slots after task completion
- Handles error cases with automatic worker slot cleanup
- Protects against race conditions in concurrent processing

### Queue Priority Levels

- `QueuePriority.HIGH` (1)
Expand Down Expand Up @@ -122,19 +133,28 @@ print(f"Error: {status.error}")
```python
status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")
print(f"Active Workers: {status['current_workers']}") # Shows current concurrent tasks
print(f"Circuit Breaker: {status['circuit_breaker_status']}")
```

## Advanced Features

### Concurrent Processing

Each queue now implements robust concurrent task processing:
- Strict enforcement of max_workers limit through semaphores
- Fair scheduling of tasks to prevent starvation
- Automatic cleanup of worker slots on task completion
- Protected against race conditions in high-concurrency scenarios
- Error handling with proper resource cleanup

### Circuit Breaker

Each queue has a built-in circuit breaker that helps prevent cascade failures:

- Opens after 3 consecutive failures (configurable)
- Auto-resets after 60 seconds (configurable)
- Provides circuit state monitoring
- Integrates with concurrent processing controls

### Task Expiration

Expand Down Expand Up @@ -174,7 +194,7 @@ Core Methods:

Configuration Parameters:
- `name: str`
- `max_workers: int`
- `max_workers: int` - Strictly enforced concurrent task limit
- `max_size: int`
- `priority: QueuePriority`

Expand Down

0 comments on commit 32a9135

Please sign in to comment.