Skip to content

Commit

Permalink
[Reduce code change size] Take Redis generic refactor out of the PR
Browse files Browse the repository at this point in the history
  • Loading branch information
majdyz committed Oct 24, 2024
1 parent 5158b87 commit 5e70973
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 28 deletions.
35 changes: 11 additions & 24 deletions autogpt_platform/backend/backend/data/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Generic, TypeVar

from pydantic import BaseModel

from backend.data import redis
from backend.data.execution import ExecutionResult

M = TypeVar("M", bound=BaseModel)

logger = logging.getLogger(__name__)


Expand All @@ -21,43 +16,35 @@ def default(self, o):
return super().default(o)


class AbstractEventQueue(ABC, Generic[M]):
class AbstractEventQueue(ABC):
@abstractmethod
def put(self, item: M):
def put(self, execution_result: ExecutionResult):
pass

@abstractmethod
def get(self) -> M | None:
def get(self) -> ExecutionResult | None:
pass


class RedisEventQueue(AbstractEventQueue[M]):
Model: type[M]

class RedisEventQueue(AbstractEventQueue):
def __init__(self):
self.queue_name = redis.QUEUE_NAME

@property
def connection(self):
return redis.get_redis()

def put(self, item: M):
message = json.dumps(item.model_dump(), cls=DateTimeEncoder)
logger.info(f"Putting item to Redis queue [{self.queue_name}]: {message}")
def put(self, execution_result: ExecutionResult):
message = json.dumps(execution_result.model_dump(), cls=DateTimeEncoder)
logger.info(f"Putting execution result to Redis {message}")
self.connection.lpush(self.queue_name, message)

def get(self) -> M | None:
def get(self) -> ExecutionResult | None:
message = self.connection.rpop(self.queue_name)
if message is not None and isinstance(message, (str, bytes, bytearray)):
data = json.loads(message)
logger.info(f"Getting item from Redis queue [{self.queue_name}]: {data}")
return self.Model(**data)
logger.info(f"Getting execution result from Redis {data}")
return ExecutionResult(**data)
elif message is not None:
logger.error(
f"Failed to get item from Redis queue [{self.queue_name}]: {message}"
)
logger.error(f"Failed to get execution result from Redis {message}")
return None


class RedisExecutionEventQueue(RedisEventQueue[ExecutionResult]):
Model = ExecutionResult
5 changes: 3 additions & 2 deletions autogpt_platform/backend/backend/executor/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
upsert_execution_output,
)
from backend.data.graph import get_graph, get_node
from backend.data.queue import RedisExecutionEventQueue
from backend.data.queue import RedisEventQueue
from backend.data.user import get_user_metadata, update_user_metadata
from backend.util.service import AppService, expose
from backend.util.settings import Config
Expand All @@ -25,11 +25,12 @@


class DatabaseManager(AppService):

def __init__(self):
super().__init__()
self.use_db = True
self.use_redis = True
self.event_queue = RedisExecutionEventQueue()
self.event_queue = RedisEventQueue()

@classmethod
def get_port(cls) -> int:
Expand Down
4 changes: 2 additions & 2 deletions autogpt_platform/backend/backend/server/ws_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fastapi.middleware.cors import CORSMiddleware

from backend.data import redis
from backend.data.queue import RedisExecutionEventQueue
from backend.data.queue import RedisEventQueue
from backend.data.user import DEFAULT_USER_ID
from backend.server.conn_manager import ConnectionManager
from backend.server.model import ExecutionSubscription, Methods, WsMessage
Expand Down Expand Up @@ -51,7 +51,7 @@ def get_connection_manager():
async def event_broadcaster(manager: ConnectionManager):
try:
redis.connect()
event_queue = RedisExecutionEventQueue()
event_queue = RedisEventQueue()
while True:
event = event_queue.get()
if event:
Expand Down

0 comments on commit 5e70973

Please sign in to comment.