Skip to content

Commit

Permalink
add outputs to Dataset.partition() that are useful for Athena/Glue re…
Browse files Browse the repository at this point in the history
…lease
  • Loading branch information
liquidcarbon committed Nov 5, 2024
1 parent c6da828 commit 2b08b49
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 47 deletions.
43 changes: 22 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,27 +204,28 @@ class PartitionedIsotopeData(af.Dataset):
url = "https://raw.githubusercontent.com/liquidcarbon/chembiodata/main/isotopes.csv"
data_from_sql = PartitionedIsotopeData.build(query=f"FROM '{url}'", rename=True)

paths, partitions = data_from_sql.partition()
paths[:3], partitions[:3]

# (['s3://myisotopes/z=1/data.csv',
# 's3://myisotopes/z=2/data.csv',
# 's3://myisotopes/z=3/data.csv'],
# [Dataset PartitionedIsotopeData of shape (3, 4)
# symbol = ['H', 'H', 'H']
# z = [1, 1, 1]
# mass = [1.007825, 2.014102, 3.016049]
# abundance = [0.999885, 0.000115, 0.0],
# Dataset PartitionedIsotopeData of shape (2, 4)
# symbol = ['He', 'He']
# z = [2, 2]
# mass = [3.016029, 4.002603]
# abundance = [1e-06, 0.999999],
# Dataset PartitionedIsotopeData of shape (2, 4)
# symbol = ['Li', 'Li']
# z = [3, 3]
# mass = [6.015123, 7.016003]
# abundance = [0.0759, 0.9241]])
names, folders, filepaths, datasets = data_from_sql.partition()
# this variety of outputs is helpful when populating cloud warehouses,
# such as Athena/Glue via awswrangler.

names[:3], folders[:3]
# ([['1'], ['2'], ['3']], ['s3://myisotopes/z=1/', 's3://myisotopes/z=2/', 's3://myisotopes/z=3/'])
#

filepaths[:3], datasets[:3]
# (['s3://myisotopes/z=1/data.csv', 's3://myisotopes/z=2/data.csv', 's3://myisotopes/z=3/data.csv'], [Dataset PartitionedIsotopeData of shape (3, 4)
# symbol = ['H', 'H', 'H']
# z = [1, 1, 1]
# mass = [1.007825, 2.014102, 3.016049]
# abundance = [0.999885, 0.000115, 0.0], Dataset PartitionedIsotopeData of shape (2, 4)
# symbol = ['He', 'He']
# z = [2, 2]
# mass = [3.016029, 4.002603]
# abundance = [1e-06, 0.999999], Dataset PartitionedIsotopeData of shape (2, 4)
# symbol = ['Li', 'Li']
# z = [3, 3]
# mass = [6.015123, 7.016003]
# abundance = [0.0759, 0.9241]])
```


Expand Down
37 changes: 24 additions & 13 deletions affinity.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pandas as pd


def try_import(module: str) -> Optional[object]:
def try_import(module: str) -> object | None:
try:
return import_module(module)
except ImportError:
Expand Down Expand Up @@ -46,6 +46,7 @@ class Location:

@property
def path(self) -> str:
"""Generates paths for writing partitioned data."""
_path = (
self.folder.as_posix() if isinstance(self.folder, Path) else self.folder
).rstrip("/")
Expand Down Expand Up @@ -170,9 +171,7 @@ def __repr__(cls) -> str:
return "\n".join(_lines)


class Dataset(metaclass=DatasetMeta):
"""Base class for typed, annotated datasets."""

class BaseDataset(metaclass=DatasetMeta):
@classmethod
def get_scalars(cls):
return {k: None for k, v in cls.__dict__.items() if isinstance(v, Scalar)}
Expand All @@ -185,6 +184,10 @@ def get_vectors(cls):
def get_dict(cls):
return dict(cls())


class Dataset(BaseDataset):
"""Base class for typed, annotated datasets."""

def __init__(self, **fields: Scalar | Vector):
"""Create dataset, dynamically setting field values.
Expand Down Expand Up @@ -385,8 +388,12 @@ def to_parquet(self, path, engine="duckdb", **kwargs):
raise NotImplementedError
return path

def partition(self) -> Tuple[List[str], List[str]]:
"""Path and format constructed from `LOCATION` attribute."""
def partition(self) -> Tuple[List[str], List[str], List[str], List[BaseDataset]]:
"""Path and format constructed from `LOCATION` attribute.
Variety of outputs is helpful when populating cloud warehouses,
such as Athena/Glue via awswrangler.
"""

_file = Path(self.LOCATION.file)
_stem = _file.stem
Expand All @@ -395,13 +402,17 @@ def partition(self) -> Tuple[List[str], List[str]]:
_partitions_iter = zip([""], [self.df])
else:
_partitions_iter = self.df.groupby(self.LOCATION.partition_by)
paths = []
partitions = []
for partition, data in _partitions_iter:
_path = self.LOCATION.path.format(*partition)
paths.append(_path)
partitions.append(self.__class__.build(dataframe=data))
return paths, partitions
names = []
folders = []
filepaths = []
datasets = []
for name, data in _partitions_iter:
_path = self.LOCATION.path.format(*name)
names.append([str(p) for p in name])
folders.append(_path.rsplit("/", maxsplit=1)[0] + "/")
filepaths.append(_path)
datasets.append(self.__class__.build(dataframe=data))
return names, folders, filepaths, datasets


### Typed scalars and vectors
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "affinity"
version = "0.4.0"
version = "0.5.0"
description = "Module for creating well-documented datasets, with types and annotations."
authors = [
{ name = "Alex Kislukhin" }
Expand Down
32 changes: 20 additions & 12 deletions test_affinity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@
duckdb.sql("SET python_scan_all_frames=true")


def test_location_default():
loc = af.Location()
assert loc.path == "./export.csv"


def test_location_partitioned():
loc = af.Location(folder="s3://affinity", partition_by=list("ab"))
assert loc.path == "s3://affinity/a={}/b={}/export.csv"


def test_scalar():
s = af.ScalarObject("field comment")
assert repr(s) == "ScalarObject <class 'object'> # field comment"
Expand Down Expand Up @@ -368,35 +378,33 @@ class IsotopeData(af.Dataset):

def test_partition():
class aDataset(af.Dataset):
"""Delightful data."""

v1 = af.VectorObject(comment="partition")
v2 = af.VectorI16(comment="int like a three")
v3 = af.VectorF32(comment="float like a butterfly")

adata = aDataset(v1=list("aaabbc"), v2=[1, 2, 1, 2, 1, 2], v3=[9, 8, 7, 7, 8, 9])

paths, partitions = adata.partition()
assert paths[0] == "./aDataset_export.csv"
assert partitions[0] == adata
names, folders, filepaths, datasets = adata.partition()
assert filepaths[0] == "./aDataset_export.csv"
assert datasets[0] == adata

adata.LOCATION.folder = "test_save"
adata.LOCATION.partition_by = ["v1", "v2"]
paths, partitions = adata.partition()
assert len(paths) == 5
assert [len(p) for p in partitions] == [2, 1, 1, 1, 1]
names, folders, filepaths, datasets = adata.partition()
assert names == [["a", "1"], ["a", "2"], ["b", "1"], ["b", "2"], ["c", "2"]]
assert folders[-1] == "test_save/v1=c/v2=2/"
assert len(filepaths) == 5
assert [len(p) for p in datasets] == [2, 1, 1, 1, 1]

class bDataset(af.Dataset):
"""Delightful data."""

v1 = af.VectorObject(comment="partition")
v2 = af.VectorI16(comment="int like a three")
v3 = af.VectorF32(comment="float like a butterfly")
LOCATION = af.Location(folder="s3://mybucket/affinity/", partition_by=["v1"])

bdata = bDataset.build(dataframe=adata.df)
paths, partitions = bdata.partition()
assert paths[0] == "s3://mybucket/affinity/v1=a/export.csv"
names, folders, filepaths, datasets = bdata.partition()
assert filepaths[0] == "s3://mybucket/affinity/v1=a/export.csv"


def test_nested_dataset():
Expand Down

0 comments on commit 2b08b49

Please sign in to comment.