-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: Pipeline.run logic #8707
base: main
Are you sure you want to change the base?
Fix: Pipeline.run logic #8707
Changes from 1 commit
9c34772
fc2f2da
c28353d
64f4afc
966552e
3d0e948
7664254
932718f
acce8cd
21c78b8
cf23b32
54d4f2c
05ed852
93f7a9d
bbba1b2
c00f8c5
235aa47
99ea5d5
7577d9b
25c64cd
5a1bbd8
6820fc3
6a9cdcc
d624e57
e1912f8
acc17b1
207eaba
15611fc
87010dd
7027572
02c82b8
7e81ff9
b5db015
d2bee24
ad16403
2984fcb
64a0125
b2b8adc
6b25825
2566cb8
ef8e754
0252450
580e4bd
fd32f77
9426bbb
ad1b0aa
554a6ef
a38bccb
0ac82b2
339d340
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,7 +1,7 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]> | ||||||||||||||||||||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||||||||||||||||||||
# SPDX-License-Identifier: Apache-2.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
import warnings | ||||||||||||||||||||||||||||||||||||||||||||||||||||
from copy import deepcopy | ||||||||||||||||||||||||||||||||||||||||||||||||||||
from enum import IntEnum | ||||||||||||||||||||||||||||||||||||||||||||||||||||
from typing import Any, Dict, List, Mapping, Optional, Set, Tuple, Union | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -40,6 +40,72 @@ class Pipeline(PipelineBase): | |||||||||||||||||||||||||||||||||||||||||||||||||||
Orchestrates component execution according to the execution graph, one after the other. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
def _warn_if_ambiguous_intent( | ||||||||||||||||||||||||||||||||||||||||||||||||||||
self, inputs: Dict[str, Any], component_names: List[str], receivers: Dict[str, Any] | ||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||
Issues warnings if the running order of the pipeline is potentially ambiguous. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
We simulate a full pass through the pipeline where all components produce outputs. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
At every step, we check if more than one component is waiting for optional inputs. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
If two components wait for optional input with the same priority, the user intention for the execution | ||||||||||||||||||||||||||||||||||||||||||||||||||||
order of these components is not clear. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
A warning does not mean that the running order must be ambiguous when real data flows through the pipeline. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
Depending on the users data and business logic, the running order might still be clear, but we can not check | ||||||||||||||||||||||||||||||||||||||||||||||||||||
for this before running the pipeline. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
:param inputs: The inputs to the pipeline. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
:param component_names: Names of all components in the pipeline. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
:param receivers: The receivers for each component in the pipeline. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||
inp_cpy = deepcopy(inputs) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
remaining_components = set(component_names) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
pq = self._fill_queue(component_names, inp_cpy) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
# Pipeline has no components. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(pq) == 0: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mathislucka, currently this test is failing as an empty list is returned instead of
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
while True: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
candidate = pq.pop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
# We don't have any components left that could run. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
if candidate is None or candidate[0] == ComponentPriority.BLOCKED: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
priority, component_name = candidate | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
# The queue is empty so the next component can't have the same priority as the current component. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(pq) == 0: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
# We get the next component and its priority to check if the current component and the next component are | ||||||||||||||||||||||||||||||||||||||||||||||||||||
# both waiting for inputs with the same priority. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
next_prio, next_name = pq.peek() | ||||||||||||||||||||||||||||||||||||||||||||||||||||
if priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST] and next_prio == priority: | ||||||||||||||||||||||||||||||||||||||||||||||||||||
msg = ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||
f"Ambiguous running order: Components '{component_name}' and '{next_name}' are waiting for " | ||||||||||||||||||||||||||||||||||||||||||||||||||||
f"optional inputs at the same time. Component '{component_name}' executes first." | ||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
warnings.warn(msg) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
# We simulate output distribution for the current component by filling all its output sockets. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
comp_with_metadata = self._get_component_with_graph_metadata(component_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
component_outputs = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||
socket_name: "simulation" for socket_name in comp_with_metadata["output_sockets"].keys() | ||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||
comp_receivers = receivers[component_name] | ||||||||||||||||||||||||||||||||||||||||||||||||||||
_, inp_cpy = self._write_component_outputs( | ||||||||||||||||||||||||||||||||||||||||||||||||||||
component_name, component_outputs, inp_cpy, comp_receivers, set() | ||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
# We need to remove the component that we just checked so that we don't get into an infinite loop. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
remaining_components.remove(component_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
# We re-prioritize the queue to capture if any components changed priority after simulating a run for | ||||||||||||||||||||||||||||||||||||||||||||||||||||
# the current component. | ||||||||||||||||||||||||||||||||||||||||||||||||||||
pq = self._fill_queue(remaining_components, inp_cpy) | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||||||||||||||||||||||||||||
def _add_missing_input_defaults(component_inputs: Dict[str, Any], component_input_sockets: Dict[str, InputSocket]): | ||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Amnah199 I started out with this method to check for ambiguous execution order up front (e.g. the case that I showed for the cycle with 2 prompt builders in the PR description).
The problem is, that we don't know for sure that the running order will be ambiguous, we just know that it's possible.
I then thought that we might just check this when actually running the pipeline and only warn if this case occurs (see comment further down).
I think that would be a better approach and I'd skip this one then. What do you think?