diff --git a/README.md b/README.md index 8449ac2..e90fb61 100644 --- a/README.md +++ b/README.md @@ -7,9 +7,11 @@ A robust, Redis-backed asynchronous task queue manager for Python applications w - Multiple named queues with independent configurations - Priority-based task scheduling with millisecond precision - Redis-backed persistence for reliability -- Configurable worker pools per queue +- Configurable worker pools per queue with strict concurrency control - Built-in circuit breaker for fault tolerance - Comprehensive task lifecycle management +- Proper semaphore-based worker slot management +- Race condition protection in concurrent processing - Automatic task expiration (24-hour default) - Detailed logging and monitoring - Graceful shutdown handling @@ -36,10 +38,10 @@ async def main(): manager = TaskQueueManager(redis_url="redis://localhost:6379") await manager.initialize() - # Create a high-priority queue + # Create a high-priority queue with concurrent processing limits await manager.create_queue(QueueConfig( name="high_priority", - max_workers=4, + max_workers=4, # Strictly enforced concurrent task limit max_size=1000, priority=QueuePriority.HIGH )) @@ -73,15 +75,24 @@ if __name__ == "__main__": ```python from kew import QueueConfig, QueuePriority -# Create a high-priority queue with 4 workers +# Create a high-priority queue with strictly enforced concurrent processing await manager.create_queue(QueueConfig( name="critical", - max_workers=4, + max_workers=4, # Maximum number of concurrent tasks max_size=1000, priority=QueuePriority.HIGH )) ``` +### Worker Pool Management + +The queue manager now implements strict concurrency control: +- Uses semaphores to guarantee max_workers limit is respected +- Prevents task starvation through fair scheduling +- Properly releases worker slots after task completion +- Handles error cases with automatic worker slot cleanup +- Protects against race conditions in concurrent processing + ### Queue Priority Levels - `QueuePriority.HIGH` (1) @@ -122,19 +133,28 @@ print(f"Error: {status.error}") ```python status = await manager.get_queue_status("critical") print(f"Queue Size: {status['queued_tasks']}") -print(f"Active Workers: {status['current_workers']}") +print(f"Active Workers: {status['current_workers']}") # Shows current concurrent tasks print(f"Circuit Breaker: {status['circuit_breaker_status']}") ``` ## Advanced Features +### Concurrent Processing + +Each queue now implements robust concurrent task processing: +- Strict enforcement of max_workers limit through semaphores +- Fair scheduling of tasks to prevent starvation +- Automatic cleanup of worker slots on task completion +- Protected against race conditions in high-concurrency scenarios +- Error handling with proper resource cleanup + ### Circuit Breaker Each queue has a built-in circuit breaker that helps prevent cascade failures: - - Opens after 3 consecutive failures (configurable) - Auto-resets after 60 seconds (configurable) - Provides circuit state monitoring +- Integrates with concurrent processing controls ### Task Expiration @@ -174,7 +194,7 @@ Core Methods: Configuration Parameters: - `name: str` -- `max_workers: int` +- `max_workers: int` - Strictly enforced concurrent task limit - `max_size: int` - `priority: QueuePriority` diff --git a/kew/README.md b/kew/README.md index 8449ac2..e90fb61 100644 --- a/kew/README.md +++ b/kew/README.md @@ -7,9 +7,11 @@ A robust, Redis-backed asynchronous task queue manager for Python applications w - Multiple named queues with independent configurations - Priority-based task scheduling with millisecond precision - Redis-backed persistence for reliability -- Configurable worker pools per queue +- Configurable worker pools per queue with strict concurrency control - Built-in circuit breaker for fault tolerance - Comprehensive task lifecycle management +- Proper semaphore-based worker slot management +- Race condition protection in concurrent processing - Automatic task expiration (24-hour default) - Detailed logging and monitoring - Graceful shutdown handling @@ -36,10 +38,10 @@ async def main(): manager = TaskQueueManager(redis_url="redis://localhost:6379") await manager.initialize() - # Create a high-priority queue + # Create a high-priority queue with concurrent processing limits await manager.create_queue(QueueConfig( name="high_priority", - max_workers=4, + max_workers=4, # Strictly enforced concurrent task limit max_size=1000, priority=QueuePriority.HIGH )) @@ -73,15 +75,24 @@ if __name__ == "__main__": ```python from kew import QueueConfig, QueuePriority -# Create a high-priority queue with 4 workers +# Create a high-priority queue with strictly enforced concurrent processing await manager.create_queue(QueueConfig( name="critical", - max_workers=4, + max_workers=4, # Maximum number of concurrent tasks max_size=1000, priority=QueuePriority.HIGH )) ``` +### Worker Pool Management + +The queue manager now implements strict concurrency control: +- Uses semaphores to guarantee max_workers limit is respected +- Prevents task starvation through fair scheduling +- Properly releases worker slots after task completion +- Handles error cases with automatic worker slot cleanup +- Protects against race conditions in concurrent processing + ### Queue Priority Levels - `QueuePriority.HIGH` (1) @@ -122,19 +133,28 @@ print(f"Error: {status.error}") ```python status = await manager.get_queue_status("critical") print(f"Queue Size: {status['queued_tasks']}") -print(f"Active Workers: {status['current_workers']}") +print(f"Active Workers: {status['current_workers']}") # Shows current concurrent tasks print(f"Circuit Breaker: {status['circuit_breaker_status']}") ``` ## Advanced Features +### Concurrent Processing + +Each queue now implements robust concurrent task processing: +- Strict enforcement of max_workers limit through semaphores +- Fair scheduling of tasks to prevent starvation +- Automatic cleanup of worker slots on task completion +- Protected against race conditions in high-concurrency scenarios +- Error handling with proper resource cleanup + ### Circuit Breaker Each queue has a built-in circuit breaker that helps prevent cascade failures: - - Opens after 3 consecutive failures (configurable) - Auto-resets after 60 seconds (configurable) - Provides circuit state monitoring +- Integrates with concurrent processing controls ### Task Expiration @@ -174,7 +194,7 @@ Core Methods: Configuration Parameters: - `name: str` -- `max_workers: int` +- `max_workers: int` - Strictly enforced concurrent task limit - `max_size: int` - `priority: QueuePriority`