Skip to content

Commit

Permalink
Adding multi thread execution
Browse files Browse the repository at this point in the history
  • Loading branch information
joaomdmoura committed Feb 4, 2024
1 parent 5628bcc commit 05dda59
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 162 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
[![GitHub Repo stars](https://img.shields.io/github/stars/joaomdmoura/crewAI)](https://github.com/joaomdmoura/crewAI)
[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)

<!-- [![Discord](https://img.shields.io/discord/1192246288507474000)](https://discord.com/invite/X4JWnZnxPb) -->

</div>

## Table of contents
Expand Down Expand Up @@ -134,7 +132,7 @@ print("######################")
print(result)
```

In addition to the sequential process, you can use the hierarchical process, which automatically assigns a manager to the defined crew to properly coordinate the planning and execution of tasks through delegation and validation of results. See more about the processes [here](./docs/core-concepts/Managing-Processes.md).
In addition to the sequential process, you can use the hierarchical process, which automatically assigns a manager to the defined crew to properly coordinate the planning and execution of tasks through delegation and validation of results. [See more about the processes here](https://docs.crewai.com/core-concepts/Managing-Processes/).

## Key Features

Expand Down
153 changes: 0 additions & 153 deletions docs/stylesheets/output.css
Original file line number Diff line number Diff line change
Expand Up @@ -544,171 +544,18 @@ video {
--tw-backdrop-sepia: ;
}

.m-3 {
margin: 0.75rem;
}

.m-0 {
margin: 0px;
}

.m-auto {
margin: auto;
}

.my-3 {
margin-top: 0.75rem;
margin-bottom: 0.75rem;
}

.mb-3 {
margin-bottom: 0.75rem;
}

.mt-5 {
margin-top: 1.25rem;
}

.mb-5 {
margin-bottom: 1.25rem;
}

.mr-10 {
margin-right: 2.5rem;
}

.ml-10 {
margin-left: 2.5rem;
}

.mb-10 {
margin-bottom: 2.5rem;
}

.hidden {
display: none;
}

.w-1\/2 {
width: 50%;
}

.w-1\/5 {
width: 20%;
}

.w-2\/3 {
width: 66.666667%;
}

.transform {
transform: translate(var(--tw-translate-x), var(--tw-translate-y)) rotate(var(--tw-rotate)) skewX(var(--tw-skew-x)) skewY(var(--tw-skew-y)) scaleX(var(--tw-scale-x)) scaleY(var(--tw-scale-y));
}

.rounded {
border-radius: 0.25rem;
}

.border-2 {
border-width: 2px;
}

.border-solid {
border-style: solid;
}

.border-red-500 {
--tw-border-opacity: 1;
border-color: rgb(239 68 68 / var(--tw-border-opacity));
}

.border-red-800 {
--tw-border-opacity: 1;
border-color: rgb(153 27 27 / var(--tw-border-opacity));
}

.bg-red-500 {
--tw-bg-opacity: 1;
background-color: rgb(239 68 68 / var(--tw-bg-opacity));
}

.bg-red-300 {
--tw-bg-opacity: 1;
background-color: rgb(252 165 165 / var(--tw-bg-opacity));
}

.bg-red-400 {
--tw-bg-opacity: 1;
background-color: rgb(248 113 113 / var(--tw-bg-opacity));
}

.p-5 {
padding: 1.25rem;
}

.p-2 {
padding: 0.5rem;
}

.p-0 {
padding: 0px;
}

.px-2 {
padding-left: 0.5rem;
padding-right: 0.5rem;
}

.py-1 {
padding-top: 0.25rem;
padding-bottom: 0.25rem;
}

.font-bold {
font-weight: 700;
}

.leading-10 {
line-height: 2.5rem;
}

.leading-3 {
line-height: .75rem;
}

.text-white {
--tw-text-opacity: 1;
color: rgb(255 255 255 / var(--tw-text-opacity));
}

.text-red-900 {
--tw-text-opacity: 1;
color: rgb(127 29 29 / var(--tw-text-opacity));
}

.text-red-800 {
--tw-text-opacity: 1;
color: rgb(153 27 27 / var(--tw-text-opacity));
}

.shadow-sm {
--tw-shadow: 0 1px 2px 0 rgb(0 0 0 / 0.05);
--tw-shadow-colored: 0 1px 2px 0 var(--tw-shadow-color);
box-shadow: var(--tw-ring-offset-shadow, 0 0 #0000), var(--tw-ring-shadow, 0 0 #0000), var(--tw-shadow);
}

.shadow-md {
--tw-shadow: 0 4px 6px -1px rgb(0 0 0 / 0.1), 0 2px 4px -2px rgb(0 0 0 / 0.1);
--tw-shadow-colored: 0 4px 6px -1px var(--tw-shadow-color), 0 2px 4px -2px var(--tw-shadow-color);
box-shadow: var(--tw-ring-offset-shadow, 0 0 #0000), var(--tw-ring-shadow, 0 0 #0000), var(--tw-shadow);
}

.shadow-lg {
--tw-shadow: 0 10px 15px -3px rgb(0 0 0 / 0.1), 0 4px 6px -4px rgb(0 0 0 / 0.1);
--tw-shadow-colored: 0 10px 15px -3px var(--tw-shadow-color), 0 4px 6px -4px var(--tw-shadow-color);
box-shadow: var(--tw-ring-offset-shadow, 0 0 #0000), var(--tw-ring-shadow, 0 0 #0000), var(--tw-shadow);
}

.transition {
transition-property: color, background-color, border-color, text-decoration-color, fill, stroke, opacity, box-shadow, transform, filter, -webkit-backdrop-filter;
transition-property: color, background-color, border-color, text-decoration-color, fill, stroke, opacity, box-shadow, transform, filter, backdrop-filter;
Expand Down
4 changes: 3 additions & 1 deletion src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ def _run_sequential_process(self) -> str:
self._logger.log("debug", f"Working Agent: {role}")
self._logger.log("info", f"Starting Task: {task.description}")

task_output = task.execute(context=task_output)
output = task.execute(context=task_output)
if not task.async_execution:
task_output = output

role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"[{role}] Task output: {task_output}\n\n")
Expand Down
36 changes: 31 additions & 5 deletions src/crewai/task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import uuid
from typing import Any, List, Optional

Expand All @@ -12,8 +13,12 @@
class Task(BaseModel):
"""Class that represent a task to be executed."""

class Config:
arbitrary_types_allowed = True

__hash__ = object.__hash__ # type: ignore
i18n: I18N = I18N()
thread: threading.Thread = None
description: str = Field(description="Description of the actual task.")
callback: Optional[Any] = Field(
description="Callback to be executed after the task is completed.", default=None
Expand All @@ -29,6 +34,10 @@ class Task(BaseModel):
description="Other tasks that will have their output used as context for this task.",
default=None,
)
async_execution: Optional[bool] = Field(
description="Whether the task should be executed asynchronously or not.",
default=False,
)
output: Optional[TaskOutput] = Field(
description="Task output, it's final result after being executed", default=None
)
Expand Down Expand Up @@ -71,12 +80,29 @@ def execute(self, agent: Agent | None = None, context: Optional[str] = None) ->
)

if self.context:
context = "\n".join([task.output.result for task in self.context])

result = self.agent.execute_task(
task=self._prompt(), context=context, tools=self.tools
)
context = []
for task in self.context:
if task.async_execution:
task.thread.join()
context.append(task.output.result)
context = "\n".join(context)

if self.async_execution:
self.thread = threading.Thread(
target=self._execute, args=(agent, self._prompt(), context, self.tools)
)
self.thread.start()
else:
result = self._execute(
agent=agent,
task_prompt=self._prompt(),
context=context,
tools=self.tools,
)
return result

def _execute(self, agent, task_prompt, context, tools):
result = agent.execute_task(task=task_prompt, context=context, tools=tools)
self.output = TaskOutput(description=self.description, result=result)
self.callback(self.output) if self.callback else None
return result
Expand Down
48 changes: 48 additions & 0 deletions tests/crew_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,51 @@ def get_final_answer(numbers) -> float:
captured = capsys.readouterr()
assert "Max RPM reached, waiting for next minute to start." in captured.out
moveon.assert_called()


def test_async_task_execution():
import threading
from unittest.mock import patch

from crewai.tasks.task_output import TaskOutput

list_ideas = Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 important events.",
agent=researcher,
async_execution=True,
)
list_important_history = Task(
description="Research the history of AI and give me the 5 most important events that shaped the technology.",
expected_output="Bullet point list of 5 important events.",
agent=researcher,
async_execution=True,
)
write_article = Task(
description="Write an article about the history of AI and its most important events.",
expected_output="A 4 paragraph article about AI.",
agent=writer,
context=[list_ideas, list_important_history],
)

crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=[list_ideas, list_important_history, write_article],
)

with patch.object(Agent, "execute_task") as execute:
execute.return_value = "ok"
with patch.object(threading.Thread, "start") as start:
thread = threading.Thread(target=lambda: None, args=()).start()
start.return_value = thread
with patch.object(threading.Thread, "join", wraps=thread.join()) as join:
list_ideas.output = TaskOutput(
description="A 4 paragraph article about AI.", result="ok"
)
list_important_history.output = TaskOutput(
description="A 4 paragraph article about AI.", result="ok"
)
crew.kickoff()
start.assert_called()
join.assert_called()
20 changes: 20 additions & 0 deletions tests/task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,23 @@ def test_execute_with_agent():
with patch.object(Agent, "execute_task", return_value="ok") as execute:
task.execute(agent=researcher)
execute.assert_called_once_with(task=task._prompt(), context=None, tools=[])


def test_async_execution():
researcher = Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
allow_delegation=False,
)

task = Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 interesting ideas.",
async_execution=True,
agent=researcher,
)

with patch.object(Agent, "execute_task", return_value="ok") as execute:
task.execute(agent=researcher)
execute.assert_called_once_with(task=task._prompt(), context=None, tools=[])

0 comments on commit 05dda59

Please sign in to comment.