From 7f318685afe4c1413f7db45714ab6d9921b575ee Mon Sep 17 00:00:00 2001 From: Zamil Majdy <zamil.majdy@agpt.co> Date: Wed, 23 Oct 2024 07:41:59 +0700 Subject: [PATCH] Revert "fix(backend): Fix DatabaseManager usage by calling it on-demand" This reverts commit e5f5005ab846ddf146ace1dc42a670532e9fe851. --- .../store.py | 9 ++---- .../backend/backend/executor/database.py | 6 +--- .../backend/backend/executor/manager.py | 15 ++++------ .../backend/backend/executor/scheduler.py | 10 ++----- .../backend/integrations/creds_manager.py | 7 +++-- .../backend/server/integrations/router.py | 3 +- .../backend/backend/server/rest_api.py | 15 +++++----- .../utils => backend/backend/util}/cache.py | 0 .../backend/backend/util/service.py | 28 ++++++------------- .../backend/test/executor/test_scheduler.py | 6 +++- .../backend/test/util/test_service.py | 8 ++---- 11 files changed, 41 insertions(+), 66 deletions(-) rename autogpt_platform/{autogpt_libs/autogpt_libs/utils => backend/backend/util}/cache.py (100%) diff --git a/autogpt_platform/autogpt_libs/autogpt_libs/supabase_integration_credentials_store/store.py b/autogpt_platform/autogpt_libs/autogpt_libs/supabase_integration_credentials_store/store.py index 8b539c15c58b..f4ce921937e7 100644 --- a/autogpt_platform/autogpt_libs/autogpt_libs/supabase_integration_credentials_store/store.py +++ b/autogpt_platform/autogpt_libs/autogpt_libs/supabase_integration_credentials_store/store.py @@ -6,7 +6,6 @@ from redis import Redis from backend.executor.database import DatabaseManager -from autogpt_libs.utils.cache import thread_cached_property from autogpt_libs.utils.synchronize import RedisKeyedMutex from .types import ( @@ -19,13 +18,9 @@ class SupabaseIntegrationCredentialsStore: - def __init__(self, redis: "Redis"): + def __init__(self, redis: "Redis", db: "DatabaseManager"): + self.db_manager: DatabaseManager = db self.locks = RedisKeyedMutex(redis) - - @thread_cached_property - def db_manager(self) -> "DatabaseManager": - from backend.executor.database import DatabaseManager - return DatabaseManager.client def add_creds(self, user_id: str, credentials: Credentials) -> None: with self.locked_user_metadata(user_id): diff --git a/autogpt_platform/backend/backend/executor/database.py b/autogpt_platform/backend/backend/executor/database.py index b404ac6ecf17..aea2dd4177fc 100644 --- a/autogpt_platform/backend/backend/executor/database.py +++ b/autogpt_platform/backend/backend/executor/database.py @@ -27,15 +27,11 @@ class DatabaseManager(AppService): def __init__(self): - super().__init__() + super().__init__(port=Config().database_api_port) self.use_db = True self.use_redis = True self.event_queue = RedisEventQueue() - @classmethod - def get_port(cls) -> int: - return Config().database_api_port - @expose def send_execution_update(self, execution_result_dict: dict[Any, Any]): self.event_queue.put(ExecutionResult(**execution_result_dict)) diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index 033d2bec3de1..ce6de9e9a623 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -16,8 +16,6 @@ if TYPE_CHECKING: from backend.executor import DatabaseManager -from autogpt_libs.utils.cache import thread_cached - from backend.data import redis from backend.data.block import Block, BlockData, BlockInput, BlockType, get_block from backend.data.execution import ( @@ -33,6 +31,7 @@ from backend.data.model import CREDENTIALS_FIELD_NAME, CredentialsMetaInput from backend.integrations.creds_manager import IntegrationCredentialsManager from backend.util import json +from backend.util.cache import thread_cached from backend.util.decorator import error_logged, time_measured from backend.util.logging import configure_logging from backend.util.process import set_service_name @@ -420,7 +419,7 @@ def on_node_executor_start(cls): redis.connect() cls.pid = os.getpid() cls.db_client = get_db_client() - cls.creds_manager = IntegrationCredentialsManager() + cls.creds_manager = IntegrationCredentialsManager(db_manager=cls.db_client) # Set up shutdown handlers cls.shutdown_lock = threading.Lock() @@ -660,24 +659,20 @@ def callback(_): class ExecutionManager(AppService): def __init__(self): - super().__init__() + super().__init__(port=settings.config.execution_manager_port) self.use_redis = True self.use_supabase = True self.pool_size = settings.config.num_graph_workers self.queue = ExecutionQueue[GraphExecution]() self.active_graph_runs: dict[str, tuple[Future, threading.Event]] = {} - @classmethod - def get_port(cls) -> int: - return settings.config.execution_manager_port - def run_service(self): from autogpt_libs.supabase_integration_credentials_store import ( SupabaseIntegrationCredentialsStore, ) self.credentials_store = SupabaseIntegrationCredentialsStore( - redis=redis.get_redis() + redis=redis.get_redis(), db=self.db_client ) self.executor = ProcessPoolExecutor( max_workers=self.pool_size, @@ -868,7 +863,7 @@ def _validate_node_input_credentials(self, graph: Graph, user_id: str): def get_db_client() -> "DatabaseManager": from backend.executor import DatabaseManager - return get_service_client(DatabaseManager) + return get_service_client(DatabaseManager, settings.config.database_api_port) @contextmanager diff --git a/autogpt_platform/backend/backend/executor/scheduler.py b/autogpt_platform/backend/backend/executor/scheduler.py index 979631c0585e..574765c34845 100644 --- a/autogpt_platform/backend/backend/executor/scheduler.py +++ b/autogpt_platform/backend/backend/executor/scheduler.py @@ -4,7 +4,6 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger -from autogpt_libs.utils.cache import thread_cached_property from backend.data.block import BlockInput from backend.data.schedule import ( @@ -15,6 +14,7 @@ update_schedule, ) from backend.executor.manager import ExecutionManager +from backend.util.cache import thread_cached_property from backend.util.service import AppService, expose, get_service_client from backend.util.settings import Config @@ -28,18 +28,14 @@ def log(msg, **kwargs): class ExecutionScheduler(AppService): def __init__(self, refresh_interval=10): - super().__init__() + super().__init__(port=Config().execution_scheduler_port) self.use_db = True self.last_check = datetime.min self.refresh_interval = refresh_interval - @classmethod - def get_port(cls) -> int: - return Config().execution_scheduler_port - @thread_cached_property def execution_client(self) -> ExecutionManager: - return get_service_client(ExecutionManager) + return get_service_client(ExecutionManager, Config().execution_manager_port) def run_service(self): scheduler = BackgroundScheduler() diff --git a/autogpt_platform/backend/backend/integrations/creds_manager.py b/autogpt_platform/backend/backend/integrations/creds_manager.py index 96f9d1a3c56d..3bfde70e2817 100644 --- a/autogpt_platform/backend/backend/integrations/creds_manager.py +++ b/autogpt_platform/backend/backend/integrations/creds_manager.py @@ -10,6 +10,7 @@ from redis.lock import Lock as RedisLock from backend.data import redis +from backend.executor.database import DatabaseManager from backend.integrations.oauth import HANDLERS_BY_NAME, BaseOAuthHandler from backend.util.settings import Settings @@ -49,10 +50,12 @@ class IntegrationCredentialsManager: cause so much latency that it's worth implementing. """ - def __init__(self): + def __init__(self, db_manager: DatabaseManager): redis_conn = redis.get_redis() self._locks = RedisKeyedMutex(redis_conn) - self.store = SupabaseIntegrationCredentialsStore(redis=redis_conn) + self.store = SupabaseIntegrationCredentialsStore( + redis=redis_conn, db=db_manager + ) def create(self, user_id: str, credentials: Credentials) -> None: return self.store.add_creds(user_id, credentials) diff --git a/autogpt_platform/backend/backend/server/integrations/router.py b/autogpt_platform/backend/backend/server/integrations/router.py index 5163de0b2fa3..440ce7921cbe 100644 --- a/autogpt_platform/backend/backend/server/integrations/router.py +++ b/autogpt_platform/backend/backend/server/integrations/router.py @@ -10,6 +10,7 @@ from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query, Request from pydantic import BaseModel, Field, SecretStr +from backend.executor.manager import get_db_client from backend.integrations.creds_manager import IntegrationCredentialsManager from backend.integrations.oauth import HANDLERS_BY_NAME, BaseOAuthHandler from backend.util.settings import Settings @@ -20,7 +21,7 @@ settings = Settings() router = APIRouter() -creds_manager = IntegrationCredentialsManager() +creds_manager = IntegrationCredentialsManager(db_manager=get_db_client()) class LoginResponse(BaseModel): diff --git a/autogpt_platform/backend/backend/server/rest_api.py b/autogpt_platform/backend/backend/server/rest_api.py index f0d922c19686..edeb025803c6 100644 --- a/autogpt_platform/backend/backend/server/rest_api.py +++ b/autogpt_platform/backend/backend/server/rest_api.py @@ -7,7 +7,6 @@ import uvicorn from autogpt_libs.auth.middleware import auth_middleware -from autogpt_libs.utils.cache import thread_cached_property from fastapi import APIRouter, Body, Depends, FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse @@ -20,7 +19,10 @@ from backend.data.credit import get_block_costs, get_user_credit_model from backend.data.user import get_or_create_user from backend.executor import ExecutionManager, ExecutionScheduler +from backend.executor.manager import get_db_client +from backend.integrations.creds_manager import IntegrationCredentialsManager from backend.server.model import CreateGraph, SetGraphActiveVersion +from backend.util.cache import thread_cached_property from backend.util.service import AppService, get_service_client from backend.util.settings import AppEnvironment, Config, Settings @@ -35,13 +37,9 @@ class AgentServer(AppService): _user_credit_model = get_user_credit_model() def __init__(self): - super().__init__() + super().__init__(port=Config().agent_server_port) self.use_redis = True - @classmethod - def get_port(cls) -> int: - return Config().agent_server_port - @asynccontextmanager async def lifespan(self, _: FastAPI): await db.connect() @@ -100,6 +98,7 @@ def run_service(self): tags=["integrations"], dependencies=[Depends(auth_middleware)], ) + self.integration_creds_manager = IntegrationCredentialsManager(get_db_client()) api_router.include_router( backend.server.routers.analytics.router, @@ -309,11 +308,11 @@ async def wrapper(*args, **kwargs): @thread_cached_property def execution_manager_client(self) -> ExecutionManager: - return get_service_client(ExecutionManager) + return get_service_client(ExecutionManager, Config().execution_manager_port) @thread_cached_property def execution_scheduler_client(self) -> ExecutionScheduler: - return get_service_client(ExecutionScheduler) + return get_service_client(ExecutionScheduler, Config().execution_scheduler_port) @classmethod def handle_internal_http_error(cls, request: Request, exc: Exception): diff --git a/autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py b/autogpt_platform/backend/backend/util/cache.py similarity index 100% rename from autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py rename to autogpt_platform/backend/backend/util/cache.py diff --git a/autogpt_platform/backend/backend/util/service.py b/autogpt_platform/backend/backend/util/service.py index e1f742483a64..3c4532b5ad00 100644 --- a/autogpt_platform/backend/backend/util/service.py +++ b/autogpt_platform/backend/backend/util/service.py @@ -5,7 +5,6 @@ import threading import time import typing -from abc import ABC, abstractmethod from enum import Enum from types import NoneType, UnionType from typing import ( @@ -100,24 +99,16 @@ def custom_dict_to_class(qualname, data: dict): return custom_dict_to_class -class AppService(AppProcess, ABC): +class AppService(AppProcess): shared_event_loop: asyncio.AbstractEventLoop use_db: bool = False use_redis: bool = False use_supabase: bool = False - def __init__(self): + def __init__(self, port): + self.port = port self.uri = None - @classmethod - @abstractmethod - def get_port(cls) -> int: - pass - - @classmethod - def get_host(cls) -> str: - return os.environ.get(f"{cls.service_name.upper()}_HOST", Config().pyro_host) - def run_service(self) -> None: while True: time.sleep(10) @@ -166,7 +157,8 @@ def cleanup(self): @conn_retry("Pyro", "Starting Pyro Service") def __start_pyro(self): - daemon = Pyro5.api.Daemon(host=self.get_host(), port=self.get_port()) + host = Config().pyro_host + daemon = Pyro5.api.Daemon(host=host, port=self.port) self.uri = daemon.register(self, objectId=self.service_name) logger.info(f"[{self.service_name}] Connected to Pyro; URI = {self.uri}") daemon.requestLoop() @@ -175,20 +167,16 @@ def __start_async_loop(self): self.shared_event_loop.run_forever() -# --------- UTILITIES --------- # - - AS = TypeVar("AS", bound=AppService) -def get_service_client(service_type: Type[AS]) -> AS: +def get_service_client(service_type: Type[AS], port: int) -> AS: service_name = service_type.service_name class DynamicClient: @conn_retry("Pyro", f"Connecting to [{service_name}]") def __init__(self): - host = service_type.get_host() - port = service_type.get_port() + host = os.environ.get(f"{service_name.upper()}_HOST", "localhost") uri = f"PYRO:{service_type.service_name}@{host}:{port}" logger.debug(f"Connecting to service [{service_name}]. URI = {uri}") self.proxy = Pyro5.api.Proxy(uri) @@ -203,6 +191,8 @@ def __getattr__(self, name: str) -> Callable[..., Any]: return cast(AS, DynamicClient()) +# --------- UTILITIES --------- # + builtin_types = [*vars(builtins).values(), NoneType, Enum] diff --git a/autogpt_platform/backend/test/executor/test_scheduler.py b/autogpt_platform/backend/test/executor/test_scheduler.py index 49e46510a120..c0bcc8307925 100644 --- a/autogpt_platform/backend/test/executor/test_scheduler.py +++ b/autogpt_platform/backend/test/executor/test_scheduler.py @@ -5,6 +5,7 @@ from backend.server.model import CreateGraph from backend.usecases.sample import create_test_graph, create_test_user from backend.util.service import get_service_client +from backend.util.settings import Config from backend.util.test import SpinTestServer @@ -18,7 +19,10 @@ async def test_agent_schedule(server: SpinTestServer): user_id=test_user.id, ) - scheduler = get_service_client(ExecutionScheduler) + scheduler = get_service_client( + ExecutionScheduler, Config().execution_scheduler_port + ) + schedules = scheduler.get_execution_schedules(test_graph.id, test_user.id) assert len(schedules) == 0 diff --git a/autogpt_platform/backend/test/util/test_service.py b/autogpt_platform/backend/test/util/test_service.py index a20810dbb1e5..e03063fff32d 100644 --- a/autogpt_platform/backend/test/util/test_service.py +++ b/autogpt_platform/backend/test/util/test_service.py @@ -7,11 +7,7 @@ class ServiceTest(AppService): def __init__(self): - super().__init__() - - @classmethod - def get_port(cls) -> int: - return TEST_SERVICE_PORT + super().__init__(port=TEST_SERVICE_PORT) @expose def add(self, a: int, b: int) -> int: @@ -32,7 +28,7 @@ async def add_async(a: int, b: int) -> int: @pytest.mark.asyncio(scope="session") async def test_service_creation(server): with ServiceTest(): - client = get_service_client(ServiceTest) + client = get_service_client(ServiceTest, TEST_SERVICE_PORT) assert client.add(5, 3) == 8 assert client.subtract(10, 4) == 6 assert client.fun_with_async(5, 3) == 8