Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Pipeline.run logic #8707

Draft
wants to merge 50 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9c34772
add component checks
mathislucka Jan 11, 2025
fc2f2da
pipeline should run deterministically
mathislucka Jan 11, 2025
c28353d
add FIFOQueue
mathislucka Jan 11, 2025
64f4afc
add agent tests
mathislucka Jan 11, 2025
966552e
add order dependent tests
mathislucka Jan 11, 2025
3d0e948
run new tests
mathislucka Jan 11, 2025
7664254
remove code that is not needed
mathislucka Jan 11, 2025
932718f
test: intermediate from cycle outputs are available outside cycle
mathislucka Jan 13, 2025
acce8cd
add tests for component checks (Claude)
mathislucka Jan 13, 2025
21c78b8
adapt tests for component checks (o1 review)
mathislucka Jan 13, 2025
cf23b32
chore: format
mathislucka Jan 13, 2025
54d4f2c
remove tests that aren't needed anymore
mathislucka Jan 13, 2025
05ed852
add _calculate_priority tests
mathislucka Jan 13, 2025
93f7a9d
revert accidental change in pyproject.toml
mathislucka Jan 13, 2025
bbba1b2
test format conversion
mathislucka Jan 13, 2025
c00f8c5
adapt to naming convention
mathislucka Jan 13, 2025
235aa47
chore: proper docstrings and type hints for PQ
mathislucka Jan 13, 2025
99ea5d5
format
mathislucka Jan 13, 2025
7577d9b
add more unit tests
mathislucka Jan 13, 2025
25c64cd
rm unneeded comments
mathislucka Jan 13, 2025
5a1bbd8
test input consumption
mathislucka Jan 13, 2025
6820fc3
lint
mathislucka Jan 13, 2025
6a9cdcc
Merge branch 'main' into fix/pipeline_run
mathislucka Jan 13, 2025
d624e57
fix: docstrings
mathislucka Jan 13, 2025
e1912f8
lint
mathislucka Jan 13, 2025
acc17b1
format
mathislucka Jan 13, 2025
207eaba
format
mathislucka Jan 13, 2025
15611fc
fix license header
mathislucka Jan 13, 2025
87010dd
fix license header
mathislucka Jan 13, 2025
7027572
add component run tests
mathislucka Jan 13, 2025
02c82b8
fix: pass correct input format to tracing
mathislucka Jan 14, 2025
7e81ff9
fix types
mathislucka Jan 14, 2025
b5db015
Merge branch 'main' into fix/pipeline_run
mathislucka Jan 14, 2025
d2bee24
format
mathislucka Jan 14, 2025
ad16403
format
mathislucka Jan 14, 2025
2984fcb
types
mathislucka Jan 14, 2025
64a0125
add defaults from Socket instead of signature
mathislucka Jan 15, 2025
b2b8adc
fix test names
mathislucka Jan 15, 2025
6b25825
still wait for optional inputs on greedy variadic sockets
mathislucka Jan 15, 2025
2566cb8
fix format
mathislucka Jan 15, 2025
ef8e754
wip: warn for ambiguous running order
mathislucka Jan 16, 2025
0252450
wip: alternative warning
mathislucka Jan 16, 2025
580e4bd
fix license header
mathislucka Jan 16, 2025
fd32f77
Merge branch 'main' of https://github.com/deepset-ai/haystack into fi…
Amnah199 Jan 17, 2025
9426bbb
make code more readable
mathislucka Jan 20, 2025
ad1b0aa
Introduce content tracing to a behavioral test
Amnah199 Jan 20, 2025
554a6ef
Merge branch 'fix/pipeline_run' of https://github.com/deepset-ai/hays…
Amnah199 Jan 20, 2025
a38bccb
Fixing linting
Amnah199 Jan 20, 2025
0ac82b2
Remove debug print statements
Amnah199 Jan 20, 2025
339d340
Fix tracer tests
Amnah199 Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions test/core/pipeline/features/pipeline_run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Feature: Pipeline running
| that has a feedback loop |
| created in a non-standard order that has a loop |
| that has an agent with a feedback cycle |
| that passes outputs that are consumed in cycle to outside the cycle |

Scenario Outline: Running a bad Pipeline
Given a pipeline <kind>
Expand Down
145 changes: 145 additions & 0 deletions test/core/pipeline/features/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2821,3 +2821,148 @@ def run(self, replies: List[str]):
)
],
)

@given("a pipeline that passes outputs that are consumed in cycle to outside the cycle", target_fixture="pipeline_data")
def passes_outputs_outside_cycle():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tstadel could you check if this test covers the use case that you had with passing the prompt outside the cycle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test fails on main and it yields an empty prompt instead of the expected prompt value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output from the cycle isn't distributed in the old logic because we are filtering for components that are in the cycle here:

receivers = [item for item in self._find_receivers_from(name) if item[0] in cycle]

And then we remove any output that was already received by a component in the cycle here:

to_remove_from_component_result.add(sender_socket.name)

That means that outputs that are received both by components inside and outside the cycle will only go to components inside the cycle.

@component
class FixedGenerator:
def __init__(self, replies):
self.replies = replies
self.idx = 0

@component.output_types(replies=List[str])
def run(self, prompt: str):
if self.idx < len(self.replies):
replies = [self.replies[self.idx]]
self.idx += 1
else:
self.idx = 0
replies = [self.replies[self.idx]]
self.idx += 1

return {"replies": replies}

@component
class AnswerBuilderWithPrompt:
@component.output_types(answers=List[GeneratedAnswer])
def run(self, replies: List[str], query: str, prompt: Optional[str] = None) -> Dict[str, Any]:
answer = GeneratedAnswer(data=replies[0], query=query, documents=[])

if prompt is not None:
answer.meta["prompt"] = prompt


return {"answers": [answer]}

code_prompt_template = "{{task}}"

feedback_prompt_template = """
Check if this code is valid and can run: {{ code[0] }}
Return "PASS" if it passes and "FAIL" if it fails.
Provide additional feedback on why it fails.
"""

valid_response = """
def generate_santa_sleigh():
'''
Returns ASCII art of Santa Claus on his sleigh with Rudolph leading the way.
'''
# implementation goes here.
return art
"""

code_llm = FixedGenerator(replies=["invalid code", "invalid code", valid_response])
code_prompt = PromptBuilder(template=code_prompt_template)

feedback_llm = FixedGenerator(replies=["FAIL", "FAIL, come on, try again.", "PASS"])
feedback_prompt = PromptBuilder(template=feedback_prompt_template)

routes = [
{
"condition": "{{ 'FAIL' in replies[0] }}",
"output": "{{ replies[0] }}",
"output_name": "fail",
"output_type": str,
},
{
"condition": "{{ 'PASS' in replies[0] }}",
"output": "{{ code }}",
"output_name": "pass",
"output_type": List[str],
},
]

router = ConditionalRouter(routes=routes)
joiner = BranchJoiner(type_=str)
concatenator = OutputAdapter(template="{{code_prompt + '\n' + generated_code[0] + '\n' + feedback}}", output_type=str)

answer_builder = AnswerBuilderWithPrompt()

pp = Pipeline(max_runs_per_component=100)

pp.add_component("concatenator", concatenator)
pp.add_component("code_llm", code_llm)
pp.add_component("code_prompt", code_prompt)
pp.add_component("feedback_prompt", feedback_prompt)
pp.add_component("feedback_llm", feedback_llm)
pp.add_component("router", router)
pp.add_component("joiner", joiner)

pp.add_component("answer_builder", answer_builder)

pp.connect("concatenator.output", "joiner.value")
pp.connect("joiner.value", "code_prompt.task")
pp.connect("code_prompt.prompt", "code_llm.prompt")
pp.connect("code_prompt.prompt", "concatenator.code_prompt")
pp.connect("code_llm.replies", "feedback_prompt.code")
pp.connect("feedback_llm.replies", "router.replies")
pp.connect("router.fail", "concatenator.feedback")
pp.connect("feedback_prompt.prompt", "feedback_llm.prompt")
pp.connect("router.pass", "answer_builder.replies")
pp.connect("code_llm.replies", "router.code")
pp.connect("code_llm.replies", "concatenator.generated_code")
pp.connect("concatenator.output", "answer_builder.prompt")

task = "Generate code to generate christmas ascii-art"

expected_prompt = """Generate code to generate christmas ascii-art
invalid code
FAIL
invalid code
FAIL, come on, try again."""
return (
pp,
[
PipelineRunData(
inputs={"joiner": {"value": task}, "answer_builder": {"query": task}},
expected_outputs={
"answer_builder": {"answers": [GeneratedAnswer(data=valid_response, query=task, documents=[], meta={"prompt": expected_prompt})]}
},
expected_run_order=[
"joiner",
"code_prompt",
"code_llm",
"feedback_prompt",
"feedback_llm",
"router",
"concatenator",
"joiner",
"code_prompt",
"code_llm",
"feedback_prompt",
"feedback_llm",
"router",
"concatenator",
"joiner",
"code_prompt",
"code_llm",
"feedback_prompt",
"feedback_llm",
"router",
"answer_builder",
],
)
],
)


Loading