From 2b08b490bb7e5814f29a8cd9a30701c4a78d1643 Mon Sep 17 00:00:00 2001 From: liquidcarbon Date: Tue, 5 Nov 2024 15:26:25 -0700 Subject: [PATCH] add outputs to Dataset.partition() that are useful for Athena/Glue release --- README.md | 43 ++++++++++++++++++++++--------------------- affinity.py | 37 ++++++++++++++++++++++++------------- pyproject.toml | 2 +- test_affinity.py | 32 ++++++++++++++++++++------------ 4 files changed, 67 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index 27af0f6..f398580 100644 --- a/README.md +++ b/README.md @@ -204,27 +204,28 @@ class PartitionedIsotopeData(af.Dataset): url = "https://raw.githubusercontent.com/liquidcarbon/chembiodata/main/isotopes.csv" data_from_sql = PartitionedIsotopeData.build(query=f"FROM '{url}'", rename=True) -paths, partitions = data_from_sql.partition() -paths[:3], partitions[:3] - -# (['s3://myisotopes/z=1/data.csv', -# 's3://myisotopes/z=2/data.csv', -# 's3://myisotopes/z=3/data.csv'], -# [Dataset PartitionedIsotopeData of shape (3, 4) -# symbol = ['H', 'H', 'H'] -# z = [1, 1, 1] -# mass = [1.007825, 2.014102, 3.016049] -# abundance = [0.999885, 0.000115, 0.0], -# Dataset PartitionedIsotopeData of shape (2, 4) -# symbol = ['He', 'He'] -# z = [2, 2] -# mass = [3.016029, 4.002603] -# abundance = [1e-06, 0.999999], -# Dataset PartitionedIsotopeData of shape (2, 4) -# symbol = ['Li', 'Li'] -# z = [3, 3] -# mass = [6.015123, 7.016003] -# abundance = [0.0759, 0.9241]]) +names, folders, filepaths, datasets = data_from_sql.partition() +# this variety of outputs is helpful when populating cloud warehouses, +# such as Athena/Glue via awswrangler. + +names[:3], folders[:3] +# ([['1'], ['2'], ['3']], ['s3://myisotopes/z=1/', 's3://myisotopes/z=2/', 's3://myisotopes/z=3/']) +# + +filepaths[:3], datasets[:3] +# (['s3://myisotopes/z=1/data.csv', 's3://myisotopes/z=2/data.csv', 's3://myisotopes/z=3/data.csv'], [Dataset PartitionedIsotopeData of shape (3, 4) +# symbol = ['H', 'H', 'H'] +# z = [1, 1, 1] +# mass = [1.007825, 2.014102, 3.016049] +# abundance = [0.999885, 0.000115, 0.0], Dataset PartitionedIsotopeData of shape (2, 4) +# symbol = ['He', 'He'] +# z = [2, 2] +# mass = [3.016029, 4.002603] +# abundance = [1e-06, 0.999999], Dataset PartitionedIsotopeData of shape (2, 4) +# symbol = ['Li', 'Li'] +# z = [3, 3] +# mass = [6.015123, 7.016003] +# abundance = [0.0759, 0.9241]]) ``` diff --git a/affinity.py b/affinity.py index dd6c007..775ef9b 100644 --- a/affinity.py +++ b/affinity.py @@ -12,7 +12,7 @@ import pandas as pd -def try_import(module: str) -> Optional[object]: +def try_import(module: str) -> object | None: try: return import_module(module) except ImportError: @@ -46,6 +46,7 @@ class Location: @property def path(self) -> str: + """Generates paths for writing partitioned data.""" _path = ( self.folder.as_posix() if isinstance(self.folder, Path) else self.folder ).rstrip("/") @@ -170,9 +171,7 @@ def __repr__(cls) -> str: return "\n".join(_lines) -class Dataset(metaclass=DatasetMeta): - """Base class for typed, annotated datasets.""" - +class BaseDataset(metaclass=DatasetMeta): @classmethod def get_scalars(cls): return {k: None for k, v in cls.__dict__.items() if isinstance(v, Scalar)} @@ -185,6 +184,10 @@ def get_vectors(cls): def get_dict(cls): return dict(cls()) + +class Dataset(BaseDataset): + """Base class for typed, annotated datasets.""" + def __init__(self, **fields: Scalar | Vector): """Create dataset, dynamically setting field values. @@ -385,8 +388,12 @@ def to_parquet(self, path, engine="duckdb", **kwargs): raise NotImplementedError return path - def partition(self) -> Tuple[List[str], List[str]]: - """Path and format constructed from `LOCATION` attribute.""" + def partition(self) -> Tuple[List[str], List[str], List[str], List[BaseDataset]]: + """Path and format constructed from `LOCATION` attribute. + + Variety of outputs is helpful when populating cloud warehouses, + such as Athena/Glue via awswrangler. + """ _file = Path(self.LOCATION.file) _stem = _file.stem @@ -395,13 +402,17 @@ def partition(self) -> Tuple[List[str], List[str]]: _partitions_iter = zip([""], [self.df]) else: _partitions_iter = self.df.groupby(self.LOCATION.partition_by) - paths = [] - partitions = [] - for partition, data in _partitions_iter: - _path = self.LOCATION.path.format(*partition) - paths.append(_path) - partitions.append(self.__class__.build(dataframe=data)) - return paths, partitions + names = [] + folders = [] + filepaths = [] + datasets = [] + for name, data in _partitions_iter: + _path = self.LOCATION.path.format(*name) + names.append([str(p) for p in name]) + folders.append(_path.rsplit("/", maxsplit=1)[0] + "/") + filepaths.append(_path) + datasets.append(self.__class__.build(dataframe=data)) + return names, folders, filepaths, datasets ### Typed scalars and vectors diff --git a/pyproject.toml b/pyproject.toml index 91335a1..6e8c12a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "affinity" -version = "0.4.0" +version = "0.5.0" description = "Module for creating well-documented datasets, with types and annotations." authors = [ { name = "Alex Kislukhin" } diff --git a/test_affinity.py b/test_affinity.py index 8d9e063..ce69f57 100644 --- a/test_affinity.py +++ b/test_affinity.py @@ -11,6 +11,16 @@ duckdb.sql("SET python_scan_all_frames=true") +def test_location_default(): + loc = af.Location() + assert loc.path == "./export.csv" + + +def test_location_partitioned(): + loc = af.Location(folder="s3://affinity", partition_by=list("ab")) + assert loc.path == "s3://affinity/a={}/b={}/export.csv" + + def test_scalar(): s = af.ScalarObject("field comment") assert repr(s) == "ScalarObject # field comment" @@ -368,35 +378,33 @@ class IsotopeData(af.Dataset): def test_partition(): class aDataset(af.Dataset): - """Delightful data.""" - v1 = af.VectorObject(comment="partition") v2 = af.VectorI16(comment="int like a three") v3 = af.VectorF32(comment="float like a butterfly") adata = aDataset(v1=list("aaabbc"), v2=[1, 2, 1, 2, 1, 2], v3=[9, 8, 7, 7, 8, 9]) - paths, partitions = adata.partition() - assert paths[0] == "./aDataset_export.csv" - assert partitions[0] == adata + names, folders, filepaths, datasets = adata.partition() + assert filepaths[0] == "./aDataset_export.csv" + assert datasets[0] == adata adata.LOCATION.folder = "test_save" adata.LOCATION.partition_by = ["v1", "v2"] - paths, partitions = adata.partition() - assert len(paths) == 5 - assert [len(p) for p in partitions] == [2, 1, 1, 1, 1] + names, folders, filepaths, datasets = adata.partition() + assert names == [["a", "1"], ["a", "2"], ["b", "1"], ["b", "2"], ["c", "2"]] + assert folders[-1] == "test_save/v1=c/v2=2/" + assert len(filepaths) == 5 + assert [len(p) for p in datasets] == [2, 1, 1, 1, 1] class bDataset(af.Dataset): - """Delightful data.""" - v1 = af.VectorObject(comment="partition") v2 = af.VectorI16(comment="int like a three") v3 = af.VectorF32(comment="float like a butterfly") LOCATION = af.Location(folder="s3://mybucket/affinity/", partition_by=["v1"]) bdata = bDataset.build(dataframe=adata.df) - paths, partitions = bdata.partition() - assert paths[0] == "s3://mybucket/affinity/v1=a/export.csv" + names, folders, filepaths, datasets = bdata.partition() + assert filepaths[0] == "s3://mybucket/affinity/v1=a/export.csv" def test_nested_dataset():