-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: Pipeline.run logic #8707
base: main
Are you sure you want to change the base?
Fix: Pipeline.run logic #8707
Conversation
@@ -2821,3 +2821,148 @@ def run(self, replies: List[str]): | |||
) | |||
], | |||
) | |||
|
|||
@given("a pipeline that passes outputs that are consumed in cycle to outside the cycle", target_fixture="pipeline_data") | |||
def passes_outputs_outside_cycle(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tstadel could you check if this test covers the use case that you had with passing the prompt outside the cycle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test fails on main and it yields an empty prompt instead of the expected prompt value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output from the cycle isn't distributed in the old logic because we are filtering for components that are in the cycle here:
haystack/haystack/core/pipeline/pipeline.py
Line 213 in 167ede1
receivers = [item for item in self._find_receivers_from(name) if item[0] in cycle] |
And then we remove any output that was already received by a component in the cycle here:
haystack/haystack/core/pipeline/base.py
Line 889 in 167ede1
to_remove_from_component_result.add(sender_socket.name) |
That means that outputs that are received both by components inside and outside the cycle will only go to components inside the cycle.
Pull Request Test Coverage Report for Build 12910444146Warning: This coverage report may be inaccurate.This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.
Details
💛 - Coveralls |
- otherwise components with dynamic inputs would fail
- mirrors previous behavior
@@ -40,6 +40,72 @@ class Pipeline(PipelineBase): | |||
Orchestrates component execution according to the execution graph, one after the other. | |||
""" | |||
|
|||
def _warn_if_ambiguous_intent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Amnah199 I started out with this method to check for ambiguous execution order up front (e.g. the case that I showed for the cycle with 2 prompt builders in the PR description).
The problem is, that we don't know for sure that the running order will be ambiguous, we just know that it's possible.
I then thought that we might just check this when actually running the pipeline and only warn if this case occurs (see comment further down).
I think that would be a better approach and I'd skip this one then. What do you think?
next_priority, next_name = priority_queue.peek() | ||
|
||
# alternative to _warn_if_ambiguous_intent | ||
if priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST] and next_priority == priority: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternative check that would only warn if ambiguity actually occurs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my current understanding, I prefer this approach; however, let’s keep both options for now before making a final decision.
I do have one question: why are we referring to this as an "ambiguity"? In this case, the component that comes first in lexicographical order should execute. Since we also plan to document this for users to clarify the execution order, we can simply notify them that, based on the sorting criteria, if we have lets say a retriever
and a summarizer
waiting in this particular scenario, then retriever
will run first. "Ambiguous running order"
suggests that we expect the order to unpredictably change in certain cases, which I believe shouldn’t happen. Please let me know if I’ve misunderstood anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I wasn't sure about the wording here.
The execution order is clear, the component that comes first lexicographically will also execute first. I just thought that this is a pretty implicit mechanism and the user might not be aware of it (many don't read the docs). So this warning is there to pull the users attention to the fact that the same priority is resolved by using lexicographical sorting to check if it was also the user's "intent" to run the components in that order.
Co-authored-by: Amna Mubashar <[email protected]>
…tack into fix/pipeline_run Introducing content tracing for behavioral tests.
], | ||
"top_k": None, | ||
}, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mathislucka I am adding these sort of checks with content tracing for component calls. Can you verify if this aligns with what you described?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's exactly what I had in mind. 🙌
They would replace the run_order tests but we can add them first and then remove run orders in one go.
elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs): | ||
return ComponentPriority.HIGHEST |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mathislucka, could you help clarify our goal behind assigning “highest priority” to components?
If HIGHEST
means running as soon as all mandatory sockets are ready and at least one greedy socket has inputs, then we should consider setting only_check_mandatory=True
. I thought about this because as per my understanding a “greedy” socket implies the component is willing to run multiple times or run as soon as partial input is available. Is the goal that we don’t block execution due to optional sockets that haven’t yet received inputs?
elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs): | |
return ComponentPriority.HIGHEST | |
elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs, only_check_mandatory=True): | |
return ComponentPriority.HIGHEST |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I initially did not wait for optional inputs but I checked the existing logic and we wait for optional inputs there, so I did not want to change that.
I think that in practice we don't have any components that receive both a greedy variadic and an optional input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davidsbatista is the above case something you can verify in one of your pipelines?
# Pipeline has no components. | ||
if len(pq) == 0: | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mathislucka, currently this test is failing as an empty list is returned instead of PipelineRuntimeError
. The pipeline has no valid entry point and is not executed at all so we should raise an error.
To address this, we could add a small validation check before entering the execution loop. Below is a suggestion which adds new functions—what do you think?
# check if pipeline is blocked before running | |
self.validate_pipeline(priority_queue) | |
# NEW FUNCTIONS TO ADD | |
@staticmethod | |
def _is_queue_blocked(pq: FIFOPriorityQueue) -> bool: | |
queue_copy = deepcopy(pq) | |
while queue_copy: | |
component = queue_copy.peek() | |
if component[0] != ComponentPriority.BLOCKED: | |
return False | |
queue_copy.pop() | |
return True | |
def validate_pipeline(self, priority_queue: FIFOPriorityQueue): | |
""" | |
Validate the pipeline to check if it is blocked or has no entry point | |
:param priority_queue: Priority queue of component names. | |
""" | |
if self._is_queue_blocked(priority_queue): | |
raise PipelineRuntimeError("Cannot run pipeline - all components are blocked. This typically happens when:\n" | |
"1. Components are missing required inputs\n" | |
"2. There is a circular dependency without a valid entry point\n" | |
"Check the connections between these components and ensure all required inputs are provided.") |
Related Issues
Proposed Changes:
The previous
Pipeline.run
logic had several flaws. Outputs of the pipeline depended on the insertion order of components or connections, cycles weren't processed in the right order, components ran more times than they should, and the output of a pipeline was not deterministic.This PR introduces a new
Pipeline.run
logic that relies almost exclusively on the available data to decide when a component should run.It is loosely based on the idea of Kahn Process Networks (a distributed model of computing).
Core Logic
Internally, the
Pipeline.run
method uses a single priority queue to schedule components to run. Components are popped from the priority queue one at a time until there are no more components that can run. Once there are no more components that can run, the pipeline returns an output. The queue is re-computed when needed because the produced outputs of the components in the pipeline might change priorities. Since pipelines can have cycles, the component can enter the priority queue multiple times.When can a component run?
A component can run once it passes two conditions:
component.run
we do not get an exception because of missing positional or keyword arguments).Triggers
There are three types of triggers that cause a component to run:
Pipeline.run
is calledA trigger is "consumed" by the component, meaning that it can only cause it to run once. For example, for each
Pipeline.run
invocation, the component can only run once because of user input. It could still run again, but it needs to receive a second trigger to do that.A component does not always run immediately when it receives a trigger, it only runs when it has highest priority in the priority queue.
Priorities
At a high level, we differentiate between components that can't run, because they don't fulfil the conditions to run. Components that can run immediately in any order, and components that we want to run later because they might still receive optional or lazy variadic inputs.
Inputs & Outputs
A component can receive inputs and it can produce outputs.
When a component runs, it "consumes" (deletes) its inputs, meaning that these same inputs will not be available in case the component runs another time. Inputs from outside the pipeline are an exception to this rule. They are only consumed when they are passed to a
GreedyVariadic
socket of the component (e.g.BranchJoiner
). Other inputs from outside the pipeline will always be available to a component, no matter how often it runs.After a component ran, its outputs are distributed to the connected input sockets of other components in the pipeline. Outputs that are not connected to any other input sockets in the pipeline, are returned to the user.
Impact on existing pipelines
Non-Cyclic Pipelines
For non-cyclic pipelines, the execution order of components might change. This does not have any impact on the outputs of a pipeline, except in one condition:
If the pipeline has two branches that are joined by a lazy variadic socket (e.g.
DocumentJoiner
), the order of the joined inputs might change. In the existing pipeline logic, the order is determined by the order of adding components to the pipeline and the order of connecting components. This behavior is not documented anywhere and the user can't know in which order these components will be executed without studying the underlying implementation in Haystack and NetworkX. This PR introduces a lexicographical sort for these cases, other possibilities could be discussed. We can potentially provide the users with utility functions to test if the output might change, when we release the changes from this PR.Cyclic Pipelines
Cyclic pipelines are affected by several bugs in the current pipeline logic.
Cyclic pipelines might be affected if they meet any of these conditions:
For these conditions, neither the run order nor the outputs of the pipeline might be deterministic (i.e. the output of the pipeline might change although the code didn't change). Again, it should be possible to provide tooling that helps users understand if their pipelines are affected.
Open Issues
Cycles without a defined entrypoint
For cycles with more than one component that only have default inputs and that receive inputs from outside the pipeline, there is no defined entrypoint (i.e. we can not know which component in the cycle should run first).
Consider this pipeline:
The use case is that one LLM generates code that is checked by another LLM for correctness and then either returned to the first LLM if it has feedback or returned to the user, if the second LLM decides that the code is good enough. The instructions for the "Feedback LLM" and the task for the "Code LLM" are both provided by the user from outside the cycle. Since we have 2
PromptBuilder
components in the cycle (configured so that all inputs are optional) and they both receive an input that triggers them to run at the same time, and they are both waiting for exactly one more input (but can run without it), there is no defined order for which of these components should be executed first.At least from my understanding, this problem can't be solved purely based on the topology of the pipeline graph or on the available data.
In this implementation, the
code_prompt
would run first becausec
comes beforef
in a lexicographical sort. We can document this behavior but it might still surprise our users. Additional measures could be:code_llm
tofeedback_prompt
required to solve the problem)Should cycles run like loops in a programming language?
@tstadel pointed out that components outside of a cycle might receive inputs from the cycle while the cycle is still running. In very few edge cases, this could cause the components outside the cycle to run repeatedly (and in turn triggering other components in the pipeline).
When we think of our pipeline in terms of a distributed model of computing, then this behavior would be expected.
However, if we assume that a cycle in a pipeline works the same way as a loop in a programming language, then the loop should run to completion before we execute any other components.
Consider this pseudo-implementation:
Another option would be to treat the input socket of a component like a FIFO queue, meaning that the outputs would not be overwritten in case components in the cycle provide the output multiple times.
My recommendation would be to follow the distributed model of computing approach and allow a component to run as soon as it receives inputs and is ready to run. The implementation is a lot less complex, especially so, when we introduce concurrency to our pipeline execution. If the user does not want the component outside the cycle to run before the cycle has fully executed, it is easy to achieve that by a different dataflow in the pipeline or by marking edges as required.
How did you test it?
Notes for the reviewer
This is a work in progress.
The change needs to be tested extensively.
My recommendation would be to update the behavioral tests so that we test for the inputs that a component received to run and the pipeline outputs instead of testing for the component execution order.
Using content tracing, we can adapt the existing testing approach by exchanging
expected_run_order
withexpected_component_inputs
. Expected component inputs could be tested like this (pseudo-code):This is better than the current approach because we don't really want to test the execution order of the components, we only care about how often a component runs, if it has the same inputs, and if the pipeline has the same outputs.
Currently, the behavioral tests do not test these two behaviors (as demonstrated here):
For real-world use cases, changes in these two behaviors will have an impact on the output of the pipeline.
Adapting the tests allows us to re-use the test suite when we implement pipelines that run components concurrently.
Checklist
fix:
,feat:
,build:
,chore:
,ci:
,docs:
,style:
,refactor:
,perf:
,test:
and added!
in case the PR includes breaking changes.