From 05dda59cf60124e9bd0eb5d3137a85d31d8153fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Moura?= Date: Sat, 3 Feb 2024 23:23:38 -0800 Subject: [PATCH] Adding multi thread execution --- README.md | 4 +- docs/stylesheets/output.css | 153 ------------------------------------ src/crewai/crew.py | 4 +- src/crewai/task.py | 36 +++++++-- tests/crew_test.py | 48 +++++++++++ tests/task_test.py | 20 +++++ 6 files changed, 103 insertions(+), 162 deletions(-) diff --git a/README.md b/README.md index 17d1fcd3ed..b47dc35e29 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,6 @@ [![GitHub Repo stars](https://img.shields.io/github/stars/joaomdmoura/crewAI)](https://github.com/joaomdmoura/crewAI) [![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) - - ## Table of contents @@ -134,7 +132,7 @@ print("######################") print(result) ``` -In addition to the sequential process, you can use the hierarchical process, which automatically assigns a manager to the defined crew to properly coordinate the planning and execution of tasks through delegation and validation of results. See more about the processes [here](./docs/core-concepts/Managing-Processes.md). +In addition to the sequential process, you can use the hierarchical process, which automatically assigns a manager to the defined crew to properly coordinate the planning and execution of tasks through delegation and validation of results. [See more about the processes here](https://docs.crewai.com/core-concepts/Managing-Processes/). ## Key Features diff --git a/docs/stylesheets/output.css b/docs/stylesheets/output.css index 60e26386aa..a5d1b09c4c 100644 --- a/docs/stylesheets/output.css +++ b/docs/stylesheets/output.css @@ -544,171 +544,18 @@ video { --tw-backdrop-sepia: ; } -.m-3 { - margin: 0.75rem; -} - -.m-0 { - margin: 0px; -} - -.m-auto { - margin: auto; -} - -.my-3 { - margin-top: 0.75rem; - margin-bottom: 0.75rem; -} - -.mb-3 { - margin-bottom: 0.75rem; -} - -.mt-5 { - margin-top: 1.25rem; -} - -.mb-5 { - margin-bottom: 1.25rem; -} - -.mr-10 { - margin-right: 2.5rem; -} - -.ml-10 { - margin-left: 2.5rem; -} - .mb-10 { margin-bottom: 2.5rem; } -.hidden { - display: none; -} - -.w-1\/2 { - width: 50%; -} - -.w-1\/5 { - width: 20%; -} - -.w-2\/3 { - width: 66.666667%; -} - .transform { transform: translate(var(--tw-translate-x), var(--tw-translate-y)) rotate(var(--tw-rotate)) skewX(var(--tw-skew-x)) skewY(var(--tw-skew-y)) scaleX(var(--tw-scale-x)) scaleY(var(--tw-scale-y)); } -.rounded { - border-radius: 0.25rem; -} - -.border-2 { - border-width: 2px; -} - -.border-solid { - border-style: solid; -} - -.border-red-500 { - --tw-border-opacity: 1; - border-color: rgb(239 68 68 / var(--tw-border-opacity)); -} - -.border-red-800 { - --tw-border-opacity: 1; - border-color: rgb(153 27 27 / var(--tw-border-opacity)); -} - -.bg-red-500 { - --tw-bg-opacity: 1; - background-color: rgb(239 68 68 / var(--tw-bg-opacity)); -} - -.bg-red-300 { - --tw-bg-opacity: 1; - background-color: rgb(252 165 165 / var(--tw-bg-opacity)); -} - -.bg-red-400 { - --tw-bg-opacity: 1; - background-color: rgb(248 113 113 / var(--tw-bg-opacity)); -} - -.p-5 { - padding: 1.25rem; -} - -.p-2 { - padding: 0.5rem; -} - -.p-0 { - padding: 0px; -} - -.px-2 { - padding-left: 0.5rem; - padding-right: 0.5rem; -} - -.py-1 { - padding-top: 0.25rem; - padding-bottom: 0.25rem; -} - -.font-bold { - font-weight: 700; -} - -.leading-10 { - line-height: 2.5rem; -} - .leading-3 { line-height: .75rem; } -.text-white { - --tw-text-opacity: 1; - color: rgb(255 255 255 / var(--tw-text-opacity)); -} - -.text-red-900 { - --tw-text-opacity: 1; - color: rgb(127 29 29 / var(--tw-text-opacity)); -} - -.text-red-800 { - --tw-text-opacity: 1; - color: rgb(153 27 27 / var(--tw-text-opacity)); -} - -.shadow-sm { - --tw-shadow: 0 1px 2px 0 rgb(0 0 0 / 0.05); - --tw-shadow-colored: 0 1px 2px 0 var(--tw-shadow-color); - box-shadow: var(--tw-ring-offset-shadow, 0 0 #0000), var(--tw-ring-shadow, 0 0 #0000), var(--tw-shadow); -} - -.shadow-md { - --tw-shadow: 0 4px 6px -1px rgb(0 0 0 / 0.1), 0 2px 4px -2px rgb(0 0 0 / 0.1); - --tw-shadow-colored: 0 4px 6px -1px var(--tw-shadow-color), 0 2px 4px -2px var(--tw-shadow-color); - box-shadow: var(--tw-ring-offset-shadow, 0 0 #0000), var(--tw-ring-shadow, 0 0 #0000), var(--tw-shadow); -} - -.shadow-lg { - --tw-shadow: 0 10px 15px -3px rgb(0 0 0 / 0.1), 0 4px 6px -4px rgb(0 0 0 / 0.1); - --tw-shadow-colored: 0 10px 15px -3px var(--tw-shadow-color), 0 4px 6px -4px var(--tw-shadow-color); - box-shadow: var(--tw-ring-offset-shadow, 0 0 #0000), var(--tw-ring-shadow, 0 0 #0000), var(--tw-shadow); -} - .transition { transition-property: color, background-color, border-color, text-decoration-color, fill, stroke, opacity, box-shadow, transform, filter, -webkit-backdrop-filter; transition-property: color, background-color, border-color, text-decoration-color, fill, stroke, opacity, box-shadow, transform, filter, backdrop-filter; diff --git a/src/crewai/crew.py b/src/crewai/crew.py index b41e8a81e1..2962e227ff 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -161,7 +161,9 @@ def _run_sequential_process(self) -> str: self._logger.log("debug", f"Working Agent: {role}") self._logger.log("info", f"Starting Task: {task.description}") - task_output = task.execute(context=task_output) + output = task.execute(context=task_output) + if not task.async_execution: + task_output = output role = task.agent.role if task.agent is not None else "None" self._logger.log("debug", f"[{role}] Task output: {task_output}\n\n") diff --git a/src/crewai/task.py b/src/crewai/task.py index d2df856a75..20119ddff1 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -1,3 +1,4 @@ +import threading import uuid from typing import Any, List, Optional @@ -12,8 +13,12 @@ class Task(BaseModel): """Class that represent a task to be executed.""" + class Config: + arbitrary_types_allowed = True + __hash__ = object.__hash__ # type: ignore i18n: I18N = I18N() + thread: threading.Thread = None description: str = Field(description="Description of the actual task.") callback: Optional[Any] = Field( description="Callback to be executed after the task is completed.", default=None @@ -29,6 +34,10 @@ class Task(BaseModel): description="Other tasks that will have their output used as context for this task.", default=None, ) + async_execution: Optional[bool] = Field( + description="Whether the task should be executed asynchronously or not.", + default=False, + ) output: Optional[TaskOutput] = Field( description="Task output, it's final result after being executed", default=None ) @@ -71,12 +80,29 @@ def execute(self, agent: Agent | None = None, context: Optional[str] = None) -> ) if self.context: - context = "\n".join([task.output.result for task in self.context]) - - result = self.agent.execute_task( - task=self._prompt(), context=context, tools=self.tools - ) + context = [] + for task in self.context: + if task.async_execution: + task.thread.join() + context.append(task.output.result) + context = "\n".join(context) + + if self.async_execution: + self.thread = threading.Thread( + target=self._execute, args=(agent, self._prompt(), context, self.tools) + ) + self.thread.start() + else: + result = self._execute( + agent=agent, + task_prompt=self._prompt(), + context=context, + tools=self.tools, + ) + return result + def _execute(self, agent, task_prompt, context, tools): + result = agent.execute_task(task=task_prompt, context=context, tools=tools) self.output = TaskOutput(description=self.description, result=result) self.callback(self.output) if self.callback else None return result diff --git a/tests/crew_test.py b/tests/crew_test.py index f1c3cda72b..5992762209 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -337,3 +337,51 @@ def get_final_answer(numbers) -> float: captured = capsys.readouterr() assert "Max RPM reached, waiting for next minute to start." in captured.out moveon.assert_called() + + +def test_async_task_execution(): + import threading + from unittest.mock import patch + + from crewai.tasks.task_output import TaskOutput + + list_ideas = Task( + description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.", + expected_output="Bullet point list of 5 important events.", + agent=researcher, + async_execution=True, + ) + list_important_history = Task( + description="Research the history of AI and give me the 5 most important events that shaped the technology.", + expected_output="Bullet point list of 5 important events.", + agent=researcher, + async_execution=True, + ) + write_article = Task( + description="Write an article about the history of AI and its most important events.", + expected_output="A 4 paragraph article about AI.", + agent=writer, + context=[list_ideas, list_important_history], + ) + + crew = Crew( + agents=[researcher, writer], + process=Process.sequential, + tasks=[list_ideas, list_important_history, write_article], + ) + + with patch.object(Agent, "execute_task") as execute: + execute.return_value = "ok" + with patch.object(threading.Thread, "start") as start: + thread = threading.Thread(target=lambda: None, args=()).start() + start.return_value = thread + with patch.object(threading.Thread, "join", wraps=thread.join()) as join: + list_ideas.output = TaskOutput( + description="A 4 paragraph article about AI.", result="ok" + ) + list_important_history.output = TaskOutput( + description="A 4 paragraph article about AI.", result="ok" + ) + crew.kickoff() + start.assert_called() + join.assert_called() diff --git a/tests/task_test.py b/tests/task_test.py index efabb72417..1e0ef1b15c 100644 --- a/tests/task_test.py +++ b/tests/task_test.py @@ -116,3 +116,23 @@ def test_execute_with_agent(): with patch.object(Agent, "execute_task", return_value="ok") as execute: task.execute(agent=researcher) execute.assert_called_once_with(task=task._prompt(), context=None, tools=[]) + + +def test_async_execution(): + researcher = Agent( + role="Researcher", + goal="Make the best research and analysis on content about AI and AI agents", + backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.", + allow_delegation=False, + ) + + task = Task( + description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.", + expected_output="Bullet point list of 5 interesting ideas.", + async_execution=True, + agent=researcher, + ) + + with patch.object(Agent, "execute_task", return_value="ok") as execute: + task.execute(agent=researcher) + execute.assert_called_once_with(task=task._prompt(), context=None, tools=[])