Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: structlog parseable #3312

Merged
merged 9 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ BACKEND_URL=http://localhost:5050
EMBEDDING_DIM=1536
DEACTIVATE_STRIPE=true


# PARSEABLE LOGGING
USE_PARSEABLE=False
PARSEABLE_STREAM_NAME=quivr-api
PARSEABLE_URL=<change-me>
PARSEABLE_AUTH=<change-me>

#RESEND
RESEND_API_KEY=<change-me>
[email protected]
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ paulgraham.py
.env_test
supabase/seed-airwallex.sql
airwallexpayouts.py
application.log
**/application.log*
backend/celerybeat-schedule.db

backend/application.log.*
Expand Down
255 changes: 225 additions & 30 deletions backend/api/quivr_api/logger.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,240 @@
import logging
import os
import queue
import sys
import threading
from logging.handlers import RotatingFileHandler
from typing import List

from colorlog import (
ColoredFormatter,
)
import orjson
import requests
import structlog

from quivr_api.models.settings import parseable_settings

def get_logger(logger_name, log_file="application.log"):
log_level = os.getenv("LOG_LEVEL", "WARNING").upper()
logger = logging.getLogger(logger_name)
logger.setLevel(log_level)
logger.propagate = False # Prevent log propagation to avoid double logging
# Thread-safe queue for log messages
log_queue = queue.Queue()
stop_log_queue = threading.Event()

formatter = logging.Formatter(
"[%(levelname)s] %(name)s [%(filename)s:%(lineno)d]: %(message)s"
)

color_formatter = ColoredFormatter(
"%(log_color)s[%(levelname)s]%(reset)s %(name)s [%(filename)s:%(lineno)d]: %(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
},
reset=True,
style="%",
)
class ParseableLogHandler(logging.Handler):
def __init__(
self,
base_parseable_url: str,
auth_token: str,
stream_name: str,
batch_size: int = 10,
flush_interval: float = 1,
):
super().__init__()
self.base_url = base_parseable_url
self.stream_name = stream_name
self.url = self.base_url + self.stream_name
self.batch_size = batch_size
self.flush_interval = flush_interval
self._worker_thread = threading.Thread(target=self._process_log_queue)
self._worker_thread.daemon = True
self._worker_thread.start()
self.headers = {
"Authorization": f"Basic {auth_token}", # base64 encoding user:mdp
"Content-Type": "application/json",
}

def emit(self, record: logging.LogRecord):
# FIXME (@AmineDiro): This ping-pong of serialization/deserialization is a limitation of logging formatter
# The formatter should return a 'str' for the logger to print
if isinstance(record.msg, str):
return
elif isinstance(record.msg, dict):
logger_name = record.msg.get("logger", None)
if logger_name and (
logger_name.startswith("quivr_api.access")
or logger_name.startswith("quivr_api.error")
):
url = record.msg.get("url", None)
# Filter on healthz
if url and "healthz" not in url:
fmt = orjson.loads(self.format(record))
log_queue.put(fmt)
else:
return

def _process_log_queue(self):
"""Background thread that processes the log queue and sends logs to Parseable."""
logs_batch = []
while not stop_log_queue.is_set():
try:
# Collect logs for batch processing
log_data = log_queue.get(timeout=self.flush_interval)
logs_batch.append(log_data)

# Send logs if batch size is reached
if len(logs_batch) >= self.batch_size:
self._send_logs_to_parseable(logs_batch)
logs_batch.clear()

except queue.Empty:
# If the queue is empty, send any remaining logs
if logs_batch:
self._send_logs_to_parseable(logs_batch)
logs_batch.clear()

def _send_logs_to_parseable(self, logs: List[str]):
payload = orjson.dumps(logs)
try:
response = requests.post(self.url, headers=self.headers, data=payload)
if response.status_code != 200:
print(f"Failed to send logs to Parseable server: {response.text}")
except Exception as e:
print(f"Error sending logs to Parseable: {e}")

def stop(self):
"""Stop the background worker thread and process any remaining logs."""
stop_log_queue.set()
self._worker_thread.join()
# Process remaining logs before shutting down
remaining_logs = list(log_queue.queue)
if remaining_logs:
self._send_logs_to_parseable(remaining_logs)


def extract_from_record(_, __, event_dict):
"""
Extract thread and process names and add them to the event dict.
"""
record = event_dict["_record"]
event_dict["thread_name"] = record.threadName
event_dict["process_name"] = record.processName
return event_dict


def drop_http_context(_, __, event_dict):
"""
Extract thread and process names and add them to the event dict.
"""
keys = ["msg", "logger", "level", "timestamp", "exc_info"]
return {k: event_dict.get(k, None) for k in keys}

console_handler = logging.StreamHandler()
console_handler.setFormatter(color_formatter)

def setup_logger(
log_file="application.log", send_log_server: bool = parseable_settings.use_parseable
):
# Shared handlers
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.UnicodeDecoder(),
structlog.processors.EventRenamer("msg"),
]
structlog.configure(
processors=shared_processors
+ [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
# Use standard logging compatible logger
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
# Use Python's logging configuration
cache_logger_on_first_use=True,
)
# Set Formatters
plain_fmt = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
processors=[
extract_from_record,
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(
colors=False, exception_formatter=structlog.dev.plain_traceback
),
],
)
color_fmt = structlog.stdlib.ProcessorFormatter(
processors=[
drop_http_context,
structlog.dev.ConsoleRenderer(
colors=True,
exception_formatter=structlog.dev.RichTracebackFormatter(
show_locals=False
),
),
],
foreign_pre_chain=shared_processors,
)
parseable_fmt = structlog.stdlib.ProcessorFormatter(
processors=[
# TODO: Which one gets us the better debug experience ?
# structlog.processors.ExceptionRenderer(
# exception_formatter=structlog.tracebacks.ExceptionDictTransformer(
# show_locals=False
# )
# ),
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.processors.JSONRenderer(),
],
foreign_pre_chain=shared_processors
+ [
structlog.processors.CallsiteParameterAdder(
{
structlog.processors.CallsiteParameter.FUNC_NAME,
structlog.processors.CallsiteParameter.LINENO,
}
),
],
)

# Set handlers
console_handler = logging.StreamHandler(sys.stdout)
file_handler = RotatingFileHandler(
log_file, maxBytes=5000000, backupCount=5
) # 5MB file
file_handler.setFormatter(formatter)
console_handler.setFormatter(color_fmt)
file_handler.setFormatter(plain_fmt)
handlers: list[logging.Handler] = [console_handler, file_handler]
if (
send_log_server
and parseable_settings.parseable_url is not None
and parseable_settings.parseable_auth is not None
and parseable_settings.parseable_stream_name
):
parseable_handler = ParseableLogHandler(
auth_token=parseable_settings.parseable_auth,
base_parseable_url=parseable_settings.parseable_url,
stream_name=parseable_settings.parseable_stream_name,
)
parseable_handler.setFormatter(parseable_fmt)
handlers.append(parseable_handler)

# Configure logger
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
root_logger.handlers = []
for handler in handlers:
root_logger.addHandler(handler)

_clear_uvicorn_logger()


def _clear_uvicorn_logger():
for _log in ["uvicorn", "uvicorn.error"]:
# Clear the log handlers for uvicorn loggers, and enable propagation
# so the messages are caught by our root logger and formatted correctly
# by structlog
logging.getLogger(_log).handlers.clear()
logging.getLogger(_log).propagate = True
logging.getLogger("uvicorn.access").handlers.clear()
logging.getLogger("uvicorn.access").propagate = False


def get_logger(_name: str | None = None):
assert structlog.is_configured()
return structlog.get_logger()

if not logger.handlers:
logger.addHandler(console_handler)
logger.addHandler(file_handler)

return logger
setup_logger()
30 changes: 14 additions & 16 deletions backend/api/quivr_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import litellm
import sentry_sdk
from dotenv import load_dotenv # type: ignore
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from pyinstrument import Profiler
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.starlette import StarletteIntegration

from quivr_api.logger import get_logger
from quivr_api.logger import get_logger, stop_log_queue
from quivr_api.middlewares.cors import add_cors_middleware
from quivr_api.middlewares.logging_middleware import LoggingMiddleware
from quivr_api.modules.analytics.controller.analytics_routes import analytics_router
from quivr_api.modules.api_key.controller import api_key_router
from quivr_api.modules.assistant.controller import assistant_router
Expand All @@ -27,7 +28,6 @@
from quivr_api.modules.user.controller import user_router
from quivr_api.routes.crawl_routes import crawl_router
from quivr_api.routes.subscription_routes import subscription_router
from quivr_api.utils import handle_request_validation_error
from quivr_api.utils.telemetry import maybe_send_telemetry

load_dotenv()
Expand All @@ -37,11 +37,11 @@
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("LiteLLM").setLevel(logging.WARNING)
logging.getLogger("litellm").setLevel(logging.WARNING)
get_logger("quivr_core")
litellm.set_verbose = False # type: ignore


logger = get_logger(__name__)
get_logger("uvicorn")
get_logger("uvicorn.access")
get_logger("quivr_core")
logger = get_logger("quivr-api")


def before_send(event, hint):
Expand Down Expand Up @@ -72,6 +72,9 @@ def before_send(event, hint):
app = FastAPI()
add_cors_middleware(app)

app.add_middleware(LoggingMiddleware)


app.include_router(brain_router)
app.include_router(chat_router)
app.include_router(crawl_router)
Expand Down Expand Up @@ -106,15 +109,10 @@ async def profile_request(request: Request, call_next):
return await call_next(request)


@app.exception_handler(HTTPException)
async def http_exception_handler(_, exc):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail},
)

@app.on_event("shutdown")
def shutdown_event():
stop_log_queue.set()

handle_request_validation_error(app)

if os.getenv("TELEMETRY_ENABLED") == "true":
logger.info("Telemetry enabled, we use telemetry to collect anonymous usage data.")
Expand Down
10 changes: 9 additions & 1 deletion backend/api/quivr_api/middlewares/auth/auth_bearer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from typing import Optional

import structlog
from fastapi import Depends, HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

Expand All @@ -13,6 +14,8 @@

api_key_service = ApiKeyService()

logger = structlog.stdlib.get_logger("quivr_api.access")


class AuthBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
Expand Down Expand Up @@ -66,5 +69,10 @@ def get_test_user(self) -> UserIdentity:
auth_bearer = AuthBearer()


def get_current_user(user: UserIdentity = Depends(auth_bearer)) -> UserIdentity:
async def get_current_user(user: UserIdentity = Depends(auth_bearer)) -> UserIdentity:
# Due to context switch in FastAPI executor we can't get this id back
# We log it as an additional log so we can get information if exception was raised
# https://www.structlog.org/en/stable/contextvars.html
structlog.contextvars.bind_contextvars(client_id=user.id)
logger.info("Authentication success")
return user
Loading
Loading