Skip to content

Commit

Permalink
Merge event log to workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
oeway committed Oct 10, 2024
1 parent 2de43e3 commit e379b74
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 330 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Hypha Change Log

### 0.20.38

- Support event logging in the workspace, use `log_event` to log events in the workspace and use `get_events` to get the events in the workspace. The events will be persists in the SQL database.
- Allow passing workspace and expires_in to the `login` function to generate workspace specific token.
- When using http endpoint to access the service, you can now pass workspace specific token to the http header `Authorization` to access the service. (Previously, all the services are assumed to be accessed from the same service provider workspace)

Expand Down
2 changes: 1 addition & 1 deletion hypha/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "0.20.37.post4"
"version": "0.20.38"
}
9 changes: 1 addition & 8 deletions hypha/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,9 @@ def __init__(
store,
s3_controller,
workspace_bucket="hypha-workspaces",
database_uri=None,
):
"""Set up controller with SQLAlchemy database and S3 for file storage."""
if database_uri is None:
# create an in-memory SQLite database for testing
database_uri = "sqlite+aiosqlite:///:memory:"
logger.warning(
"Using in-memory SQLite database for artifact manager, all data will be lost on restart!!!"
)
self.engine = create_async_engine(database_uri, echo=False)
self.engine = store.get_sql_engine()
self.SessionLocal = async_sessionmaker(
self.engine, expire_on_commit=False, class_=AsyncSession
)
Expand Down
20 changes: 20 additions & 0 deletions hypha/core/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
UserInfo,
WorkspaceInfo,
)
from sqlalchemy.ext.asyncio import (
create_async_engine,
)
from hypha.core.auth import (
create_scope,
parse_token,
Expand Down Expand Up @@ -87,6 +90,7 @@ def __init__(
public_base_url=None,
local_base_url=None,
redis_uri=None,
database_uri=None,
reconnection_token_life_time=2 * 24 * 60 * 60,
):
"""Initialize the redis store."""
Expand Down Expand Up @@ -122,6 +126,17 @@ def __init__(

logger.info("Server info: %s", self._server_info)

self._database_uri = database_uri
if self._database_uri is None:
database_uri = (
"sqlite+aiosqlite:///:memory:" # In-memory SQLite for testing
)
logger.warning(
"Using in-memory SQLite database for event logging, all data will be lost on restart!"
)

self._sql_engine = create_async_engine(database_uri, echo=False)

if redis_uri and redis_uri.startswith("redis://"):
from redis import asyncio as aioredis

Expand Down Expand Up @@ -149,6 +164,9 @@ def kickout_client(self, workspace: str, client_id: str, code: int, reason: str)
def get_redis(self):
return self._redis

def get_sql_engine(self):
return self._sql_engine

def get_event_bus(self):
"""Get the event bus."""
return self._event_bus
Expand Down Expand Up @@ -619,8 +637,10 @@ async def register_workspace_manager(self):
self._event_bus,
self._server_info,
self._manager_id,
self._sql_engine,
self._s3_controller,
self._artifact_manager,
self._logging_service,
)
await manager.setup()
return manager
Expand Down
221 changes: 185 additions & 36 deletions hypha/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@
from hypha_rpc.utils.schema import schema_method
from pydantic import BaseModel, Field

import logging
import sys
from sqlalchemy import (
Column,
String,
Integer,
JSON,
DateTime,
select,
func,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
import datetime
from hypha.core import UserInfo, UserPermission # Replace with actual imports


from hypha.core import (
ApplicationArtifact,
RedisRPCConnection,
Expand All @@ -29,12 +46,40 @@
logger = logging.getLogger("workspace")
logger.setLevel(logging.INFO)

Base = declarative_base()


SERVICE_SUMMARY_FIELD = ["id", "name", "type", "description", "config"]

# Ensure the client_id is safe
_allowed_characters = re.compile(r"^[a-zA-Z0-9-_/|*]*$")


# SQLAlchemy model for storing events
class EventLog(Base):
__tablename__ = "event_logs"

id = Column(Integer, primary_key=True, autoincrement=True)
event_type = Column(String, nullable=False)
workspace = Column(String, nullable=False)
user_id = Column(String, nullable=False)
timestamp = Column(
DateTime, default=datetime.datetime.now(datetime.timezone.utc), index=True
)
data = Column(JSON, nullable=True) # Store any additional event metadata

def to_dict(self):
"""Convert the SQLAlchemy model instance to a dictionary."""
return {
"id": self.id,
"event_type": self.event_type,
"workspace": self.workspace,
"user_id": self.user_id,
"timestamp": self.timestamp.isoformat(), # Convert datetime to ISO string
"data": self.data,
}


def validate_key_part(key_part: str):
"""Ensure key parts only contain safe characters."""
if not _allowed_characters.match(key_part):
Expand Down Expand Up @@ -64,8 +109,10 @@ def __init__(
event_bus: EventBus,
server_info: dict,
client_id: str,
sql_engine: Optional[str] = None,
s3_controller: Optional[Any] = None,
artifact_manager: Optional[Any] = None,
logging_service: Optional[Any] = None,
):
self._redis = redis
self._initialized = False
Expand All @@ -76,6 +123,18 @@ def __init__(
self._client_id = client_id
self._s3_controller = s3_controller
self._artifact_manager = artifact_manager
self._logging_service = logging_service
self._sql_engine = sql_engine
if self._sql_engine:
self.SessionLocal = async_sessionmaker(
self._sql_engine, expire_on_commit=False, class_=AsyncSession
)
else:
self.SessionLocal = None

async def _get_sql_session(self):
"""Return an async session for the database."""
return self.SessionLocal()

def get_client_id(self):
assert self._client_id, "client id must not be empty."
Expand All @@ -97,9 +156,132 @@ async def setup(
management_service,
{"notify": False},
)
if self._sql_engine:
async with self._sql_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("Database tables created successfully.")
self._initialized = True
return rpc

@schema_method
async def log_event(
self,
event_type: str = Field(..., description="Event type"),
data: Optional[dict] = Field(None, description="Additional event data"),
context: dict = None,
):
"""Log a new event, checking permissions."""
assert " " not in event_type, "Event type must not contain spaces"
workspace = context["ws"]
user_info = UserInfo.model_validate(context["user"])
if not user_info.check_permission(workspace, UserPermission.read_write):
raise PermissionError(f"Permission denied for workspace {workspace}")

session = await self._get_sql_session()
try:
async with session.begin():
event_log = EventLog(
event_type=event_type,
workspace=workspace,
user_id=user_info.id,
data=data,
)
session.add(event_log)
await session.commit()
logger.info(
f"Logged event: {event_type} by {user_info.id} in {workspace}"
)
finally:
await session.close()

@schema_method
async def get_event_stats(
self,
event_type: Optional[str] = Field(None, description="Event type"),
start_time: Optional[datetime.datetime] = Field(
None, description="Start time for filtering events"
),
end_time: Optional[datetime.datetime] = Field(
None, description="End time for filtering events"
),
context: Optional[dict] = None,
):
"""Get statistics for specific event types, filtered by workspace, user, and time range."""
workspace = context["ws"]
user_info = UserInfo.model_validate(context["user"])
if not user_info.check_permission(workspace, UserPermission.read):
raise PermissionError(f"Permission denied for workspace {workspace}")

session = await self._get_sql_session()
try:
async with session.begin():
query = select(
EventLog.event_type, func.count(EventLog.id).label("count")
).filter(
EventLog.workspace == workspace, EventLog.user_id == user_info.id
)

# Apply optional filters
if event_type:
query = query.filter(EventLog.event_type == event_type)
if start_time:
query = query.filter(EventLog.timestamp >= start_time)
if end_time:
query = query.filter(EventLog.timestamp <= end_time)

query = query.group_by(EventLog.event_type)
result = await session.execute(query)
stats = result.fetchall()
# Convert rows to dictionaries
stats_dicts = [dict(row._mapping) for row in stats]
return stats_dicts
finally:
await session.close()

@schema_method
async def get_events(
self,
event_type: Optional[str] = Field(None, description="Event type"),
start_time: Optional[datetime.datetime] = Field(
None, description="Start time for filtering events"
),
end_time: Optional[datetime.datetime] = Field(
None, description="End time for filtering events"
),
context: Optional[dict] = None,
):
"""Search for events with various filters, enforcing workspace and permission checks."""
workspace = context["ws"]
user_info = UserInfo.model_validate(context["user"])
if not user_info.check_permission(workspace, UserPermission.read):
raise PermissionError(f"Permission denied for workspace {workspace}")

session = await self._get_sql_session()
try:
async with session.begin():
query = select(EventLog).filter(
EventLog.workspace == workspace, EventLog.user_id == user_info.id
)

# Apply optional filters
if event_type:
query = query.filter(EventLog.event_type == event_type)
if start_time:
query = query.filter(EventLog.timestamp >= start_time)
if end_time:
query = query.filter(EventLog.timestamp <= end_time)

result = await session.execute(query)
# Use scalars() to get model instances, not rows
events = result.scalars().all()

# Convert each EventLog instance to a dictionary using to_dict()
event_dicts = [event.to_dict() for event in events]

return event_dicts
finally:
await session.close()

@schema_method
async def get_summary(self, context: Optional[dict] = None) -> dict:
"""Get a summary about the workspace."""
Expand Down Expand Up @@ -976,38 +1158,6 @@ async def log(
self.validate_context(context, permission=UserPermission.read)
logger.info("%s: %s", context["from"], msg)

@schema_method
async def info(
self, msg: str = Field(..., description="log a message as info"), context=None
):
"""Log a app message."""
self.validate_context(context, permission=UserPermission.read)
logger.info("%s: %s", context["from"], msg)

@schema_method
async def warning(
self, msg: str = Field(..., description="log a message as info"), context=None
):
"""Log a app message (warning)."""
self.validate_context(context, permission=UserPermission.read)
logger.warning("WARNING: %s: %s", context["from"], msg)

@schema_method
async def error(
self, msg: str = Field(..., description="log an error message"), context=None
):
"""Log a app error message (error)."""
self.validate_context(context, permission=UserPermission.read)
logger.error("%s: %s", context["from"], msg)

@schema_method
async def critical(
self, msg: str = Field(..., description="log an critical message"), context=None
):
"""Log a app error message (critical)."""
self.validate_context(context, permission=UserPermission.read)
logger.critical("%s: %s", context["from"], msg)

async def load_workspace_info(self, workspace: str, load=True) -> WorkspaceInfo:
"""Load info of the current workspace from the redis store."""
assert workspace is not None
Expand Down Expand Up @@ -1431,10 +1581,9 @@ def create_service(self, service_id, service_name=None):
},
"echo": self.echo,
"log": self.log,
"info": self.info,
"error": self.error,
"warning": self.warning,
"critical": self.critical,
"log_event": self.log_event,
"get_event_stats": self.get_event_stats,
"get_events": self.get_events,
"register_service": self.register_service,
"unregister_service": self.unregister_service,
"list_workspaces": self.list_workspaces,
Expand Down
Loading

0 comments on commit e379b74

Please sign in to comment.