Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LLM-mode + Port to Magentic #16

Merged
merged 6 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ jobs:
run: |
python -m pip install --upgrade pip
poetry install
- name: Build OpenBB
run: poetry run python -c "import openbb; openbb.build(); print('Done')"
- name: Run Pytest
run : poetry run pytest tests/
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run : poetry run pytest -n 8 tests/
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ pre-commit install
```

### Testing

We are in the process of adding tests.

We use `pytest` as our test-runner:

We use `pytest` as our test runner:
``` sh
pytest tests/
pytest -n 8 tests/
```

905 changes: 0 additions & 905 deletions langchain-tool-retrieval.ipynb

This file was deleted.

406 changes: 0 additions & 406 deletions openbb-agent.ipynb

This file was deleted.

307 changes: 250 additions & 57 deletions openbb_agents/agent.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
import asyncio
import logging
from typing import Optional

from openbb_agents.chains import (
generate_final_response,
from langchain.vectorstores import VectorStore

from .chains import (
agenerate_subquestion_answer,
agenerate_subquestions_from_query,
asearch_tools,
generate_final_answer,
generate_subquestion_answer,
generate_subquestions,
select_tools,
generate_subquestions_from_query,
search_tools,
)
from openbb_agents.models import SubQuestionAgentConfig
from openbb_agents.tools import (
create_tool_index,
get_all_openbb_tools,
map_openbb_routes_to_langchain_tools,
from .models import AnsweredSubQuestion, SubQuestion
from .tools import (
build_openbb_tool_vector_index,
build_vector_index_from_openbb_function_descriptions,
map_name_to_openbb_function_description,
)
from openbb_agents.utils import get_dependencies

from . import VERBOSE
from .utils import get_dependencies

logger = logging.getLogger(__name__)


def openbb_agent(
query: str, openbb_tools: Optional[list[str]] = None, verbose=VERBOSE
) -> str:
def openbb_agent(query: str, openbb_tools: list[str] | None = None) -> str:
"""Answer a query using the OpenBB Agent equipped with tools.
By default all available openbb tools are used. You can have a query
Expand All @@ -33,60 +34,252 @@ def openbb_agent(
----------
query : str
The query you want to have answered.
openbb_tools : optional[list[str]]
Optional. Specify the OpenBB collections or commands that you use to use. If not
openbb_tools : list[Callable]
Optional. Specify the OpenBB functions you want to use. If not
specified, every available OpenBB tool will be used.
Examples
--------
>>> # Use all OpenBB tools to answer the query
>>> openbb_agent("What is the market cap of TSLA?")
>>> openbb_agent("What is the stock price of TSLA?")
>>> # Use only the specified tools to answer the query
>>> openbb_agent("What is the market cap of TSLA?",
... openbb_tools=["/equity/fundamental", "/equity/price/historical"])
>>> openbb_agent("What is the stock price of TSLA?",
... openbb_tools=['.equity.price.quote'])
"""
tool_vector_index = _handle_tool_vector_index(openbb_tools)
subquestions = generate_subquestions_from_query(user_query=query)

subquestion_list = generate_subquestions(query, verbose=verbose)
logger.info("Generated subquestions: %s", subquestion_list)
logger.info("Generated subquestions: %s", subquestions)

if openbb_tools:
tools = map_openbb_routes_to_langchain_tools(openbb_tools)
else:
tools = get_all_openbb_tools()
vector_index = create_tool_index(tools=tools)
answered_subquestions = []
for subquestion in subquestions:
if _is_subquestion_answerable(
subquestion=subquestion, answered_subquestions=answered_subquestions
):
logger.info("Answering subquestion: %s", subquestion)
answered_subquestion = _fetch_tools_and_answer_subquestion(
user_query=query,
subquestion=subquestion,
tool_vector_index=tool_vector_index,
answered_subquestions=answered_subquestions,
)
answered_subquestions.append(answered_subquestion)
else:
logger.info("Skipping unanswerable subquestion: %s", subquestion)
return generate_final_answer(
user_query=query,
answered_subquestions=answered_subquestions,
)


async def aopenbb_agent(query: str, openbb_tools: list[str] | None = None) -> str:
"""Answer a query using the OpenBB Agent equipped with tools.
Async variant of `openbb_agent`.
By default all available openbb tools are used. You can have a query
answered using a smaller subset of OpenBB tools by using the `openbb_tools`
argument.
Parameters
----------
query : str
The query you want to have answered.
openbb_tools : list[Callable]
Optional. Specify the OpenBB functions you want to use. If not
specified, every available OpenBB tool will be used.
Examples
--------
>>> # Use all OpenBB tools to answer the query
>>> openbb_agent("What is the stock price of TSLA?")
>>> # Use only the specified tools to answer the query
>>> openbb_agent("What is the stock price of TSLA?",
... openbb_tools=['.equity.price.quote'])
"""
tool_vector_index = _handle_tool_vector_index(openbb_tools)

subquestions = await agenerate_subquestions_from_query(user_query=query)
answered_subquestions = await _aprocess_subquestions(
user_query=query,
subquestions=subquestions,
tool_vector_index=tool_vector_index,
)

return generate_final_answer(
user_query=query,
answered_subquestions=answered_subquestions,
)


async def _aprocess_subquestions(
user_query: str, subquestions: list[SubQuestion], tool_vector_index: VectorStore
) -> list[AnsweredSubQuestion]:
answered_subquestions = []
for subquestion in subquestion_list.subquestions: # TODO: Do in parallel
# Fetch tool for subquestion
logger.info("Attempting to select tools for: %s", {subquestion.question})
selected_tools = select_tools(
vector_index=vector_index,
tools=tools,
subquestion=subquestion,
answered_subquestions=answered_subquestions,
verbose=verbose,
queued_subquestions = []

tasks = []
while True:
unanswered_subquestions = _get_unanswered_subquestions(
answered_subquestions=answered_subquestions, subquestions=subquestions
)
# TODO: Improve filtering of tools (probably by storing them in a dict)
tool_names = [tool.name for tool in selected_tools.tools]
subquestion_tools = [tool for tool in tools if tool.name in tool_names]
logger.info("Retrieved tool(s): %s", tool_names)

# Then attempt to answer subquestion
answered_subquestion = generate_subquestion_answer(
SubQuestionAgentConfig(
query=query,
subquestion=subquestion,
tools=subquestion_tools,
dependencies=get_dependencies(
answered_subquestions, subquestion
), # TODO: Just do this in gneerate_subquestion_answer
),
verbose=verbose,
logger.info("Pending subquestions: %s", unanswered_subquestions)

new_answerable_subquestions = _get_answerable_subquestions(
subquestions=unanswered_subquestions,
answered_subquestions=answered_subquestions,
)
answered_subquestions.append(answered_subquestion)
logger.info("Answerable subquestions: %s", new_answerable_subquestions)

for subquestion in new_answerable_subquestions:
logger.info("Scheduling subquestion for answer: %s", subquestion)
# Make sure we only submit newly answerable questions (since the
# other ones have been submitted already)
if subquestion not in queued_subquestions:
task = asyncio.create_task(
_afetch_tools_and_answer_subquestion(
user_query=user_query,
subquestion=subquestion,
tool_vector_index=tool_vector_index,
answered_subquestions=answered_subquestions,
)
)
tasks.append(task)
queued_subquestions.append(subquestion)

if not tasks:
break

done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
tasks = [task for task in tasks if not task.done()]

for task in done:
if task.exception():
logger.error("Unexpected error in task: %s", task.exception())
else:
answered_subquestion = task.result()
logger.info("Finished task for subquestion: %s", answered_subquestion)
answered_subquestions.append(answered_subquestion)

return answered_subquestions


def _fetch_tools_and_answer_subquestion(
user_query: str,
subquestion: SubQuestion,
tool_vector_index: VectorStore,
answered_subquestions: list[AnsweredSubQuestion],
) -> AnsweredSubQuestion:
logger.info("Attempting to select tools for: %s", {subquestion.question})
dependencies = get_dependencies(
answered_subquestions=answered_subquestions, subquestion=subquestion
)
tools = search_tools(
subquestion=subquestion,
tool_vector_index=tool_vector_index,
answered_subquestions=dependencies,
)
tool_names = [tool.__name__ for tool in tools]
logger.info("Retrieved tool(s): %s", tool_names)

# Then attempt to answer subquestion
logger.info("Answering subquestion: %s", subquestion.question)
answered_subquestion = generate_subquestion_answer(
user_query=user_query,
subquestion=subquestion,
tools=tools,
dependencies=dependencies,
)

logger.info("Answered subquestion: %s", answered_subquestion.answer)
return answered_subquestion


# Answer final question
return generate_final_response(
query=query, answered_subquestions=answered_subquestions, verbose=verbose
async def _afetch_tools_and_answer_subquestion(
user_query: str,
subquestion: SubQuestion,
tool_vector_index: VectorStore,
answered_subquestions: list[AnsweredSubQuestion],
) -> AnsweredSubQuestion:
logger.info("Attempting to select tools for: %s", {subquestion.question})
dependencies = get_dependencies(
answered_subquestions=answered_subquestions, subquestion=subquestion
)
tools = await asearch_tools(
subquestion=subquestion,
tool_vector_index=tool_vector_index,
answered_subquestions=dependencies,
)
tool_names = [tool.__name__ for tool in tools]
logger.info("Retrieved tool(s): %s", tool_names)

# Then attempt to answer subquestion
logger.info("Answering subquestion: %s", subquestion.question)
answered_subquestion = await agenerate_subquestion_answer(
user_query=user_query,
subquestion=subquestion,
tools=tools,
dependencies=dependencies,
)

logger.info("Answered subquestion: %s", answered_subquestion.answer)
return answered_subquestion


def _get_unanswered_subquestions(
answered_subquestions: list[AnsweredSubQuestion], subquestions: list[SubQuestion]
) -> list[SubQuestion]:
answered_subquestion_ids = [
answered_subquestion.subquestion.id
for answered_subquestion in answered_subquestions
]
return [
subquestion
for subquestion in subquestions
if subquestion.id not in answered_subquestion_ids
]


def _is_subquestion_answerable(
subquestion: SubQuestion, answered_subquestions: list[AnsweredSubQuestion]
) -> bool:
if not subquestion.depends_on:
return True

for id_ in subquestion.depends_on:
if id_ not in [
answered_subquestion.subquestion.id
for answered_subquestion in answered_subquestions
]:
return False
return True


def _get_answerable_subquestions(
subquestions: list[SubQuestion], answered_subquestions: list[AnsweredSubQuestion]
) -> list[SubQuestion]:
return [
subquestion
for subquestion in subquestions
if _is_subquestion_answerable(
subquestion=subquestion, answered_subquestions=answered_subquestions
)
]


def _handle_tool_vector_index(openbb_tools: list[str] | None) -> VectorStore:
if not openbb_tools:
logger.info("Using all available OpenBB tools.")
tool_vector_index = build_openbb_tool_vector_index()
else:
logger.info("Using specified OpenBB tools: %s", openbb_tools)
openbb_function_descriptions = [
map_name_to_openbb_function_description(obb_function_name)
for obb_function_name in openbb_tools
]
tool_vector_index = build_vector_index_from_openbb_function_descriptions(
openbb_function_descriptions
)
return tool_vector_index
Loading
Loading