Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pagination to alerts and messages endpoints #1186

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 50 additions & 22 deletions src/codegate/api/v1.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import List, Optional
from typing import Any, Dict, List, Optional
from uuid import UUID

import requests
import structlog
from fastapi import APIRouter, Depends, HTTPException, Response
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from fastapi.responses import StreamingResponse
from fastapi.routing import APIRoute
from pydantic import BaseModel, ValidationError

from codegate.config import API_DEFAULT_PAGE_SIZE, API_MAX_PAGE_SIZE
import codegate.muxing.models as mux_models
from codegate import __version__
from codegate.api import v1_models, v1_processing
Expand Down Expand Up @@ -378,7 +379,11 @@ async def hard_delete_workspace(workspace_name: str):
tags=["Workspaces"],
generate_unique_id_function=uniq_name,
)
async def get_workspace_alerts(workspace_name: str) -> List[Optional[v1_models.AlertConversation]]:
async def get_workspace_alerts(
workspace_name: str,
page: int = Query(1, ge=1),
page_size: int = Query(API_DEFAULT_PAGE_SIZE, get=1, le=API_MAX_PAGE_SIZE),
) -> List[v1_models.AlertConversation]:
"""Get alerts for a workspace."""
try:
ws = await wscrud.get_workspace_by_name(workspace_name)
Expand All @@ -388,21 +393,40 @@ async def get_workspace_alerts(workspace_name: str) -> List[Optional[v1_models.A
logger.exception("Error while getting workspace")
raise HTTPException(status_code=500, detail="Internal server error")

try:
alerts = await dbreader.get_alerts_by_workspace(ws.id, AlertSeverity.CRITICAL.value)
prompts_outputs = await dbreader.get_prompts_with_output(ws.id)
return await v1_processing.parse_get_alert_conversation(alerts, prompts_outputs)
except Exception:
logger.exception("Error while getting alerts and messages")
raise HTTPException(status_code=500, detail="Internal server error")
offset = (page - 1) * page_size
fetched_alerts = []

while len(fetched_alerts) < page_size:
alerts_batch = await dbreader.get_alerts_by_workspace(
ws.id, AlertSeverity.CRITICAL.value, page_size, offset
)
if not alerts_batch:
break

dedup_alerts = await v1_processing.remove_duplicate_alerts(alerts_batch)
fetched_alerts.extend(dedup_alerts)
offset += page_size

final_alerts = fetched_alerts[:page_size]

prompt_ids = list({alert.prompt_id for alert in final_alerts if alert.prompt_id})
prompts_outputs = await dbreader.get_prompts_with_output(prompt_ids)
alert_conversations = await v1_processing.parse_get_alert_conversation(
final_alerts, prompts_outputs
)
return alert_conversations


@v1.get(
"/workspaces/{workspace_name}/messages",
tags=["Workspaces"],
generate_unique_id_function=uniq_name,
)
async def get_workspace_messages(workspace_name: str) -> List[v1_models.Conversation]:
async def get_workspace_messages(
workspace_name: str,
page: int = Query(1, ge=1),
page_size: int = Query(API_DEFAULT_PAGE_SIZE, ge=1, le=API_MAX_PAGE_SIZE),
) -> List[v1_models.Conversation]:
"""Get messages for a workspace."""
try:
ws = await wscrud.get_workspace_by_name(workspace_name)
Expand All @@ -412,19 +436,23 @@ async def get_workspace_messages(workspace_name: str) -> List[v1_models.Conversa
logger.exception("Error while getting workspace")
raise HTTPException(status_code=500, detail="Internal server error")

try:
prompts_with_output_alerts_usage = (
await dbreader.get_prompts_with_output_alerts_usage_by_workspace_id(
ws.id, AlertSeverity.CRITICAL.value
)
offset = (page - 1) * page_size
fetched_messages = []

while len(fetched_messages) < page_size:
messages_batch = await dbreader.get_prompts_with_output_alerts_usage_by_workspace_id(
ws.id, AlertSeverity.CRITICAL.value, page_size, offset
)
conversations, _ = await v1_processing.parse_messages_in_conversations(
prompts_with_output_alerts_usage
if not messages_batch:
break
parsed_conversations, _ = await v1_processing.parse_messages_in_conversations(
messages_batch
)
return conversations
except Exception:
logger.exception("Error while getting messages")
raise HTTPException(status_code=500, detail="Internal server error")
fetched_messages.extend(parsed_conversations)
offset += page_size

final_messages = fetched_messages[:page_size]
return final_messages


@v1.get(
Expand Down
3 changes: 3 additions & 0 deletions src/codegate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
"llamacpp": "./codegate_volume/models", # Default LlamaCpp model path
}

API_DEFAULT_PAGE_SIZE = 50
API_MAX_PAGE_SIZE = 100


@dataclass
class Config:
Expand Down
67 changes: 48 additions & 19 deletions src/codegate/db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@
import json
import uuid
from pathlib import Path
from typing import Dict, List, Optional, Type
from typing import Dict, List, Optional, Tuple, Type

import structlog
from alembic import command as alembic_command
from alembic.config import Config as AlembicConfig
from pydantic import BaseModel
from sqlalchemy import CursorResult, TextClause, event, text
from sqlalchemy import CursorResult, TextClause, bindparam, event, text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.ext.asyncio import create_async_engine

from codegate.config import API_DEFAULT_PAGE_SIZE
from codegate.db.fim_cache import FimCache
from codegate.db.models import (
ActiveWorkspace,
Expand Down Expand Up @@ -569,7 +570,10 @@ async def _exec_select_conditions_to_pydantic(
raise e
return None

async def get_prompts_with_output(self, workpace_id: str) -> List[GetPromptWithOutputsRow]:
async def get_prompts_with_output(self, prompt_ids: List[str]) -> List[GetPromptWithOutputsRow]:
if not prompt_ids:
return []

sql = text(
"""
SELECT
Expand All @@ -583,23 +587,27 @@ async def get_prompts_with_output(self, workpace_id: str) -> List[GetPromptWithO
o.output_cost
FROM prompts p
LEFT JOIN outputs o ON p.id = o.prompt_id
WHERE p.workspace_id = :workspace_id
WHERE (p.id IN :prompt_ids)
ORDER BY o.timestamp DESC
"""
)
conditions = {"workspace_id": workpace_id}
).bindparams(bindparam("prompt_ids", expanding=True))

conditions = {"prompt_ids": prompt_ids if prompt_ids else None}
prompts = await self._exec_select_conditions_to_pydantic(
GetPromptWithOutputsRow, sql, conditions, should_raise=True
)
return prompts

async def get_prompts_with_output_alerts_usage_by_workspace_id(
self, workspace_id: str, trigger_category: Optional[str] = None
self,
workspace_id: str,
trigger_category: Optional[str] = None,
limit: int = API_DEFAULT_PAGE_SIZE,
offset: int = 0,
) -> List[GetPromptWithOutputsRow]:
"""
Get all prompts with their outputs, alerts and token usage by workspace_id.
"""

sql = text(
"""
SELECT
Expand All @@ -610,20 +618,26 @@ async def get_prompts_with_output_alerts_usage_by_workspace_id(
LEFT JOIN outputs o ON p.id = o.prompt_id
LEFT JOIN alerts a ON p.id = a.prompt_id
WHERE p.workspace_id = :workspace_id
AND (a.trigger_category = :trigger_category OR a.trigger_category is NULL)
ORDER BY o.timestamp DESC, a.timestamp DESC
""" # noqa: E501
)
# If trigger category is None we want to get all alerts
trigger_category = trigger_category if trigger_category else "%"
conditions = {"workspace_id": workspace_id, "trigger_category": trigger_category}
rows: List[IntermediatePromptWithOutputUsageAlerts] = (
conditions = {"workspace_id": workspace_id}
if trigger_category:
sql = text(sql.text + " AND a.trigger_category = :trigger_category")
conditions["trigger_category"] = trigger_category

sql = text(
sql.text + " ORDER BY o.timestamp DESC, a.timestamp DESC LIMIT :limit OFFSET :offset"
)
conditions["limit"] = limit
conditions["offset"] = offset

fetched_rows: List[IntermediatePromptWithOutputUsageAlerts] = (
await self._exec_select_conditions_to_pydantic(
IntermediatePromptWithOutputUsageAlerts, sql, conditions, should_raise=True
)
)
prompts_dict: Dict[str, GetPromptWithOutputsRow] = {}
for row in rows:
for row in fetched_rows:
prompt_id = row.prompt_id
if prompt_id not in prompts_dict:
prompts_dict[prompt_id] = GetPromptWithOutputsRow(
Expand Down Expand Up @@ -655,8 +669,22 @@ async def get_prompts_with_output_alerts_usage_by_workspace_id(

return list(prompts_dict.values())

async def _exec_select_count(self, sql_command: str, conditions: dict) -> int:
"""Executes a COUNT SQL command and returns an integer result."""
async with self._async_db_engine.begin() as conn:
try:
result = await conn.execute(text(sql_command), conditions)
return result.scalar_one() # Ensures it returns exactly one integer value
except Exception as e:
logger.error(f"Failed to execute COUNT query.", error=str(e))
return 0 # Return 0 in case of failure to avoid crashes

async def get_alerts_by_workspace(
self, workspace_id: str, trigger_category: Optional[str] = None
self,
workspace_id: str,
trigger_category: Optional[str] = None,
limit: int = API_DEFAULT_PAGE_SIZE,
offset: int = 0,
) -> List[Alert]:
sql = text(
"""
Expand All @@ -679,12 +707,13 @@ async def get_alerts_by_workspace(
sql = text(sql.text + " AND a.trigger_category = :trigger_category")
conditions["trigger_category"] = trigger_category

sql = text(sql.text + " ORDER BY a.timestamp DESC")
sql = text(sql.text + " ORDER BY a.timestamp DESC LIMIT :limit OFFSET :offset")
conditions["limit"] = limit
conditions["offset"] = offset

prompts = await self._exec_select_conditions_to_pydantic(
return await self._exec_select_conditions_to_pydantic(
Alert, sql, conditions, should_raise=True
)
return prompts

async def get_workspaces(self) -> List[WorkspaceWithSessionInfo]:
sql = text(
Expand Down