From 91cfb63e2bddc5727dbf0e12cbec993ca5d2d919 Mon Sep 17 00:00:00 2001 From: Ben Lopata Date: Tue, 8 Oct 2024 14:35:45 -0500 Subject: [PATCH 1/9] Adds retry logic to backend via tenacity --- agent/agent/chain.py | 12 +++++-- api/pyproject.toml | 7 ++-- api/routers/chat.py | 78 ++++++++++++++++++++++++---------------- api/utils/retry_utils.py | 27 ++++++++++++++ 4 files changed, 88 insertions(+), 36 deletions(-) create mode 100644 api/utils/retry_utils.py diff --git a/agent/agent/chain.py b/agent/agent/chain.py index 47198b6..83fbd02 100644 --- a/agent/agent/chain.py +++ b/agent/agent/chain.py @@ -1,5 +1,6 @@ from os import getenv from typing import List +from api.utils.retry_utils import perform_openai_operation from openai import AzureOpenAI from dotenv import load_dotenv @@ -76,7 +77,7 @@ def history(self) -> str: history_str += f"THOUGHT: {past_thoughts[message.id]}\n" return history_str - def stream(self): + def _stream(self): completion = self.openai.chat.completions.create( model=getenv("AZURE_OPENAI_DEPLOYMENT", "placeholder"), messages=[self.template(), {"role": "user", "content": self.user_input}], @@ -86,6 +87,10 @@ def stream(self): if len(chunk.choices) > 0: yield chunk.choices[0].delta.content or "" + def stream(self): + def stream(self): + return perform_openai_operation(self._stream) + class RespondCall(HonchoCall): def __init__(self, *args, thought, **kwargs): @@ -121,7 +126,7 @@ def history(self) -> List[dict]: history_list.append({"role": "assistant", "content": message.content}) return history_list - def stream(self): + def _stream(self): completion = self.openai.chat.completions.create( model=getenv("AZURE_OPENAI_DEPLOYMENT", "placeholder"), messages=self.template(), @@ -130,3 +135,6 @@ def stream(self): for chunk in completion: if len(chunk.choices) > 0: yield chunk.choices[0].delta.content or "" + + def stream(self): + return perform_openai_operation(self._stream) diff --git a/api/pyproject.toml b/api/pyproject.toml index 44874d8..75f3a7a 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -2,9 +2,7 @@ name = "api" version = "0.6.0" description = "The REST API Implementation of Tutor-GPT" -authors = [ - {name = "Plastic Labs", email = "hello@plasticlabs.ai"}, -] +authors = [{ name = "Plastic Labs", email = "hello@plasticlabs.ai" }] requires-python = ">=3.11" dependencies = [ "fastapi[standard]>=0.112.2", @@ -12,8 +10,9 @@ dependencies = [ "honcho-ai>=0.0.14", "python-dotenv>=1.0.1", "agent", + "tenacity>=8.2.0", ] [tool.uv.sources] # agent = { path = "../agent", editable = true } -agent = {workspace=true} +agent = { workspace = true } diff --git a/api/routers/chat.py b/api/routers/chat.py index 128c433..bb3e6e3 100644 --- a/api/routers/chat.py +++ b/api/routers/chat.py @@ -1,3 +1,4 @@ +from api.utils.retry_utils import perform_db_operation, perform_openai_operation from fastapi import APIRouter from fastapi.responses import StreamingResponse @@ -17,54 +18,71 @@ async def stream( user = honcho.apps.users.get_or_create(app_id=app.id, name=inp.user_id) def convo_turn(): - thought_stream = ThinkCall( - user_input=inp.message, - app_id=app.id, - user_id=user.id, - session_id=str(inp.conversation_id), - honcho=honcho, - ).stream() + thought_stream = perform_openai_operation( + ThinkCall, + user_input=inp.message, + app_id=app.id, + user_id=user.id, + session_id=str(inp.conversation_id), + honcho=honcho + ).stream() thought = "" for chunk in thought_stream: thought += chunk yield chunk yield "❀" - response_stream = RespondCall( - user_input=inp.message, - thought=thought, - app_id=app.id, - user_id=user.id, - session_id=str(inp.conversation_id), - honcho=honcho, - ).stream() + response_stream = perform_openai_operation( + RespondCall, + user_input=inp.message, + thought=thought, + app_id=app.id, + user_id=user.id, + session_id=str(inp.conversation_id), + honcho=honcho + ).stream() response = "" for chunk in response_stream: response += chunk yield chunk yield "❀" - new_message = honcho.apps.users.sessions.messages.create( - is_user=True, - session_id=str(inp.conversation_id), - app_id=app.id, - user_id=user.id, - content=inp.message, - ) - honcho.apps.users.sessions.metamessages.create( + perform_db_operation( + honcho.apps.users.sessions.messages.create, + is_user=True, + session_id=str(inp.conversation_id), + app_id=app.id, + user_id=user.id, + content=inp.message, + ) + new_ai_message = perform_db_operation( + honcho.apps.users.sessions.messages.create, + is_user=False, + session_id=str(inp.conversation_id), + app_id=app.id, + user_id=user.id, + content=response, + ) + perform_db_operation(honcho.apps.users.sessions.metamessages.create, app_id=app.id, session_id=str(inp.conversation_id), user_id=user.id, - message_id=new_message.id, + message_id=new_ai_message.id, metamessage_type="thought", content=thought, ) - honcho.apps.users.sessions.messages.create( - is_user=False, - session_id=str(inp.conversation_id), + return StreamingResponse(convo_turn()) + +@router.get("/thought/{message_id}") +async def get_thought(conversation_id: str, message_id: str, user_id: str): + user = honcho.apps.users.get_or_create(app_id=app.id, name=user_id) + thought = perform_db_operation( + honcho.apps.users.sessions.metamessages.list, + session_id=conversation_id, app_id=app.id, user_id=user.id, - content=response, + message_id=message_id, + metamessage_type="thought" ) - - return StreamingResponse(convo_turn()) + # In practice, there should only be one thought per message + return {"thought": thought.items[0].content if thought.items else None} diff --git a/api/utils/retry_utils.py b/api/utils/retry_utils.py new file mode 100644 index 0000000..eaf8569 --- /dev/null +++ b/api/utils/retry_utils.py @@ -0,0 +1,27 @@ +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type + +# Database (honcho) retry decorator +def db_retry_decorator(): + return retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + retry=retry_if_exception_type((ConnectionError, TimeoutError)), + reraise=True + ) + +# Third-party (OpenAI) retry decorator +def openai_retry_decorator(): + return retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=60), + retry=retry_if_exception_type((ConnectionError, TimeoutError, openai.error.RateLimitError)), + reraise=True + ) + +@db_retry_decorator() +def perform_db_operation(func, *args, **kwargs): + return func(*args, **kwargs) + +@openai_retry_decorator() +def perform_openai_operation(func, *args, **kwargs): + return func(*args, **kwargs) From 039911866b6c327b1347ce50fd0951cce26568b7 Mon Sep 17 00:00:00 2001 From: Ben Lopata Date: Tue, 8 Oct 2024 14:38:34 -0500 Subject: [PATCH 2/9] Adds python metadata directory to ignore. --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 3ffdc5c..0129f94 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ __pycache__/ *.pyc *.pyo .python-version +api/api.egg-info/* # Visual Studio Code .vscode/ From 645f602afe1447ab86d1c172697e0b268ee09f5d Mon Sep 17 00:00:00 2001 From: Ben Lopata Date: Tue, 8 Oct 2024 15:37:30 -0500 Subject: [PATCH 3/9] Adds sentry logging --- api/utils/retry_utils.py | 79 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 72 insertions(+), 7 deletions(-) diff --git a/api/utils/retry_utils.py b/api/utils/retry_utils.py index eaf8569..a211d27 100644 --- a/api/utils/retry_utils.py +++ b/api/utils/retry_utils.py @@ -1,27 +1,92 @@ +import sentry_sdk from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type +import logging -# Database (honcho) retry decorator +logger = logging.getLogger(__name__) + +class RateLimitError(Exception): + """Custom exception for rate limit errors""" + pass + +def is_rate_limit_error(exception): + if isinstance(exception, Exception): + error_message = str(exception).lower() + return 'rate limit' in error_message or 'too many requests' in error_message + return False + +def log_retry(retry_state): + """Log retry attempts and capture exceptions in Sentry""" + exception = retry_state.outcome.exception() + if exception: + if is_rate_limit_error(exception): + logger.warning(f"Rate limit hit in {retry_state.fn.__name__}: attempt {retry_state.attempt_number}") + sentry_sdk.capture_message( + f"OpenAI Rate Limit Hit", + level="warning", + extras={ + "function": retry_state.fn.__name__, + "attempt": retry_state.attempt_number, + "exception": str(exception) + } + ) + else: + sentry_sdk.capture_exception(exception) + + logger.warning(f"Retrying {retry_state.fn.__name__}: attempt {retry_state.attempt_number} due to {exception}") + sentry_sdk.add_breadcrumb( + category="retry", + message=f"Retrying {retry_state.fn.__name__}", + level="warning", + data={ + "attempt": retry_state.attempt_number, + "exception": str(exception) + } + ) + +# honcho retry decorator def db_retry_decorator(): return retry( stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=2, max=10), + wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type((ConnectionError, TimeoutError)), + before_sleep=log_retry, reraise=True ) # Third-party (OpenAI) retry decorator +def rate_limit_retry_decorator(): + return retry( + stop=stop_after_attempt(5), # More attempts for rate limits + wait=wait_exponential(multiplier=2, min=4, max=40), # Longer waits + retry=is_rate_limit_error, + before_sleep=log_retry, + reraise=True + ) + def openai_retry_decorator(): return retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=4, max=60), - retry=retry_if_exception_type((ConnectionError, TimeoutError, openai.error.RateLimitError)), + stop=stop_after_attempt(3), # Fewer attempts for other errors + wait=wait_exponential(multiplier=1, min=2, max=15), + retry=lambda e: not is_rate_limit_error(e), # Retry if it's NOT a rate limit error + before_sleep=log_retry, reraise=True ) @db_retry_decorator() def perform_db_operation(func, *args, **kwargs): - return func(*args, **kwargs) + try: + return func(*args, **kwargs) + except Exception as e: + sentry_sdk.capture_exception(e) + raise @openai_retry_decorator() +@rate_limit_retry_decorator() def perform_openai_operation(func, *args, **kwargs): - return func(*args, **kwargs) + try: + return func(*args, **kwargs) + except Exception as e: + if is_rate_limit_error(e): + raise RateLimitError(str(e)) from e + sentry_sdk.capture_exception(e) + raise From 5b272da008d382b4e36cb163b6ad01f1203aff19 Mon Sep 17 00:00:00 2001 From: Ben Lopata Date: Thu, 10 Oct 2024 12:31:12 -0500 Subject: [PATCH 4/9] Move retry logic from API into Bloom client. --- agent/agent/chain.py | 14 +- api/routers/chat.py | 157 +++++++++++---------- api/utils/retry_utils.py | 92 ------------ www/package.json | 2 + www/pnpm-lock.yaml | 17 +++ www/utils/api.ts | 295 +++++++++++++++++++++++++-------------- www/utils/retryUtils.ts | 72 ++++++++++ 7 files changed, 367 insertions(+), 282 deletions(-) delete mode 100644 api/utils/retry_utils.py create mode 100644 www/utils/retryUtils.ts diff --git a/agent/agent/chain.py b/agent/agent/chain.py index e133967..e36492b 100644 --- a/agent/agent/chain.py +++ b/agent/agent/chain.py @@ -1,6 +1,5 @@ from os import getenv from typing import List -from api.utils.retry_utils import perform_openai_operation from openai import AzureOpenAI from dotenv import load_dotenv @@ -45,7 +44,7 @@ def template(self) -> dict[str, str]: system = ( { "role": "system", - "content": f"""You are Bloom, a subversive-minded learning companion. Your job is to employ your theory of mind skills to predict the user’s mental state. + "content": f"""You are Bloom, a subversive-minded learning companion. Your job is to employ your theory of mind skills to predict the user's mental state. Generate a thought that makes a prediction about the user's needs given current dialogue and also lists other pieces of data that would help improve your prediction previous commentary: {self.history}""", }, @@ -83,7 +82,7 @@ def history(self) -> str: continue return history_str - def _stream(self): + def stream(self): completion = self.openai.chat.completions.create( model=getenv("AZURE_OPENAI_DEPLOYMENT", "placeholder"), messages=[self.template(), {"role": "user", "content": self.user_input}], @@ -93,10 +92,6 @@ def _stream(self): if len(chunk.choices) > 0: yield chunk.choices[0].delta.content or "" - def stream(self): - def stream(self): - return perform_openai_operation(self._stream) - class RespondCall(HonchoCall): def __init__(self, *args, thought, **kwargs): @@ -132,7 +127,7 @@ def history(self) -> List[dict]: history_list.append({"role": "assistant", "content": message.content}) return history_list - def _stream(self): + def stream(self): completion = self.openai.chat.completions.create( model=getenv("AZURE_OPENAI_DEPLOYMENT", "placeholder"), messages=self.template(), @@ -141,6 +136,3 @@ def _stream(self): for chunk in completion: if len(chunk.choices) > 0: yield chunk.choices[0].delta.content or "" - - def stream(self): - return perform_openai_operation(self._stream) diff --git a/api/routers/chat.py b/api/routers/chat.py index c463e4e..59d5544 100644 --- a/api/routers/chat.py +++ b/api/routers/chat.py @@ -1,6 +1,5 @@ -from api.utils.retry_utils import perform_db_operation, perform_openai_operation -from fastapi import APIRouter -from fastapi.responses import StreamingResponse +from fastapi import APIRouter, HTTPException +from fastapi.responses import StreamingResponse, JSONResponse from api import schemas from api.dependencies import app, honcho @@ -11,81 +10,93 @@ @router.post("/stream") -async def stream( - inp: schemas.ConversationInput, -): - """Stream the response too the user, currently only used by the Web UI and has integration to be able to use Honcho is not anonymous""" - user = honcho.apps.users.get_or_create(app_id=app.id, name=inp.user_id) +async def stream(inp: schemas.ConversationInput): + """Stream the response to the user, currently only used by the Web UI and has integration to be able to use Honcho if not anonymous""" + try: + user = honcho.apps.users.get_or_create(app_id=app.id, name=inp.user_id) - def convo_turn(): - thought_stream = perform_openai_operation( - ThinkCall, - user_input=inp.message, - app_id=app.id, - user_id=user.id, - session_id=str(inp.conversation_id), - honcho=honcho, - ).stream() - thought = "" - for chunk in thought_stream: - thought += chunk - yield chunk + def convo_turn(): + thought_stream = ThinkCall( + user_input=inp.message, + app_id=app.id, + user_id=user.id, + session_id=str(inp.conversation_id), + honcho=honcho, + ).stream() + thought = "" + for chunk in thought_stream: + thought += chunk + yield chunk - yield "❀" - response_stream = perform_openai_operation( - RespondCall, - user_input=inp.message, - thought=thought, - app_id=app.id, - user_id=user.id, - session_id=str(inp.conversation_id), - honcho=honcho, - ).stream() - response = "" - for chunk in response_stream: - response += chunk - yield chunk - yield "❀" + yield "❀" + response_stream = RespondCall( + user_input=inp.message, + thought=thought, + app_id=app.id, + user_id=user.id, + session_id=str(inp.conversation_id), + honcho=honcho, + ).stream() + response = "" + for chunk in response_stream: + response += chunk + yield chunk + yield "❀" - perform_db_operation( - honcho.apps.users.sessions.messages.create, - is_user=True, - session_id=str(inp.conversation_id), - app_id=app.id, - user_id=user.id, - content=inp.message, - ) - new_ai_message = perform_db_operation( - honcho.apps.users.sessions.messages.create, - is_user=False, - session_id=str(inp.conversation_id), - app_id=app.id, - user_id=user.id, - content=response, - ) - perform_db_operation( - honcho.apps.users.sessions.metamessages.create, - app_id=app.id, - session_id=str(inp.conversation_id), - user_id=user.id, - message_id=new_ai_message.id, - metamessage_type="thought", - content=thought, - ) + honcho.apps.users.sessions.messages.create( + is_user=True, + session_id=str(inp.conversation_id), + app_id=app.id, + user_id=user.id, + content=inp.message, + ) + new_ai_message = honcho.apps.users.sessions.messages.create( + is_user=False, + session_id=str(inp.conversation_id), + app_id=app.id, + user_id=user.id, + content=response, + ) + honcho.apps.users.sessions.metamessages.create( + app_id=app.id, + session_id=str(inp.conversation_id), + user_id=user.id, + message_id=new_ai_message.id, + metamessage_type="thought", + content=thought, + ) - return StreamingResponse(convo_turn()) + return StreamingResponse(convo_turn()) + except Exception as e: + # Log the error here if needed + if "rate limit" in str(e).lower(): + return JSONResponse( + status_code=429, + content={"error": "rate_limit_exceeded", "message": str(e)} + ) + else: + return JSONResponse( + status_code=500, + content={"error": "internal_server_error", "message": str(e)} + ) @router.get("/thought/{message_id}") async def get_thought(conversation_id: str, message_id: str, user_id: str): - user = honcho.apps.users.get_or_create(app_id=app.id, name=user_id) - thought = perform_db_operation( - honcho.apps.users.sessions.metamessages.list, - session_id=conversation_id, - app_id=app.id, - user_id=user.id, - message_id=message_id, - metamessage_type="thought", - ) - # In practice, there should only be one thought per message - return {"thought": thought.items[0].content if thought.items else None} + try: + user = honcho.apps.users.get_or_create(app_id=app.id, name=user_id) + thought = honcho.apps.users.sessions.metamessages.list( + session_id=conversation_id, + app_id=app.id, + user_id=user.id, + message_id=message_id, + metamessage_type="thought", + ) + # In practice, there should only be one thought per message + return {"thought": thought.items[0].content if thought.items else None} + except Exception as e: + # Log the error here if needed + return JSONResponse( + status_code=500, + content={"error": "internal_server_error", "message": str(e)} + ) diff --git a/api/utils/retry_utils.py b/api/utils/retry_utils.py deleted file mode 100644 index a211d27..0000000 --- a/api/utils/retry_utils.py +++ /dev/null @@ -1,92 +0,0 @@ -import sentry_sdk -from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type -import logging - -logger = logging.getLogger(__name__) - -class RateLimitError(Exception): - """Custom exception for rate limit errors""" - pass - -def is_rate_limit_error(exception): - if isinstance(exception, Exception): - error_message = str(exception).lower() - return 'rate limit' in error_message or 'too many requests' in error_message - return False - -def log_retry(retry_state): - """Log retry attempts and capture exceptions in Sentry""" - exception = retry_state.outcome.exception() - if exception: - if is_rate_limit_error(exception): - logger.warning(f"Rate limit hit in {retry_state.fn.__name__}: attempt {retry_state.attempt_number}") - sentry_sdk.capture_message( - f"OpenAI Rate Limit Hit", - level="warning", - extras={ - "function": retry_state.fn.__name__, - "attempt": retry_state.attempt_number, - "exception": str(exception) - } - ) - else: - sentry_sdk.capture_exception(exception) - - logger.warning(f"Retrying {retry_state.fn.__name__}: attempt {retry_state.attempt_number} due to {exception}") - sentry_sdk.add_breadcrumb( - category="retry", - message=f"Retrying {retry_state.fn.__name__}", - level="warning", - data={ - "attempt": retry_state.attempt_number, - "exception": str(exception) - } - ) - -# honcho retry decorator -def db_retry_decorator(): - return retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=1, max=10), - retry=retry_if_exception_type((ConnectionError, TimeoutError)), - before_sleep=log_retry, - reraise=True - ) - -# Third-party (OpenAI) retry decorator -def rate_limit_retry_decorator(): - return retry( - stop=stop_after_attempt(5), # More attempts for rate limits - wait=wait_exponential(multiplier=2, min=4, max=40), # Longer waits - retry=is_rate_limit_error, - before_sleep=log_retry, - reraise=True - ) - -def openai_retry_decorator(): - return retry( - stop=stop_after_attempt(3), # Fewer attempts for other errors - wait=wait_exponential(multiplier=1, min=2, max=15), - retry=lambda e: not is_rate_limit_error(e), # Retry if it's NOT a rate limit error - before_sleep=log_retry, - reraise=True - ) - -@db_retry_decorator() -def perform_db_operation(func, *args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as e: - sentry_sdk.capture_exception(e) - raise - -@openai_retry_decorator() -@rate_limit_retry_decorator() -def perform_openai_operation(func, *args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as e: - if is_rate_limit_error(e): - raise RateLimitError(str(e)) from e - sentry_sdk.capture_exception(e) - raise diff --git a/www/package.json b/www/package.json index 68372e5..a62bb90 100644 --- a/www/package.json +++ b/www/package.json @@ -25,6 +25,7 @@ "react-toggle-dark-mode": "^1.1.1", "rehype-katex": "^7.0.1", "remark-math": "^6.0.0", + "retry": "^0.13.1", "sharp": "^0.32.6", "stripe": "^16.11.0", "sweetalert2": "^11.14.0", @@ -36,6 +37,7 @@ "@types/react": "18.2.21", "@types/react-dom": "18.2.7", "@types/react-syntax-highlighter": "^15.5.13", + "@types/retry": "^0.12.5", "@types/uuid": "^9.0.8", "autoprefixer": "10.4.15", "encoding": "^0.1.13", diff --git a/www/pnpm-lock.yaml b/www/pnpm-lock.yaml index d3200d4..5aba9eb 100644 --- a/www/pnpm-lock.yaml +++ b/www/pnpm-lock.yaml @@ -56,6 +56,9 @@ importers: remark-math: specifier: ^6.0.0 version: 6.0.0 + retry: + specifier: ^0.13.1 + version: 0.13.1 sharp: specifier: ^0.32.6 version: 0.32.6 @@ -84,6 +87,9 @@ importers: '@types/react-syntax-highlighter': specifier: ^15.5.13 version: 15.5.13 + '@types/retry': + specifier: ^0.12.5 + version: 0.12.5 '@types/uuid': specifier: ^9.0.8 version: 9.0.8 @@ -1642,6 +1648,9 @@ packages: '@types/react@18.2.21': resolution: {integrity: sha512-neFKG/sBAwGxHgXiIxnbm3/AAVQ/cMRS93hvBpg8xYRbeQSPVABp9U2bRnPf0iI4+Ucdv3plSxKK+3CW2ENJxA==} + '@types/retry@0.12.5': + resolution: {integrity: sha512-3xSjTp3v03X/lSQLkczaN9UIEwJMoMCA1+Nb5HfbJEQWogdeQIyVtTvxPXDQjZ5zws8rFQfVfRdz03ARihPJgw==} + '@types/scheduler@0.23.0': resolution: {integrity: sha512-YIoDCTH3Af6XM5VuwGG/QL/CJqga1Zm3NkU3HZ4ZHK2fRMPYP1VczsTUqtsf43PH/iJNVlPHAo2oWX7BSdB2Hw==} @@ -4448,6 +4457,10 @@ packages: resolution: {integrity: sha512-l+sSefzHpj5qimhFSE5a8nufZYAM3sBSVMAPtYkmC+4EH2anSGaEMXSD0izRQbu9nfyQ9y5JrVmp7E8oZrUjvA==} engines: {node: '>=8'} + retry@0.13.1: + resolution: {integrity: sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==} + engines: {node: '>= 4'} + reusify@1.0.4: resolution: {integrity: sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==} engines: {iojs: '>=1.0.0', node: '>=0.10.0'} @@ -7358,6 +7371,8 @@ snapshots: '@types/scheduler': 0.23.0 csstype: 3.1.3 + '@types/retry@0.12.5': {} + '@types/scheduler@0.23.0': {} '@types/shimmer@1.2.0': {} @@ -10821,6 +10836,8 @@ snapshots: onetime: 5.1.2 signal-exit: 3.0.7 + retry@0.13.1: {} + reusify@1.0.4: {} rimraf@2.6.3: diff --git a/www/utils/api.ts b/www/utils/api.ts index 22788c0..e74067a 100644 --- a/www/utils/api.ts +++ b/www/utils/api.ts @@ -1,5 +1,8 @@ +import { Reaction } from "@/components/messagebox"; +import { retryDBOperation, retryOpenAIOperation } from "./retryUtils"; + const defaultMessage: Message = { - text: `I'm your Aristotelian learning companion — here to help you follow your curiosity in whatever direction you like. My engineering makes me extremely receptive to your needs and interests. You can reply normally, and I’ll always respond!\n\nIf I'm off track, just say so!\n\nNeed to leave or just done chatting? Let me know! I’m conversational by design so I’ll say goodbye 😊.`, + text: `I'm your Aristotelian learning companion — here to help you follow your curiosity in whatever direction you like. My engineering makes me extremely receptive to your needs and interests. You can reply normally, and I'll always respond!\n\nIf I'm off track, just say so!\n\nNeed to leave or just done chatting? Let me know! I'm conversational by design so I'll say goodbye 😊.`, isUser: false, id: "", }; @@ -30,65 +33,71 @@ export class Conversation { } async getMessages() { - const req = await fetch( - `${this.api.url}/api/messages?` + - new URLSearchParams({ - conversation_id: this.conversationId, - user_id: this.api.userId, - }), - ); - const { messages: rawMessages } = await req.json(); - // console.log(rawMessages); - if (!rawMessages) return []; - const messages = rawMessages.map((rawMessage: any) => { - return { - text: rawMessage.data.content, - isUser: rawMessage.type === "human", - id: rawMessage.id, - }; - }); + return retryDBOperation(async () => { + const req = await fetch( + `${this.api.url}/api/messages?` + + new URLSearchParams({ + conversation_id: this.conversationId, + user_id: this.api.userId, + }), + ); + const { messages: rawMessages } = await req.json(); + if (!rawMessages) return []; + const messages = rawMessages.map((rawMessage: any) => { + return { + text: rawMessage.data.content, + isUser: rawMessage.type === "human", + id: rawMessage.id, + }; + }); - return messages; + return messages; + }); } async setName(name: string) { if (!name || name === this.name) return; - await fetch(`${this.api.url}/api/conversations/update`, { - method: "POST", - body: JSON.stringify({ - conversation_id: this.conversationId, - user_id: this.api.userId, - name, - }), - headers: { - "Content-Type": "application/json", - }, + await retryDBOperation(async () => { + await fetch(`${this.api.url}/api/conversations/update`, { + method: "POST", + body: JSON.stringify({ + conversation_id: this.conversationId, + user_id: this.api.userId, + name, + }), + headers: { + "Content-Type": "application/json", + }, + }); + this.name = name; }); - this.name = name; } async delete() { - await fetch( - `${this.api.url}/api/conversations/delete?user_id=${this.api.userId}&conversation_id=${this.conversationId}`, - ).then((res) => res.json()); + await retryDBOperation(async () => { + await fetch( + `${this.api.url}/api/conversations/delete?user_id=${this.api.userId}&conversation_id=${this.conversationId}`, + ).then((res) => res.json()); + }); } async chat(message: string) { - const req = await fetch(`${this.api.url}/api/stream`, { - method: "POST", - body: JSON.stringify({ - conversation_id: this.conversationId, - user_id: this.api.userId, - message, - }), - headers: { - "Content-Type": "application/json", - }, - }); + return retryOpenAIOperation(async () => { + const req = await fetch(`${this.api.url}/api/stream`, { + method: "POST", + body: JSON.stringify({ + conversation_id: this.conversationId, + user_id: this.api.userId, + message, + }), + headers: { + "Content-Type": "application/json", + }, + }); - const reader = req.body?.pipeThrough(new TextDecoderStream()).getReader()!; - return reader; + return req.body?.pipeThrough(new TextDecoderStream()).getReader()!; + }); } } @@ -107,83 +116,157 @@ export class API { } async new() { - const req = await fetch( - `${this.url}/api/conversations/insert?user_id=${this.userId}`, - ); - const { conversation_id } = await req.json(); - return new Conversation({ - api: this, - name: "", - conversationId: conversation_id, + return retryDBOperation(async () => { + const req = await fetch( + `${this.url}/api/conversations/insert?user_id=${this.userId}`, + ); + const { conversation_id } = await req.json(); + return new Conversation({ + api: this, + name: "", + conversationId: conversation_id, + }); }); } async getConversations() { - const req = await fetch( - `${this.url}/api/conversations/get?user_id=${this.userId}`, - ); - const { conversations }: { conversations: RawConversation[] } = - await req.json(); - - if (conversations.length === 0) { - return [await this.new()]; - } - return conversations.map( - (conversation) => - new Conversation({ - api: this, - name: conversation.name, - conversationId: conversation.conversation_id, - }), - ); + return retryDBOperation(async () => { + const req = await fetch( + `${this.url}/api/conversations/get?user_id=${this.userId}`, + ); + const { conversations }: { conversations: RawConversation[] } = + await req.json(); + + if (conversations.length === 0) { + return [await this.new()]; + } + return conversations.map( + (conversation) => + new Conversation({ + api: this, + name: conversation.name, + conversationId: conversation.conversation_id, + }), + ); + }); } async getMessagesByConversation(conversationId: string) { - const req = await fetch( - `${this.url}/api/messages?` + - new URLSearchParams({ - conversation_id: conversationId, - user_id: this.userId, - }), - ); - const { messages: rawMessages } = await req.json(); - // console.log(rawMessages); - if (!rawMessages) return []; - const messages: Message[] = rawMessages.map((rawMessage: any) => { - return { - text: rawMessage.content, - isUser: rawMessage.isUser, - id: rawMessage.id, - }; - }); + return retryDBOperation(async () => { + const req = await fetch( + `${this.url}/api/messages?` + + new URLSearchParams({ + conversation_id: conversationId, + user_id: this.userId, + }), + ); + const { messages: rawMessages } = await req.json(); + if (!rawMessages) return []; + const messages: Message[] = rawMessages.map((rawMessage: any) => { + return { + text: rawMessage.content, + isUser: rawMessage.isUser, + id: rawMessage.id, + }; + }); - return [defaultMessage, ...messages]; + return [defaultMessage, ...messages]; + }); } async getThoughtById( conversationId: string, messageId: string, ): Promise { - try { - const response = await fetch( - `${this.url}/api/thought/${messageId}?user_id=${this.userId}&conversation_id=${conversationId}`, - { - method: "GET", - headers: { - "Content-Type": "application/json", + return retryDBOperation(async () => { + try { + const response = await fetch( + `${this.url}/api/thought/${messageId}?user_id=${this.userId}&conversation_id=${conversationId}`, + { + method: "GET", + headers: { + "Content-Type": "application/json", + }, }, - }, - ); + ); + + if (!response.ok) { + throw new Error("Failed to fetch thought"); + } + + const data = await response.json(); + return data.thought; + } catch (error) { + console.error("Error fetching thought:", error); + return null; + } + }); + } + + async addReaction( + conversationId: string, + messageId: string, + reaction: Exclude, + ): Promise<{ status: string }> { + return retryDBOperation(async () => { + try { + const response = await fetch( + `${this.url}/api/reaction/${messageId}?user_id=${this.userId}&conversation_id=${conversationId}&reaction=${reaction}`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + }, + ); + + if (!response.ok) { + throw new Error("Failed to add reaction"); + } - if (!response.ok) { - throw new Error("Failed to fetch thought"); + return await response.json(); + } catch (error) { + console.error("Error adding reaction:", error); + throw error; } + }); + } - const data = await response.json(); - return data.thought; - } catch (error) { - console.error("Error fetching thought:", error); - return null; - } + async getReaction( + conversationId: string, + messageId: string, + ): Promise<{ reaction: Reaction }> { + return retryDBOperation(async () => { + try { + const response = await fetch( + `${this.url}/api/reaction/${messageId}?user_id=${this.userId}&conversation_id=${conversationId}`, + { + method: "GET", + headers: { + "Content-Type": "application/json", + }, + }, + ); + + if (!response.ok) { + throw new Error("Failed to get reaction"); + } + + const data = await response.json(); + + // Validate the reaction + if ( + data.reaction !== null && + !["thumbs_up", "thumbs_down"].includes(data.reaction) + ) { + throw new Error("Invalid reaction received from server"); + } + + return data as { reaction: Reaction }; + } catch (error) { + console.error("Error getting reaction:", error); + throw error; + } + }); } } diff --git a/www/utils/retryUtils.ts b/www/utils/retryUtils.ts new file mode 100644 index 0000000..492d369 --- /dev/null +++ b/www/utils/retryUtils.ts @@ -0,0 +1,72 @@ +import retry from "retry"; +import { captureException, captureMessage } from "@sentry/nextjs"; + +interface RetryOptions { + retries: number; + factor: number; + minTimeout: number; + maxTimeout: number; +} + +const dbOptions: RetryOptions = { + retries: 3, + factor: 1.5, + minTimeout: 1000, + maxTimeout: 10000, +}; + +const openAIOptions: RetryOptions = { + retries: 5, + factor: 2, + minTimeout: 4000, + maxTimeout: 60000, +}; + +function isRateLimitError(error: any): boolean { + return error?.response?.data?.error === "rate_limit_exceeded"; +} + +function retryOperation( + operation: () => Promise, + options: RetryOptions, + isOpenAI: boolean, +): Promise { + return new Promise((resolve, reject) => { + const retryOperation = retry.operation(options); + + retryOperation.attempt(async (currentAttempt) => { + try { + const result = await operation(); + resolve(result); + } catch (error: any) { + if (isOpenAI && isRateLimitError(error)) { + captureMessage("OpenAI Rate Limit Hit", { + level: "warning", + extra: { + attempt: currentAttempt, + error: error.message, + }, + }); + } else { + captureException(error); + } + + if (retryOperation.retry(error)) { + return; + } + + reject(retryOperation.mainError()); + } + }); + }); +} + +export function retryDBOperation(operation: () => Promise): Promise { + return retryOperation(operation, dbOptions, false); +} + +export function retryOpenAIOperation( + operation: () => Promise, +): Promise { + return retryOperation(operation, openAIOptions, true); +} From 318cdc7ba0d9819e64e201015a8d9de8c107888c Mon Sep 17 00:00:00 2001 From: Ben Lopata Date: Thu, 10 Oct 2024 12:37:33 -0500 Subject: [PATCH 5/9] Cleanup messagebox params and page component loading. --- www/app/page.tsx | 32 +++++++++++++++++++------------- www/components/messagebox.tsx | 10 +++++----- www/utils/api.ts | 2 +- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/www/app/page.tsx b/www/app/page.tsx index 9c62cdb..34c0bff 100644 --- a/www/app/page.tsx +++ b/www/app/page.tsx @@ -6,9 +6,6 @@ import dynamic from "next/dynamic"; import banner from "@/public/bloom2x1.svg"; import darkBanner from "@/public/bloom2x1dark.svg"; -import MessageBox from "@/components/messagebox"; -import Sidebar from "@/components/sidebar"; -import MarkdownWrapper from "@/components/markdownWrapper"; import { DarkModeSwitch } from "react-toggle-dark-mode"; import { FaLightbulb, FaPaperPlane, FaBars } from "react-icons/fa"; import Swal from "sweetalert2"; @@ -22,7 +19,15 @@ import { getSubscription } from "@/utils/supabase/queries"; import { API } from "@/utils/api"; import { createClient } from "@/utils/supabase/client"; -const Thoughts = dynamic(() => import("@/components/thoughts")); +const Thoughts = dynamic(() => import("@/components/thoughts"), { + ssr: false, +}); +const MessageBox = dynamic(() => import("@/components/messagebox"), { + ssr: false, +}); +const Sidebar = dynamic(() => import("@/components/sidebar"), { + ssr: false, +}); const URL = process.env.NEXT_PUBLIC_API_URL; @@ -79,11 +84,9 @@ export default function Home() { const sub = await getSubscription(supabase); setIsSubscribed(!!sub); } - })(); }, [supabase, posthog, userId]); - useEffect(() => { const messageContainer = messageContainerRef.current; if (!messageContainer) return; @@ -204,7 +207,6 @@ export default function Home() { isThinking = false; continue; } - console.log(value) setThought((prev) => prev + value); } else { if (value.includes("❀")) { @@ -300,8 +302,7 @@ export default function Home() { isUser={message.isUser} userId={userId} URL={URL} - messageId={message.id} - text={message.text} + message={message} loading={messagesLoading} conversationId={conversationId} setThought={setThought} @@ -310,7 +311,7 @@ export default function Home() { )) || ( { diff --git a/www/components/messagebox.tsx b/www/components/messagebox.tsx index 39ea3a4..8ad74df 100644 --- a/www/components/messagebox.tsx +++ b/www/components/messagebox.tsx @@ -10,21 +10,21 @@ interface MessageBoxProps { isUser?: boolean; userId?: string; URL?: string; - messageId?: string; conversationId?: string; - text: string; + message: { text: string; id: string }; loading?: boolean; isThoughtsOpen?: boolean; setIsThoughtsOpen: (isOpen: boolean) => void; setThought: (thought: string) => void; } +export type Reaction = "thumbs_up" | "thumbs_down" | null; + export default function MessageBox({ isUser, userId, URL, - messageId, - text, + message, loading = false, setIsThoughtsOpen, conversationId, @@ -32,7 +32,7 @@ export default function MessageBox({ }: MessageBoxProps) { const [isThoughtLoading, setIsThoughtLoading] = useState(false); const [error, setError] = useState(null); - + const { id: messageId, text } = message; const shouldShowButtons = messageId !== ""; const handleFetchThought = async () => { diff --git a/www/utils/api.ts b/www/utils/api.ts index e74067a..652ce3a 100644 --- a/www/utils/api.ts +++ b/www/utils/api.ts @@ -1,4 +1,4 @@ -import { Reaction } from "@/components/messagebox"; +import { type Reaction } from "@/components/messagebox"; import { retryDBOperation, retryOpenAIOperation } from "./retryUtils"; const defaultMessage: Message = { From f7e0cb121c6b6eaa07aaf11b943adbf9702c9cf8 Mon Sep 17 00:00:00 2001 From: Ben Lopata Date: Fri, 11 Oct 2024 14:20:57 -0500 Subject: [PATCH 6/9] Removes tenacity from /api/ deps. --- api/pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/api/pyproject.toml b/api/pyproject.toml index 75f3a7a..1f268ec 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -10,7 +10,6 @@ dependencies = [ "honcho-ai>=0.0.14", "python-dotenv>=1.0.1", "agent", - "tenacity>=8.2.0", ] [tool.uv.sources] From 84ff08c769505394268f4a09546169b3736e2a11 Mon Sep 17 00:00:00 2001 From: Ben Lopata <42045469+bLopata@users.noreply.github.com> Date: Tue, 15 Oct 2024 09:36:05 -0500 Subject: [PATCH 7/9] Fix code scanning alert no. 10: Information exposure through an exception Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- api/routers/chat.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/api/routers/chat.py b/api/routers/chat.py index 59d5544..632b477 100644 --- a/api/routers/chat.py +++ b/api/routers/chat.py @@ -68,16 +68,18 @@ def convo_turn(): return StreamingResponse(convo_turn()) except Exception as e: - # Log the error here if needed + # Log the error here + import logging + logging.error("An error occurred: %s", str(e)) if "rate limit" in str(e).lower(): return JSONResponse( status_code=429, - content={"error": "rate_limit_exceeded", "message": str(e)} + content={"error": "rate_limit_exceeded", "message": "Rate limit exceeded. Please try again later."} ) else: return JSONResponse( status_code=500, - content={"error": "internal_server_error", "message": str(e)} + content={"error": "internal_server_error", "message": "An internal server error has occurred."} ) @@ -95,8 +97,10 @@ async def get_thought(conversation_id: str, message_id: str, user_id: str): # In practice, there should only be one thought per message return {"thought": thought.items[0].content if thought.items else None} except Exception as e: - # Log the error here if needed + # Log the error here + import logging + logging.error("An error occurred: %s", str(e)) return JSONResponse( status_code=500, - content={"error": "internal_server_error", "message": str(e)} + content={"error": "internal_server_error", "message": "An internal server error has occurred."} ) From c044b58531ec57b51cee61a930f4d0784c9b29cc Mon Sep 17 00:00:00 2001 From: Ben Lopata Date: Wed, 16 Oct 2024 13:24:49 -0500 Subject: [PATCH 8/9] Background task encapsulation for honcho SDK operations --- api/routers/chat.py | 119 ++++++++++++++++++++++++-------------------- 1 file changed, 66 insertions(+), 53 deletions(-) diff --git a/api/routers/chat.py b/api/routers/chat.py index 632b477..6341db2 100644 --- a/api/routers/chat.py +++ b/api/routers/chat.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, BackgroundTasks from fastapi.responses import StreamingResponse, JSONResponse from api import schemas @@ -6,71 +6,58 @@ from agent.chain import ThinkCall, RespondCall +import logging + router = APIRouter(prefix="/api", tags=["chat"]) @router.post("/stream") -async def stream(inp: schemas.ConversationInput): +async def stream(inp: schemas.ConversationInput, background_tasks: BackgroundTasks): """Stream the response to the user, currently only used by the Web UI and has integration to be able to use Honcho if not anonymous""" try: user = honcho.apps.users.get_or_create(app_id=app.id, name=inp.user_id) def convo_turn(): - thought_stream = ThinkCall( - user_input=inp.message, - app_id=app.id, - user_id=user.id, - session_id=str(inp.conversation_id), - honcho=honcho, - ).stream() thought = "" - for chunk in thought_stream: - thought += chunk - yield chunk - - yield "❀" - response_stream = RespondCall( - user_input=inp.message, - thought=thought, - app_id=app.id, - user_id=user.id, - session_id=str(inp.conversation_id), - honcho=honcho, - ).stream() response = "" - for chunk in response_stream: - response += chunk - yield chunk - yield "❀" + try: + thought_stream = ThinkCall( + user_input=inp.message, + app_id=app.id, + user_id=user.id, + session_id=str(inp.conversation_id), + honcho=honcho, + ).stream() + for chunk in thought_stream: + thought += chunk + yield chunk - honcho.apps.users.sessions.messages.create( - is_user=True, - session_id=str(inp.conversation_id), - app_id=app.id, - user_id=user.id, - content=inp.message, - ) - new_ai_message = honcho.apps.users.sessions.messages.create( - is_user=False, - session_id=str(inp.conversation_id), - app_id=app.id, - user_id=user.id, - content=response, - ) - honcho.apps.users.sessions.metamessages.create( - app_id=app.id, - session_id=str(inp.conversation_id), - user_id=user.id, - message_id=new_ai_message.id, - metamessage_type="thought", - content=thought, + yield "❀" + response_stream = RespondCall( + user_input=inp.message, + thought=thought, + app_id=app.id, + user_id=user.id, + session_id=str(inp.conversation_id), + honcho=honcho, + ).stream() + for chunk in response_stream: + response += chunk + yield chunk + yield "❀" + except Exception as e: + logging.error(f"Error during streaming: {str(e)}") + yield f"Error: {str(e)}" + return + + background_tasks.add_task( + create_messages_and_metamessages, + app.id, user.id, inp.conversation_id, inp.message, thought, response ) return StreamingResponse(convo_turn()) except Exception as e: - # Log the error here - import logging - logging.error("An error occurred: %s", str(e)) + logging.error(f"An error occurred: {str(e)}") if "rate limit" in str(e).lower(): return JSONResponse( status_code=429, @@ -97,10 +84,36 @@ async def get_thought(conversation_id: str, message_id: str, user_id: str): # In practice, there should only be one thought per message return {"thought": thought.items[0].content if thought.items else None} except Exception as e: - # Log the error here - import logging - logging.error("An error occurred: %s", str(e)) + logging.error(f"An error occurred: {str(e)}") return JSONResponse( status_code=500, content={"error": "internal_server_error", "message": "An internal server error has occurred."} ) + + +def create_messages_and_metamessages(app_id, user_id, conversation_id, user_message, thought, ai_response): + try: + honcho.apps.users.sessions.messages.create( + is_user=True, + session_id=str(conversation_id), + app_id=app_id, + user_id=user_id, + content=user_message, + ) + new_ai_message = honcho.apps.users.sessions.messages.create( + is_user=False, + session_id=str(conversation_id), + app_id=app_id, + user_id=user_id, + content=ai_response, + ) + honcho.apps.users.sessions.metamessages.create( + app_id=app_id, + session_id=str(conversation_id), + user_id=user_id, + message_id=new_ai_message.id, + metamessage_type="thought", + content=thought, + ) + except Exception as e: + logging.error(f"Error in background task: {str(e)}") From 42f503cc1625eb7ba9f5f0860955710811203c2f Mon Sep 17 00:00:00 2001 From: Ben Lopata Date: Fri, 18 Oct 2024 10:45:24 -0500 Subject: [PATCH 9/9] Wraps honcho operations in promise handling. --- api/routers/chat.py | 63 ++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/api/routers/chat.py b/api/routers/chat.py index 6341db2..700e18b 100644 --- a/api/routers/chat.py +++ b/api/routers/chat.py @@ -12,12 +12,11 @@ @router.post("/stream") -async def stream(inp: schemas.ConversationInput, background_tasks: BackgroundTasks): - """Stream the response to the user, currently only used by the Web UI and has integration to be able to use Honcho if not anonymous""" +async def stream(inp: schemas.ConversationInput): try: user = honcho.apps.users.get_or_create(app_id=app.id, name=inp.user_id) - def convo_turn(): + async def convo_turn(): thought = "" response = "" try: @@ -50,8 +49,7 @@ def convo_turn(): yield f"Error: {str(e)}" return - background_tasks.add_task( - create_messages_and_metamessages, + await create_messages_and_metamessages( app.id, user.id, inp.conversation_id, inp.message, thought, response ) @@ -69,45 +67,24 @@ def convo_turn(): content={"error": "internal_server_error", "message": "An internal server error has occurred."} ) - -@router.get("/thought/{message_id}") -async def get_thought(conversation_id: str, message_id: str, user_id: str): - try: - user = honcho.apps.users.get_or_create(app_id=app.id, name=user_id) - thought = honcho.apps.users.sessions.metamessages.list( - session_id=conversation_id, - app_id=app.id, - user_id=user.id, - message_id=message_id, - metamessage_type="thought", - ) - # In practice, there should only be one thought per message - return {"thought": thought.items[0].content if thought.items else None} - except Exception as e: - logging.error(f"An error occurred: {str(e)}") - return JSONResponse( - status_code=500, - content={"error": "internal_server_error", "message": "An internal server error has occurred."} - ) - - -def create_messages_and_metamessages(app_id, user_id, conversation_id, user_message, thought, ai_response): +async def create_messages_and_metamessages(app_id, user_id, conversation_id, user_message, thought, ai_response): try: - honcho.apps.users.sessions.messages.create( + # These operations will use the DB layer's built-in retry logic + await honcho.apps.users.sessions.messages.create( is_user=True, session_id=str(conversation_id), app_id=app_id, user_id=user_id, content=user_message, ) - new_ai_message = honcho.apps.users.sessions.messages.create( + new_ai_message = await honcho.apps.users.sessions.messages.create( is_user=False, session_id=str(conversation_id), app_id=app_id, user_id=user_id, content=ai_response, ) - honcho.apps.users.sessions.metamessages.create( + await honcho.apps.users.sessions.metamessages.create( app_id=app_id, session_id=str(conversation_id), user_id=user_id, @@ -116,4 +93,26 @@ def create_messages_and_metamessages(app_id, user_id, conversation_id, user_mess content=thought, ) except Exception as e: - logging.error(f"Error in background task: {str(e)}") + logging.error(f"Error in create_messages_and_metamessages: {str(e)}") + raise # Re-raise the exception to be handled by the caller + + +@router.get("/thought/{message_id}") +async def get_thought(conversation_id: str, message_id: str, user_id: str): + try: + user = honcho.apps.users.get_or_create(app_id=app.id, name=user_id) + thought = honcho.apps.users.sessions.metamessages.list( + session_id=conversation_id, + app_id=app.id, + user_id=user.id, + message_id=message_id, + metamessage_type="thought", + ) + # In practice, there should only be one thought per message + return {"thought": thought.items[0].content if thought.items else None} + except Exception as e: + logging.error(f"An error occurred: {str(e)}") + return JSONResponse( + status_code=500, + content={"error": "internal_server_error", "message": "An internal server error has occurred."} + )