Skip to content

Commit

Permalink
partition
Browse files Browse the repository at this point in the history
  • Loading branch information
liquidcarbon committed Nov 5, 2024
1 parent f178d51 commit f9d8aa8
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 49 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Affinity

Affinity makes it easy to create well-annotated datasets from vector data.
Affinity makes it easy to create well-annotated datasets from vector data.
What your data means should always travel together with the data.

Affinity is a pythonic dialect of Data Definition Language (DDL). Affinity does not replace any dataframe library, but can be used with any package you like.
Affinity is a pythonic dialect of Data Definition Language (DDL). Affinity does not replace any dataframe library, but can be used with any package you like.

## Installation

Expand All @@ -25,6 +25,7 @@ class SensorData(af.Dataset):
voltage = af.VectorF64("something we measured (mV)")
is_laser_on = af.VectorBool("are the lights on?")
exp_id = af.ScalarI32("FK to `experiment`")
LOCATION = af.Location(folder="s3://mybucket/affinity", file="raw.parquet", partition_by=["channel"])

# this working concept covers the following:
data = SensorData() # ✅ empty dataset
Expand All @@ -35,8 +36,7 @@ data.metadata # ✅ annotations (data dict with column and
data.origin # ✅ creation metadata, some data provenance
data.sql(...) # ✅ run DuckDB SQL query on the dataset
data.to_parquet(...) # ✅ data.metadata -> Parquet metadata
data.to_csv(...) # ⚒️ annotations in the header
data.to_excel(...) # ⚒️ annotations on separate sheet
data.partition() # ✅ get formatted paths and partitioned datasets
```


Expand All @@ -51,7 +51,7 @@ The `af.Dataset` is Affinity's `BaseModel`, the base class that defines the beha

## Detailed example: Parquet Round-Trip

Affinity makes class declaration as concise as possible.
Affinity makes class declaration as concise as possible.
All you need to create a data class are typed classes and comments explaining what the fields mean.

#### 1. Declare class
Expand All @@ -61,7 +61,7 @@ import affinity as af

class IsotopeData(af.Dataset):
"""NIST Atomic Weights & Isotopic Compositions.[^1]
[^1] https://www.nist.gov/pml/atomic-weights-and-isotopic-compositions-relative-atomic-masses
"""
symbol = af.VectorObject("Element")
Expand All @@ -83,7 +83,7 @@ The class attributes are instantiated Vector objects of zero length. Using the

#### 2. Build class instance from querying a CSV

To build the dataset, we use `IsotopeData.build()` method with `query` argument. We use DuckDB [FROM-first syntax](https://duckdb.org/docs/sql/query_syntax/from.html#from-first-syntax), with `rename=True` keyword argument. The fields in the query result will be assigned names and types provided in the class definition. With `rename=False` (default), the source columns must be named exactly as class attributes. When safe type casting is not possible, an error will be raised; element with z=128 would not fit this dataset.
To build the dataset, we use `IsotopeData.build()` method with `query` argument. We use DuckDB [FROM-first syntax](https://duckdb.org/docs/sql/query_syntax/from.html#from-first-syntax), with `rename=True` keyword argument. The fields in the query result will be assigned names and types provided in the class definition. With `rename=False` (default), the source columns must be named exactly as class attributes. When safe type casting is not possible, an error will be raised; element with z=128 would not fit this dataset.

```python
url = "https://raw.githubusercontent.com/liquidcarbon/chembiodata/main/isotopes.csv"
Expand Down Expand Up @@ -129,7 +129,7 @@ pf.schema_arrow
# abundance: double
# -- schema metadata --
# table_comment: 'NIST Atomic Weights & Isotopic Compositions.[^1]

# [' + 97
# symbol: 'Element'
# z: 'Atomic Number (Z)'
Expand Down Expand Up @@ -198,7 +198,7 @@ CREATE EXTERNAL TABLE [IF NOT EXISTS]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...) INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[STORED AS file_format]
[WITH SERDEPROPERTIES (...)]
[LOCATION 's3://amzn-s3-demo-bucket/[folder]/']
[TBLPROPERTIES ( ['has_encrypted_data'='true | false',] ['classification'='aws_glue_classification',] property_name=property_value [, ...] ) ]
Expand Down Expand Up @@ -242,4 +242,4 @@ duckdb.sql("FROM 'task.parquet'")
# │ 123.456 │ {'attrs': [adorable, agreeable], 'name': Alice} │ 3 │
# │ 123.456 │ {'attrs': [bland, broke], 'name': Brent} │ 5 │
# └────────────┴─────────────────────────────────────────────────┴───────┘
```
```
69 changes: 59 additions & 10 deletions affinity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
Module for creating well-documented datasets, with types and annotations.
"""

from dataclasses import dataclass, field
from importlib import import_module
from pathlib import Path
from time import time
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, List, Optional, Tuple

import numpy as np
import pandas as pd
Expand All @@ -29,6 +31,30 @@ def try_import(module: str) -> Optional[object]:
pa = try_import("pyarrow")
pq = try_import("pyarrow.parquet")

try:
# streaming to/from cloud
pass
except Exception:
pass


@dataclass
class Location:
folder: str | Path = field(default=Path("."))
file: str | Path = field(default="export.csv")
partition_by: List[str] = field(default_factory=list)

@property
def path(self) -> str:
_path = (
self.folder.as_posix() if isinstance(self.folder, Path) else self.folder
).rstrip("/")
for part in self.partition_by:
_path += f"/{part}={{}}"
else:
_path += f"/{self.file}"
return _path


class Descriptor:
"""Base class for scalars and vectors."""
Expand Down Expand Up @@ -128,7 +154,13 @@ def __str__(self):


class DatasetMeta(type):
"""Metaclass for custom repr."""
"""Metaclass for universal attributes and custom repr."""

def __new__(cls, name, bases, dct):
new_class = super().__new__(cls, name, bases, dct)
if "LOCATION" not in dct:
new_class.LOCATION = Location(file=f"{name}_export.csv")
return new_class

def __repr__(cls) -> str:
_lines = [cls.__name__]
Expand All @@ -141,8 +173,6 @@ def __repr__(cls) -> str:
class Dataset(metaclass=DatasetMeta):
"""Base class for typed, annotated datasets."""

save_to = {"partition": tuple(), "prefix": "", "file": ""}

@classmethod
def get_scalars(cls):
return {k: None for k, v in cls.__dict__.items() if isinstance(v, Scalar)}
Expand All @@ -155,7 +185,7 @@ def get_vectors(cls):
def get_dict(cls):
return dict(cls())

def __init__(self, **fields: Union[Scalar, Vector]):
def __init__(self, **fields: Scalar | Vector):
"""Create dataset, dynamically setting field values.
Vectors are initialized first, ensuring all are of equal length.
Expand Down Expand Up @@ -331,9 +361,11 @@ def model_dump(self) -> dict:
return self.dict

def to_parquet(self, path, engine="duckdb", **kwargs):
if engine == "arrow":
if engine == "pandas":
self.df.to_parquet(path)
elif engine == "arrow":
pq.write_table(self.arrow, path)
if engine == "duckdb":
elif engine == "duckdb":
kv_metadata = []
for k, v in self.metadata.items():
if isinstance(v, str) and "'" in v:
Expand All @@ -349,11 +381,28 @@ def to_parquet(self, path, engine="duckdb", **kwargs):
);""",
**kwargs,
)
else:
raise NotImplementedError
return path

def save(self):
"""Path and format constructed from `save_to` attribute."""
raise NotImplementedError
def partition(self) -> Tuple[List[str], List[str]]:
"""Path and format constructed from `LOCATION` attribute."""

_file = Path(self.LOCATION.file)
_stem = _file.stem
_ext = _file.suffix
if len(self.LOCATION.partition_by) == 0:
_partitions_iter = zip([""], [self.df])
else:
_partitions_iter = self.df.groupby(self.LOCATION.partition_by)
paths = []
partitions = []
for partition, data in _partitions_iter:
_path = self.LOCATION.path.format(*partition)
# paths.append((_path, self.__class__.build(dataframe=data)))
paths.append(_path)
partitions.append(self.__class__.build(dataframe=data))
return paths, partitions


### Typed scalars and vectors
Expand Down
54 changes: 27 additions & 27 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "affinity"
version = "0.3.0"
description = "Module for creating well-documented datasets, with types and annotations."
authors = [
{ name = "Alex Kislukhin" }
]
readme = "README.md"
requires-python = ">=3.11"

dependencies = [
"pandas"
]

[tool.hatch.build]
include = [
"affinity.py",
]

[tool.hatch.build.targets.wheel]

[tool.ruff]
line-length = 88
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "affinity"
version = "0.4.0"
description = "Module for creating well-documented datasets, with types and annotations."
authors = [
{ name = "Alex Kislukhin" }
]
readme = "README.md"
requires-python = ">=3.11"

dependencies = [
"pandas",
]

[tool.hatch.build]
include = [
"affinity.py",
]

[tool.hatch.build.targets.wheel]

[tool.ruff]
line-length = 88
33 changes: 31 additions & 2 deletions test_affinity.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,37 @@ class IsotopeData(af.Dataset):
assert data_from_parquet_duckdb == data_from_parquet_arrow


def test_save_default():
pass
def test_partition():
class aDataset(af.Dataset):
"""Delightful data."""

v1 = af.VectorObject(comment="partition")
v2 = af.VectorI16(comment="int like a three")
v3 = af.VectorF32(comment="float like a butterfly")

adata = aDataset(v1=list("aaabbc"), v2=[1, 2, 1, 2, 1, 2], v3=[9, 8, 7, 7, 8, 9])

paths, partitions = adata.partition()
assert paths[0] == "./aDataset_export.csv"
assert partitions[0] == adata

adata.LOCATION.folder = "test_save"
adata.LOCATION.partition_by = ["v1", "v2"]
paths, partitions = adata.partition()
assert len(paths) == 5
assert [len(p) for p in partitions] == [2, 1, 1, 1, 1]

class bDataset(af.Dataset):
"""Delightful data."""

v1 = af.VectorObject(comment="partition")
v2 = af.VectorI16(comment="int like a three")
v3 = af.VectorF32(comment="float like a butterfly")
LOCATION = af.Location(folder="s3://mybucket/affinity/", partition_by=["v1"])

bdata = bDataset.build(dataframe=adata.df)
paths, partitions = bdata.partition()
assert paths[0] == "s3://mybucket/affinity/v1=a/export.csv"


def test_nested_dataset():
Expand Down

0 comments on commit f9d8aa8

Please sign in to comment.