Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Pipeline.run logic #8707

Draft
wants to merge 50 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9c34772
add component checks
mathislucka Jan 11, 2025
fc2f2da
pipeline should run deterministically
mathislucka Jan 11, 2025
c28353d
add FIFOQueue
mathislucka Jan 11, 2025
64f4afc
add agent tests
mathislucka Jan 11, 2025
966552e
add order dependent tests
mathislucka Jan 11, 2025
3d0e948
run new tests
mathislucka Jan 11, 2025
7664254
remove code that is not needed
mathislucka Jan 11, 2025
932718f
test: intermediate from cycle outputs are available outside cycle
mathislucka Jan 13, 2025
acce8cd
add tests for component checks (Claude)
mathislucka Jan 13, 2025
21c78b8
adapt tests for component checks (o1 review)
mathislucka Jan 13, 2025
cf23b32
chore: format
mathislucka Jan 13, 2025
54d4f2c
remove tests that aren't needed anymore
mathislucka Jan 13, 2025
05ed852
add _calculate_priority tests
mathislucka Jan 13, 2025
93f7a9d
revert accidental change in pyproject.toml
mathislucka Jan 13, 2025
bbba1b2
test format conversion
mathislucka Jan 13, 2025
c00f8c5
adapt to naming convention
mathislucka Jan 13, 2025
235aa47
chore: proper docstrings and type hints for PQ
mathislucka Jan 13, 2025
99ea5d5
format
mathislucka Jan 13, 2025
7577d9b
add more unit tests
mathislucka Jan 13, 2025
25c64cd
rm unneeded comments
mathislucka Jan 13, 2025
5a1bbd8
test input consumption
mathislucka Jan 13, 2025
6820fc3
lint
mathislucka Jan 13, 2025
6a9cdcc
Merge branch 'main' into fix/pipeline_run
mathislucka Jan 13, 2025
d624e57
fix: docstrings
mathislucka Jan 13, 2025
e1912f8
lint
mathislucka Jan 13, 2025
acc17b1
format
mathislucka Jan 13, 2025
207eaba
format
mathislucka Jan 13, 2025
15611fc
fix license header
mathislucka Jan 13, 2025
87010dd
fix license header
mathislucka Jan 13, 2025
7027572
add component run tests
mathislucka Jan 13, 2025
02c82b8
fix: pass correct input format to tracing
mathislucka Jan 14, 2025
7e81ff9
fix types
mathislucka Jan 14, 2025
b5db015
Merge branch 'main' into fix/pipeline_run
mathislucka Jan 14, 2025
d2bee24
format
mathislucka Jan 14, 2025
ad16403
format
mathislucka Jan 14, 2025
2984fcb
types
mathislucka Jan 14, 2025
64a0125
add defaults from Socket instead of signature
mathislucka Jan 15, 2025
b2b8adc
fix test names
mathislucka Jan 15, 2025
6b25825
still wait for optional inputs on greedy variadic sockets
mathislucka Jan 15, 2025
2566cb8
fix format
mathislucka Jan 15, 2025
ef8e754
wip: warn for ambiguous running order
mathislucka Jan 16, 2025
0252450
wip: alternative warning
mathislucka Jan 16, 2025
580e4bd
fix license header
mathislucka Jan 16, 2025
fd32f77
Merge branch 'main' of https://github.com/deepset-ai/haystack into fi…
Amnah199 Jan 17, 2025
9426bbb
make code more readable
mathislucka Jan 20, 2025
ad1b0aa
Introduce content tracing to a behavioral test
Amnah199 Jan 20, 2025
554a6ef
Merge branch 'fix/pipeline_run' of https://github.com/deepset-ai/hays…
Amnah199 Jan 20, 2025
a38bccb
Fixing linting
Amnah199 Jan 20, 2025
0ac82b2
Remove debug print statements
Amnah199 Jan 20, 2025
339d340
Fix tracer tests
Amnah199 Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
536 changes: 1 addition & 535 deletions haystack/core/pipeline/base.py

Large diffs are not rendered by default.

244 changes: 244 additions & 0 deletions haystack/core/pipeline/component_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from typing import Any, Dict, List

from haystack.core.component.types import InputSocket, _empty

_NO_OUTPUT_PRODUCED = _empty


def can_component_run(component: Dict, inputs: Dict) -> bool:
"""
Checks if the component can run, given the current state of its inputs.

A component needs to pass two gates so that it is ready to run:
1. It has received all mandatory inputs.
2. It has received a trigger.
:param component: Component metadata and the component instance.
:param inputs: Inputs for the component.
"""
received_all_mandatory_inputs = are_all_sockets_ready(component, inputs, only_check_mandatory=True)
received_trigger = has_any_trigger(component, inputs)

return received_all_mandatory_inputs and received_trigger


def has_any_trigger(component: Dict, inputs: Dict) -> bool:
"""
Checks if a component was triggered to execute.

There are 3 triggers:
1. A predecessor provided input to the component.
2. Input to the component was provided from outside the pipeline (e.g. user input).
3. The component does not receive input from any other components in the pipeline and `Pipeline.run` was called.

A trigger can only cause a component to execute ONCE because:
1. Components consume inputs from predecessors before execution (they are deleted).
2. Inputs from outside the pipeline can only trigger a component when it is executed for the first time.
3. `Pipeline.run` can only trigger a component when it is executed for the first time.

:param component: Component metadata and the component instance.
:param inputs: Inputs for the component.
"""
trigger_from_predecessor = any_predecessors_provided_input(component, inputs)
trigger_from_user = has_user_input(inputs) and component["visits"] == 0
trigger_without_inputs = can_not_receive_inputs_from_pipeline(component) and component["visits"] == 0

return trigger_from_predecessor or trigger_from_user or trigger_without_inputs


def are_all_sockets_ready(component: Dict, inputs: Dict, only_check_mandatory: bool = False) -> bool:
"""
Checks if all sockets of a component have enough inputs for the component to execute.

:param component: Component metadata and the component instance.
:param inputs: Inputs for the component.
:param only_check_mandatory: If only mandatory sockets should be checked.
"""
filled_sockets = set()
expected_sockets = set()
if only_check_mandatory:
sockets_to_check = {
socket_name: socket for socket_name, socket in component["input_sockets"].items() if socket.is_mandatory
}
else:
sockets_to_check = {
socket_name: socket
for socket_name, socket in component["input_sockets"].items()
if socket.is_mandatory or len(socket.senders)
}

for socket_name, socket in sockets_to_check.items():
socket_inputs = inputs.get(socket_name, [])
expected_sockets.add(socket_name)
if (
is_socket_lazy_variadic(socket)
and any_socket_input_received(socket_inputs)
or has_socket_received_all_inputs(socket, socket_inputs)
):
mathislucka marked this conversation as resolved.
Show resolved Hide resolved
filled_sockets.add(socket_name)

return filled_sockets == expected_sockets


def any_predecessors_provided_input(component: Dict, inputs: Dict) -> bool:
"""
Checks if a component received inputs from any predecessors.

:param component: Component metadata and the component instance.
:param inputs: Inputs for the component.
"""
return any(
any_socket_value_from_predecessor_received(inputs.get(socket_name, []))
for socket_name in component["input_sockets"].keys()
)


def any_socket_value_from_predecessor_received(socket_inputs: List[Dict[str, Any]]) -> bool:
"""
Checks if a component socket received input from any predecessors.

:param socket_inputs: Inputs for the component's socket.
"""
# When sender is None, the input was provided from outside the pipeline.
return any(inp["value"] != _NO_OUTPUT_PRODUCED and inp["sender"] is not None for inp in socket_inputs)


def has_user_input(inputs: Dict) -> bool:
"""
Checks if a component has received input from outside the pipeline (e.g. user input).

:param inputs: Inputs for the component.
"""
return any(inp for socket in inputs.values() for inp in socket if inp["sender"] is None)


def can_not_receive_inputs_from_pipeline(component: Dict) -> bool:
"""
Checks if a component can not receive inputs from any other components in the pipeline.

:param: Component metadata and the component instance.
"""
return all(len(sock.senders) == 0 for sock in component["input_sockets"].values())


def all_socket_predecessors_executed(socket: InputSocket, socket_inputs: List[Dict]) -> bool:
"""
Checks if all components connecting to an InputSocket have executed.

:param: The InputSocket of a component.
:param: socket_inputs: Inputs for the socket.
"""
expected_senders = set(socket.senders)
executed_senders = {inp["sender"] for inp in socket_inputs if inp["sender"] is not None}

return expected_senders == executed_senders


def any_socket_input_received(socket_inputs: List[Dict]) -> bool:
"""
Checks if a socket has received any input from any other components in the pipeline or from outside the pipeline.

:param socket_inputs: Inputs for the socket.
"""
return any(inp["value"] != _NO_OUTPUT_PRODUCED for inp in socket_inputs)


def has_lazy_variadic_socket_received_all_inputs(socket: InputSocket, socket_inputs: List[Dict]) -> bool:
"""
Checks if a lazy variadic socket has received all expected inputs from other components in the pipeline.

:param socket: The InputSocket of a component.
:param socket_inputs: Inputs for the socket.
"""
expected_senders = set(socket.senders)
actual_senders = {
sock["sender"] for sock in socket_inputs if sock["value"] != _NO_OUTPUT_PRODUCED and sock["sender"] is not None
}

return expected_senders == actual_senders


def is_socket_lazy_variadic(socket: InputSocket) -> bool:
"""
Checks if an InputSocket is a lazy variadic socket.

:param socket: The InputSocket of a component.
"""
return socket.is_variadic and not socket.is_greedy


def has_socket_received_all_inputs(socket: InputSocket, socket_inputs: List[Dict]) -> bool:
"""
Checks if a socket has received all expected inputs.

:param socket: The InputSocket of a component.
:param socket_inputs: Inputs for the socket.
"""
# No inputs received for the socket, it is not filled.
if len(socket_inputs) == 0:
return False

# The socket is greedy variadic and at least one input was produced, it is complete.
if (
socket.is_variadic
and socket.is_greedy
and len(socket_inputs) > 0
and any(sock["value"] != _NO_OUTPUT_PRODUCED for sock in socket_inputs)
):
return True

# The socket is lazy variadic and all expected inputs were produced.
if is_socket_lazy_variadic(socket) and has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs):
return True

# The socket is not variadic and the only expected input is complete.
return not socket.is_variadic and socket_inputs[0]["value"] != _NO_OUTPUT_PRODUCED


def all_predecessors_executed(component: Dict, inputs: Dict) -> bool:
"""
Checks if all predecessors of a component have executed.

:param component: Component metadata and the component instance.
:param inputs: Inputs for the component.
"""
return all(
all_socket_predecessors_executed(socket, inputs.get(socket_name, []))
for socket_name, socket in component["input_sockets"].items()
)


def are_all_lazy_variadic_sockets_resolved(component: Dict, inputs: Dict) -> bool:
"""
Checks if the final state for all lazy variadic sockets of a component is resolved.

:param component: Component metadata and the component instance.
:param inputs: Inputs for the component.
"""
for socket_name, socket in component["input_sockets"].items():
if is_socket_lazy_variadic(socket):
socket_inputs = inputs.get(socket_name, [])
if not (
has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs)
or all_socket_predecessors_executed(socket, socket_inputs)
):
return False

return True


def is_any_greedy_socket_ready(component: Dict, inputs: Dict) -> bool:
"""
Checks if the component has any greedy socket that is ready to run.

:param component: Component metadata and the component instance.
:param inputs: Inputs for the component.
"""
for socket_name, socket in component["input_sockets"].items():
if socket.is_greedy and has_socket_received_all_inputs(socket, inputs.get(socket_name, [])):
return True

return False
Loading
Loading