Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Pipeline.run logic #8707

Draft
wants to merge 50 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9c34772
add component checks
mathislucka Jan 11, 2025
fc2f2da
pipeline should run deterministically
mathislucka Jan 11, 2025
c28353d
add FIFOQueue
mathislucka Jan 11, 2025
64f4afc
add agent tests
mathislucka Jan 11, 2025
966552e
add order dependent tests
mathislucka Jan 11, 2025
3d0e948
run new tests
mathislucka Jan 11, 2025
7664254
remove code that is not needed
mathislucka Jan 11, 2025
932718f
test: intermediate from cycle outputs are available outside cycle
mathislucka Jan 13, 2025
acce8cd
add tests for component checks (Claude)
mathislucka Jan 13, 2025
21c78b8
adapt tests for component checks (o1 review)
mathislucka Jan 13, 2025
cf23b32
chore: format
mathislucka Jan 13, 2025
54d4f2c
remove tests that aren't needed anymore
mathislucka Jan 13, 2025
05ed852
add _calculate_priority tests
mathislucka Jan 13, 2025
93f7a9d
revert accidental change in pyproject.toml
mathislucka Jan 13, 2025
bbba1b2
test format conversion
mathislucka Jan 13, 2025
c00f8c5
adapt to naming convention
mathislucka Jan 13, 2025
235aa47
chore: proper docstrings and type hints for PQ
mathislucka Jan 13, 2025
99ea5d5
format
mathislucka Jan 13, 2025
7577d9b
add more unit tests
mathislucka Jan 13, 2025
25c64cd
rm unneeded comments
mathislucka Jan 13, 2025
5a1bbd8
test input consumption
mathislucka Jan 13, 2025
6820fc3
lint
mathislucka Jan 13, 2025
6a9cdcc
Merge branch 'main' into fix/pipeline_run
mathislucka Jan 13, 2025
d624e57
fix: docstrings
mathislucka Jan 13, 2025
e1912f8
lint
mathislucka Jan 13, 2025
acc17b1
format
mathislucka Jan 13, 2025
207eaba
format
mathislucka Jan 13, 2025
15611fc
fix license header
mathislucka Jan 13, 2025
87010dd
fix license header
mathislucka Jan 13, 2025
7027572
add component run tests
mathislucka Jan 13, 2025
02c82b8
fix: pass correct input format to tracing
mathislucka Jan 14, 2025
7e81ff9
fix types
mathislucka Jan 14, 2025
b5db015
Merge branch 'main' into fix/pipeline_run
mathislucka Jan 14, 2025
d2bee24
format
mathislucka Jan 14, 2025
ad16403
format
mathislucka Jan 14, 2025
2984fcb
types
mathislucka Jan 14, 2025
64a0125
add defaults from Socket instead of signature
mathislucka Jan 15, 2025
b2b8adc
fix test names
mathislucka Jan 15, 2025
6b25825
still wait for optional inputs on greedy variadic sockets
mathislucka Jan 15, 2025
2566cb8
fix format
mathislucka Jan 15, 2025
ef8e754
wip: warn for ambiguous running order
mathislucka Jan 16, 2025
0252450
wip: alternative warning
mathislucka Jan 16, 2025
580e4bd
fix license header
mathislucka Jan 16, 2025
fd32f77
Merge branch 'main' of https://github.com/deepset-ai/haystack into fi…
Amnah199 Jan 17, 2025
9426bbb
make code more readable
mathislucka Jan 20, 2025
ad1b0aa
Introduce content tracing to a behavioral test
Amnah199 Jan 20, 2025
554a6ef
Merge branch 'fix/pipeline_run' of https://github.com/deepset-ai/hays…
Amnah199 Jan 20, 2025
a38bccb
Fixing linting
Amnah199 Jan 20, 2025
0ac82b2
Remove debug print statements
Amnah199 Jan 20, 2025
339d340
Fix tracer tests
Amnah199 Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import warnings
from copy import deepcopy
from enum import IntEnum
from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union
Expand Down Expand Up @@ -40,6 +40,72 @@ class Pipeline(PipelineBase):
Orchestrates component execution according to the execution graph, one after the other.
"""

def _warn_if_ambiguous_intent(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Amnah199 I started out with this method to check for ambiguous execution order up front (e.g. the case that I showed for the cycle with 2 prompt builders in the PR description).

The problem is, that we don't know for sure that the running order will be ambiguous, we just know that it's possible.

I then thought that we might just check this when actually running the pipeline and only warn if this case occurs (see comment further down).

I think that would be a better approach and I'd skip this one then. What do you think?

self, inputs: Dict[str, Any], component_names: List[str], receivers: Dict[str, Any]
) -> None:
"""
Issues warnings if the running order of the pipeline is potentially ambiguous.

We simulate a full pass through the pipeline where all components produce outputs.
At every step, we check if more than one component is waiting for optional inputs.
If two components wait for optional input with the same priority, the user intention for the execution
order of these components is not clear.
A warning does not mean that the running order must be ambiguous when real data flows through the pipeline.
Depending on the users data and business logic, the running order might still be clear, but we can not check
for this before running the pipeline.

:param inputs: The inputs to the pipeline.
:param component_names: Names of all components in the pipeline.
:param receivers: The receivers for each component in the pipeline.
"""
inp_cpy = deepcopy(inputs)
remaining_components = set(component_names)
pq = self._fill_queue(component_names, inp_cpy)

# Pipeline has no components.
if len(pq) == 0:
return

Copy link
Contributor

@Amnah199 Amnah199 Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mathislucka, currently this test is failing as an empty list is returned instead of PipelineRuntimeError. The pipeline has no valid entry point and is not executed at all so we should raise an error.
To address this, we could add a small validation check before entering the execution loop. Below is a suggestion which adds new functions—what do you think?

Suggested change
# check if pipeline is blocked before running
self.validate_pipeline(priority_queue)
# NEW FUNCTIONS TO ADD
@staticmethod
def _is_queue_blocked(pq: FIFOPriorityQueue) -> bool:
queue_copy = deepcopy(pq)
while queue_copy:
component = queue_copy.peek()
if component[0] != ComponentPriority.BLOCKED:
return False
queue_copy.pop()
return True
def validate_pipeline(self, priority_queue: FIFOPriorityQueue):
"""
Validate the pipeline to check if it is blocked or has no entry point
:param priority_queue: Priority queue of component names.
"""
if self._is_queue_blocked(priority_queue):
raise PipelineRuntimeError("Cannot run pipeline - all components are blocked. This typically happens when:\n"
"1. Components are missing required inputs\n"
"2. There is a circular dependency without a valid entry point\n"
"Check the connections between these components and ensure all required inputs are provided.")

while True:
candidate = pq.pop()

# We don't have any components left that could run.
if candidate is None or candidate[0] == ComponentPriority.BLOCKED:
return

priority, component_name = candidate

# The queue is empty so the next component can't have the same priority as the current component.
if len(pq) == 0:
return

# We get the next component and its priority to check if the current component and the next component are
# both waiting for inputs with the same priority.
next_prio, next_name = pq.peek()
if priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST] and next_prio == priority:
msg = (
f"Ambiguous running order: Components '{component_name}' and '{next_name}' are waiting for "
f"optional inputs at the same time. Component '{component_name}' executes first."
)
warnings.warn(msg)

# We simulate output distribution for the current component by filling all its output sockets.
comp_with_metadata = self._get_component_with_graph_metadata(component_name)
component_outputs = {
socket_name: "simulation" for socket_name in comp_with_metadata["output_sockets"].keys()
}
comp_receivers = receivers[component_name]
_, inp_cpy = self._write_component_outputs(
component_name, component_outputs, inp_cpy, comp_receivers, set()
)

# We need to remove the component that we just checked so that we don't get into an infinite loop.
remaining_components.remove(component_name)

# We re-prioritize the queue to capture if any components changed priority after simulating a run for
# the current component.
pq = self._fill_queue(remaining_components, inp_cpy)

@staticmethod
def _add_missing_input_defaults(component_inputs: Dict[str, Any], component_input_sockets: Dict[str, InputSocket]):
"""
Expand Down
Loading