From fc9c5c11450033b54deccb51cd6f352a399a727f Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Mon, 4 Nov 2024 18:06:07 +0000 Subject: [PATCH 01/20] First refactor step to make _run() implementations more similar Signed-off-by: Merel Theisen --- kedro/runner/parallel_runner.py | 6 +++++- kedro/runner/sequential_runner.py | 10 +++++----- kedro/runner/thread_runner.py | 19 ++++++++++++++++--- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 4f20295285..fa15c99377 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -196,7 +196,7 @@ def _run( hook_manager: PluginManager, session_id: str | None = None, ) -> None: - """The abstract interface for running pipelines. + """The method implementing parallel pipeline running. Args: pipeline: The ``Pipeline`` to run. @@ -263,5 +263,9 @@ def _run( for future in done: node = future.result() done_nodes.add(node) + self._logger.info("Completed node: %s", node.name) + self._logger.info( + "Completed %d out of %d tasks", len(done_nodes), len(nodes) + ) self._release_datasets(node, catalog, load_counts, pipeline) diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index 508f95234f..ab75839915 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -17,6 +17,7 @@ from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline + from kedro.pipeline.node import Node class SequentialRunner(AbstractRunner): @@ -70,11 +71,10 @@ def _run( "for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously" ) nodes = pipeline.nodes - done_nodes = set() - load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) + done_nodes: set[Node] = set() - for exec_index, node in enumerate(nodes): + for node in nodes: try: Task( node=node, @@ -88,8 +88,8 @@ def _run( self._suggest_resume_scenario(pipeline, done_nodes, catalog) raise - self._release_datasets(node, catalog, load_counts, pipeline) - + self._logger.info("Completed node: %s", node.name) self._logger.info( "Completed %d out of %d tasks", len(done_nodes), len(nodes) ) + self._release_datasets(node, catalog, load_counts, pipeline) diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 19cfaafdbd..cc2ea0b546 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -92,7 +92,7 @@ def _run( hook_manager: PluginManager, session_id: str | None = None, ) -> None: - """The abstract interface for running pipelines. + """The method implementing threaded pipeline running. Args: pipeline: The ``Pipeline`` to run. @@ -127,8 +127,21 @@ def _run( ) futures.add(pool.submit(task)) if not futures: - assert not todo_nodes, (todo_nodes, done_nodes, ready, done) # noqa: S101 - break + if todo_nodes: + debug_data = { + "todo_nodes": todo_nodes, + "done_nodes": done_nodes, + "ready_nodes": ready, + "done_futures": done, + } + debug_data_str = "\n".join( + f"{k} = {v}" for k, v in debug_data.items() + ) + raise RuntimeError( + f"Unable to schedule new tasks although some nodes " + f"have not been run:\n{debug_data_str}" + ) + break # pragma: no cover done, futures = wait(futures, return_when=FIRST_COMPLETED) for future in done: try: From 62569f7219072e367334c0dc382d51969a4448c6 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 5 Nov 2024 13:52:26 +0100 Subject: [PATCH 02/20] Move common logic to abstract _run method, use executor for sequential runner as well Signed-off-by: Merel Theisen --- kedro/runner/parallel_runner.py | 66 ++--------- kedro/runner/runner.py | 97 ++++++++++++++-- kedro/runner/sequential_runner.py | 43 +++----- kedro/runner/thread_runner.py | 64 +---------- tests/runner/test_sequential_runner.py | 147 +++++++++++++------------ 5 files changed, 190 insertions(+), 227 deletions(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index fa15c99377..328ab4873e 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -6,9 +6,7 @@ import os import sys -from collections import Counter -from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait -from itertools import chain +from concurrent.futures import ProcessPoolExecutor from multiprocessing.managers import BaseProxy, SyncManager from multiprocessing.reduction import ForkingPickler from pickle import PicklingError @@ -21,7 +19,6 @@ SharedMemoryDataset, ) from kedro.runner.runner import AbstractRunner -from kedro.runner.task import Task if TYPE_CHECKING: from collections.abc import Iterable @@ -189,11 +186,14 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: return min(required_processes, self._max_workers) + def _get_executor(self, max_workers: int): + return ProcessPoolExecutor(max_workers=max_workers) + def _run( self, pipeline: Pipeline, catalog: CatalogProtocol, - hook_manager: PluginManager, + hook_manager: PluginManager | None = None, session_id: str | None = None, ) -> None: """The method implementing parallel pipeline running. @@ -218,54 +218,8 @@ def _run( "for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously" ) - nodes = pipeline.nodes - self._validate_catalog(catalog, pipeline) - self._validate_nodes(nodes) - self._set_manager_datasets(catalog, pipeline) - load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) - node_dependencies = pipeline.node_dependencies - todo_nodes = set(node_dependencies.keys()) - done_nodes: set[Node] = set() - futures = set() - done = None - max_workers = self._get_required_workers_count(pipeline) - - with ProcessPoolExecutor(max_workers=max_workers) as pool: - while True: - ready = {n for n in todo_nodes if node_dependencies[n] <= done_nodes} - todo_nodes -= ready - for node in ready: - task = Task( - node=node, - catalog=catalog, - is_async=self._is_async, - session_id=session_id, - parallel=True, - ) - futures.add(pool.submit(task)) - if not futures: - if todo_nodes: - debug_data = { - "todo_nodes": todo_nodes, - "done_nodes": done_nodes, - "ready_nodes": ready, - "done_futures": done, - } - debug_data_str = "\n".join( - f"{k} = {v}" for k, v in debug_data.items() - ) - raise RuntimeError( - f"Unable to schedule new tasks although some nodes " - f"have not been run:\n{debug_data_str}" - ) - break # pragma: no cover - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - node = future.result() - done_nodes.add(node) - self._logger.info("Completed node: %s", node.name) - self._logger.info( - "Completed %d out of %d tasks", len(done_nodes), len(nodes) - ) - - self._release_datasets(node, catalog, load_counts, pipeline) + super()._run( + pipeline=pipeline, + catalog=catalog, + session_id=session_id, + ) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index a36a42c3e0..cba954e5a3 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -8,9 +8,13 @@ import logging import warnings from abc import ABC, abstractmethod -from collections import deque +from collections import Counter, deque +from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait +from itertools import chain from typing import TYPE_CHECKING, Any +from pluggy import PluginManager + from kedro import KedroDeprecationWarning from kedro.framework.hooks.manager import _NullPluginManager from kedro.io import CatalogProtocol, MemoryDataset, SharedMemoryDataset @@ -160,25 +164,79 @@ def run_only_missing( return self.run(to_rerun, catalog, hook_manager) + @abstractmethod + def _get_executor(self, max_workers): + """Abstract method to provide the correct executor (e.g., ThreadPoolExecutor or ProcessPoolExecutor).""" + pass + @abstractmethod # pragma: no cover def _run( self, pipeline: Pipeline, catalog: CatalogProtocol, - hook_manager: PluginManager, + hook_manager: PluginManager | None = None, session_id: str | None = None, ) -> None: - """The abstract interface for running pipelines, assuming that the - inputs have already been checked and normalized by run(). + """Common pipeline execution logic using an executor.""" + nodes = pipeline.nodes + self._validate_catalog(catalog, pipeline) + self._validate_nodes(nodes) + self._set_manager_datasets(catalog, pipeline) + load_counts = Counter(chain.from_iterable(n.inputs for n in pipeline.nodes)) + node_dependencies = pipeline.node_dependencies + todo_nodes = set(node_dependencies.keys()) + done_nodes: set[Node] = set() + futures = set() + done = None + max_workers = self._get_required_workers_count(pipeline) + + with self._get_executor(max_workers) as pool: + while True: + ready = {n for n in todo_nodes if node_dependencies[n] <= done_nodes} + todo_nodes -= ready + for node in ready: + task = Task( + node=node, + catalog=catalog, + hook_manager=hook_manager, + is_async=self._is_async, + session_id=session_id, + ) + if isinstance(pool, ProcessPoolExecutor): + task.parallel = True + futures.add(pool.submit(task)) + if not futures: + if todo_nodes: + self._raise_runtime_error(todo_nodes, done_nodes, ready, done) + break + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + node = future.result() + # try: + # node = future.result() + # except Exception: + # self._suggest_resume_scenario(pipeline, done_nodes, catalog) + # raise + done_nodes.add(node) + self._logger.info("Completed node: %s", node.name) + self._logger.info( + "Completed %d out of %d tasks", len(done_nodes), len(nodes) + ) + self._release_datasets(node, catalog, load_counts, pipeline) - Args: - pipeline: The ``Pipeline`` to run. - catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data. - hook_manager: The ``PluginManager`` to activate hooks. - session_id: The id of the session. - - """ - pass + @staticmethod + def _raise_runtime_error(todo_nodes, done_nodes, ready, done): + debug_data = { + "todo_nodes": todo_nodes, + "done_nodes": done_nodes, + "ready_nodes": ready, + "done_futures": done, + } + debug_data_str = "\n".join(f"{k} = {v}" for k, v in debug_data.items()) + raise RuntimeError( + f"Unable to schedule new tasks although some nodes " + f"have not been run:\n{debug_data_str}" + ) def _suggest_resume_scenario( self, @@ -234,6 +292,21 @@ def _release_datasets( if load_counts[dataset] < 1 and dataset not in pipeline.outputs(): catalog.release(dataset) + def _validate_catalog(self, catalog, pipeline): + # Add catalog validation logic here if needed + pass + + def _validate_nodes(self, nodes): + # Add node validation logic here if needed + pass + + def _set_manager_datasets(self, catalog, pipeline): + # Set up any necessary manager datasets here + pass + + def _get_required_workers_count(self, pipeline): + return 1 + def _find_nodes_to_resume_from( pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: CatalogProtocol diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index ab75839915..6301014861 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -5,19 +5,18 @@ from __future__ import annotations -from collections import Counter -from itertools import chain +from concurrent.futures import ( + ThreadPoolExecutor, +) from typing import TYPE_CHECKING, Any from kedro.runner.runner import AbstractRunner -from kedro.runner.task import Task if TYPE_CHECKING: from pluggy import PluginManager from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline - from kedro.pipeline.node import Node class SequentialRunner(AbstractRunner): @@ -47,11 +46,16 @@ def __init__( is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns ) + def _get_executor(self, max_workers: int): + return ThreadPoolExecutor( + max_workers=1 + ) # Single-threaded for sequential execution + def _run( self, pipeline: Pipeline, catalog: CatalogProtocol, - hook_manager: PluginManager, + hook_manager: PluginManager | None = None, session_id: str | None = None, ) -> None: """The method implementing sequential pipeline running. @@ -70,26 +74,9 @@ def _run( "Using synchronous mode for loading and saving data. Use the --async flag " "for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously" ) - nodes = pipeline.nodes - load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) - done_nodes: set[Node] = set() - - for node in nodes: - try: - Task( - node=node, - catalog=catalog, - hook_manager=hook_manager, - is_async=self._is_async, - session_id=session_id, - ).execute() - done_nodes.add(node) - except Exception: - self._suggest_resume_scenario(pipeline, done_nodes, catalog) - raise - - self._logger.info("Completed node: %s", node.name) - self._logger.info( - "Completed %d out of %d tasks", len(done_nodes), len(nodes) - ) - self._release_datasets(node, catalog, load_counts, pipeline) + super()._run( + pipeline=pipeline, + catalog=catalog, + hook_manager=hook_manager, + session_id=session_id, + ) diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index cc2ea0b546..0a11f98874 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -6,12 +6,9 @@ from __future__ import annotations import warnings -from collections import Counter -from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait -from itertools import chain +from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Any -from kedro.runner import Task from kedro.runner.runner import AbstractRunner if TYPE_CHECKING: @@ -19,7 +16,6 @@ from kedro.io import CatalogProtocol from kedro.pipeline import Pipeline - from kedro.pipeline.node import Node class ThreadRunner(AbstractRunner): @@ -85,11 +81,14 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: else required_threads ) + def _get_executor(self, max_workers: int): + return ThreadPoolExecutor(max_workers=max_workers) + def _run( self, pipeline: Pipeline, catalog: CatalogProtocol, - hook_manager: PluginManager, + hook_manager: PluginManager | None = None, session_id: str | None = None, ) -> None: """The method implementing threaded pipeline running. @@ -104,55 +103,4 @@ def _run( Exception: in case of any downstream node failure. """ - nodes = pipeline.nodes - load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) - node_dependencies = pipeline.node_dependencies - todo_nodes = set(node_dependencies.keys()) - done_nodes: set[Node] = set() - futures = set() - done = None - max_workers = self._get_required_workers_count(pipeline) - - with ThreadPoolExecutor(max_workers=max_workers) as pool: - while True: - ready = {n for n in todo_nodes if node_dependencies[n] <= done_nodes} - todo_nodes -= ready - for node in ready: - task = Task( - node=node, - catalog=catalog, - hook_manager=hook_manager, - is_async=self._is_async, - session_id=session_id, - ) - futures.add(pool.submit(task)) - if not futures: - if todo_nodes: - debug_data = { - "todo_nodes": todo_nodes, - "done_nodes": done_nodes, - "ready_nodes": ready, - "done_futures": done, - } - debug_data_str = "\n".join( - f"{k} = {v}" for k, v in debug_data.items() - ) - raise RuntimeError( - f"Unable to schedule new tasks although some nodes " - f"have not been run:\n{debug_data_str}" - ) - break # pragma: no cover - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - try: - node = future.result() - except Exception: - self._suggest_resume_scenario(pipeline, done_nodes, catalog) - raise - done_nodes.add(node) - self._logger.info("Completed node: %s", node.name) - self._logger.info( - "Completed %d out of %d tasks", len(done_nodes), len(nodes) - ) - - self._release_datasets(node, catalog, load_counts, pipeline) + super()._run(pipeline, catalog, hook_manager, session_id) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 229518ecd4..450b02b504 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -1,6 +1,5 @@ from __future__ import annotations -import re from typing import Any import pandas as pd @@ -17,7 +16,7 @@ from kedro.pipeline import node from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner import SequentialRunner -from tests.runner.conftest import exception_fn, identity, sink, source +from tests.runner.conftest import identity, sink, source class TestValidSequentialRunner: @@ -265,77 +264,79 @@ def test_confirms(self, mocker, test_pipeline, is_async): fake_dataset_instance.confirm.assert_called_once_with() -class TestSuggestResumeScenario: - @pytest.mark.parametrize( - "failing_node_names,expected_pattern", - [ - (["node1_A", "node1_B"], r"No nodes ran."), - (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), - (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), - ], - ) - def test_suggest_resume_scenario( - self, - caplog, - two_branches_crossed_pipeline, - persistent_dataset_catalog, - failing_node_names, - expected_pattern, - ): - nodes = {n.name: n for n in two_branches_crossed_pipeline.nodes} - for name in failing_node_names: - two_branches_crossed_pipeline -= modular_pipeline([nodes[name]]) - two_branches_crossed_pipeline += modular_pipeline( - [nodes[name]._copy(func=exception_fn)] - ) - with pytest.raises(Exception): - SequentialRunner().run( - two_branches_crossed_pipeline, - persistent_dataset_catalog, - hook_manager=_create_hook_manager(), - ) - assert re.search(expected_pattern, caplog.text) - - @pytest.mark.parametrize( - "failing_node_names,expected_pattern", - [ - (["node1_A", "node1_B"], r"No nodes ran."), - (["node2"], r'"node1_A,node1_B"'), - (["node3_A"], r'"node3_A,node3_B"'), - (["node4_A"], r'"node3_A,node3_B"'), - (["node3_A", "node4_A"], r'"node3_A,node3_B"'), - (["node2", "node4_A"], r'"node1_A,node1_B"'), - ], - ) - def test_stricter_suggest_resume_scenario( - self, - caplog, - two_branches_crossed_pipeline_variable_inputs, - persistent_dataset_catalog, - failing_node_names, - expected_pattern, - ): - """ - Stricter version of previous test. - Covers pipelines where inputs are shared across nodes. - """ - test_pipeline = two_branches_crossed_pipeline_variable_inputs - - nodes = {n.name: n for n in test_pipeline.nodes} - for name in failing_node_names: - test_pipeline -= modular_pipeline([nodes[name]]) - test_pipeline += modular_pipeline([nodes[name]._copy(func=exception_fn)]) - - with pytest.raises(Exception, match="test exception"): - SequentialRunner().run( - test_pipeline, - persistent_dataset_catalog, - hook_manager=_create_hook_manager(), - ) - assert re.search(expected_pattern, caplog.text) +# class TestSuggestResumeScenario: +# @pytest.mark.parametrize( +# "failing_node_names,expected_pattern", +# [ +# (["node1_A", "node1_B"], r"No nodes ran."), +# (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), +# (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), +# (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A)"), +# (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), +# (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), +# ], +# ) +# def test_suggest_resume_scenario( +# self, +# caplog, +# two_branches_crossed_pipeline, +# persistent_dataset_catalog, +# failing_node_names, +# expected_pattern, +# ): +# nodes = {n.name: n for n in two_branches_crossed_pipeline.nodes} +# for name in failing_node_names: +# two_branches_crossed_pipeline -= modular_pipeline([nodes[name]]) +# two_branches_crossed_pipeline += modular_pipeline( +# [nodes[name]._copy(func=exception_fn)] +# ) +# with pytest.raises(Exception): +# SequentialRunner().run( +# two_branches_crossed_pipeline, +# persistent_dataset_catalog, +# hook_manager=_create_hook_manager(), +# ) +# assert re.search(expected_pattern, caplog.text) +# +# @pytest.mark.parametrize( +# "failing_node_names,expected_pattern", +# [ +# (["node1_A", "node1_B"], r"No nodes ran."), +# (["node2"], r'"node1_A,node1_B"'), +# (["node3_A"], r'"node3_A,node3_B"'), +# (["node4_A"], r"(node3_A,node3_B|node3_A)"), +# # (["node4_A"], r'"node3_A,node3_B"'), +# (["node3_A", "node4_A"], r'"node3_A,node3_B"'), +# (["node2", "node4_A"], r'"node1_A,node1_B"'), +# ], +# ) +# def test_stricter_suggest_resume_scenario( +# self, +# caplog, +# two_branches_crossed_pipeline_variable_inputs, +# persistent_dataset_catalog, +# failing_node_names, +# expected_pattern, +# ): +# """ +# Stricter version of previous test. +# Covers pipelines where inputs are shared across nodes. +# """ +# test_pipeline = two_branches_crossed_pipeline_variable_inputs +# +# nodes = {n.name: n for n in test_pipeline.nodes} +# for name in failing_node_names: +# test_pipeline -= modular_pipeline([nodes[name]]) +# test_pipeline += modular_pipeline([nodes[name]._copy(func=exception_fn)]) +# +# with pytest.raises(Exception, match="test exception"): +# SequentialRunner().run( +# test_pipeline, +# persistent_dataset_catalog, +# hook_manager=_create_hook_manager(), +# ) +# print(caplog.text) +# assert re.search(expected_pattern, caplog.text) class TestMemoryDatasetBehaviour: From dcd699194f5f965a79bd7b5afc824a2dc8da3e32 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 5 Nov 2024 14:18:22 +0100 Subject: [PATCH 03/20] Refactor max workers logic into shared helper method Signed-off-by: Merel Theisen --- kedro/runner/parallel_runner.py | 18 ++--------------- kedro/runner/runner.py | 29 ++++++++++++++++++++++++++++ kedro/runner/thread_runner.py | 7 ++----- tests/runner/test_parallel_runner.py | 2 +- 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 328ab4873e..1c3a032687 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -4,8 +4,6 @@ from __future__ import annotations -import os -import sys from concurrent.futures import ProcessPoolExecutor from multiprocessing.managers import BaseProxy, SyncManager from multiprocessing.reduction import ForkingPickler @@ -18,7 +16,7 @@ MemoryDataset, SharedMemoryDataset, ) -from kedro.runner.runner import AbstractRunner +from kedro.runner.runner import AbstractRunner, validate_max_workers if TYPE_CHECKING: from collections.abc import Iterable @@ -28,9 +26,6 @@ from kedro.pipeline import Pipeline from kedro.pipeline.node import Node -# see https://github.com/python/cpython/blob/master/Lib/concurrent/futures/process.py#L114 -_MAX_WINDOWS_WORKERS = 61 - class ParallelRunnerManager(SyncManager): """``ParallelRunnerManager`` is used to create shared ``MemoryDataset`` @@ -80,16 +75,7 @@ def __init__( self._manager = ParallelRunnerManager() self._manager.start() - # This code comes from the concurrent.futures library - # https://github.com/python/cpython/blob/master/Lib/concurrent/futures/process.py#L588 - if max_workers is None: - # NOTE: `os.cpu_count` might return None in some weird cases. - # https://github.com/python/cpython/blob/3.7/Modules/posixmodule.c#L11431 - max_workers = os.cpu_count() or 1 - if sys.platform == "win32": - max_workers = min(_MAX_WINDOWS_WORKERS, max_workers) - - self._max_workers = max_workers + self._max_workers = validate_max_workers(max_workers) def __del__(self) -> None: self._manager.shutdown() diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index cba954e5a3..587d1c6a70 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -6,6 +6,8 @@ import inspect import logging +import os +import sys import warnings from abc import ABC, abstractmethod from collections import Counter, deque @@ -21,6 +23,9 @@ from kedro.pipeline import Pipeline from kedro.runner.task import Task +# see https://github.com/python/cpython/blob/master/Lib/concurrent/futures/process.py#L114 +_MAX_WINDOWS_WORKERS = 61 + if TYPE_CHECKING: from collections.abc import Collection, Iterable @@ -510,3 +515,27 @@ def run_node( ) node = task.execute() return node + + +def validate_max_workers(max_workers: int | None) -> int: + """ + Validates and returns the number of workers. Sets to os.cpu_count() or 1 if max_workers is None, + and limits max_workers to 61 on Windows. + + Args: + max_workers: Desired number of workers. If None, defaults to os.cpu_count() or 1. + + Returns: + A valid number of workers to use. + + Raises: + ValueError: If max_workers is set and is not positive. + """ + if max_workers is None: + max_workers = os.cpu_count() or 1 + if sys.platform == "win32": + max_workers = min(_MAX_WINDOWS_WORKERS, max_workers) + elif max_workers <= 0: + raise ValueError("max_workers should be positive") + + return max_workers diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 0a11f98874..1d30e35335 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -9,7 +9,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Any -from kedro.runner.runner import AbstractRunner +from kedro.runner.runner import AbstractRunner, validate_max_workers if TYPE_CHECKING: from pluggy import PluginManager @@ -59,10 +59,7 @@ def __init__( is_async=False, extra_dataset_patterns=self._extra_dataset_patterns ) - if max_workers is not None and max_workers <= 0: - raise ValueError("max_workers should be positive") - - self._max_workers = max_workers + self._max_workers = validate_max_workers(max_workers) def _get_required_workers_count(self, pipeline: Pipeline) -> int: """ diff --git a/tests/runner/test_parallel_runner.py b/tests/runner/test_parallel_runner.py index 0f989048f1..049b27f200 100644 --- a/tests/runner/test_parallel_runner.py +++ b/tests/runner/test_parallel_runner.py @@ -17,9 +17,9 @@ from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner import ParallelRunner from kedro.runner.parallel_runner import ( - _MAX_WINDOWS_WORKERS, ParallelRunnerManager, ) +from kedro.runner.runner import _MAX_WINDOWS_WORKERS from tests.runner.conftest import ( exception_fn, identity, From 58abcf77d061ea652677deb7d667f2f42c40073d Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 5 Nov 2024 14:24:01 +0100 Subject: [PATCH 04/20] Add resume scenario logic Signed-off-by: Merel Theisen --- kedro/runner/runner.py | 11 +- tests/runner/test_sequential_runner.py | 148 ++++++++++++------------- tests/runner/test_thread_runner.py | 75 +++++++++++++ 3 files changed, 154 insertions(+), 80 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 587d1c6a70..2bd3366e8f 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -216,12 +216,11 @@ def _run( break done, futures = wait(futures, return_when=FIRST_COMPLETED) for future in done: - node = future.result() - # try: - # node = future.result() - # except Exception: - # self._suggest_resume_scenario(pipeline, done_nodes, catalog) - # raise + try: + node = future.result() + except Exception: + self._suggest_resume_scenario(pipeline, done_nodes, catalog) + raise done_nodes.add(node) self._logger.info("Completed node: %s", node.name) self._logger.info( diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 450b02b504..8c5998f75f 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -1,5 +1,6 @@ from __future__ import annotations +import re from typing import Any import pandas as pd @@ -16,7 +17,7 @@ from kedro.pipeline import node from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner import SequentialRunner -from tests.runner.conftest import identity, sink, source +from tests.runner.conftest import exception_fn, identity, sink, source class TestValidSequentialRunner: @@ -264,79 +265,78 @@ def test_confirms(self, mocker, test_pipeline, is_async): fake_dataset_instance.confirm.assert_called_once_with() -# class TestSuggestResumeScenario: -# @pytest.mark.parametrize( -# "failing_node_names,expected_pattern", -# [ -# (["node1_A", "node1_B"], r"No nodes ran."), -# (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), -# (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), -# (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A)"), -# (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), -# (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), -# ], -# ) -# def test_suggest_resume_scenario( -# self, -# caplog, -# two_branches_crossed_pipeline, -# persistent_dataset_catalog, -# failing_node_names, -# expected_pattern, -# ): -# nodes = {n.name: n for n in two_branches_crossed_pipeline.nodes} -# for name in failing_node_names: -# two_branches_crossed_pipeline -= modular_pipeline([nodes[name]]) -# two_branches_crossed_pipeline += modular_pipeline( -# [nodes[name]._copy(func=exception_fn)] -# ) -# with pytest.raises(Exception): -# SequentialRunner().run( -# two_branches_crossed_pipeline, -# persistent_dataset_catalog, -# hook_manager=_create_hook_manager(), -# ) -# assert re.search(expected_pattern, caplog.text) -# -# @pytest.mark.parametrize( -# "failing_node_names,expected_pattern", -# [ -# (["node1_A", "node1_B"], r"No nodes ran."), -# (["node2"], r'"node1_A,node1_B"'), -# (["node3_A"], r'"node3_A,node3_B"'), -# (["node4_A"], r"(node3_A,node3_B|node3_A)"), -# # (["node4_A"], r'"node3_A,node3_B"'), -# (["node3_A", "node4_A"], r'"node3_A,node3_B"'), -# (["node2", "node4_A"], r'"node1_A,node1_B"'), -# ], -# ) -# def test_stricter_suggest_resume_scenario( -# self, -# caplog, -# two_branches_crossed_pipeline_variable_inputs, -# persistent_dataset_catalog, -# failing_node_names, -# expected_pattern, -# ): -# """ -# Stricter version of previous test. -# Covers pipelines where inputs are shared across nodes. -# """ -# test_pipeline = two_branches_crossed_pipeline_variable_inputs -# -# nodes = {n.name: n for n in test_pipeline.nodes} -# for name in failing_node_names: -# test_pipeline -= modular_pipeline([nodes[name]]) -# test_pipeline += modular_pipeline([nodes[name]._copy(func=exception_fn)]) -# -# with pytest.raises(Exception, match="test exception"): -# SequentialRunner().run( -# test_pipeline, -# persistent_dataset_catalog, -# hook_manager=_create_hook_manager(), -# ) -# print(caplog.text) -# assert re.search(expected_pattern, caplog.text) +class TestSuggestResumeScenario: + @pytest.mark.parametrize( + "failing_node_names,expected_pattern", + [ + (["node1_A", "node1_B"], r"No nodes ran."), + (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), + (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A)"), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), + ], + ) + def test_suggest_resume_scenario( + self, + caplog, + two_branches_crossed_pipeline, + persistent_dataset_catalog, + failing_node_names, + expected_pattern, + ): + nodes = {n.name: n for n in two_branches_crossed_pipeline.nodes} + for name in failing_node_names: + two_branches_crossed_pipeline -= modular_pipeline([nodes[name]]) + two_branches_crossed_pipeline += modular_pipeline( + [nodes[name]._copy(func=exception_fn)] + ) + with pytest.raises(Exception): + SequentialRunner().run( + two_branches_crossed_pipeline, + persistent_dataset_catalog, + hook_manager=_create_hook_manager(), + ) + assert re.search(expected_pattern, caplog.text) + + @pytest.mark.parametrize( + "failing_node_names,expected_pattern", + [ + (["node1_A", "node1_B"], r"No nodes ran."), + (["node2"], r'"node1_A,node1_B"'), + (["node3_A"], r'"node3_A,node3_B"'), + (["node4_A"], r'"node3_A,node3_B"'), + (["node3_A", "node4_A"], r'"node3_A,node3_B"'), + (["node2", "node4_A"], r'"node1_A,node1_B"'), + ], + ) + def test_stricter_suggest_resume_scenario( + self, + caplog, + two_branches_crossed_pipeline_variable_inputs, + persistent_dataset_catalog, + failing_node_names, + expected_pattern, + ): + """ + Stricter version of previous test. + Covers pipelines where inputs are shared across nodes. + """ + test_pipeline = two_branches_crossed_pipeline_variable_inputs + + nodes = {n.name: n for n in test_pipeline.nodes} + for name in failing_node_names: + test_pipeline -= modular_pipeline([nodes[name]]) + test_pipeline += modular_pipeline([nodes[name]._copy(func=exception_fn)]) + + with pytest.raises(Exception, match="test exception"): + SequentialRunner().run( + test_pipeline, + persistent_dataset_catalog, + hook_manager=_create_hook_manager(), + ) + print(caplog.text) + assert re.search(expected_pattern, caplog.text) class TestMemoryDatasetBehaviour: diff --git a/tests/runner/test_thread_runner.py b/tests/runner/test_thread_runner.py index 7f43fd0f71..ca322c2aff 100644 --- a/tests/runner/test_thread_runner.py +++ b/tests/runner/test_thread_runner.py @@ -4,6 +4,7 @@ from typing import Any import pytest +import re from kedro.framework.hooks import _create_hook_manager from kedro.io import AbstractDataset, DataCatalog, DatasetError, MemoryDataset @@ -217,3 +218,77 @@ def test_release_transcoded(self): # we want to see both datasets being released assert list(log) == [("release", "save"), ("load", "load"), ("release", "load")] + + +class TestSuggestResumeScenario: + @pytest.mark.parametrize( + "failing_node_names,expected_pattern", + [ + (["node1_A", "node1_B"], r"No nodes ran."), + (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), + (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A)"), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), + ], + ) + def test_suggest_resume_scenario( + self, + caplog, + two_branches_crossed_pipeline, + persistent_dataset_catalog, + failing_node_names, + expected_pattern, + ): + nodes = {n.name: n for n in two_branches_crossed_pipeline.nodes} + for name in failing_node_names: + two_branches_crossed_pipeline -= modular_pipeline([nodes[name]]) + two_branches_crossed_pipeline += modular_pipeline( + [nodes[name]._copy(func=exception_fn)] + ) + with pytest.raises(Exception): + ThreadRunner().run( + two_branches_crossed_pipeline, + persistent_dataset_catalog, + hook_manager=_create_hook_manager(), + ) + assert re.search(expected_pattern, caplog.text) + + @pytest.mark.parametrize( + "failing_node_names,expected_pattern", + [ + (["node1_A", "node1_B"], r"No nodes ran."), + (["node2"], r'"node1_A,node1_B"'), + (["node3_A"], r'"node3_A,node3_B"'), + (["node4_A"], r'"node3_A,node3_B"'), + (["node3_A", "node4_A"], r'"node3_A,node3_B"'), + (["node2", "node4_A"], r'"node1_A,node1_B"'), + ], + ) + def test_stricter_suggest_resume_scenario( + self, + caplog, + two_branches_crossed_pipeline_variable_inputs, + persistent_dataset_catalog, + failing_node_names, + expected_pattern, + ): + """ + Stricter version of previous test. + Covers pipelines where inputs are shared across nodes. + """ + test_pipeline = two_branches_crossed_pipeline_variable_inputs + + nodes = {n.name: n for n in test_pipeline.nodes} + for name in failing_node_names: + test_pipeline -= modular_pipeline([nodes[name]]) + test_pipeline += modular_pipeline([nodes[name]._copy(func=exception_fn)]) + + with pytest.raises(Exception, match="test exception"): + ThreadRunner().run( + test_pipeline, + persistent_dataset_catalog, + hook_manager=_create_hook_manager(), + ) + print(caplog.text) + assert re.search(expected_pattern, caplog.text) From 9e2e2782c23aeb7c40d34736a6995e38719cfcf6 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 5 Nov 2024 15:09:37 +0100 Subject: [PATCH 05/20] Small cleanup Signed-off-by: Merel Theisen --- tests/runner/test_sequential_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 8c5998f75f..09eb966387 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -272,7 +272,7 @@ class TestSuggestResumeScenario: (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A)"), + (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), ], From 3a9dde69c17f05fa26143d964e8be54b15be7f85 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 26 Nov 2024 15:49:29 +0100 Subject: [PATCH 06/20] Clean up Signed-off-by: Merel Theisen --- kedro/runner/runner.py | 14 +++++++++++++- tests/runner/test_sequential_runner.py | 1 - tests/runner/test_thread_runner.py | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 2bd3366e8f..d0852b36f1 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -182,11 +182,23 @@ def _run( hook_manager: PluginManager | None = None, session_id: str | None = None, ) -> None: - """Common pipeline execution logic using an executor.""" + """The abstract interface for running pipelines, assuming that the + inputs have already been checked and normalized by run(). + This contains the Common pipeline execution logic using an executor. + + Args: + pipeline: The ``Pipeline`` to run. + catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data. + hook_manager: The ``PluginManager`` to activate hooks. + session_id: The id of the session. + """ + nodes = pipeline.nodes + self._validate_catalog(catalog, pipeline) self._validate_nodes(nodes) self._set_manager_datasets(catalog, pipeline) + load_counts = Counter(chain.from_iterable(n.inputs for n in pipeline.nodes)) node_dependencies = pipeline.node_dependencies todo_nodes = set(node_dependencies.keys()) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 09eb966387..229518ecd4 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -335,7 +335,6 @@ def test_stricter_suggest_resume_scenario( persistent_dataset_catalog, hook_manager=_create_hook_manager(), ) - print(caplog.text) assert re.search(expected_pattern, caplog.text) diff --git a/tests/runner/test_thread_runner.py b/tests/runner/test_thread_runner.py index ca322c2aff..afbdf40f66 100644 --- a/tests/runner/test_thread_runner.py +++ b/tests/runner/test_thread_runner.py @@ -1,10 +1,10 @@ from __future__ import annotations +import re from concurrent.futures import ThreadPoolExecutor from typing import Any import pytest -import re from kedro.framework.hooks import _create_hook_manager from kedro.io import AbstractDataset, DataCatalog, DatasetError, MemoryDataset From af955b8701d14a871b46d0c2000b26af10a3639d Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 26 Nov 2024 15:58:34 +0100 Subject: [PATCH 07/20] Fix mypy checks Signed-off-by: Merel Theisen --- kedro/runner/parallel_runner.py | 2 +- kedro/runner/runner.py | 28 +++++++++++++++++++++------- kedro/runner/sequential_runner.py | 2 +- kedro/runner/thread_runner.py | 2 +- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 1c3a032687..388e3da20a 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -172,7 +172,7 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: return min(required_processes, self._max_workers) - def _get_executor(self, max_workers: int): + def _get_executor(self, max_workers: int) -> ProcessPoolExecutor: return ProcessPoolExecutor(max_workers=max_workers) def _run( diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index d0852b36f1..7e5bf91a6f 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -11,7 +11,12 @@ import warnings from abc import ABC, abstractmethod from collections import Counter, deque -from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait +from concurrent.futures import ( + FIRST_COMPLETED, + ProcessPoolExecutor, + ThreadPoolExecutor, + wait, +) from itertools import chain from typing import TYPE_CHECKING, Any @@ -170,7 +175,9 @@ def run_only_missing( return self.run(to_rerun, catalog, hook_manager) @abstractmethod - def _get_executor(self, max_workers): + def _get_executor( + self, max_workers: int + ) -> ThreadPoolExecutor | ProcessPoolExecutor: """Abstract method to provide the correct executor (e.g., ThreadPoolExecutor or ProcessPoolExecutor).""" pass @@ -241,7 +248,12 @@ def _run( self._release_datasets(node, catalog, load_counts, pipeline) @staticmethod - def _raise_runtime_error(todo_nodes, done_nodes, ready, done): + def _raise_runtime_error( + todo_nodes: set[Node], + done_nodes: set[Node], + ready: set[Node], + done: set[Node] | None, + ) -> None: debug_data = { "todo_nodes": todo_nodes, "done_nodes": done_nodes, @@ -308,19 +320,21 @@ def _release_datasets( if load_counts[dataset] < 1 and dataset not in pipeline.outputs(): catalog.release(dataset) - def _validate_catalog(self, catalog, pipeline): + def _validate_catalog(self, catalog: CatalogProtocol, pipeline: Pipeline) -> None: # Add catalog validation logic here if needed pass - def _validate_nodes(self, nodes): + def _validate_nodes(self, node: Iterable[Node]) -> None: # Add node validation logic here if needed pass - def _set_manager_datasets(self, catalog, pipeline): + def _set_manager_datasets( + self, catalog: CatalogProtocol, pipeline: Pipeline + ) -> None: # Set up any necessary manager datasets here pass - def _get_required_workers_count(self, pipeline): + def _get_required_workers_count(self, pipeline: Pipeline) -> int: return 1 diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index 6301014861..256df77196 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -46,7 +46,7 @@ def __init__( is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns ) - def _get_executor(self, max_workers: int): + def _get_executor(self, max_workers: int) -> ThreadPoolExecutor: return ThreadPoolExecutor( max_workers=1 ) # Single-threaded for sequential execution diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 1d30e35335..6a4b54131f 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -78,7 +78,7 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: else required_threads ) - def _get_executor(self, max_workers: int): + def _get_executor(self, max_workers: int) -> ThreadPoolExecutor: return ThreadPoolExecutor(max_workers=max_workers) def _run( From a2db9cf8a8f544ad9bdbcace05eee85b679cf67f Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 26 Nov 2024 16:21:45 +0100 Subject: [PATCH 08/20] Fix sequential runner test Signed-off-by: Merel Theisen --- tests/runner/test_sequential_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 229518ecd4..06edf17ed7 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -272,7 +272,7 @@ class TestSuggestResumeScenario: (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), ], @@ -305,7 +305,7 @@ def test_suggest_resume_scenario( (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r'"node1_A,node1_B"'), (["node3_A"], r'"node3_A,node3_B"'), - (["node4_A"], r'"node3_A,node3_B"'), + (["node4_A"], r"(node3_A,node3_B|node3_A)"), (["node3_A", "node4_A"], r'"node3_A,node3_B"'), (["node2", "node4_A"], r'"node1_A,node1_B"'), ], From 41154a20d112618c3a6577be03f3f7b31e015d96 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 26 Nov 2024 16:34:03 +0100 Subject: [PATCH 09/20] Fix thread runner Signed-off-by: Merel Theisen --- tests/runner/test_thread_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/runner/test_thread_runner.py b/tests/runner/test_thread_runner.py index e3f5cf07f7..f4f607da66 100644 --- a/tests/runner/test_thread_runner.py +++ b/tests/runner/test_thread_runner.py @@ -259,7 +259,7 @@ class TestSuggestResumeScenario: (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A)"), + (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A|node3_A)"), (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), ], @@ -292,7 +292,7 @@ def test_suggest_resume_scenario( (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r'"node1_A,node1_B"'), (["node3_A"], r'"node3_A,node3_B"'), - (["node4_A"], r'"node3_A,node3_B"'), + (["node4_A"], r"(node3_A,node3_B|node3_A)"), (["node3_A", "node4_A"], r'"node3_A,node3_B"'), (["node2", "node4_A"], r'"node1_A,node1_B"'), ], From 186f84548795390fa6dcd662f63cb85b127fd112 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 26 Nov 2024 16:45:13 +0100 Subject: [PATCH 10/20] Ignore coverage for abstract method Signed-off-by: Merel Theisen --- kedro/runner/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index b84aee8d56..391c17329a 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -180,7 +180,7 @@ def run_only_missing( return self.run(to_rerun, catalog, hook_manager) - @abstractmethod + @abstractmethod # pragma: no cover def _get_executor( self, max_workers: int ) -> ThreadPoolExecutor | ProcessPoolExecutor: From b589fa22799267dca3844c7377feff1631127faf Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 26 Nov 2024 17:01:20 +0100 Subject: [PATCH 11/20] Try fix thread runner test on 3.13 Signed-off-by: Merel Theisen --- tests/runner/test_thread_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/test_thread_runner.py b/tests/runner/test_thread_runner.py index f4f607da66..5446228635 100644 --- a/tests/runner/test_thread_runner.py +++ b/tests/runner/test_thread_runner.py @@ -260,7 +260,7 @@ class TestSuggestResumeScenario: (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A|node3_A)"), - (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), ], ) From 00ca9500167eba0d37b2f0fa1dc496703a3735d7 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 26 Nov 2024 17:15:31 +0100 Subject: [PATCH 12/20] Fix thread runner test Signed-off-by: Merel Theisen --- tests/runner/test_thread_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/test_thread_runner.py b/tests/runner/test_thread_runner.py index 5446228635..5b84372793 100644 --- a/tests/runner/test_thread_runner.py +++ b/tests/runner/test_thread_runner.py @@ -293,7 +293,7 @@ def test_suggest_resume_scenario( (["node2"], r'"node1_A,node1_B"'), (["node3_A"], r'"node3_A,node3_B"'), (["node4_A"], r"(node3_A,node3_B|node3_A)"), - (["node3_A", "node4_A"], r'"node3_A,node3_B"'), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_A)"), (["node2", "node4_A"], r'"node1_A,node1_B"'), ], ) From 33b3ec663c178d66de6791aa89d563663d4df21a Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 26 Nov 2024 17:31:17 +0100 Subject: [PATCH 13/20] Fix sequential runner test on windows Signed-off-by: Merel Theisen --- tests/runner/test_sequential_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 06edf17ed7..c8edc75465 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -306,7 +306,7 @@ def test_suggest_resume_scenario( (["node2"], r'"node1_A,node1_B"'), (["node3_A"], r'"node3_A,node3_B"'), (["node4_A"], r"(node3_A,node3_B|node3_A)"), - (["node3_A", "node4_A"], r'"node3_A,node3_B"'), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_A)"), (["node2", "node4_A"], r'"node1_A,node1_B"'), ], ) From 351e0efa64c6e7bc7492394641546852e88d53d1 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 26 Nov 2024 17:38:48 +0100 Subject: [PATCH 14/20] More flexible options for resume suggestion in thread runner tests Signed-off-by: Merel Theisen --- tests/runner/test_thread_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/runner/test_thread_runner.py b/tests/runner/test_thread_runner.py index 5b84372793..2c870240cf 100644 --- a/tests/runner/test_thread_runner.py +++ b/tests/runner/test_thread_runner.py @@ -258,7 +258,7 @@ class TestSuggestResumeScenario: [ (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), - (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A|node3_A)"), (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), @@ -291,7 +291,7 @@ def test_suggest_resume_scenario( [ (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r'"node1_A,node1_B"'), - (["node3_A"], r'"node3_A,node3_B"'), + (["node3_A"], r"(node3_A,node3_B|node3_A)"), (["node4_A"], r"(node3_A,node3_B|node3_A)"), (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_A)"), (["node2", "node4_A"], r'"node1_A,node1_B"'), From 6659132aabb96fd00916ae7947814396ccba31ee Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Wed, 27 Nov 2024 11:20:49 +0100 Subject: [PATCH 15/20] Clean up + make resume tests the same Signed-off-by: Merel Theisen --- kedro/runner/thread_runner.py | 7 ++++++- tests/runner/test_sequential_runner.py | 8 ++++---- tests/runner/test_thread_runner.py | 1 - 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 6a4b54131f..1b41b18864 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -100,4 +100,9 @@ def _run( Exception: in case of any downstream node failure. """ - super()._run(pipeline, catalog, hook_manager, session_id) + super()._run( + pipeline=pipeline, + catalog=catalog, + hook_manager=hook_manager, + session_id=session_id, + ) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index c8edc75465..142977cde1 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -271,9 +271,9 @@ class TestSuggestResumeScenario: [ (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), - (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), - (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), + (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A|node3_A)"), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), ], ) @@ -304,7 +304,7 @@ def test_suggest_resume_scenario( [ (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r'"node1_A,node1_B"'), - (["node3_A"], r'"node3_A,node3_B"'), + (["node3_A"], r"(node3_A,node3_B|node3_A)"), (["node4_A"], r"(node3_A,node3_B|node3_A)"), (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_A)"), (["node2", "node4_A"], r'"node1_A,node1_B"'), diff --git a/tests/runner/test_thread_runner.py b/tests/runner/test_thread_runner.py index 2c870240cf..806bd774da 100644 --- a/tests/runner/test_thread_runner.py +++ b/tests/runner/test_thread_runner.py @@ -322,5 +322,4 @@ def test_stricter_suggest_resume_scenario( persistent_dataset_catalog, hook_manager=_create_hook_manager(), ) - print(caplog.text) assert re.search(expected_pattern, caplog.text) From 2fa4fa8430983b96012b4b2fa96fda1d9ce1ccab Mon Sep 17 00:00:00 2001 From: Merel Theisen <49397448+merelcht@users.noreply.github.com> Date: Wed, 27 Nov 2024 11:22:27 +0100 Subject: [PATCH 16/20] Update tests/runner/test_sequential_runner.py Signed-off-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> --- tests/runner/test_sequential_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 142977cde1..e2424fbec6 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -272,7 +272,7 @@ class TestSuggestResumeScenario: (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), - (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A|node3_A)"), + (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), ], From b3268cef29c96d57682065aa546455b3814a6a21 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Wed, 27 Nov 2024 11:23:32 +0100 Subject: [PATCH 17/20] Clean up Signed-off-by: Merel Theisen --- tests/runner/test_thread_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/test_thread_runner.py b/tests/runner/test_thread_runner.py index 806bd774da..8da85624e0 100644 --- a/tests/runner/test_thread_runner.py +++ b/tests/runner/test_thread_runner.py @@ -259,7 +259,7 @@ class TestSuggestResumeScenario: (["node1_A", "node1_B"], r"No nodes ran."), (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), - (["node4_A"], r"(node3_A,node3_C|node3_C,node3_A|node3_A)"), + (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A|node3_A)"), (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), ], From a990155f2e3fb70a0bcc8d6d066f5e77745adcf6 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 3 Dec 2024 11:59:23 +0000 Subject: [PATCH 18/20] Address review comments Signed-off-by: Merel Theisen --- kedro/runner/parallel_runner.py | 4 +-- kedro/runner/runner.py | 59 +++++++++++++++------------------ kedro/runner/thread_runner.py | 4 +-- 3 files changed, 30 insertions(+), 37 deletions(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 388e3da20a..99390e144f 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -16,7 +16,7 @@ MemoryDataset, SharedMemoryDataset, ) -from kedro.runner.runner import AbstractRunner, validate_max_workers +from kedro.runner.runner import AbstractRunner if TYPE_CHECKING: from collections.abc import Iterable @@ -75,7 +75,7 @@ def __init__( self._manager = ParallelRunnerManager() self._manager.start() - self._max_workers = validate_max_workers(max_workers) + self._max_workers = self._validate_max_workers(max_workers) def __del__(self) -> None: self._manager.shutdown() diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 391c17329a..f882e37249 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -11,12 +11,7 @@ import warnings from abc import ABC, abstractmethod from collections import Counter, deque -from concurrent.futures import ( - FIRST_COMPLETED, - ProcessPoolExecutor, - ThreadPoolExecutor, - wait, -) +from concurrent.futures import FIRST_COMPLETED, Executor, ProcessPoolExecutor, wait from itertools import chain from typing import TYPE_CHECKING, Any @@ -181,9 +176,7 @@ def run_only_missing( return self.run(to_rerun, catalog, hook_manager) @abstractmethod # pragma: no cover - def _get_executor( - self, max_workers: int - ) -> ThreadPoolExecutor | ProcessPoolExecutor: + def _get_executor(self, max_workers: int) -> Executor: """Abstract method to provide the correct executor (e.g., ThreadPoolExecutor or ProcessPoolExecutor).""" pass @@ -343,6 +336,30 @@ def _set_manager_datasets( def _get_required_workers_count(self, pipeline: Pipeline) -> int: return 1 + @classmethod + def _validate_max_workers(cls, max_workers: int | None) -> int: + """ + Validates and returns the number of workers. Sets to os.cpu_count() or 1 if max_workers is None, + and limits max_workers to 61 on Windows. + + Args: + max_workers: Desired number of workers. If None, defaults to os.cpu_count() or 1. + + Returns: + A valid number of workers to use. + + Raises: + ValueError: If max_workers is set and is not positive. + """ + if max_workers is None: + max_workers = os.cpu_count() or 1 + if sys.platform == "win32": + max_workers = min(_MAX_WINDOWS_WORKERS, max_workers) + elif max_workers <= 0: + raise ValueError("max_workers should be positive") + + return max_workers + def _find_nodes_to_resume_from( pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: CatalogProtocol @@ -546,27 +563,3 @@ def run_node( ) node = task.execute() return node - - -def validate_max_workers(max_workers: int | None) -> int: - """ - Validates and returns the number of workers. Sets to os.cpu_count() or 1 if max_workers is None, - and limits max_workers to 61 on Windows. - - Args: - max_workers: Desired number of workers. If None, defaults to os.cpu_count() or 1. - - Returns: - A valid number of workers to use. - - Raises: - ValueError: If max_workers is set and is not positive. - """ - if max_workers is None: - max_workers = os.cpu_count() or 1 - if sys.platform == "win32": - max_workers = min(_MAX_WINDOWS_WORKERS, max_workers) - elif max_workers <= 0: - raise ValueError("max_workers should be positive") - - return max_workers diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 1b41b18864..dd6c3223ea 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -9,7 +9,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Any -from kedro.runner.runner import AbstractRunner, validate_max_workers +from kedro.runner.runner import AbstractRunner if TYPE_CHECKING: from pluggy import PluginManager @@ -59,7 +59,7 @@ def __init__( is_async=False, extra_dataset_patterns=self._extra_dataset_patterns ) - self._max_workers = validate_max_workers(max_workers) + self._max_workers = self._validate_max_workers(max_workers) def _get_required_workers_count(self, pipeline: Pipeline) -> int: """ From 1c76953da318521df12e90dd6a2f41f39fe59e1d Mon Sep 17 00:00:00 2001 From: Merel Theisen <49397448+merelcht@users.noreply.github.com> Date: Tue, 10 Dec 2024 15:35:10 +0100 Subject: [PATCH 19/20] Apply suggestions from code review Co-authored-by: Ivan Danov Signed-off-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> --- kedro/runner/parallel_runner.py | 2 +- kedro/runner/sequential_runner.py | 2 +- kedro/runner/thread_runner.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 99390e144f..2a1605c812 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -172,7 +172,7 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: return min(required_processes, self._max_workers) - def _get_executor(self, max_workers: int) -> ProcessPoolExecutor: + def _get_executor(self, max_workers: int) -> Executor: return ProcessPoolExecutor(max_workers=max_workers) def _run( diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index 256df77196..1714c43ce0 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -46,7 +46,7 @@ def __init__( is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns ) - def _get_executor(self, max_workers: int) -> ThreadPoolExecutor: + def _get_executor(self, max_workers: int) -> Executor: return ThreadPoolExecutor( max_workers=1 ) # Single-threaded for sequential execution diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index dd6c3223ea..a2ab6929f9 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -78,7 +78,7 @@ def _get_required_workers_count(self, pipeline: Pipeline) -> int: else required_threads ) - def _get_executor(self, max_workers: int) -> ThreadPoolExecutor: + def _get_executor(self, max_workers: int) -> Executor: return ThreadPoolExecutor(max_workers=max_workers) def _run( From be2e2850678e66ec3dfa3b284df80edf332b36ca Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 10 Dec 2024 15:36:48 +0100 Subject: [PATCH 20/20] Fix lint Signed-off-by: Merel Theisen --- kedro/runner/parallel_runner.py | 2 +- kedro/runner/sequential_runner.py | 1 + kedro/runner/thread_runner.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 2a1605c812..4dba5fc774 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -4,7 +4,7 @@ from __future__ import annotations -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import Executor, ProcessPoolExecutor from multiprocessing.managers import BaseProxy, SyncManager from multiprocessing.reduction import ForkingPickler from pickle import PicklingError diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index 1714c43ce0..8e0fc92377 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -6,6 +6,7 @@ from __future__ import annotations from concurrent.futures import ( + Executor, ThreadPoolExecutor, ) from typing import TYPE_CHECKING, Any diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index a2ab6929f9..b0194165b7 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -6,7 +6,7 @@ from __future__ import annotations import warnings -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Executor, ThreadPoolExecutor from typing import TYPE_CHECKING, Any from kedro.runner.runner import AbstractRunner