Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Pipeline.run logic #8707

Draft
wants to merge 50 commits into
base: main
Choose a base branch
from
Draft

Fix: Pipeline.run logic #8707

wants to merge 50 commits into from

Conversation

mathislucka
Copy link
Member

@mathislucka mathislucka commented Jan 11, 2025

Related Issues

Proposed Changes:

The previous Pipeline.run logic had several flaws. Outputs of the pipeline depended on the insertion order of components or connections, cycles weren't processed in the right order, components ran more times than they should, and the output of a pipeline was not deterministic.

This PR introduces a new Pipeline.run logic that relies almost exclusively on the available data to decide when a component should run.

It is loosely based on the idea of Kahn Process Networks (a distributed model of computing).

Core Logic
Internally, the Pipeline.run method uses a single priority queue to schedule components to run. Components are popped from the priority queue one at a time until there are no more components that can run. Once there are no more components that can run, the pipeline returns an output. The queue is re-computed when needed because the produced outputs of the components in the pipeline might change priorities. Since pipelines can have cycles, the component can enter the priority queue multiple times.

When can a component run?
A component can run once it passes two conditions:

  1. All mandatory inputs are available (i.e. if we call component.run we do not get an exception because of missing positional or keyword arguments).
  2. The component was triggered to run.

Triggers
There are three types of triggers that cause a component to run:

  1. The component receives input from outside the pipeline (e.g. user input)
  2. The component receives input from another component in the pipeline
  3. The component does not have incoming connections to any other component in the pipeline and Pipeline.run is called

A trigger is "consumed" by the component, meaning that it can only cause it to run once. For example, for each Pipeline.run invocation, the component can only run once because of user input. It could still run again, but it needs to receive a second trigger to do that.

A component does not always run immediately when it receives a trigger, it only runs when it has highest priority in the priority queue.

Priorities
At a high level, we differentiate between components that can't run, because they don't fulfil the conditions to run. Components that can run immediately in any order, and components that we want to run later because they might still receive optional or lazy variadic inputs.

Inputs & Outputs
A component can receive inputs and it can produce outputs.

When a component runs, it "consumes" (deletes) its inputs, meaning that these same inputs will not be available in case the component runs another time. Inputs from outside the pipeline are an exception to this rule. They are only consumed when they are passed to a GreedyVariadic socket of the component (e.g. BranchJoiner). Other inputs from outside the pipeline will always be available to a component, no matter how often it runs.

After a component ran, its outputs are distributed to the connected input sockets of other components in the pipeline. Outputs that are not connected to any other input sockets in the pipeline, are returned to the user.

Impact on existing pipelines

Non-Cyclic Pipelines
For non-cyclic pipelines, the execution order of components might change. This does not have any impact on the outputs of a pipeline, except in one condition:

If the pipeline has two branches that are joined by a lazy variadic socket (e.g. DocumentJoiner), the order of the joined inputs might change. In the existing pipeline logic, the order is determined by the order of adding components to the pipeline and the order of connecting components. This behavior is not documented anywhere and the user can't know in which order these components will be executed without studying the underlying implementation in Haystack and NetworkX. This PR introduces a lexicographical sort for these cases, other possibilities could be discussed. We can potentially provide the users with utility functions to test if the output might change, when we release the changes from this PR.

Cyclic Pipelines
Cyclic pipelines are affected by several bugs in the current pipeline logic.
Cyclic pipelines might be affected if they meet any of these conditions:

  • pipelines with more than one optional or greedy variadic edge in the cycle (e.g. PromptBuilder, BranchJoiner)
  • pipelines with two cycles that share an optional or greedy variadic edge
  • ... (more?)

For these conditions, neither the run order nor the outputs of the pipeline might be deterministic (i.e. the output of the pipeline might change although the code didn't change). Again, it should be possible to provide tooling that helps users understand if their pipelines are affected.

Open Issues

Cycles without a defined entrypoint
For cycles with more than one component that only have default inputs and that receive inputs from outside the pipeline, there is no defined entrypoint (i.e. we can not know which component in the cycle should run first).

Consider this pipeline:
image

The use case is that one LLM generates code that is checked by another LLM for correctness and then either returned to the first LLM if it has feedback or returned to the user, if the second LLM decides that the code is good enough. The instructions for the "Feedback LLM" and the task for the "Code LLM" are both provided by the user from outside the cycle. Since we have 2 PromptBuilder components in the cycle (configured so that all inputs are optional) and they both receive an input that triggers them to run at the same time, and they are both waiting for exactly one more input (but can run without it), there is no defined order for which of these components should be executed first.

At least from my understanding, this problem can't be solved purely based on the topology of the pipeline graph or on the available data.

In this implementation, the code_prompt would run first because c comes before f in a lexicographical sort. We can document this behavior but it might still surprise our users. Additional measures could be:

  • test for this condition and log a warning
  • test for this condition and raise an exception (the user could make the edge from code_llm to feedback_prompt required to solve the problem)
  • give users another way to specify the running order for components that could run at the same time when the running order could affect the outputs of the pipeline (might be complex)

Should cycles run like loops in a programming language?

@tstadel pointed out that components outside of a cycle might receive inputs from the cycle while the cycle is still running. In very few edge cases, this could cause the components outside the cycle to run repeatedly (and in turn triggering other components in the pipeline).

When we think of our pipeline in terms of a distributed model of computing, then this behavior would be expected.

However, if we assume that a cycle in a pipeline works the same way as a loop in a programming language, then the loop should run to completion before we execute any other components.

Consider this pseudo-implementation:

outside_output = None
for component in cycle:
  if component.output_receiver == 'outside_output':
    outside_output = component.output

# continue with the last value that was set for outside_output

Another option would be to treat the input socket of a component like a FIFO queue, meaning that the outputs would not be overwritten in case components in the cycle provide the output multiple times.

outside_output = []
for component in cycle:
  if component.output_receiver == 'outside_output':
    outside_output.append(component.output)

# receiving component will run as many times as outputs were appended to outside_output

My recommendation would be to follow the distributed model of computing approach and allow a component to run as soon as it receives inputs and is ready to run. The implementation is a lot less complex, especially so, when we introduce concurrency to our pipeline execution. If the user does not want the component outside the cycle to run before the cycle has fully executed, it is easy to achieve that by a different dataflow in the pipeline or by marking edges as required.

How did you test it?

  • behavioral tests
  • some tests that are currently failing are expected to fail, because we are testing the running order of the components, which is not actually the behavior that we want to test

Notes for the reviewer

This is a work in progress.

The change needs to be tested extensively.

My recommendation would be to update the behavioral tests so that we test for the inputs that a component received to run and the pipeline outputs instead of testing for the component execution order.

Using content tracing, we can adapt the existing testing approach by exchanging expected_run_order with expected_component_inputs. Expected component inputs could be tested like this (pseudo-code):

expected_component_inputs = {('<component_name>', '<visits>'): {...inputs}}

for key, inputs in actual_inputs.items():
   assert inputs == expected_component_inputs.get(key)

This is better than the current approach because we don't really want to test the execution order of the components, we only care about how often a component runs, if it has the same inputs, and if the pipeline has the same outputs.

Currently, the behavioral tests do not test these two behaviors (as demonstrated here):

  • how often a component runs
  • if it has the same inputs

For real-world use cases, changes in these two behaviors will have an impact on the output of the pipeline.

Adapting the tests allows us to re-use the test suite when we implement pipelines that run components concurrently.

Checklist

  • I have read the contributors guidelines and the code of conduct
  • I have updated the related issue with new insights and changes
  • I added unit tests and updated the docstrings
  • I've used one of the conventional commit types for my PR title: fix:, feat:, build:, chore:, ci:, docs:, style:, refactor:, perf:, test: and added ! in case the PR includes breaking changes.
  • I documented my code
  • I ran pre-commit hooks and fixed any issue

@@ -2821,3 +2821,148 @@ def run(self, replies: List[str]):
)
],
)

@given("a pipeline that passes outputs that are consumed in cycle to outside the cycle", target_fixture="pipeline_data")
def passes_outputs_outside_cycle():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tstadel could you check if this test covers the use case that you had with passing the prompt outside the cycle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test fails on main and it yields an empty prompt instead of the expected prompt value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output from the cycle isn't distributed in the old logic because we are filtering for components that are in the cycle here:

receivers = [item for item in self._find_receivers_from(name) if item[0] in cycle]

And then we remove any output that was already received by a component in the cycle here:

to_remove_from_component_result.add(sender_socket.name)

That means that outputs that are received both by components inside and outside the cycle will only go to components inside the cycle.

@coveralls
Copy link
Collaborator

coveralls commented Jan 14, 2025

Pull Request Test Coverage Report for Build 12910444146

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • 39 unchanged lines in 4 files lost coverage.
  • Overall coverage increased (+1.2%) to 92.547%

Files with Coverage Reduction New Missed Lines %
core/pipeline/utils.py 3 91.43%
components/validators/json_schema.py 8 88.73%
core/pipeline/pipeline.py 13 92.31%
core/pipeline/base.py 15 93.51%
Totals Coverage Status
Change from base Build 12829583920: 1.2%
Covered Lines: 8878
Relevant Lines: 9593

💛 - Coveralls

@@ -40,6 +40,72 @@ class Pipeline(PipelineBase):
Orchestrates component execution according to the execution graph, one after the other.
"""

def _warn_if_ambiguous_intent(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Amnah199 I started out with this method to check for ambiguous execution order up front (e.g. the case that I showed for the cycle with 2 prompt builders in the PR description).

The problem is, that we don't know for sure that the running order will be ambiguous, we just know that it's possible.

I then thought that we might just check this when actually running the pipeline and only warn if this case occurs (see comment further down).

I think that would be a better approach and I'd skip this one then. What do you think?

next_priority, next_name = priority_queue.peek()

# alternative to _warn_if_ambiguous_intent
if priority in [ComponentPriority.DEFER, ComponentPriority.DEFER_LAST] and next_priority == priority:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative check that would only warn if ambiguity actually occurs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my current understanding, I prefer this approach; however, let’s keep both options for now before making a final decision.

I do have one question: why are we referring to this as an "ambiguity"? In this case, the component that comes first in lexicographical order should execute. Since we also plan to document this for users to clarify the execution order, we can simply notify them that, based on the sorting criteria, if we have lets say a retriever and a summarizer waiting in this particular scenario, then retriever will run first. "Ambiguous running order" suggests that we expect the order to unpredictably change in certain cases, which I believe shouldn’t happen. Please let me know if I’ve misunderstood anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn't sure about the wording here.

The execution order is clear, the component that comes first lexicographically will also execute first. I just thought that this is a pretty implicit mechanism and the user might not be aware of it (many don't read the docs). So this warning is there to pull the users attention to the fact that the same priority is resolved by using lexicographical sorting to check if it was also the user's "intent" to run the components in that order.

Co-authored-by: Amna Mubashar <[email protected]>
],
"top_k": None,
},
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mathislucka I am adding these sort of checks with content tracing for component calls. Can you verify if this aligns with what you described?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's exactly what I had in mind. 🙌

They would replace the run_order tests but we can add them first and then remove run orders in one go.

Comment on lines +276 to +277
elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs):
return ComponentPriority.HIGHEST
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mathislucka, could you help clarify our goal behind assigning “highest priority” to components?

If HIGHEST means running as soon as all mandatory sockets are ready and at least one greedy socket has inputs, then we should consider setting only_check_mandatory=True. I thought about this because as per my understanding a “greedy” socket implies the component is willing to run multiple times or run as soon as partial input is available. Is the goal that we don’t block execution due to optional sockets that haven’t yet received inputs?

Suggested change
elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs):
return ComponentPriority.HIGHEST
elif is_any_greedy_socket_ready(component, inputs) and are_all_sockets_ready(component, inputs, only_check_mandatory=True):
return ComponentPriority.HIGHEST

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I initially did not wait for optional inputs but I checked the existing logic and we wait for optional inputs there, so I did not want to change that.

I think that in practice we don't have any components that receive both a greedy variadic and an optional input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidsbatista is the above case something you can verify in one of your pipelines?

@julian-risch julian-risch added this to the 2.10.0 milestone Jan 22, 2025
# Pipeline has no components.
if len(pq) == 0:
return

Copy link
Contributor

@Amnah199 Amnah199 Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mathislucka, currently this test is failing as an empty list is returned instead of PipelineRuntimeError. The pipeline has no valid entry point and is not executed at all so we should raise an error.
To address this, we could add a small validation check before entering the execution loop. Below is a suggestion which adds new functions—what do you think?

Suggested change
# check if pipeline is blocked before running
self.validate_pipeline(priority_queue)
# NEW FUNCTIONS TO ADD
@staticmethod
def _is_queue_blocked(pq: FIFOPriorityQueue) -> bool:
queue_copy = deepcopy(pq)
while queue_copy:
component = queue_copy.peek()
if component[0] != ComponentPriority.BLOCKED:
return False
queue_copy.pop()
return True
def validate_pipeline(self, priority_queue: FIFOPriorityQueue):
"""
Validate the pipeline to check if it is blocked or has no entry point
:param priority_queue: Priority queue of component names.
"""
if self._is_queue_blocked(priority_queue):
raise PipelineRuntimeError("Cannot run pipeline - all components are blocked. This typically happens when:\n"
"1. Components are missing required inputs\n"
"2. There is a circular dependency without a valid entry point\n"
"Check the connections between these components and ensure all required inputs are provided.")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cycle detection removes same edge multiple times
4 participants