Skip to content

Commit

Permalink
Fix : Metrics Tree doesn't pick up the latest kedro run metrics data
Browse files Browse the repository at this point in the history
This issue occurs when you run kedro viz and kedro run simultaneously on two terminals. After your last kedro run, if you refresh the browser running kedro viz you will notice that while the plot gets updated with the latest. The json tree with metrics data does not. 
Initially, we would get metrics data using get_load_path function from kedro-core which would load the most recent version -- but this does not happen in the above settings because that function is cached. 

So we replicate the functionality of kedro-core on kedro-viz backend so the latest metrics are loaded.
  • Loading branch information
rashidakanchwala authored Oct 8, 2021
1 parent 1881ad1 commit 97f7276
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 16 deletions.
2 changes: 1 addition & 1 deletion package/features/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def _setup_context_with_venv(context, venv_dir):
"botocore",
"dynaconf==3.1.5",
"PyYAML>=4.2, <6.0",
"click<8.0"
"click<8.0",
],
env=context.env,
)
Expand Down
37 changes: 33 additions & 4 deletions package/kedro_viz/models/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from enum import Enum
from pathlib import Path
from types import FunctionType
from typing import Any, Dict, List, Optional, Set, Union, cast
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union, cast

import pandas as pd
import plotly.express as px
Expand All @@ -49,6 +49,12 @@
from kedro.pipeline.pipeline import TRANSCODING_SEPARATOR, _strip_transcoding
from pandas.core.frame import DataFrame

# only import MetricsDataSet at top-level for type-checking
# so it doesn't blow up if user doesn't have the dataset dependencies installed.
if TYPE_CHECKING: # pragma: no cover
from kedro.extras.datasets.tracking.metrics_dataset import MetricsDataSet


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -514,9 +520,10 @@ def __post_init__(self, data_node: DataNode):
dataset = cast(MetricsDataSet, dataset)
if not dataset._exists() or self.filepath is None:
return
load_path = get_filepath_str(dataset._get_load_path(), dataset._protocol)
with dataset._fs.open(load_path, **dataset._fs_open_args_load) as fs_file:
self.metrics = json.load(fs_file)
metrics = self.load_latest_metrics_data(dataset)
if not metrics:
return
self.metrics = metrics
metrics_data = self.load_metrics_versioned_data(self.filepath)
if not metrics_data:
return
Expand All @@ -528,6 +535,28 @@ def __post_init__(self, data_node: DataNode):
if not data_node.is_free_input:
self.run_command = f'kedro run --to-outputs="{data_node.full_name}"'

@staticmethod
def load_latest_metrics_data(
dataset: "MetricsDataSet",
) -> Optional[Dict[str, float]]:
"""Load data for latest versions of the metrics dataset.
Below operation is also on kedro.io.core -> fetched_latest_load_version()
However it is a cached function and hence cannot be relied upon
Args:
dataset: the latest version of the metrics dataset
Returns:
A dictionary containing json data for the latest version
"""
pattern = str(dataset._get_versioned_path("*"))
version_paths = sorted(dataset._glob_function(pattern), reverse=True)
most_recent = next(
(path for path in version_paths if dataset._exists_function(path)), None
)
if not most_recent:
return None
with dataset._fs.open(most_recent, **dataset._fs_open_args_load) as fs_file:
return json.load(fs_file)

@staticmethod
def load_metrics_versioned_data(
filepath: str, num_versions: int = 10
Expand Down
79 changes: 68 additions & 11 deletions package/tests/test_models/test_graph/test_graph_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
#
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-public-methods
import datetime
import json
import shutil
from pathlib import Path
from textwrap import dedent
from unittest.mock import MagicMock, call, patch
Expand All @@ -35,6 +37,7 @@
import pytest
from kedro.extras.datasets.pandas import CSVDataSet, ParquetDataSet
from kedro.extras.datasets.spark import SparkDataSet
from kedro.extras.datasets.tracking.metrics_dataset import MetricsDataSet
from kedro.io import MemoryDataSet, PartitionedDataSet
from kedro.pipeline.node import node

Expand Down Expand Up @@ -397,14 +400,14 @@ def test_plotly_data_node_dataset_not_exist(self, patched_import):
plotly_node_metadata = DataNodeMetadata(data_node=plotly_data_node)
assert not hasattr(plotly_node_metadata, "plot")

@patch("json.load")
@patch("kedro_viz.models.graph.DataNodeMetadata.load_metrics_versioned_data")
@patch("kedro_viz.models.graph.DataNodeMetadata.load_latest_metrics_data")
@patch("kedro_viz.models.graph.DataNodeMetadata.create_metrics_plot")
def test_metrics_data_node_metadata(
self,
patched_metrics_plot,
patched_latest_metrics,
patched_data_loader,
patched_json_load,
):
mock_metrics_data = {
"recommendations": 0.0009277445547700936,
Expand Down Expand Up @@ -432,8 +435,8 @@ def test_metrics_data_node_metadata(
}
]
}
patched_json_load.return_value = mock_metrics_data
patched_data_loader.return_value = mock_version_data
patched_latest_metrics.return_value = mock_metrics_data
patched_metrics_plot.return_value = mock_plot_data
metrics_data_node = MagicMock()
metrics_data_node.is_plot_node.return_value = False
Expand All @@ -448,22 +451,35 @@ def test_metrics_data_node_metadata_dataset_not_exist(self):
metrics_data_node.is_metric_node.return_value = True
metrics_data_node.kedro_obj._exists.return_value = False
metrics_node_metadata = DataNodeMetadata(data_node=metrics_data_node)
assert not hasattr(metrics_node_metadata, "metric")
assert not hasattr(metrics_node_metadata, "metrics")
assert not hasattr(metrics_node_metadata, "plot")

@patch("json.load")
@patch("kedro_viz.models.graph.DataNodeMetadata.load_latest_metrics_data")
def test_metrics_data_node_metadata_latest_metrics_not_exist(
self,
patched_latest_metrics,
):
patched_latest_metrics.return_value = None
metrics_data_node = MagicMock()
metrics_data_node.is_plot_node.return_value = False
metrics_data_node.is_metric_node.return_value = True
metrics_node_metadata = DataNodeMetadata(data_node=metrics_data_node)
assert not hasattr(metrics_node_metadata, "metrics")
assert not hasattr(metrics_node_metadata, "plot")

@patch("kedro_viz.models.graph.DataNodeMetadata.load_latest_metrics_data")
@patch("kedro_viz.models.graph.DataNodeMetadata.load_metrics_versioned_data")
def test_metrics_data_node_metadata_versioned_dataset_not_exist(
self,
patched_data_loader,
patched_json_load,
patched_latest_metrics,
):
mock_metrics_data = {
"recommendations": 0.0009277445547700936,
"recommended_controls": 0.001159680693462617,
"projected_optimization": 0.0013916168321551402,
}
patched_json_load.return_value = mock_metrics_data
patched_latest_metrics.return_value = mock_metrics_data
patched_data_loader.return_value = {}
metrics_data_node = MagicMock()
metrics_data_node.is_plot_node.return_value = False
Expand All @@ -488,7 +504,7 @@ def test_data_node_metadata_create_metrics_plot(self):
assert "layout" in test_plot

@pytest.fixture
def metrics_filepath(self, tmpdir):
def metrics_filepath(self, tmp_path):
dir_name = ["2021-09-10T09.02.44.245Z", "2021-09-10T09.03.23.733Z"]
filename = "metrics.json"
json_content = [
Expand All @@ -503,15 +519,38 @@ def metrics_filepath(self, tmpdir):
"projected_optimization": 0.30057499608184196,
},
]
source_dir = Path(tmpdir / filename)
source_dir = Path(tmp_path / filename)
for index, directory in enumerate(dir_name):
filepath = Path(source_dir / directory / filename)
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath.write_text(json.dumps(json_content[index]))
return source_dir

@pytest.fixture
def metrics_filepath_invalid_timestamp(self, tmpdir):
def metrics_filepath_reload(self, tmp_path):
dir_name = ["2021-09-10T09.03.55.245Z", "2021-09-10T09.03.56.733Z"]
filename = "metrics.json"
json_content = [
{
"recommendations": 0.4,
"recommended_controls": 0.5,
"projected_optimization": 0.6,
},
{
"recommendations": 0.7,
"recommended_controls": 0.8,
"projected_optimization": 0.9,
},
]
source_dir = Path(tmp_path / filename)
for index, directory in enumerate(dir_name):
filepath = Path(source_dir / directory / filename)
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath.write_text(json.dumps(json_content[index]))
return source_dir

@pytest.fixture
def metrics_filepath_invalid_timestamp(self, tmp_path):
dir_name = ["2021", "2021"]
filename = "metrics.json"
json_content = [
Expand All @@ -526,13 +565,31 @@ def metrics_filepath_invalid_timestamp(self, tmpdir):
"projected_optimization": 0.30057499608184196,
},
]
source_dir = Path(tmpdir / filename)
source_dir = Path(tmp_path / filename)
for index, directory in enumerate(dir_name):
filepath = Path(source_dir / directory / filename)
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath.write_text(json.dumps(json_content[index]))
return source_dir

def test_load_latest_metrics(self, tmp_path):
# Note - filepath is assigned temp.json as temp solution instead of metrics_filepath
# as it fails on windows build. This will be cleaned up in the future.
filename = "temp.json"
dataset = MetricsDataSet(filepath=filename)
data = {"col1": 1, "col2": 0.23, "col3": 0.002}
dataset.save(data)
assert DataNodeMetadata.load_latest_metrics_data(dataset) == data
new_data = {"col1": 3, "col2": 3.23, "col3": 3.002}
dataset.save(new_data)
assert DataNodeMetadata.load_latest_metrics_data(dataset) == new_data
shutil.rmtree(filename)

def test_load_latest_metrics_fail(self, mocker, metrics_filepath):
dataset = MetricsDataSet(filepath=f"{metrics_filepath}")
mocker.patch.object(dataset, "_exists_function", return_value=False)
assert DataNodeMetadata.load_latest_metrics_data(dataset) is None

def test_load_metrics_versioned_data(self, metrics_filepath):
mock_metrics_json = {
datetime.datetime(2021, 9, 10, 9, 2, 44, 245000): {
Expand Down

0 comments on commit 97f7276

Please sign in to comment.