diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index ee744d5871..f19846b71b 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -332,11 +332,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression break # pragma: no cover done, futures = wait(futures, return_when=FIRST_COMPLETED) for future in done: - try: - node = future.result() - except Exception: - self._suggest_resume_scenario(pipeline, done_nodes) - raise + node = future.result() done_nodes.add(node) # Decrement load counts, and release any datasets we diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 79ffd10d48..70f6b127e8 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -4,6 +4,7 @@ import logging from abc import ABC, abstractmethod +from collections import deque from concurrent.futures import ( ALL_COMPLETED, Future, @@ -11,12 +12,12 @@ as_completed, wait, ) -from typing import Any, Dict, Iterable +from typing import Any, Dict, Iterable, List, Set from pluggy import PluginManager from kedro.framework.hooks.manager import _NullPluginManager -from kedro.io import AbstractDataSet, DataCatalog +from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet from kedro.pipeline import Pipeline from kedro.pipeline.node import Node @@ -157,31 +158,123 @@ def create_default_data_set(self, ds_name: str) -> AbstractDataSet: Returns: An instance of an implementation of ``AbstractDataSet`` to be used for all unregistered datasets. - """ pass def _suggest_resume_scenario( - self, pipeline: Pipeline, done_nodes: Iterable[Node] + self, + pipeline: Pipeline, + done_nodes: Iterable[Node], + catalog: DataCatalog, ) -> None: + """ + Suggest a command to the user to resume a run after it fails. + The run should be started from the point closest to the failure + for which persisted input exists. + + Args: + pipeline: the ``Pipeline`` of the run. + done_nodes: the ``Node``s that executed successfully. + catalog: the ``DataCatalog`` of the run. + + """ remaining_nodes = set(pipeline.nodes) - set(done_nodes) postfix = "" if done_nodes: node_names = (n.name for n in remaining_nodes) resume_p = pipeline.only_nodes(*node_names) - start_p = resume_p.only_nodes_with_inputs(*resume_p.inputs()) - start_node_names = (n.name for n in start_p.nodes) + + # find the nearest persistent ancestors of the nodes in start_p + start_p_persistent_ancestors = _find_persistent_ancestors( + pipeline, start_p.nodes, catalog + ) + + start_node_names = (n.name for n in start_p_persistent_ancestors) postfix += f" --from-nodes \"{','.join(start_node_names)}\"" - self._logger.warning( - "There are %d nodes that have not run.\n" - "You can resume the pipeline run by adding the following " - "argument to your previous command:\n%s", - len(remaining_nodes), - postfix, - ) + if not postfix: + self._logger.warning( + "No nodes ran. Repeat the previous command to attempt a new run." + ) + else: + self._logger.warning( + "There are %d nodes that have not run.\n" + "You can resume the pipeline run from the nearest nodes with " + "persisted inputs by adding the following " + "argument to your previous command:\n%s", + len(remaining_nodes), + postfix, + ) + + +def _find_persistent_ancestors( + pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog +) -> Set[Node]: + """Breadth-first search approach to finding the complete set of + persistent ancestors of an iterable of ``Node``s. Persistent + ancestors exclusively have persisted ``Dataset``s as inputs. + + Args: + pipeline: the ``Pipeline`` to find ancestors in. + children: the iterable containing ``Node``s to find ancestors of. + catalog: the ``DataCatalog`` of the run. + + Returns: + A set containing first persistent ancestors of the given + ``Node``s. + + """ + ancestor_nodes_to_run = set() + queue, visited = deque(children), set(children) + while queue: + current_node = queue.popleft() + if _has_persistent_inputs(current_node, catalog): + ancestor_nodes_to_run.add(current_node) + continue + for parent in _enumerate_parents(pipeline, current_node): + if parent in visited: + continue + visited.add(parent) + queue.append(parent) + return ancestor_nodes_to_run + + +def _enumerate_parents(pipeline: Pipeline, child: Node) -> List[Node]: + """For a given ``Node``, returns a list containing the direct parents + of that ``Node`` in the given ``Pipeline``. + + Args: + pipeline: the ``Pipeline`` to search for direct parents in. + child: the ``Node`` to find parents of. + + Returns: + A list of all ``Node``s that are direct parents of ``child``. + + """ + parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) + return parent_pipeline.nodes + + +def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool: + """Check if a ``Node`` exclusively has persisted Datasets as inputs. + If at least one input is a ``MemoryDataSet``, return False. + + Args: + node: the ``Node`` to check the inputs of. + catalog: the ``DataCatalog`` of the run. + + Returns: + True if the ``Node`` being checked exclusively has inputs that + are not ``MemoryDataSet``, else False. + + """ + for node_input in node.inputs: + # pylint: disable=protected-access + if isinstance(catalog._data_sets[node_input], MemoryDataSet): + return False + return True def run_node( diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index dcad9324fe..6a9becb868 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -70,7 +70,7 @@ def _run( run_node(node, catalog, hook_manager, self._is_async, session_id) done_nodes.add(node) except Exception: - self._suggest_resume_scenario(pipeline, done_nodes) + self._suggest_resume_scenario(pipeline, done_nodes, catalog) raise # decrement load counts and release any data sets we've finished with diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 57136f67e0..6e34484860 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -131,7 +131,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression try: node = future.result() except Exception: - self._suggest_resume_scenario(pipeline, done_nodes) + self._suggest_resume_scenario(pipeline, done_nodes, catalog) raise done_nodes.add(node) self._logger.info("Completed node: %s", node.name) diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index ddde22059a..26d46516de 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -1,9 +1,10 @@ from random import random +import pandas as pd import pytest -from kedro.io import DataCatalog -from kedro.pipeline import Pipeline, node +from kedro.io import DataCatalog, LambdaDataSet, MemoryDataSet +from kedro.pipeline import node, pipeline def source(): @@ -22,7 +23,7 @@ def fan_in(*args): return args -def exception_fn(arg): +def exception_fn(*args): raise Exception("test exception") @@ -35,14 +36,58 @@ def return_not_serialisable(arg): # pylint: disable=unused-argument return lambda x: x +def multi_input_list_output(arg1, arg2): + return [arg1, arg2] + + +@pytest.fixture +def conflicting_feed_dict(pandas_df_feed_dict): + ds1 = MemoryDataSet({"data": 0}) + ds3 = pandas_df_feed_dict["ds3"] + return {"ds1": ds1, "ds3": ds3} + + +@pytest.fixture +def pandas_df_feed_dict(): + pandas_df = pd.DataFrame({"Name": ["Alex", "Bob"], "Age": [15, 25]}) + return {"ds3": pandas_df} + + @pytest.fixture def catalog(): return DataCatalog() +@pytest.fixture +def memory_catalog(): + ds1 = MemoryDataSet({"data": 42}) + ds2 = MemoryDataSet([1, 2, 3, 4, 5]) + return DataCatalog({"ds1": ds1, "ds2": ds2}) + + +@pytest.fixture +def persistent_dataset_catalog(): + def _load(): + return 0 + + # pylint: disable=unused-argument + def _save(arg): + pass + + persistent_dataset = LambdaDataSet(load=_load, save=_save) + return DataCatalog( + { + "ds0_A": persistent_dataset, + "ds0_B": persistent_dataset, + "ds2_A": persistent_dataset, + "ds2_B": persistent_dataset, + } + ) + + @pytest.fixture def fan_out_fan_in(): - return Pipeline( + return pipeline( [ node(identity, "A", "B"), node(identity, "B", "C"), @@ -56,7 +101,7 @@ def fan_out_fan_in(): @pytest.fixture def branchless_no_input_pipeline(): """The pipeline runs in the order A->B->C->D->E.""" - return Pipeline( + return pipeline( [ node(identity, "D", "E", name="node1"), node(identity, "C", "D", name="node2"), @@ -69,7 +114,7 @@ def branchless_no_input_pipeline(): @pytest.fixture def branchless_pipeline(): - return Pipeline( + return pipeline( [ node(identity, "ds1", "ds2", name="node1"), node(identity, "ds2", "ds3", name="node2"), @@ -79,11 +124,45 @@ def branchless_pipeline(): @pytest.fixture def saving_result_pipeline(): - return Pipeline([node(identity, "ds", "dsX")]) + return pipeline([node(identity, "ds", "dsX")]) @pytest.fixture def saving_none_pipeline(): - return Pipeline( + return pipeline( [node(random, None, "A"), node(return_none, "A", "B"), node(identity, "B", "C")] ) + + +@pytest.fixture +def unfinished_outputs_pipeline(): + return pipeline( + [ + node(identity, dict(arg="ds4"), "ds8", name="node1"), + node(sink, "ds7", None, name="node2"), + node(multi_input_list_output, ["ds3", "ds4"], ["ds6", "ds7"], name="node3"), + node(identity, "ds2", "ds5", name="node4"), + node(identity, "ds1", "ds4", name="node5"), + ] + ) # Outputs: ['ds8', 'ds5', 'ds6'] == ['ds1', 'ds2', 'ds3'] + + +@pytest.fixture +def two_branches_crossed_pipeline(): + """A ``Pipeline`` with an X-shape (two branches with one common node)""" + return pipeline( + [ + node(identity, "ds0_A", "ds1_A", name="node1_A"), + node(identity, "ds0_B", "ds1_B", name="node1_B"), + node( + multi_input_list_output, + ["ds1_A", "ds1_B"], + ["ds2_A", "ds2_B"], + name="node2", + ), + node(identity, "ds2_A", "ds3_A", name="node3_A"), + node(identity, "ds2_B", "ds3_B", name="node3_B"), + node(identity, "ds3_A", "ds4_A", name="node4_A"), + node(identity, "ds3_B", "ds4_B", name="node4_B"), + ] + ) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 6bcc01a139..0b014ec0f0 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -1,43 +1,14 @@ +import re from typing import Any, Dict import pandas as pd import pytest from kedro.framework.hooks import _create_hook_manager -from kedro.io import ( - AbstractDataSet, - DataCatalog, - DataSetError, - LambdaDataSet, - MemoryDataSet, -) -from kedro.pipeline import Pipeline, node +from kedro.io import AbstractDataSet, DataCatalog, DataSetError, LambdaDataSet +from kedro.pipeline import node, pipeline from kedro.runner import SequentialRunner -from tests.runner.conftest import identity, sink, source - - -@pytest.fixture -def memory_catalog(): - ds1 = MemoryDataSet({"data": 42}) - ds2 = MemoryDataSet([1, 2, 3, 4, 5]) - return DataCatalog({"ds1": ds1, "ds2": ds2}) - - -@pytest.fixture -def pandas_df_feed_dict(): - pandas_df = pd.DataFrame({"Name": ["Alex", "Bob"], "Age": [15, 25]}) - return {"ds3": pandas_df} - - -@pytest.fixture -def conflicting_feed_dict(pandas_df_feed_dict): - ds1 = MemoryDataSet({"data": 0}) - ds3 = pandas_df_feed_dict["ds3"] - return {"ds1": ds1, "ds3": ds3} - - -def multi_input_list_output(arg1, arg2): - return [arg1, arg2] +from tests.runner.conftest import exception_fn, identity, sink, source class TestValidSequentialRunner: @@ -105,21 +76,8 @@ def _save(arg): assert output == {} -@pytest.fixture -def unfinished_outputs_pipeline(): - return Pipeline( - [ - node(identity, dict(arg="ds4"), "ds8", name="node1"), - node(sink, "ds7", None, name="node2"), - node(multi_input_list_output, ["ds3", "ds4"], ["ds6", "ds7"], name="node3"), - node(identity, "ds2", "ds5", name="node4"), - node(identity, "ds1", "ds4", name="node5"), - ] - ) # Outputs: ['ds8', 'ds5', 'ds6'] == ['ds1', 'ds2', 'ds3'] - - @pytest.mark.parametrize("is_async", [False, True]) -class TestSeqentialRunnerBranchedPipeline: +class TestSequentialRunnerBranchedPipeline: def test_input_seq( self, is_async, @@ -189,7 +147,7 @@ def _describe(self) -> Dict[str, Any]: class TestSequentialRunnerRelease: def test_dont_release_inputs_and_outputs(self, is_async): log = [] - pipeline = Pipeline( + test_pipeline = pipeline( [node(identity, "in", "middle"), node(identity, "middle", "out")] ) catalog = DataCatalog( @@ -199,14 +157,14 @@ def test_dont_release_inputs_and_outputs(self, is_async): "out": LoggingDataSet(log, "out"), } ) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) # we don't want to see release in or out in here assert log == [("load", "in"), ("load", "middle"), ("release", "middle")] def test_release_at_earliest_opportunity(self, is_async): log = [] - pipeline = Pipeline( + test_pipeline = pipeline( [ node(source, None, "first"), node(identity, "first", "second"), @@ -219,7 +177,7 @@ def test_release_at_earliest_opportunity(self, is_async): "second": LoggingDataSet(log, "second"), } ) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) # we want to see "release first" before "load second" assert log == [ @@ -231,7 +189,7 @@ def test_release_at_earliest_opportunity(self, is_async): def test_count_multiple_loads(self, is_async): log = [] - pipeline = Pipeline( + test_pipeline = pipeline( [ node(source, None, "dataset"), node(sink, "dataset", None, name="bob"), @@ -239,14 +197,14 @@ def test_count_multiple_loads(self, is_async): ] ) catalog = DataCatalog({"dataset": LoggingDataSet(log, "dataset")}) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) # we want to the release after both the loads assert log == [("load", "dataset"), ("load", "dataset"), ("release", "dataset")] def test_release_transcoded(self, is_async): log = [] - pipeline = Pipeline( + test_pipeline = pipeline( [node(source, None, "ds@save"), node(sink, "ds@load", None)] ) catalog = DataCatalog( @@ -256,16 +214,16 @@ def test_release_transcoded(self, is_async): } ) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) # we want to see both datasets being released assert log == [("release", "save"), ("load", "load"), ("release", "load")] @pytest.mark.parametrize( - "pipeline", + "test_pipeline", [ - Pipeline([node(identity, "ds1", "ds2", confirms="ds1")]), - Pipeline( + pipeline([node(identity, "ds1", "ds2", confirms="ds1")]), + pipeline( [ node(identity, "ds1", "ds2"), node(identity, "ds2", None, confirms="ds1"), @@ -273,8 +231,43 @@ def test_release_transcoded(self, is_async): ), ], ) - def test_confirms(self, mocker, pipeline, is_async): + def test_confirms(self, mocker, test_pipeline, is_async): fake_dataset_instance = mocker.Mock() catalog = DataCatalog(data_sets={"ds1": fake_dataset_instance}) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) fake_dataset_instance.confirm.assert_called_once_with() + + +@pytest.mark.parametrize( + "failing_node_names,expected_pattern", + [ + (["node1_A"], r"No nodes ran."), + (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), + (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), + ], +) +class TestSuggestResumeScenario: + def test_suggest_resume_scenario( + self, + caplog, + two_branches_crossed_pipeline, + persistent_dataset_catalog, + failing_node_names, + expected_pattern, + ): + nodes = {n.name: n for n in two_branches_crossed_pipeline.nodes} + for name in failing_node_names: + two_branches_crossed_pipeline -= pipeline([nodes[name]]) + two_branches_crossed_pipeline += pipeline( + [nodes[name]._copy(func=exception_fn)] + ) + with pytest.raises(Exception): + SequentialRunner().run( + two_branches_crossed_pipeline, + persistent_dataset_catalog, + hook_manager=_create_hook_manager(), + ) + assert re.search(expected_pattern, caplog.text)