Skip to content

Commit

Permalink
singleton engine, and garbage collection
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaraphael committed Sep 18, 2024
1 parent 2e41d88 commit 0eff4aa
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
12 changes: 9 additions & 3 deletions cerulean_cloud/cloud_function_ais_analysis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
slick_to_curves,
)

from cerulean_cloud.database_client import DatabaseClient, get_engine
from cerulean_cloud.database_client import DatabaseClient, close_engine, get_engine


def verify_api_key(request):
Expand Down Expand Up @@ -80,10 +80,14 @@ async def handle_aaa_request(request):
- It uses the `DatabaseClient` for database operations.
"""
request_json = request.get_json()
if not request_json.get("dry_run"):

if request_json.get("dry_run"):
return "Success!"

try:
scene_id = request_json.get("scene_id")
print(f"Running AAA on scene_id: {scene_id}")
db_engine = get_engine(db_url=os.getenv("DB_URL"))
db_engine = get_engine()
async with DatabaseClient(db_engine) as db_client:
async with db_client.session.begin():
s1 = await db_client.get_scene_from_id(scene_id)
Expand Down Expand Up @@ -140,6 +144,8 @@ async def handle_aaa_request(request):
geojson_fc=traj["geojson_fc"],
geometry=traj["geometry"],
)
finally:
await close_engine() # Ensure resources are cleaned up after request

return "Success!"

Expand Down
10 changes: 8 additions & 2 deletions cerulean_cloud/cloud_run_orchestrator/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
OrchestratorInput,
OrchestratorResult,
)
from cerulean_cloud.database_client import DatabaseClient, get_engine
from cerulean_cloud.database_client import DatabaseClient, close_engine, get_engine
from cerulean_cloud.models import get_model
from cerulean_cloud.roda_sentinelhub_client import RodaSentinelHubClient
from cerulean_cloud.tiling import TMS, offset_bounds_from_base_tiles
Expand Down Expand Up @@ -129,7 +129,7 @@ def get_roda_sentinelhub_client():

def get_database_engine():
"""get database engine"""
return get_engine(db_url=os.getenv("DB_URL"))
return get_engine()


@app.get("/", description="Health Check", tags=["Health Check"])
Expand Down Expand Up @@ -385,3 +385,9 @@ async def _orchestrate(
if success is False:
raise exc
return OrchestratorResult(status="Success")


@app.on_event("shutdown")
async def on_shutdown():
"""Close down the engine"""
await close_engine()
40 changes: 21 additions & 19 deletions cerulean_cloud/database_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from geoalchemy2.shape import from_shape
from shapely.geometry import MultiPolygon, Polygon, base, box, shape
from sqlalchemy import and_, select, update
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine

import cerulean_cloud.database_schema as db

Expand All @@ -19,24 +19,26 @@ class InstanceNotFoundError(Exception):
pass


def get_engine(db_url: str = os.getenv("DB_URL")):
"""get database engine"""
# Connect args ref: https://docs.sqlalchemy.org/en/20/core/engines.html#use-the-connect-args-dictionary-parameter
# Note: statement timeout is assumed to be in MILIseconds if no unit is
# specified (as is the case here)
# Ref: https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-STATEMENT-TIMEOUT
# Note: specifying a 1 minute timeout per statement, since each orchestrator
# run may attempt to execute many statements
return create_async_engine(
db_url,
echo=False,
# connect_args={"options": f"-c statement_timeout={1000 * 60}"},
connect_args={"command_timeout": 60},
pool_size=1, # Default pool size
max_overflow=0, # Default max overflow
pool_timeout=300, # Default pool timeout
pool_recycle=600, # Default pool recycle
)
DATABASE_URL = os.getenv("DB_URL")
engine: AsyncEngine = create_async_engine(
DATABASE_URL,
echo=False,
connect_args={"command_timeout": 60},
pool_size=1,
max_overflow=0,
pool_timeout=300,
pool_recycle=600,
)


def get_engine() -> AsyncEngine:
"""Provide the singleton database engine."""
return engine


async def close_engine():
"""Clean up database engine"""
await get_engine().dispose()


async def get(sess, kls, error_if_absent=True, **kwargs):
Expand Down

0 comments on commit 0eff4aa

Please sign in to comment.