From 3451b6fc7a2a382310a4d2ae39093f7c229485b4 Mon Sep 17 00:00:00 2001 From: "Brandon Hancock (bhancock_ai)" <109994880+bhancockio@users.noreply.github.com> Date: Fri, 16 Aug 2024 14:47:28 -0400 Subject: [PATCH] Clean up pipeline (#1187) * Clean up pipeline * Make versioning dynamic in templates * fix .env issues when openai is trying to use invalid keys * Fix type checker issue in pipeline * Fix tests. --- ...e-a-New-CrewAI-Pipeline-Template-Method.md | 136 ++++++++++++++++++ docs/index.md | 15 +- src/crewai/agents/executor.py | 18 +-- src/crewai/cli/templates/crew/pyproject.toml | 3 +- src/crewai/cli/templates/pipeline/README.md | 4 + .../cli/templates/pipeline/pyproject.toml | 2 +- .../templates/pipeline_router/pyproject.toml | 3 +- src/crewai/project/crew_base.py | 35 ++--- src/crewai/project/pipeline_base.py | 2 +- tests/pipeline/test_pipeline.py | 100 +++++-------- 10 files changed, 210 insertions(+), 108 deletions(-) create mode 100644 docs/getting-started/Create-a-New-CrewAI-Pipeline-Template-Method.md diff --git a/docs/getting-started/Create-a-New-CrewAI-Pipeline-Template-Method.md b/docs/getting-started/Create-a-New-CrewAI-Pipeline-Template-Method.md new file mode 100644 index 0000000000..f3859779aa --- /dev/null +++ b/docs/getting-started/Create-a-New-CrewAI-Pipeline-Template-Method.md @@ -0,0 +1,136 @@ +# Creating a CrewAI Pipeline Project + +Welcome to the comprehensive guide for creating a new CrewAI pipeline project. This document will walk you through the steps to create, customize, and run your CrewAI pipeline project, ensuring you have everything you need to get started. + +To learn more about CrewAI pipelines, visit the [CrewAI documentation](https://docs.crewai.com/core-concepts/Pipeline/). + +## Prerequisites + +Before getting started with CrewAI pipelines, make sure that you have installed CrewAI via pip: + +```shell +$ pip install crewai crewai-tools +``` + +The same prerequisites for virtual environments and Code IDEs apply as in regular CrewAI projects. + +## Creating a New Pipeline Project + +To create a new CrewAI pipeline project, you have two options: + +1. For a basic pipeline template: + +```shell +$ crewai create pipeline +``` + +2. For a pipeline example that includes a router: + +```shell +$ crewai create pipeline --router +``` + +These commands will create a new project folder with the following structure: + +``` +/ +├── README.md +├── poetry.lock +├── pyproject.toml +├── src/ +│ └── / +│ ├── __init__.py +│ ├── main.py +│ ├── crews/ +│ │ ├── crew1/ +│ │ │ ├── crew1.py +│ │ │ └── config/ +│ │ │ ├── agents.yaml +│ │ │ └── tasks.yaml +│ │ ├── crew2/ +│ │ │ ├── crew2.py +│ │ │ └── config/ +│ │ │ ├── agents.yaml +│ │ │ └── tasks.yaml +│ ├── pipelines/ +│ │ ├── __init__.py +│ │ ├── pipeline1.py +│ │ └── pipeline2.py +│ └── tools/ +│ ├── __init__.py +│ └── custom_tool.py +└── tests/ +``` + +## Customizing Your Pipeline Project + +To customize your pipeline project, you can: + +1. Modify the crew files in `src//crews/` to define your agents and tasks for each crew. +2. Modify the pipeline files in `src//pipelines/` to define your pipeline structure. +3. Modify `src//main.py` to set up and run your pipelines. +4. Add your environment variables into the `.env` file. + +### Example: Defining a Pipeline + +Here's an example of how to define a pipeline in `src//pipelines/normal_pipeline.py`: + +```python +from crewai import Pipeline +from crewai.project import PipelineBase +from ..crews.normal_crew import NormalCrew + +@PipelineBase +class NormalPipeline: + def __init__(self): + # Initialize crews + self.normal_crew = NormalCrew().crew() + + def create_pipeline(self): + return Pipeline( + stages=[ + self.normal_crew + ] + ) + + async def kickoff(self, inputs): + pipeline = self.create_pipeline() + results = await pipeline.kickoff(inputs) + return results +``` + +### Annotations + +The main annotation you'll use for pipelines is `@PipelineBase`. This annotation is used to decorate your pipeline classes, similar to how `@CrewBase` is used for crews. + +## Installing Dependencies + +To install the dependencies for your project, use Poetry: + +```shell +$ cd +$ poetry lock +$ poetry install +``` + +## Running Your Pipeline Project + +To run your pipeline project, use the following command: + +```shell +$ crewai run +``` + +or + +```shell +$ poetry run +``` + +This will initialize your pipeline and begin task execution as defined in your `main.py` file. + +## Deploying Your Pipeline Project + +Pipelines can be deployed in the same way as regular CrewAI projects. The easiest way is through [CrewAI+](https://www.crewai.com/crewaiplus), where you can deploy your pipeline in a few clicks. + +Remember, when working with pipelines, you're orchestrating multiple crews to work together in a sequence or parallel fashion. This allows for more complex workflows and information processing tasks. diff --git a/docs/index.md b/docs/index.md index c7f1f3a547..3f8917dac9 100644 --- a/docs/index.md +++ b/docs/index.md @@ -8,13 +8,20 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By diff --git a/src/crewai/agents/executor.py b/src/crewai/agents/executor.py index 532ceca253..f10ab785e2 100644 --- a/src/crewai/agents/executor.py +++ b/src/crewai/agents/executor.py @@ -1,33 +1,29 @@ import threading import time from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, Union -import click - +import click from langchain.agents import AgentExecutor from langchain.agents.agent import ExceptionTool from langchain.callbacks.manager import CallbackManagerForChainRun +from langchain.chains.summarize import load_summarize_chain +from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.agents import AgentAction, AgentFinish, AgentStep from langchain_core.exceptions import OutputParserException from langchain_core.tools import BaseTool from langchain_core.utils.input import get_color_mapping from pydantic import InstanceOf -from langchain.text_splitter import RecursiveCharacterTextSplitter -from langchain.chains.summarize import load_summarize_chain - from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin from crewai.agents.tools_handler import ToolsHandler - - from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException from crewai.utilities import I18N from crewai.utilities.constants import TRAINING_DATA_FILE from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededException, ) -from crewai.utilities.training_handler import CrewTrainingHandler from crewai.utilities.logger import Logger +from crewai.utilities.training_handler import CrewTrainingHandler class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin): @@ -213,11 +209,7 @@ def _iter_next_step( yield step return - yield AgentStep( - action=AgentAction("_Exception", str(e), str(e)), - observation=str(e), - ) - return + raise e # If the tool chosen is the finishing tool, then we end and return. if isinstance(output, AgentFinish): diff --git a/src/crewai/cli/templates/crew/pyproject.toml b/src/crewai/cli/templates/crew/pyproject.toml index 99ac243a84..1783b351ec 100644 --- a/src/crewai/cli/templates/crew/pyproject.toml +++ b/src/crewai/cli/templates/crew/pyproject.toml @@ -6,7 +6,8 @@ authors = ["Your Name "] [tool.poetry.dependencies] python = ">=3.10,<=3.13" -crewai = { extras = ["tools"], version = "^0.51.0" } +crewai = { extras = ["tools"], version = ">=0.51.0,<1.0.0" } + [tool.poetry.scripts] {{folder_name}} = "{{folder_name}}.main:run" diff --git a/src/crewai/cli/templates/pipeline/README.md b/src/crewai/cli/templates/pipeline/README.md index 60dc617e9d..3bb1bef6cb 100644 --- a/src/crewai/cli/templates/pipeline/README.md +++ b/src/crewai/cli/templates/pipeline/README.md @@ -15,12 +15,15 @@ pip install poetry Next, navigate to your project directory and install the dependencies: 1. First lock the dependencies and then install them: + ```bash poetry lock ``` + ```bash poetry install ``` + ### Customizing **Add your `OPENAI_API_KEY` into the `.env` file** @@ -49,6 +52,7 @@ The {{name}} Crew is composed of multiple AI agents, each with unique roles, goa ## Support For support, questions, or feedback regarding the {{crew_name}} Crew or crewAI. + - Visit our [documentation](https://docs.crewai.com) - Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai) - [Join our Discord](https://discord.com/invite/X4JWnZnxPb) diff --git a/src/crewai/cli/templates/pipeline/pyproject.toml b/src/crewai/cli/templates/pipeline/pyproject.toml index b86e72c81e..b72cebd6ab 100644 --- a/src/crewai/cli/templates/pipeline/pyproject.toml +++ b/src/crewai/cli/templates/pipeline/pyproject.toml @@ -6,7 +6,7 @@ authors = ["Your Name "] [tool.poetry.dependencies] python = ">=3.10,<=3.13" -crewai = { extras = ["tools"], version = "^0.51.0" } +crewai = { extras = ["tools"], version = ">=0.51.0,<1.0.0" } asyncio = "*" [tool.poetry.scripts] diff --git a/src/crewai/cli/templates/pipeline_router/pyproject.toml b/src/crewai/cli/templates/pipeline_router/pyproject.toml index f86dc71923..ab00bd3552 100644 --- a/src/crewai/cli/templates/pipeline_router/pyproject.toml +++ b/src/crewai/cli/templates/pipeline_router/pyproject.toml @@ -6,7 +6,8 @@ authors = ["Your Name "] [tool.poetry.dependencies] python = ">=3.10,<=3.13" -crewai = { extras = ["tools"], version = "^0.51.0" } +crewai = { extras = ["tools"], version = ">=0.51.0,<1.0.0" } + [tool.poetry.scripts] {{folder_name}} = "{{folder_name}}.main:main" diff --git a/src/crewai/project/crew_base.py b/src/crewai/project/crew_base.py index 460d4381c1..90db88b7c2 100644 --- a/src/crewai/project/crew_base.py +++ b/src/crewai/project/crew_base.py @@ -1,5 +1,4 @@ import inspect -import os from pathlib import Path from typing import Any, Callable, Dict @@ -15,42 +14,34 @@ class WrappedClass(cls): model_config = ConfigDict(arbitrary_types_allowed=True) is_crew_class: bool = True # type: ignore - base_directory = None - for frame_info in inspect.stack(): - if "site-packages" not in frame_info.filename: - base_directory = Path(frame_info.filename).parent.resolve() - break + # Get the directory of the class being decorated + base_directory = Path(inspect.getfile(cls)).parent original_agents_config_path = getattr( cls, "agents_config", "config/agents.yaml" ) - original_tasks_config_path = getattr(cls, "tasks_config", "config/tasks.yaml") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - if self.base_directory is None: - raise Exception( - "Unable to dynamically determine the project's base directory, you must run it from the project's root directory." - ) + agents_config_path = self.base_directory / self.original_agents_config_path + tasks_config_path = self.base_directory / self.original_tasks_config_path - self.agents_config = self.load_yaml( - os.path.join(self.base_directory, self.original_agents_config_path) - ) - - self.tasks_config = self.load_yaml( - os.path.join(self.base_directory, self.original_tasks_config_path) - ) + self.agents_config = self.load_yaml(agents_config_path) + self.tasks_config = self.load_yaml(tasks_config_path) self.map_all_agent_variables() self.map_all_task_variables() @staticmethod - def load_yaml(config_path: str): - with open(config_path, "r") as file: - # parsedContent = YamlParser.parse(file) # type: ignore # Argument 1 to "parse" has incompatible type "TextIOWrapper"; expected "YamlParser" - return yaml.safe_load(file) + def load_yaml(config_path: Path): + try: + with open(config_path, "r") as file: + return yaml.safe_load(file) + except FileNotFoundError: + print(f"File not found: {config_path}") + raise def _get_all_functions(self): return { diff --git a/src/crewai/project/pipeline_base.py b/src/crewai/project/pipeline_base.py index fd109be3b5..3f23403014 100644 --- a/src/crewai/project/pipeline_base.py +++ b/src/crewai/project/pipeline_base.py @@ -11,7 +11,7 @@ def PipelineBase(cls): class WrappedClass(cls): model_config = ConfigDict(arbitrary_types_allowed=True) - is_pipeline_class: bool = True + is_pipeline_class: bool = True # type: ignore def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 39f28ab868..c618274f85 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -25,14 +25,20 @@ def _create_mock_crew(name: str, output_json_dict=None, pydantic_output=None): MockCrewClass = type("MockCrew", (MagicMock, Crew), {}) class MockCrew(MockCrewClass): - def __deepcopy__(self, memo): + def __deepcopy__(self): result = MockCrewClass() result.kickoff_async = self.kickoff_async result.name = self.name return result + def copy( + self, + ): + return self + crew = MockCrew() crew.name = name + task_output = TaskOutput( description="Test task", raw="Task output", agent="Test Agent" ) @@ -44,9 +50,15 @@ def __deepcopy__(self, memo): pydantic=pydantic_output, ) - async def async_kickoff(inputs=None): + async def kickoff_async(inputs=None): return crew_output + # Create an AsyncMock for kickoff_async + crew.kickoff_async = AsyncMock(side_effect=kickoff_async) + + # Mock the synchronous kickoff method + crew.kickoff = MagicMock(return_value=crew_output) + # Add more attributes that Procedure might be expecting crew.verbose = False crew.output_log_file = None @@ -56,30 +68,16 @@ async def async_kickoff(inputs=None): crew.config = None crew.cache = True - # # Create a valid Agent instance - mock_agent = Agent( - name="Mock Agent", - role="Mock Role", - goal="Mock Goal", - backstory="Mock Backstory", - allow_delegation=False, - verbose=False, - ) - - # Create a valid Task instance - mock_task = Task( - description="Return: Test output", - expected_output="Test output", - agent=mock_agent, - async_execution=False, - context=None, - ) + # Add non-empty agents and tasks + mock_agent = MagicMock(spec=Agent) + mock_task = MagicMock(spec=Task) + mock_task.agent = mock_agent + mock_task.async_execution = False + mock_task.context = None crew.agents = [mock_agent] crew.tasks = [mock_task] - crew.kickoff_async = AsyncMock(side_effect=async_kickoff) - return crew return _create_mock_crew @@ -115,9 +113,7 @@ def __deepcopy__(self, memo): ( "route1" if x.get("score", 0) > 80 - else "route2" - if x.get("score", 0) > 50 - else "default" + else "route2" if x.get("score", 0) > 50 else "default" ), ) ) @@ -477,31 +473,17 @@ async def test_pipeline_with_parallel_stages_end_in_single_stage(mock_crew_facto """ Test that Pipeline correctly handles parallel stages. """ - crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent]) - crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent]) - crew3 = Crew(name="Crew 3", tasks=[task], agents=[agent]) - crew4 = Crew(name="Crew 4", tasks=[task], agents=[agent]) + crew1 = mock_crew_factory(name="Crew 1") + crew2 = mock_crew_factory(name="Crew 2") + crew3 = mock_crew_factory(name="Crew 3") + crew4 = mock_crew_factory(name="Crew 4") pipeline = Pipeline(stages=[crew1, [crew2, crew3], crew4]) input_data = [{"initial": "data"}] pipeline_result = await pipeline.kickoff(input_data) - with patch.object(Crew, "kickoff_async") as mock_kickoff: - mock_kickoff.return_value = CrewOutput( - raw="Test output", - tasks_output=[ - TaskOutput( - description="Test task", raw="Task output", agent="Test Agent" - ) - ], - token_usage=DEFAULT_TOKEN_USAGE, - json_dict=None, - pydantic=None, - ) - pipeline_result = await pipeline.kickoff(input_data) - - mock_kickoff.assert_called_with(inputs={"initial": "data"}) + crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"}) assert len(pipeline_result) == 1 pipeline_result_1 = pipeline_result[0] @@ -649,33 +631,21 @@ def test_pipeline_invalid_crew(mock_crew_factory): @pytest.mark.asyncio -async def test_pipeline_data_accumulation(): - crew1 = Crew(name="Crew 1", tasks=[task], agents=[agent]) - crew2 = Crew(name="Crew 2", tasks=[task], agents=[agent]) +async def test_pipeline_data_accumulation(mock_crew_factory): + crew1 = mock_crew_factory(name="Crew 1", output_json_dict={"key1": "value1"}) + crew2 = mock_crew_factory(name="Crew 2", output_json_dict={"key2": "value2"}) pipeline = Pipeline(stages=[crew1, crew2]) input_data = [{"initial": "data"}] results = await pipeline.kickoff(input_data) - with patch.object(Crew, "kickoff_async") as mock_kickoff: - mock_kickoff.side_effect = [ - CrewOutput( - raw="Test output from Crew 1", - tasks_output=[], - token_usage=DEFAULT_TOKEN_USAGE, - json_dict={"key1": "value1"}, - pydantic=None, - ), - CrewOutput( - raw="Test output from Crew 2", - tasks_output=[], - token_usage=DEFAULT_TOKEN_USAGE, - json_dict={"key2": "value2"}, - pydantic=None, - ), - ] + # Check that crew1 was called with only the initial input + crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"}) - results = await pipeline.kickoff(input_data) + # Check that crew2 was called with the combined input from the initial data and crew1's output + crew2.kickoff_async.assert_called_once_with( + inputs={"initial": "data", "key1": "value1"} + ) # Check the final output assert len(results) == 1