From aaf4a722449b170ca0d72c42a26f1c5f1deaca60 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 7 Jan 2025 12:21:59 +0000 Subject: [PATCH 01/19] Replaced callable check Signed-off-by: Elena Khaustova --- .../kedro_datasets/partitions/partitioned_dataset.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index ea2461034..46fc74e78 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -43,6 +43,11 @@ def _grandparent(path: str) -> str: return str(grandparent) +def _islambda(obj: object): + """Check if object is a lambda function.""" + return callable(obj) and hasattr(obj, "__name__") and obj.__name__ == "" + + class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[], Any]]]): """``PartitionedDataset`` loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses `fsspec`: @@ -311,7 +316,7 @@ def save(self, data: dict[str, Any]) -> None: # join the protocol back since tools like PySpark may rely on it kwargs[self._filepath_arg] = self._join_protocol(partition) dataset = self._dataset_type(**kwargs) # type: ignore - if callable(partition_data): + if _islambda(partition_data): partition_data = partition_data() # noqa: PLW2901 dataset.save(partition_data) self._invalidate_caches() From f35b850a7719baf9ac114059d7f7c06ace322952 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 7 Jan 2025 12:26:37 +0000 Subject: [PATCH 02/19] Updateds lazy_save test Signed-off-by: Elena Khaustova --- .../partitions/test_partitioned_dataset.py | 115 +++++++++--------- 1 file changed, 57 insertions(+), 58 deletions(-) diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index f0126887d..921ae9774 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -61,9 +61,9 @@ class TestPartitionedDatasetLocal: def test_repr(self, dataset): pds = PartitionedDataset(path="", dataset=dataset) assert ( - repr(pds) - == """kedro_datasets.partitions.partitioned_dataset.PartitionedDataset(filepath='', """ - """dataset='kedro_datasets.pandas.parquet_dataset.ParquetDataset()')""" + repr(pds) + == """kedro_datasets.partitions.partitioned_dataset.PartitionedDataset(filepath='', """ + """dataset='kedro_datasets.pandas.parquet_dataset.ParquetDataset()')""" ) @pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION) @@ -71,7 +71,7 @@ def test_repr(self, dataset): "suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)] ) def test_load( - self, dataset, local_csvs, partitioned_data_pandas, suffix, expected_num_parts + self, dataset, local_csvs, partitioned_data_pandas, suffix, expected_num_parts ): pds = PartitionedDataset( path=str(local_csvs), dataset=dataset, filename_suffix=suffix @@ -108,17 +108,16 @@ def test_lazy_save(self, dataset, local_csvs, suffix): path=str(local_csvs), dataset=dataset, filename_suffix=suffix ) - def original_data(): - return pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) + original_data = pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) part_id = "new/data" - pds.save({part_id: original_data}) + pds.save({part_id: lambda: original_data}) assert (local_csvs / "new" / ("data" + suffix)).is_file() loaded_partitions = pds.load() assert part_id in loaded_partitions reloaded_data = loaded_partitions[part_id]() - assert_frame_equal(reloaded_data, original_data()) + assert_frame_equal(reloaded_data, original_data) def test_save_invalidates_cache(self, local_csvs, mocker): """Test that save calls invalidate partition cache""" @@ -226,7 +225,7 @@ def test_load_args(self, mocker): [({"cred": "common"}, {"cred": "common"}, {"cred": "common"}), (None, {}, {})], ) def test_credentials( - self, mocker, credentials, expected_pds_creds, expected_dataset_creds + self, mocker, credentials, expected_pds_creds, expected_dataset_creds ): mocked_filesystem = mocker.patch("fsspec.filesystem") path = str(Path.cwd()) @@ -277,8 +276,8 @@ def test_invalid_dataset(self, dataset, local_csvs): df_loader() error_message = str(exc_info.value) assert ( - "Either the file is corrupted or this is not a parquet file" - in error_message + "Either the file is corrupted or this is not a parquet file" + in error_message ) assert str(partition) in error_message @@ -287,13 +286,13 @@ def test_invalid_dataset(self, dataset, local_csvs): [ ("UndefinedDatasetType", "Class 'UndefinedDatasetType' not found"), ( - "missing.module.UndefinedDatasetType", - r"Class 'missing\.module\.UndefinedDatasetType' not found", + "missing.module.UndefinedDatasetType", + r"Class 'missing\.module\.UndefinedDatasetType' not found", ), ( - FakeDataset, - r"Dataset type 'tests\.partitions\.test_partitioned_dataset\.FakeDataset' " - r"is invalid\: all dataset types must extend 'AbstractDataset'", + FakeDataset, + r"Dataset type 'tests\.partitions\.test_partitioned_dataset\.FakeDataset' " + r"is invalid\: all dataset types must extend 'AbstractDataset'", ), ({}, "'type' is missing from dataset catalog configuration"), ], @@ -314,13 +313,13 @@ def test_invalid_dataset_config(self, dataset_config, error_pattern): "suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)] ) def test_versioned_dataset_save_and_load( - self, - mocker, - filepath_csvs, - dataset_config, - suffix, - expected_num_parts, - partitioned_data_pandas, + self, + mocker, + filepath_csvs, + dataset_config, + suffix, + expected_num_parts, + partitioned_data_pandas, ): """Test that saved and reloaded data matches the original one for the versioned dataset.""" @@ -383,19 +382,19 @@ def test_no_partitions(self, tmpdir): "pds_config,filepath_arg", [ ( - { - "path": str(Path.cwd()), - "dataset": {"type": CSVDataset, "filepath": "fake_path"}, - }, - "filepath", + { + "path": str(Path.cwd()), + "dataset": {"type": CSVDataset, "filepath": "fake_path"}, + }, + "filepath", ), ( - { - "path": str(Path.cwd()), - "dataset": {"type": CSVDataset, "other_arg": "fake_path"}, - "filepath_arg": "other_arg", - }, - "other_arg", + { + "path": str(Path.cwd()), + "dataset": {"type": CSVDataset, "other_arg": "fake_path"}, + "filepath_arg": "other_arg", + }, + "other_arg", ), ], ) @@ -441,38 +440,38 @@ def test_fs_args_log_warning(self, caplog): "pds_config,expected_ds_creds,global_creds", [ ( - {"dataset": "pandas.CSVDataset", "credentials": {"secret": "global"}}, - {"secret": "global"}, - {"secret": "global"}, + {"dataset": "pandas.CSVDataset", "credentials": {"secret": "global"}}, + {"secret": "global"}, + {"secret": "global"}, ), ( - { - "dataset": { - "type": CSVDataset, - "credentials": {"secret": "expected"}, + { + "dataset": { + "type": CSVDataset, + "credentials": {"secret": "expected"}, + }, }, - }, - {"secret": "expected"}, - {}, + {"secret": "expected"}, + {}, ), ( - { - "dataset": {"type": CSVDataset, "credentials": None}, - "credentials": {"secret": "global"}, - }, - None, - {"secret": "global"}, + { + "dataset": {"type": CSVDataset, "credentials": None}, + "credentials": {"secret": "global"}, + }, + None, + {"secret": "global"}, ), ( - { - "dataset": { - "type": CSVDataset, - "credentials": {"secret": "expected"}, + { + "dataset": { + "type": CSVDataset, + "credentials": {"secret": "expected"}, + }, + "credentials": {"secret": "global"}, }, - "credentials": {"secret": "global"}, - }, - {"secret": "expected"}, - {"secret": "global"}, + {"secret": "expected"}, + {"secret": "global"}, ), ], ) From 262c059f2ef12441084149db28671cd7e382c357 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 7 Jan 2025 12:44:36 +0000 Subject: [PATCH 03/19] Added test_callable_save Signed-off-by: Elena Khaustova --- .../partitions/test_partitioned_dataset.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index 921ae9774..01abf7dc1 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -52,6 +52,10 @@ def filepath_csvs(tmp_path): ] +def original_data_callable(): + return pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) + + class FakeDataset: # pylint: disable=too-few-public-methods pass @@ -101,6 +105,22 @@ def test_save(self, dataset, local_csvs, suffix): reloaded_data = loaded_partitions[part_id]() assert_frame_equal(reloaded_data, original_data) + @pytest.mark.parametrize("dataset", ["kedro_datasets.pickle.PickleDataset"]) + @pytest.mark.parametrize("suffix", ["", ".csv"]) + def test_callable_save(self, dataset, local_csvs, suffix): + pds = PartitionedDataset( + path=str(local_csvs), dataset=dataset, filename_suffix=suffix + ) + + part_id = "new/data" + pds.save({part_id: original_data_callable}) + + assert (local_csvs / "new" / ("data" + suffix)).is_file() + loaded_partitions = pds.load() + assert part_id in loaded_partitions + reloaded_data = loaded_partitions[part_id]() + assert reloaded_data == original_data_callable + @pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION) @pytest.mark.parametrize("suffix", ["", ".csv"]) def test_lazy_save(self, dataset, local_csvs, suffix): From e65368d9c4767faa127a6c2a5acfb4304fba263c Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 7 Jan 2025 12:49:49 +0000 Subject: [PATCH 04/19] Fixed lint Signed-off-by: Elena Khaustova --- .../partitions/test_partitioned_dataset.py | 108 +++++++++--------- 1 file changed, 54 insertions(+), 54 deletions(-) diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index 01abf7dc1..bc5238727 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -65,9 +65,9 @@ class TestPartitionedDatasetLocal: def test_repr(self, dataset): pds = PartitionedDataset(path="", dataset=dataset) assert ( - repr(pds) - == """kedro_datasets.partitions.partitioned_dataset.PartitionedDataset(filepath='', """ - """dataset='kedro_datasets.pandas.parquet_dataset.ParquetDataset()')""" + repr(pds) + == """kedro_datasets.partitions.partitioned_dataset.PartitionedDataset(filepath='', """ + """dataset='kedro_datasets.pandas.parquet_dataset.ParquetDataset()')""" ) @pytest.mark.parametrize("dataset", LOCAL_DATASET_DEFINITION) @@ -75,7 +75,7 @@ def test_repr(self, dataset): "suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)] ) def test_load( - self, dataset, local_csvs, partitioned_data_pandas, suffix, expected_num_parts + self, dataset, local_csvs, partitioned_data_pandas, suffix, expected_num_parts ): pds = PartitionedDataset( path=str(local_csvs), dataset=dataset, filename_suffix=suffix @@ -245,7 +245,7 @@ def test_load_args(self, mocker): [({"cred": "common"}, {"cred": "common"}, {"cred": "common"}), (None, {}, {})], ) def test_credentials( - self, mocker, credentials, expected_pds_creds, expected_dataset_creds + self, mocker, credentials, expected_pds_creds, expected_dataset_creds ): mocked_filesystem = mocker.patch("fsspec.filesystem") path = str(Path.cwd()) @@ -296,8 +296,8 @@ def test_invalid_dataset(self, dataset, local_csvs): df_loader() error_message = str(exc_info.value) assert ( - "Either the file is corrupted or this is not a parquet file" - in error_message + "Either the file is corrupted or this is not a parquet file" + in error_message ) assert str(partition) in error_message @@ -306,13 +306,13 @@ def test_invalid_dataset(self, dataset, local_csvs): [ ("UndefinedDatasetType", "Class 'UndefinedDatasetType' not found"), ( - "missing.module.UndefinedDatasetType", - r"Class 'missing\.module\.UndefinedDatasetType' not found", + "missing.module.UndefinedDatasetType", + r"Class 'missing\.module\.UndefinedDatasetType' not found", ), ( - FakeDataset, - r"Dataset type 'tests\.partitions\.test_partitioned_dataset\.FakeDataset' " - r"is invalid\: all dataset types must extend 'AbstractDataset'", + FakeDataset, + r"Dataset type 'tests\.partitions\.test_partitioned_dataset\.FakeDataset' " + r"is invalid\: all dataset types must extend 'AbstractDataset'", ), ({}, "'type' is missing from dataset catalog configuration"), ], @@ -333,13 +333,13 @@ def test_invalid_dataset_config(self, dataset_config, error_pattern): "suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)] ) def test_versioned_dataset_save_and_load( - self, - mocker, - filepath_csvs, - dataset_config, - suffix, - expected_num_parts, - partitioned_data_pandas, + self, + mocker, + filepath_csvs, + dataset_config, + suffix, + expected_num_parts, + partitioned_data_pandas, ): """Test that saved and reloaded data matches the original one for the versioned dataset.""" @@ -402,19 +402,19 @@ def test_no_partitions(self, tmpdir): "pds_config,filepath_arg", [ ( - { - "path": str(Path.cwd()), - "dataset": {"type": CSVDataset, "filepath": "fake_path"}, - }, - "filepath", + { + "path": str(Path.cwd()), + "dataset": {"type": CSVDataset, "filepath": "fake_path"}, + }, + "filepath", ), ( - { - "path": str(Path.cwd()), - "dataset": {"type": CSVDataset, "other_arg": "fake_path"}, - "filepath_arg": "other_arg", - }, - "other_arg", + { + "path": str(Path.cwd()), + "dataset": {"type": CSVDataset, "other_arg": "fake_path"}, + "filepath_arg": "other_arg", + }, + "other_arg", ), ], ) @@ -460,38 +460,38 @@ def test_fs_args_log_warning(self, caplog): "pds_config,expected_ds_creds,global_creds", [ ( - {"dataset": "pandas.CSVDataset", "credentials": {"secret": "global"}}, - {"secret": "global"}, - {"secret": "global"}, + {"dataset": "pandas.CSVDataset", "credentials": {"secret": "global"}}, + {"secret": "global"}, + {"secret": "global"}, ), ( - { - "dataset": { - "type": CSVDataset, - "credentials": {"secret": "expected"}, - }, + { + "dataset": { + "type": CSVDataset, + "credentials": {"secret": "expected"}, }, - {"secret": "expected"}, - {}, + }, + {"secret": "expected"}, + {}, ), ( - { - "dataset": {"type": CSVDataset, "credentials": None}, - "credentials": {"secret": "global"}, - }, - None, - {"secret": "global"}, + { + "dataset": {"type": CSVDataset, "credentials": None}, + "credentials": {"secret": "global"}, + }, + None, + {"secret": "global"}, ), ( - { - "dataset": { - "type": CSVDataset, - "credentials": {"secret": "expected"}, - }, - "credentials": {"secret": "global"}, + { + "dataset": { + "type": CSVDataset, + "credentials": {"secret": "expected"}, }, - {"secret": "expected"}, - {"secret": "global"}, + "credentials": {"secret": "global"}, + }, + {"secret": "expected"}, + {"secret": "global"}, ), ], ) From f3388d1153ebd1d41fe18a83e6f3cb041821ac26 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 7 Jan 2025 14:25:32 +0000 Subject: [PATCH 05/19] Fixed docs links Signed-off-by: Elena Khaustova --- kedro-datasets/kedro_datasets/dask/parquet_dataset.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/dask/parquet_dataset.py b/kedro-datasets/kedro_datasets/dask/parquet_dataset.py index 1acfe7cda..c79fcac0a 100644 --- a/kedro-datasets/kedro_datasets/dask/parquet_dataset.py +++ b/kedro-datasets/kedro_datasets/dask/parquet_dataset.py @@ -1,5 +1,6 @@ """``ParquetDataset`` is a dataset used to load and save data to parquet files using Dask dataframe""" + from __future__ import annotations from copy import deepcopy @@ -97,9 +98,9 @@ def __init__( # noqa: PLR0913 filepath: Filepath in POSIX format to a parquet file parquet collection or the directory of a multipart parquet. load_args: Additional loading options `dask.dataframe.read_parquet`: - https://docs.dask.org/en/latest/generated/dask.dataframe.read_parquet.html + https://docs.dask.org/en/stable/generated/dask.dataframe.read_parquet.html save_args: Additional saving options for `dask.dataframe.to_parquet`: - https://docs.dask.org/en/latest/generated/dask.dataframe.to_parquet.html + https://docs.dask.org/en/stable/generated/dask.dataframe.to_parquet.html credentials: Credentials required to get access to the underlying filesystem. E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Optional parameters to the backend file system driver: From 61903eb24f7397d5e4600df04d4d09a69981872b Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 7 Jan 2025 14:33:49 +0000 Subject: [PATCH 06/19] Fixed all docs links Signed-off-by: Elena Khaustova --- kedro-datasets/kedro_datasets/dask/parquet_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/dask/parquet_dataset.py b/kedro-datasets/kedro_datasets/dask/parquet_dataset.py index c79fcac0a..b3a81c632 100644 --- a/kedro-datasets/kedro_datasets/dask/parquet_dataset.py +++ b/kedro-datasets/kedro_datasets/dask/parquet_dataset.py @@ -15,7 +15,7 @@ class ParquetDataset(AbstractDataset[dd.DataFrame, dd.DataFrame]): """``ParquetDataset`` loads and saves data to parquet file(s). It uses Dask remote data services to handle the corresponding load and save operations: - https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html + https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html Example usage for the `YAML API `_: @@ -104,7 +104,7 @@ def __init__( # noqa: PLR0913 credentials: Credentials required to get access to the underlying filesystem. E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Optional parameters to the backend file system driver: - https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters + https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ From 51d62ef39c302ecd42539d7e89254279712b0227 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 7 Jan 2025 14:44:09 +0000 Subject: [PATCH 07/19] Updated release notes Signed-off-by: Elena Khaustova --- kedro-datasets/RELEASE.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index a477dca5e..0b65a84ec 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,6 +1,11 @@ # Upcoming Release ## Major features and improvements + + ## Bug fixes and other changes + +- Made `PartitionedDataset` accept only lamda functions for lazy saving and ignore other callable objects. + ## Breaking Changes ## Community contributions From f49a1f888ab8c1587864800da521b3831d03162f Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 7 Jan 2025 14:50:30 +0000 Subject: [PATCH 08/19] Fixed all docs links Signed-off-by: Elena Khaustova --- kedro-datasets/kedro_datasets/dask/csv_dataset.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kedro-datasets/kedro_datasets/dask/csv_dataset.py b/kedro-datasets/kedro_datasets/dask/csv_dataset.py index 053da6b00..b82bff15e 100644 --- a/kedro-datasets/kedro_datasets/dask/csv_dataset.py +++ b/kedro-datasets/kedro_datasets/dask/csv_dataset.py @@ -1,5 +1,6 @@ """``CSVDataset`` is a dataset used to load and save data to CSV files using Dask dataframe""" + from __future__ import annotations from copy import deepcopy @@ -13,7 +14,7 @@ class CSVDataset(AbstractDataset[dd.DataFrame, dd.DataFrame]): """``CSVDataset`` loads and saves data to comma-separated value file(s). It uses Dask remote data services to handle the corresponding load and save operations: - https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html + https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html Example usage for the `YAML API `_: @@ -67,13 +68,13 @@ def __init__( # noqa: PLR0913 filepath: Filepath in POSIX format to a CSV file CSV collection or the directory of a multipart CSV. load_args: Additional loading options `dask.dataframe.read_csv`: - https://docs.dask.org/en/latest/generated/dask.dataframe.read_csv.html + https://docs.dask.org/en/stable/generated/dask.dataframe.read_csv.html save_args: Additional saving options for `dask.dataframe.to_csv`: - https://docs.dask.org/en/latest/generated/dask.dataframe.to_csv.html + https://docs.dask.org/en/stable/generated/dask.dataframe.to_csv.html credentials: Credentials required to get access to the underlying filesystem. E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. fs_args: Optional parameters to the backend file system driver: - https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters + https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html#optional-parameters metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. """ From 3144ba8fbfc55161a68b7e5145b67e10073caff0 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 7 Jan 2025 15:20:55 +0000 Subject: [PATCH 09/19] Fixed typo Signed-off-by: Elena Khaustova --- kedro-datasets/RELEASE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 0b65a84ec..a10c846b6 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -4,7 +4,7 @@ ## Bug fixes and other changes -- Made `PartitionedDataset` accept only lamda functions for lazy saving and ignore other callable objects. +- Made `PartitionedDataset` accept only lambda functions for lazy saving and ignore other callable objects. ## Breaking Changes ## Community contributions From b1d0a1d0dcf804d356082d5c91da6d4b2a4cc963 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Tue, 14 Jan 2025 16:40:52 +0000 Subject: [PATCH 10/19] Added argument to disable lazy saving Signed-off-by: Elena Khaustova --- .../partitions/partitioned_dataset.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index 46fc74e78..8b2cd1e59 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -43,11 +43,6 @@ def _grandparent(path: str) -> str: return str(grandparent) -def _islambda(obj: object): - """Check if object is a lambda function.""" - return callable(obj) and hasattr(obj, "__name__") and obj.__name__ == "" - - class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[], Any]]]): """``PartitionedDataset`` loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses `fsspec`: @@ -157,6 +152,7 @@ def __init__( # noqa: PLR0913 load_args: dict[str, Any] | None = None, fs_args: dict[str, Any] | None = None, overwrite: bool = False, + save_lazily: bool = True, metadata: dict[str, Any] | None = None, ) -> None: """Creates a new instance of ``PartitionedDataset``. @@ -196,6 +192,8 @@ def __init__( # noqa: PLR0913 fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). overwrite: If True, any existing partitions will be removed. + save_lazily: If True, lazy saving is enabled. Meaning that if callable object is passed, + the partition’s data will not be materialised until it is time to write. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. @@ -211,6 +209,7 @@ def __init__( # noqa: PLR0913 self._overwrite = overwrite self._protocol = infer_storage_options(self._path)["protocol"] self._partition_cache: Cache = Cache(maxsize=1) + self._save_lazily = save_lazily self.metadata = metadata dataset = dataset if isinstance(dataset, dict) else {"type": dataset} @@ -306,7 +305,7 @@ def load(self) -> dict[str, Callable[[], Any]]: return partitions - def save(self, data: dict[str, Any]) -> None: + def save(self, data: dict[str, Any], lazy: bool = True) -> None: if self._overwrite and self._filesystem.exists(self._normalized_path): self._filesystem.rm(self._normalized_path, recursive=True) @@ -316,7 +315,7 @@ def save(self, data: dict[str, Any]) -> None: # join the protocol back since tools like PySpark may rely on it kwargs[self._filepath_arg] = self._join_protocol(partition) dataset = self._dataset_type(**kwargs) # type: ignore - if _islambda(partition_data): + if callable(partition_data) and (self._save_lazily or lazy): partition_data = partition_data() # noqa: PLW2901 dataset.save(partition_data) self._invalidate_caches() From e2755c370aa2f9772af6360255ce4ae6b304abed Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 15 Jan 2025 11:43:48 +0000 Subject: [PATCH 11/19] Removed save function argument Signed-off-by: Elena Khaustova --- .../kedro_datasets/partitions/partitioned_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index 8b2cd1e59..8327ecd10 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -305,7 +305,7 @@ def load(self) -> dict[str, Callable[[], Any]]: return partitions - def save(self, data: dict[str, Any], lazy: bool = True) -> None: + def save(self, data: dict[str, Any]) -> None: if self._overwrite and self._filesystem.exists(self._normalized_path): self._filesystem.rm(self._normalized_path, recursive=True) @@ -315,7 +315,7 @@ def save(self, data: dict[str, Any], lazy: bool = True) -> None: # join the protocol back since tools like PySpark may rely on it kwargs[self._filepath_arg] = self._join_protocol(partition) dataset = self._dataset_type(**kwargs) # type: ignore - if callable(partition_data) and (self._save_lazily or lazy): + if callable(partition_data) and self._save_lazily: partition_data = partition_data() # noqa: PLW2901 dataset.save(partition_data) self._invalidate_caches() From c49c73089ded66ddd8d9bfba19fde4509c50b1c8 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 15 Jan 2025 12:23:32 +0000 Subject: [PATCH 12/19] Updated unit test Signed-off-by: Elena Khaustova --- kedro-datasets/tests/partitions/test_partitioned_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index bc5238727..1e1ed88e2 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -109,7 +109,7 @@ def test_save(self, dataset, local_csvs, suffix): @pytest.mark.parametrize("suffix", ["", ".csv"]) def test_callable_save(self, dataset, local_csvs, suffix): pds = PartitionedDataset( - path=str(local_csvs), dataset=dataset, filename_suffix=suffix + path=str(local_csvs), dataset=dataset, filename_suffix=suffix, save_lazily=False ) part_id = "new/data" From 3df72136495c6853310f3f5485b46748fbed318c Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 15 Jan 2025 12:26:38 +0000 Subject: [PATCH 13/19] Fixed lint Signed-off-by: Elena Khaustova --- kedro-datasets/tests/partitions/test_partitioned_dataset.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index 1e1ed88e2..2e90802da 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -109,7 +109,10 @@ def test_save(self, dataset, local_csvs, suffix): @pytest.mark.parametrize("suffix", ["", ".csv"]) def test_callable_save(self, dataset, local_csvs, suffix): pds = PartitionedDataset( - path=str(local_csvs), dataset=dataset, filename_suffix=suffix, save_lazily=False + path=str(local_csvs), + dataset=dataset, + filename_suffix=suffix, + save_lazily=False, ) part_id = "new/data" From f6bc42bc480a2179be9558d828d493844ddd94f2 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 15 Jan 2025 14:58:20 +0000 Subject: [PATCH 14/19] Updated related docs Signed-off-by: Elena Khaustova --- .../kedro_datasets/partitions/partitioned_dataset.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index 8327ecd10..6c60ccdcb 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -69,6 +69,7 @@ class PartitionedDataset(AbstractDataset[dict[str, Any], dict[str, Callable[[], sep: '\\t' index: true filename_suffix: '.dat' + save_lazily: True Example usage for the `Python API >> # This will create a folder `df_with_partition` and save multiple files >>> # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc. @@ -192,8 +194,10 @@ def __init__( # noqa: PLR0913 fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). overwrite: If True, any existing partitions will be removed. - save_lazily: If True, lazy saving is enabled. Meaning that if callable object is passed, - the partition’s data will not be materialised until it is time to write. + save_lazily: If True, lazy saving is enabled. Meaning that if callable object is passed + as data to save, the partition’s data will not be materialised until it is time to write. + Lazy saving example: + https://docs.kedro.org/en/stable/data/kedro_io.html#partitioned-dataset-lazy-saving metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. From 8365e1005a94e76cb2fb35bdf50a171a25231a4c Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 15 Jan 2025 15:12:20 +0000 Subject: [PATCH 15/19] Revert test changes Signed-off-by: Elena Khaustova --- .../tests/partitions/test_partitioned_dataset.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index 2e90802da..9a49d3bb8 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -131,16 +131,17 @@ def test_lazy_save(self, dataset, local_csvs, suffix): path=str(local_csvs), dataset=dataset, filename_suffix=suffix ) - original_data = pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) + def original_data(): + return pd.DataFrame({"foo": 42, "bar": ["a", "b", None]}) part_id = "new/data" - pds.save({part_id: lambda: original_data}) + pds.save({part_id: original_data}) assert (local_csvs / "new" / ("data" + suffix)).is_file() loaded_partitions = pds.load() assert part_id in loaded_partitions reloaded_data = loaded_partitions[part_id]() - assert_frame_equal(reloaded_data, original_data) + assert_frame_equal(reloaded_data, original_data()) def test_save_invalidates_cache(self, local_csvs, mocker): """Test that save calls invalidate partition cache""" From a9d294f5076c27575e1a8e3b080dccaf3480053e Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 15 Jan 2025 15:26:43 +0000 Subject: [PATCH 16/19] Updated baseline Signed-off-by: Elena Khaustova --- .secrets.baseline | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index ce3799e06..c18f3f6f1 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -129,7 +129,7 @@ "filename": "kedro-datasets/kedro_datasets/dask/parquet_dataset.py", "hashed_secret": "6e1d66a1596528c308e601c10aa0b92d53606ab9", "is_verified": false, - "line_number": 71 + "line_number": 72 } ], "kedro-datasets/kedro_datasets/pandas/sql_dataset.py": [ @@ -340,35 +340,35 @@ "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "76f747de912e8682e29a23cb506dd5bf0de080d2", "is_verified": false, - "line_number": 415 + "line_number": 438 }, { "type": "Secret Keyword", "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "9027cc5a2c1321de60a2d71ccde6229d1152d6d3", "is_verified": false, - "line_number": 416 + "line_number": 439 }, { "type": "Secret Keyword", "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "5dcbdf371f181b9b7a41a4be7be70f8cbee67da7", "is_verified": false, - "line_number": 452 + "line_number": 475 }, { "type": "Secret Keyword", "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "727d8ff68b6b550f2cf6e737b3cad5149c65fe5b", "is_verified": false, - "line_number": 503 + "line_number": 526 }, { "type": "Secret Keyword", "filename": "kedro-datasets/tests/partitions/test_partitioned_dataset.py", "hashed_secret": "adb5fabe51f5b45e83fdd91b71c92156fec4a63e", "is_verified": false, - "line_number": 523 + "line_number": 546 } ], "kedro-datasets/tests/plotly/test_html_dataset.py": [ @@ -490,5 +490,5 @@ } ] }, - "generated_at": "2025-01-13T16:27:46Z" + "generated_at": "2025-01-15T15:25:24Z" } From ff2867c7999fe864261e7ae8e1349cbf0201a1e8 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 15 Jan 2025 16:44:06 +0000 Subject: [PATCH 17/19] Updated release notes Signed-off-by: Elena Khaustova --- kedro-datasets/RELEASE.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 9616c15c6..c1404d9b7 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -3,10 +3,8 @@ ## Bug fixes and other changes +- Added a parameter to enable/disable lazy saving for `PartitionedDataset`. - Replaced `trufflehog` with `detect-secrets` for detecting secrets within a code base. - -## Bug fixes and other changes - - Fix polars.CSVDataset `save` method on Windows using `utf-8` as default encoding. ## Breaking Changes From e4278e215448dfcbee8fa9efbeca53ccdd57a278 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 15 Jan 2025 16:45:21 +0000 Subject: [PATCH 18/19] Updated release notes Signed-off-by: Elena Khaustova --- kedro-datasets/RELEASE.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index c1404d9b7..820388766 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,10 +1,11 @@ # Upcoming Release ## Major features and improvements -## Bug fixes and other changes - - Added a parameter to enable/disable lazy saving for `PartitionedDataset`. - Replaced `trufflehog` with `detect-secrets` for detecting secrets within a code base. + +## Bug fixes and other changes + - Fix polars.CSVDataset `save` method on Windows using `utf-8` as default encoding. ## Breaking Changes From 32ebe5668938611e40325e3f3a936ba905652e06 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Fri, 17 Jan 2025 11:42:38 +0000 Subject: [PATCH 19/19] Updated docstrings Signed-off-by: Elena Khaustova --- .../kedro_datasets/partitions/partitioned_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index 6c60ccdcb..cf1069b1a 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -194,8 +194,8 @@ def __init__( # noqa: PLR0913 fs_args: Extra arguments to pass into underlying filesystem class constructor (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). overwrite: If True, any existing partitions will be removed. - save_lazily: If True, lazy saving is enabled. Meaning that if callable object is passed - as data to save, the partition’s data will not be materialised until it is time to write. + save_lazily: Parameter to enable/disable lazy saving, the default is True. Meaning that if callable object + is passed as data to save, the partition’s data will not be materialised until it is time to write. Lazy saving example: https://docs.kedro.org/en/stable/data/kedro_io.html#partitioned-dataset-lazy-saving metadata: Any arbitrary metadata.