Skip to content

Commit

Permalink
Improve resume pipeline suggestion (kedro-org#1795)
Browse files Browse the repository at this point in the history
* Add _find_first_persistent_ancestors and stubs for supporting functions.

Signed-off-by: Jannic Holzer <[email protected]>

* Add body to _enumerate_parents.

Signed-off-by: Jannic Holzer <[email protected]>

* Add function to check persistence of node outputs.

Signed-off-by: Jannic Holzer <[email protected]>

* Modify _suggest_resume_scenario to use _find_first_persistent_ancestors

Signed-off-by: Jannic Holzer <[email protected]>

* Pass catalog to self._suggest_resume_scenario

Signed-off-by: Jannic Holzer <[email protected]>

* Track and return all ancestor nodes that must be re-run during DFS.

Signed-off-by: Jannic Holzer <[email protected]>

* Integrate DFS with original _suggest_resume_scenario.

Signed-off-by: Jannic Holzer <[email protected]>

* Implement backwards-DFS strategy on all boundary nodes.

Signed-off-by: Jannic Holzer <[email protected]>

* Switch to multi-node start BFS approach to finding persistent ancestors.

Signed-off-by: Jannic Holzer <[email protected]>

* Add a useful error message if no nodes ran.

Signed-off-by: Jannic Holzer <[email protected]>

* Add docstrings to new functions.

Signed-off-by: Jannic Holzer <[email protected]>

* Add catalog argument to self._suggest_resume_scenario

Signed-off-by: Jannic Holzer <[email protected]>

* Modify exception_fn to allow it to take multiple arguments

Signed-off-by: Jannic Holzer <[email protected]>

* Add test for AbstractRunner._suggest_resume_scenario

Signed-off-by: Jannic Holzer <[email protected]>

* Add docstring for _suggest_resume_scenario

Signed-off-by: Jannic Holzer <[email protected]>

* Improve formatting

Signed-off-by: Jannic Holzer <[email protected]>

* Move new functions out of AbstractRunner

Signed-off-by: Jannic Holzer <[email protected]>

* Remove bare except

Signed-off-by: Jannic Holzer <[email protected]>

* Fix broad except clause

Signed-off-by: Jannic Holzer <[email protected]>

* Access datasets __dict__ using vars()

Signed-off-by: Jannic Holzer <[email protected]>

* Sort imports

Signed-off-by: Jannic Holzer <[email protected]>

* Improve resume message

Signed-off-by: Jannic Holzer <[email protected]>

* Add a space to resume suggestion message

Signed-off-by: Jannic Holzer <[email protected]>

* Modify DFS logic to eliminate possible queue duplicates

Signed-off-by: Jannic Holzer <[email protected]>

* Modify catalog.datasets to catalog._data_sets w/ disabled linter warning

Signed-off-by: Jannic Holzer <[email protected]>

* Move all pytest fixtures to conftest.py

Signed-off-by: Jannic Holzer <[email protected]>

* Modify all instances of Pipeline to pipeline

Signed-off-by: Jannic Holzer <[email protected]>

* Fix typo in the name of TestSequentialRunnerBranchedPipeline

Signed-off-by: Jannic Holzer <[email protected]>

* Remove spurious assert in save of persistent_dataset_catalog

Signed-off-by: Jannic Holzer <[email protected]>

* Replace instantiations of Pipeline with pipeline

Signed-off-by: Jannic Holzer <[email protected]>

* Modify test_suggest_resume_scenario fixture to use node names

Signed-off-by: Jannic Holzer <[email protected]>

* Add disable=unused-argument to _save

Signed-off-by: Jannic Holzer <[email protected]>

* Remove resume suggestion for ParallelRunner

Signed-off-by: Jannic Holzer <[email protected]>

* Remove spurious try / except

Signed-off-by: Jannic Holzer <[email protected]>

Signed-off-by: Jannic Holzer <[email protected]>
Signed-off-by: nickolasrm <[email protected]>
  • Loading branch information
jmholzer authored and nickolasrm committed Oct 26, 2022
1 parent 0bd618c commit 2c819a5
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 88 deletions.
6 changes: 1 addition & 5 deletions kedro/runner/parallel_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression
break # pragma: no cover
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
try:
node = future.result()
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes)
raise
node = future.result()
done_nodes.add(node)

# Decrement load counts, and release any datasets we
Expand Down
119 changes: 106 additions & 13 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

import logging
from abc import ABC, abstractmethod
from collections import deque
from concurrent.futures import (
ALL_COMPLETED,
Future,
ThreadPoolExecutor,
as_completed,
wait,
)
from typing import Any, Dict, Iterable
from typing import Any, Dict, Iterable, List, Set

from pluggy import PluginManager

from kedro.framework.hooks.manager import _NullPluginManager
from kedro.io import AbstractDataSet, DataCatalog
from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node

Expand Down Expand Up @@ -157,31 +158,123 @@ def create_default_data_set(self, ds_name: str) -> AbstractDataSet:
Returns:
An instance of an implementation of ``AbstractDataSet`` to be
used for all unregistered datasets.
"""
pass

def _suggest_resume_scenario(
self, pipeline: Pipeline, done_nodes: Iterable[Node]
self,
pipeline: Pipeline,
done_nodes: Iterable[Node],
catalog: DataCatalog,
) -> None:
"""
Suggest a command to the user to resume a run after it fails.
The run should be started from the point closest to the failure
for which persisted input exists.
Args:
pipeline: the ``Pipeline`` of the run.
done_nodes: the ``Node``s that executed successfully.
catalog: the ``DataCatalog`` of the run.
"""
remaining_nodes = set(pipeline.nodes) - set(done_nodes)

postfix = ""
if done_nodes:
node_names = (n.name for n in remaining_nodes)
resume_p = pipeline.only_nodes(*node_names)

start_p = resume_p.only_nodes_with_inputs(*resume_p.inputs())
start_node_names = (n.name for n in start_p.nodes)

# find the nearest persistent ancestors of the nodes in start_p
start_p_persistent_ancestors = _find_persistent_ancestors(
pipeline, start_p.nodes, catalog
)

start_node_names = (n.name for n in start_p_persistent_ancestors)
postfix += f" --from-nodes \"{','.join(start_node_names)}\""

self._logger.warning(
"There are %d nodes that have not run.\n"
"You can resume the pipeline run by adding the following "
"argument to your previous command:\n%s",
len(remaining_nodes),
postfix,
)
if not postfix:
self._logger.warning(
"No nodes ran. Repeat the previous command to attempt a new run."
)
else:
self._logger.warning(
"There are %d nodes that have not run.\n"
"You can resume the pipeline run from the nearest nodes with "
"persisted inputs by adding the following "
"argument to your previous command:\n%s",
len(remaining_nodes),
postfix,
)


def _find_persistent_ancestors(
pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog
) -> Set[Node]:
"""Breadth-first search approach to finding the complete set of
persistent ancestors of an iterable of ``Node``s. Persistent
ancestors exclusively have persisted ``Dataset``s as inputs.
Args:
pipeline: the ``Pipeline`` to find ancestors in.
children: the iterable containing ``Node``s to find ancestors of.
catalog: the ``DataCatalog`` of the run.
Returns:
A set containing first persistent ancestors of the given
``Node``s.
"""
ancestor_nodes_to_run = set()
queue, visited = deque(children), set(children)
while queue:
current_node = queue.popleft()
if _has_persistent_inputs(current_node, catalog):
ancestor_nodes_to_run.add(current_node)
continue
for parent in _enumerate_parents(pipeline, current_node):
if parent in visited:
continue
visited.add(parent)
queue.append(parent)
return ancestor_nodes_to_run


def _enumerate_parents(pipeline: Pipeline, child: Node) -> List[Node]:
"""For a given ``Node``, returns a list containing the direct parents
of that ``Node`` in the given ``Pipeline``.
Args:
pipeline: the ``Pipeline`` to search for direct parents in.
child: the ``Node`` to find parents of.
Returns:
A list of all ``Node``s that are direct parents of ``child``.
"""
parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs)
return parent_pipeline.nodes


def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool:
"""Check if a ``Node`` exclusively has persisted Datasets as inputs.
If at least one input is a ``MemoryDataSet``, return False.
Args:
node: the ``Node`` to check the inputs of.
catalog: the ``DataCatalog`` of the run.
Returns:
True if the ``Node`` being checked exclusively has inputs that
are not ``MemoryDataSet``, else False.
"""
for node_input in node.inputs:
# pylint: disable=protected-access
if isinstance(catalog._data_sets[node_input], MemoryDataSet):
return False
return True


def run_node(
Expand Down
2 changes: 1 addition & 1 deletion kedro/runner/sequential_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _run(
run_node(node, catalog, hook_manager, self._is_async, session_id)
done_nodes.add(node)
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes)
self._suggest_resume_scenario(pipeline, done_nodes, catalog)
raise

# decrement load counts and release any data sets we've finished with
Expand Down
2 changes: 1 addition & 1 deletion kedro/runner/thread_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression
try:
node = future.result()
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes)
self._suggest_resume_scenario(pipeline, done_nodes, catalog)
raise
done_nodes.add(node)
self._logger.info("Completed node: %s", node.name)
Expand Down
95 changes: 87 additions & 8 deletions tests/runner/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from random import random

import pandas as pd
import pytest

from kedro.io import DataCatalog
from kedro.pipeline import Pipeline, node
from kedro.io import DataCatalog, LambdaDataSet, MemoryDataSet
from kedro.pipeline import node, pipeline


def source():
Expand All @@ -22,7 +23,7 @@ def fan_in(*args):
return args


def exception_fn(arg):
def exception_fn(*args):
raise Exception("test exception")


Expand All @@ -35,14 +36,58 @@ def return_not_serialisable(arg): # pylint: disable=unused-argument
return lambda x: x


def multi_input_list_output(arg1, arg2):
return [arg1, arg2]


@pytest.fixture
def conflicting_feed_dict(pandas_df_feed_dict):
ds1 = MemoryDataSet({"data": 0})
ds3 = pandas_df_feed_dict["ds3"]
return {"ds1": ds1, "ds3": ds3}


@pytest.fixture
def pandas_df_feed_dict():
pandas_df = pd.DataFrame({"Name": ["Alex", "Bob"], "Age": [15, 25]})
return {"ds3": pandas_df}


@pytest.fixture
def catalog():
return DataCatalog()


@pytest.fixture
def memory_catalog():
ds1 = MemoryDataSet({"data": 42})
ds2 = MemoryDataSet([1, 2, 3, 4, 5])
return DataCatalog({"ds1": ds1, "ds2": ds2})


@pytest.fixture
def persistent_dataset_catalog():
def _load():
return 0

# pylint: disable=unused-argument
def _save(arg):
pass

persistent_dataset = LambdaDataSet(load=_load, save=_save)
return DataCatalog(
{
"ds0_A": persistent_dataset,
"ds0_B": persistent_dataset,
"ds2_A": persistent_dataset,
"ds2_B": persistent_dataset,
}
)


@pytest.fixture
def fan_out_fan_in():
return Pipeline(
return pipeline(
[
node(identity, "A", "B"),
node(identity, "B", "C"),
Expand All @@ -56,7 +101,7 @@ def fan_out_fan_in():
@pytest.fixture
def branchless_no_input_pipeline():
"""The pipeline runs in the order A->B->C->D->E."""
return Pipeline(
return pipeline(
[
node(identity, "D", "E", name="node1"),
node(identity, "C", "D", name="node2"),
Expand All @@ -69,7 +114,7 @@ def branchless_no_input_pipeline():

@pytest.fixture
def branchless_pipeline():
return Pipeline(
return pipeline(
[
node(identity, "ds1", "ds2", name="node1"),
node(identity, "ds2", "ds3", name="node2"),
Expand All @@ -79,11 +124,45 @@ def branchless_pipeline():

@pytest.fixture
def saving_result_pipeline():
return Pipeline([node(identity, "ds", "dsX")])
return pipeline([node(identity, "ds", "dsX")])


@pytest.fixture
def saving_none_pipeline():
return Pipeline(
return pipeline(
[node(random, None, "A"), node(return_none, "A", "B"), node(identity, "B", "C")]
)


@pytest.fixture
def unfinished_outputs_pipeline():
return pipeline(
[
node(identity, dict(arg="ds4"), "ds8", name="node1"),
node(sink, "ds7", None, name="node2"),
node(multi_input_list_output, ["ds3", "ds4"], ["ds6", "ds7"], name="node3"),
node(identity, "ds2", "ds5", name="node4"),
node(identity, "ds1", "ds4", name="node5"),
]
) # Outputs: ['ds8', 'ds5', 'ds6'] == ['ds1', 'ds2', 'ds3']


@pytest.fixture
def two_branches_crossed_pipeline():
"""A ``Pipeline`` with an X-shape (two branches with one common node)"""
return pipeline(
[
node(identity, "ds0_A", "ds1_A", name="node1_A"),
node(identity, "ds0_B", "ds1_B", name="node1_B"),
node(
multi_input_list_output,
["ds1_A", "ds1_B"],
["ds2_A", "ds2_B"],
name="node2",
),
node(identity, "ds2_A", "ds3_A", name="node3_A"),
node(identity, "ds2_B", "ds3_B", name="node3_B"),
node(identity, "ds3_A", "ds4_A", name="node4_A"),
node(identity, "ds3_B", "ds4_B", name="node4_B"),
]
)
Loading

0 comments on commit 2c819a5

Please sign in to comment.