Skip to content

Commit

Permalink
feat: Implement Redis-backed priority queue system
Browse files Browse the repository at this point in the history
This commit introduces Redis as the backend storage system for the task queue
manager and implements priority-based task processing. Key changes include:

Core Features:
- Add Redis integration for persistent task storage
- Implement priority queue system with sorted sets
- Add task status tracking and state management
- Add worker pool management with configurable concurrency

Technical Details:
- Use Redis ZADD with priority scores for task ordering
- Implement atomic task state transitions
- Add task metadata storage with expiration
- Add circuit breaker pattern for error handling
- Add graceful shutdown with task completion waiting

Testing:
- Add integration tests for priority ordering
- Add tests for concurrent task processing
- Add tests for error handling and recovery
- Add cleanup and shutdown tests

Documentation:
- Update README with Redis configuration
- Add API documentation for new features
- Add usage examples for priority queues
- Add configuration guidelines

Breaking Changes:
- Requires Redis server (>= 5.0)
- Changed task submission API to include priorities
- Changed queue configuration format

This change provides a robust foundation for distributed task processing
with proper priority handling and persistent state management.
  • Loading branch information
justrach committed Nov 1, 2024
1 parent 203d2b3 commit 113bfd1
Show file tree
Hide file tree
Showing 8 changed files with 645 additions and 378 deletions.
Binary file modified kew/.coverage
Binary file not shown.
4 changes: 4 additions & 0 deletions kew/kew/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ class TaskNotFoundError(TaskQueueError):

class QueueNotFoundError(TaskQueueError):
"""Raised when attempting to access a non-existent queue"""
pass

class QueueProcessorError(TaskQueueError):
"""Raised when the queue processor encounters an error"""
pass
421 changes: 244 additions & 177 deletions kew/kew/manager.py

Large diffs are not rendered by default.

44 changes: 39 additions & 5 deletions kew/kew/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import datetime
from enum import Enum
from typing import Optional, TypeVar, Generic
from typing import Optional, TypeVar, Generic, Dict, Any
from dataclasses import dataclass

import json
T = TypeVar('T') # Generic type for task result

class TaskStatus(Enum):
Expand All @@ -15,6 +15,7 @@ class QueuePriority(Enum):
HIGH = 1
MEDIUM = 2
LOW = 3
T = TypeVar('T') # Generic type for task result

@dataclass
class QueueConfig:
Expand All @@ -26,19 +27,30 @@ class QueueConfig:
task_timeout: int = 3600

class TaskInfo(Generic[T]):
def __init__(self, task_id: str, task_type: str, queue_name: str, priority: int):
def __init__(
self,
task_id: str,
task_type: str,
queue_name: str,
priority: int,
status: TaskStatus = TaskStatus.QUEUED # Made optional with default
):
self.task_id = task_id
self.task_type = task_type
self.queue_name = queue_name
self.priority = priority
self.status = TaskStatus.QUEUED
self.status = status
self.queued_time = datetime.now()
self.started_time: Optional[datetime] = None
self.completed_time: Optional[datetime] = None
self.result: Optional[T] = None
self.error: Optional[str] = None
# Store function and arguments for execution
self._func = None
self._args = ()
self._kwargs = {}

def to_dict(self):
def to_dict(self) -> Dict[str, Any]:
return {
"task_id": self.task_id,
"task_type": self.task_type,
Expand All @@ -51,3 +63,25 @@ def to_dict(self):
"result": self.result,
"error": self.error
}

def to_json(self) -> str:
"""Convert task info to JSON string"""
return json.dumps(self.to_dict())

@classmethod
def from_json(cls, json_str: str) -> 'TaskInfo':
"""Create TaskInfo instance from JSON string"""
data = json.loads(json_str)
task = cls(
task_id=data["task_id"],
task_type=data["task_type"],
queue_name=data["queue_name"],
priority=data["priority"],
status=TaskStatus(data["status"])
)
task.queued_time = datetime.fromisoformat(data["queued_time"])
task.started_time = datetime.fromisoformat(data["started_time"]) if data["started_time"] else None
task.completed_time = datetime.fromisoformat(data["completed_time"]) if data["completed_time"] else None
task.result = data["result"]
task.error = data["error"]
return task
20 changes: 12 additions & 8 deletions kew/kew/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
# kew/tests/conftest.py
import pytest
import asyncio
from kew import TaskQueueManager

@pytest.fixture
async def manager():
"""Fixture that provides a TaskQueueManager instance"""
manager = TaskQueueManager()
yield manager
await manager.shutdown()
@pytest.fixture(scope="session")
def event_loop():
"""Create an instance of the default event loop for each test case."""
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()

@pytest.fixture(scope="session")
def anyio_backend():
"""Backend for anyio/pytest-asyncio."""
return 'asyncio'
Loading

0 comments on commit 113bfd1

Please sign in to comment.