-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor run methods more into abstract method #4353
Conversation
Signed-off-by: Merel Theisen <[email protected]>
…l runner as well Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
) -> ThreadPoolExecutor | ProcessPoolExecutor: | ||
"""Abstract method to provide the correct executor (e.g., ThreadPoolExecutor or ProcessPoolExecutor).""" | ||
pass | ||
|
||
@abstractmethod # pragma: no cover |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still an abstractmethod?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because it's still necessary to have _run()
for any Runner class and you can still overwrite this when creating a custom Runner. But now it's also easier to create a custom runner, because the _run() method has more in place to get started from.
) | ||
|
||
self._release_datasets(node, catalog, load_counts, pipeline) | ||
super()._run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it still needed if it's just using the base method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because _run()
is still an abstract method.
kedro/runner/runner.py
Outdated
from collections import Counter, deque | ||
from concurrent.futures import ( | ||
FIRST_COMPLETED, | ||
ProcessPoolExecutor, | ||
ThreadPoolExecutor, | ||
wait, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this import multiprocessing
as a dependencies? I recalled in the past we have issues with ShelveStore
because even importing the library cause issues on restricted environment like AWS Lambda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yeah I think it does.. Do you remember what the problem was with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's amazing how similar now all the runners start to look like, I love it!
Signed-off-by: Merel Theisen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, this makes all the runners look almost identical!
Co-authored-by: Ivan Danov <[email protected]> Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR, @merelcht! Great job on unifying and simplifying the runner's code - it looks great to me! I’ve left one comment suggesting a potential complexity optimisation.
|
||
with self._get_executor(max_workers) as pool: | ||
while True: | ||
ready = {n for n in todo_nodes if node_dependencies[n] <= done_nodes} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I propose reducing the complexity of that line in a separate PR because this becomes more critical with the unification of runners. As I understand, the previous implementation of the SequentialRunner (most popular runner) simply executed an ordered list of nodes sequentially. However, the current approach performs a full loop over all rest nodes after each executed node, this results in an overall complexity greater than quadratic.
I believe we can achieve linear complexity by maintaining a count of unmet dependencies for each node. As nodes are completed, we decrement the counter for their dependents and mark a node as ready when its counter reaches zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point @DimedS. I'll create a new issue for this, so it's not forgotten.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description
Resolves #4290
Development notes
_run
method implementations into the abstract_run
method._get_executor
method, implemented by each runner.ParallelRunner
andThreadRunner
into sharedvalidate_max_workers
method.hook_manager
argument in runners to allow it to beNone
, which is needed for theParallelRunner
, because hook manager can't be serialised.TestSuggestResumeScenario
forSequentialRunner
, because it's now using aThreadPoolExecutor
, the suggestions can vary per run. I've manually created a project with the same pipelines and verified that the suggestions (even if they vary) do always work.TestSuggestResumeScenario
tests forThreadRunner
.Developer Certificate of Origin
We need all contributions to comply with the Developer Certificate of Origin (DCO). All commits must be signed off by including a
Signed-off-by
line in the commit message. See our wiki for guidance.If your PR is blocked due to unsigned commits, then you must follow the instructions under "Rebase the branch" on the GitHub Checks page for your PR. This will retroactively add the sign-off to all unsigned commits and allow the DCO check to pass.
Checklist
RELEASE.md
file