Skip to content

Commit

Permalink
feat(datasets): support Polars lazy evaluation (#350)
Browse files Browse the repository at this point in the history
* feat(datasets) add PolarsDataset to support Polars's Lazy API

Signed-off-by: Matthias Roels <[email protected]>

* Fix(datasets): rename PolarsDataSet to PolarsDataSet

Add PolarsDataSet as an alias for PolarsDataset with
deprecation warning.

Signed-off-by: Matthias Roels <[email protected]>

* Fix(datasets): apply ruff linting rules

Signed-off-by: Matthias Roels <[email protected]>

* Fix(datasets): Correct pattern matching when Raising exceptions

Corrected PolarsDataSet to PolarsDataset in the pattern to match
in test_load_missing_file

Signed-off-by: Matthias Roels <[email protected]>

* fix(datasets): clean up PolarsDataset related code

Remove reference to PolarsDataSet as this is not required for new
dataset implementations.

Signed-off-by: Matthias Roels <[email protected]>

* feat(datasets): Rename Polars Datasets to better describe their intent

Signed-off-by: Matthias Roels <[email protected]>

* feat(datasets): clean up LazyPolarsDataset

Signed-off-by: Matthias Roels <[email protected]>

* fix(datasets): increase test coverage for PolarsDataset classes

Signed-off-by: Matthias Roels <[email protected]>

* docs(datasets): add renamed Polars datasets to docs

Signed-off-by: Matthias Roels <[email protected]>

* docs(datasets): Add new polars datasets to release notes

Signed-off-by: Matthias Roels <[email protected]>

* fix(datasets): load_args not properly passed to LazyPolarsDataset.load

Signed-off-by: Matthias Roels <[email protected]>

* docs(datasets): fix spelling error in release notes

Co-authored-by: Merel Theisen <[email protected]>
Signed-off-by: Matthias Roels <[email protected]>

---------

Signed-off-by: Matthias Roels <[email protected]>
Signed-off-by: Matthias Roels <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Co-authored-by: Matthias Roels <[email protected]>
Co-authored-by: Merel Theisen <[email protected]>
  • Loading branch information
3 people authored Oct 20, 2023
1 parent 7140080 commit b44bf0e
Show file tree
Hide file tree
Showing 9 changed files with 741 additions and 57 deletions.
3 changes: 3 additions & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Upcoming Release
## Major features and improvements
* Moved `PartitionedDataSet` and `IncrementalDataSet` from the core Kedro repo to `kedro-datasets` and renamed to `PartitionedDataset` and `IncrementalDataset`.
* Added `polars.LazyPolarsDataset`, a `GenericDataSet` using [polars](https://www.pola.rs/)'s Lazy API.
* Renamed `polars.GenericDataSet` to `polars.EagerPolarsDataset` to better reflect the difference between the two dataset classes.
* Added a deprecation warning when using `polars.GenericDataSet` or `polars.GenericDataset` that these have been renamed to `polars.EagerPolarsDataset`
* Delayed backend connection for `pandas.SQLTableDataset`, `pandas.SQLQueryDataset`, and `snowflake.SnowparkTableDataset`. In practice, this means that a dataset's connection details aren't used (or validated) until the dataset is accessed. On the plus side, the cost of connection isn't incurred regardless of when or whether the dataset is used.

## Bug fixes and other changes
Expand Down
2 changes: 2 additions & 0 deletions kedro-datasets/docs/source/kedro_datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ kedro_datasets
kedro_datasets.polars.CSVDataset
kedro_datasets.polars.GenericDataSet
kedro_datasets.polars.GenericDataset
kedro_datasets.polars.EagerPolarsDataset
kedro_datasets.polars.LazyPolarsDataset
kedro_datasets.redis.PickleDataSet
kedro_datasets.redis.PickleDataset
kedro_datasets.snowflake.SnowparkTableDataSet
Expand Down
13 changes: 10 additions & 3 deletions kedro-datasets/kedro_datasets/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@
# https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901
CSVDataSet: type[CSVDataset]
CSVDataset: Any
GenericDataSet: type[GenericDataset]
GenericDataset: Any
GenericDataSet: type[EagerPolarsDataset]
GenericDataset: type[EagerPolarsDataset]
EagerPolarsDataset: Any
LazyPolarsDataset: Any

__getattr__, __dir__, __all__ = lazy.attach(
__name__,
submod_attrs={
"csv_dataset": ["CSVDataSet", "CSVDataset"],
"generic_dataset": ["GenericDataSet", "GenericDataset"],
"eager_polars_dataset": [
"EagerPolarsDataset",
"GenericDataSet",
"GenericDataset",
],
"lazy_polars_dataset": ["LazyPolarsDataset"],
},
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""``GenericDataset`` loads/saves data from/to a data file using an underlying
"""``EagerPolarsDataset`` loads/saves data from/to a data file using an underlying
filesystem (e.g.: local, S3, GCS). It uses polars to handle the
type of read/write target.
"""
Expand All @@ -16,8 +16,8 @@
from kedro_datasets._io import AbstractVersionedDataset, DatasetError


class GenericDataset(AbstractVersionedDataset[pl.DataFrame, pl.DataFrame]):
"""``polars.GenericDataset`` loads/saves data from/to a data file using an underlying
class EagerPolarsDataset(AbstractVersionedDataset[pl.DataFrame, pl.DataFrame]):
"""``polars.EagerPolarsDataset`` loads/saves data from/to a data file using an underlying
filesystem (e.g.: local, S3, GCS). It uses polars to handle the dynamically select the
appropriate type of read/write on a best effort basis.
Expand All @@ -27,7 +27,7 @@ class GenericDataset(AbstractVersionedDataset[pl.DataFrame, pl.DataFrame]):
.. code-block:: yaml
cars:
type: polars.GenericDataset
type: polars.EagerPolarsDataset
file_format: parquet
filepath: s3://data/01_raw/company/cars.parquet
load_args:
Expand All @@ -39,12 +39,12 @@ class GenericDataset(AbstractVersionedDataset[pl.DataFrame, pl.DataFrame]):
.. code-block:: pycon
>>> from kedro_datasets.polars import GenericDataset
>>> from kedro_datasets.polars import EagerPolarsDataset
>>> import polars as pl
>>>
>>> data = pl.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>>
>>> dataset = GenericDataset(filepath="test.parquet", file_format="parquet")
>>> dataset = EagerPolarsDataset(filepath="test.parquet", file_format="parquet")
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.frame_equal(reloaded)
Expand All @@ -64,7 +64,7 @@ def __init__( # noqa: PLR0913
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
):
"""Creates a new instance of ``GenericDataset`` pointing to a concrete data file
"""Creates a new instance of ``EagerPolarsDataset`` pointing to a concrete data file
on a specific filesystem. The appropriate polars load/save methods are dynamically
identified by string matching on a best effort basis.
Expand Down Expand Up @@ -200,7 +200,8 @@ def _invalidate_cache(self) -> None:


_DEPRECATED_CLASSES = {
"GenericDataSet": GenericDataset,
"GenericDataSet": EagerPolarsDataset,
"GenericDataset": EagerPolarsDataset,
}


Expand Down
248 changes: 248 additions & 0 deletions kedro-datasets/kedro_datasets/polars/lazy_polars_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
"""``LazyPolarsDataset`` loads/saves data from/to a data file using an underlying
filesystem (e.g.: local, S3, GCS). It uses polars to handle the
type of read/write target.
"""
import logging
from copy import deepcopy
from io import BytesIO
from pathlib import PurePosixPath
from typing import Any, ClassVar, Dict, Optional, Union

import fsspec
import polars as pl
import pyarrow.dataset as ds
from kedro.io.core import (
AbstractVersionedDataSet,
DatasetError,
Version,
get_filepath_str,
get_protocol_and_path,
)

ACCEPTED_FILE_FORMATS = ["csv", "parquet"]

PolarsFrame = Union[pl.LazyFrame, pl.DataFrame]

logger = logging.getLogger(__name__)


class LazyPolarsDataset(AbstractVersionedDataSet[pl.LazyFrame, PolarsFrame]):
"""``LazyPolarsDataset`` loads/saves data from/to a data file using an
underlying filesystem (e.g.: local, S3, GCS). It uses Polars to handle
the type of read/write target. It uses lazy loading with Polars Lazy API, but it can
save both Lazy and Eager Polars DataFrames.
Example usage for the `YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog_yaml_examples.html>`_:
.. code-block:: yaml
>>> cars:
>>> type: polars.LazyPolarsDataset
>>> filepath: data/01_raw/company/cars.csv
>>> load_args:
>>> sep: ","
>>> parse_dates: False
>>> save_args:
>>> has_header: False
null_value: "somenullstring"
>>>
>>> motorbikes:
>>> type: polars.LazyPolarsDataset
>>> filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
>>> credentials: dev_s3
Example using Python API:
::
>>> from kedro_datasets.polars import LazyPolarsDataset
>>> import polars as pl
>>>
>>> data = pl.DataFrame({'col1': [1, 2], 'col2': [4, 5],
>>> 'col3': [5, 6]})
>>>
>>> data_set = LazyPolarsDataset(filepath="test.csv")
>>> data_set.save(data)
>>> reloaded = data_set.load()
>>> assert data.frame_equal(reloaded)
"""

DEFAULT_LOAD_ARGS: ClassVar[Dict[str, Any]] = {}
DEFAULT_SAVE_ARGS: ClassVar[Dict[str, Any]] = {}

def __init__( # noqa: PLR0913
self,
filepath: str,
file_format: str,
load_args: Optional[Dict[str, Any]] = None,
save_args: Optional[Dict[str, Any]] = None,
version: Version = None,
credentials: Optional[Dict[str, Any]] = None,
fs_args: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Creates a new instance of ``LazyPolarsDataset`` pointing to a concrete
data file on a specific filesystem.
Args:
filepath: Filepath in POSIX format to a file prefixed with a protocol like
`s3://`.
If prefix is not provided, `file` protocol (local filesystem)
will be used.
The prefix should be any protocol supported by ``fsspec``.
Key assumption: The first argument of either load/save method points to
a filepath/buffer/io type location. There are some read/write targets
such as 'clipboard' or 'records' that will fail since they do not take a
filepath like argument.
file_format: String which is used to match the appropriate load/save method
on a best effort basis. For example if 'csv' is passed the
`polars.read_csv` and
`polars.DataFrame.write_csv` methods will be identified. An error will
be raised unless
at least one matching `read_{file_format}` or `write_{file_format}`.
load_args: polars options for loading files.
Here you can find all available arguments:
https://pola-rs.github.io/polars/py-polars/html/reference/io.html
All defaults are preserved.
save_args: Polars options for saving files.
Here you can find all available arguments:
https://pola-rs.github.io/polars/py-polars/html/reference/io.html
All defaults are preserved.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as
to pass to the filesystem's `open` method through nested keys
`open_args_load` and `open_args_save`.
Here you can find all available arguments for `open`:
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open
All defaults are preserved, except `mode`, which is set to `r` when loading
and to `w` when saving.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
Raises:
DatasetError: Will be raised if at least less than one appropriate
read or write methods are identified.
"""
self._file_format = file_format.lower()

if self._file_format not in ACCEPTED_FILE_FORMATS:
raise DatasetError(
f"'{self._file_format}' is not an accepted format "
f"({ACCEPTED_FILE_FORMATS}) ensure that your 'file_format' parameter "
"has been defined correctly as per the Polars API "
"https://pola-rs.github.io/polars/py-polars/html/reference/io.html"
)

_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}

protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)

self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)

self.metadata = metadata

super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

if "storage_options" in self._save_args or "storage_options" in self._load_args:
logger.warning(
"Dropping 'storage_options' for %s, "
"please specify them under 'fs_args' or 'credentials'.",
self._filepath,
)
self._save_args.pop("storage_options", None)
self._load_args.pop("storage_options", None)

def _describe(self) -> Dict[str, Any]:
return {
"filepath": self._filepath,
"protocol": self._protocol,
"load_args": self._load_args,
"save_args": self._save_args,
"version": self._version,
}

def _load(self) -> pl.LazyFrame:
load_path = str(self._get_load_path())

if self._protocol == "file":
# With local filesystems, we can use Polar's build-in I/O method:
load_method = getattr(pl, f"scan_{self._file_format}", None)
return load_method(load_path, **self._load_args)

# For object storage, we use pyarrow for I/O:
dataset = ds.dataset(
load_path, filesystem=self._fs, format=self._file_format, **self._load_args
)
return pl.scan_pyarrow_dataset(dataset)

def _save(self, data: Union[pl.DataFrame, pl.LazyFrame]) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)

collected_data = None
if isinstance(data, pl.LazyFrame):
collected_data = data.collect()
else:
collected_data = data

# Note: polars does support writing partitioned parquet file
# it is leveraging Arrow to do so, see e.g.
# https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_parquet.html
save_method = getattr(collected_data, f"write_{self._file_format}", None)
if save_method:
buf = BytesIO()
save_method(file=buf, **self._save_args)
with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(buf.getvalue())
self._invalidate_cache()
# How the LazyPolarsDataset logic is currently written with
# ACCEPTED_FILE_FORMATS and a check in the `__init__` method,
# this else loop is never reached, hence we exclude it from coverage report
# but leave it in for consistency between the Eager and Lazy classes
else: # pragma: no cover
raise DatasetError(
f"Unable to retrieve 'polars.DataFrame.write_{self._file_format}' "
"method, please ensure that your 'file_format' parameter has been "
"defined correctly as per the Polars API"
"https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html"
)

def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DatasetError: # pragma: no cover
return False

return self._fs.exists(load_path)

def _release(self) -> None:
super()._release()
self._invalidate_cache()

def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)
1 change: 1 addition & 0 deletions kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ version = {attr = "kedro_datasets.__version__"}
[tool.coverage.report]
fail_under = 100
show_missing = true
# temporarily ignore kedro_datasets/__init__.py in coverage report
omit = ["tests/*", "kedro_datasets/holoviews/*", "kedro_datasets/snowflake/*", "kedro_datasets/tensorflow/*", "kedro_datasets/__init__.py", "kedro_datasets/databricks/*"]
exclude_lines = ["pragma: no cover", "raise NotImplementedError", "if TYPE_CHECKING:"]

Expand Down
9 changes: 9 additions & 0 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ def _collect_requirements(requires):
[
POLARS, "pyarrow>=4.0", "xlsx2csv>=0.8.0", "deltalake >= 0.6.2"
],
"polars.EagerPolarsDataset":
[
POLARS, "pyarrow>=4.0", "xlsx2csv>=0.8.0", "deltalake >= 0.6.2"
],
"polars.LazyPolarsDataset":
[
# Note: there is no Lazy read Excel option, so we exclude xlsx2csv here.
POLARS, "pyarrow>=4.0", "deltalake >= 0.6.2"
],
}
redis_require = {"redis.PickleDataSet": ["redis~=4.1"]}
snowflake_require = {
Expand Down
Loading

0 comments on commit b44bf0e

Please sign in to comment.