diff --git a/README.md b/README.md index 37e8a76..4e3c65e 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ # Affinity -Affinity makes it easy to create well-annotated datasets from vector data. +Affinity makes it easy to create well-annotated datasets from vector data. What your data means should always travel together with the data. -Affinity is a pythonic dialect of Data Definition Language (DDL). Affinity does not replace any dataframe library, but can be used with any package you like. +Affinity is a pythonic dialect of Data Definition Language (DDL). Affinity does not replace any dataframe library, but can be used with any package you like. ## Installation @@ -25,6 +25,7 @@ class SensorData(af.Dataset): voltage = af.VectorF64("something we measured (mV)") is_laser_on = af.VectorBool("are the lights on?") exp_id = af.ScalarI32("FK to `experiment`") + LOCATION = af.Location(folder="s3://mybucket/affinity", file="raw.parquet", partition_by=["channel"]) # this working concept covers the following: data = SensorData() # ✅ empty dataset @@ -35,8 +36,7 @@ data.metadata # ✅ annotations (data dict with column and data.origin # ✅ creation metadata, some data provenance data.sql(...) # ✅ run DuckDB SQL query on the dataset data.to_parquet(...) # ✅ data.metadata -> Parquet metadata -data.to_csv(...) # ⚒️ annotations in the header -data.to_excel(...) # ⚒️ annotations on separate sheet +data.partition() # ✅ get formatted paths and partitioned datasets ``` @@ -51,7 +51,7 @@ The `af.Dataset` is Affinity's `BaseModel`, the base class that defines the beha ## Detailed example: Parquet Round-Trip -Affinity makes class declaration as concise as possible. +Affinity makes class declaration as concise as possible. All you need to create a data class are typed classes and comments explaining what the fields mean. #### 1. Declare class @@ -61,7 +61,7 @@ import affinity as af class IsotopeData(af.Dataset): """NIST Atomic Weights & Isotopic Compositions.[^1] - + [^1] https://www.nist.gov/pml/atomic-weights-and-isotopic-compositions-relative-atomic-masses """ symbol = af.VectorObject("Element") @@ -83,7 +83,7 @@ The class attributes are instantiated Vector objects of zero length. Using the #### 2. Build class instance from querying a CSV -To build the dataset, we use `IsotopeData.build()` method with `query` argument. We use DuckDB [FROM-first syntax](https://duckdb.org/docs/sql/query_syntax/from.html#from-first-syntax), with `rename=True` keyword argument. The fields in the query result will be assigned names and types provided in the class definition. With `rename=False` (default), the source columns must be named exactly as class attributes. When safe type casting is not possible, an error will be raised; element with z=128 would not fit this dataset. +To build the dataset, we use `IsotopeData.build()` method with `query` argument. We use DuckDB [FROM-first syntax](https://duckdb.org/docs/sql/query_syntax/from.html#from-first-syntax), with `rename=True` keyword argument. The fields in the query result will be assigned names and types provided in the class definition. With `rename=False` (default), the source columns must be named exactly as class attributes. When safe type casting is not possible, an error will be raised; element with z=128 would not fit this dataset. ```python url = "https://raw.githubusercontent.com/liquidcarbon/chembiodata/main/isotopes.csv" @@ -129,7 +129,7 @@ pf.schema_arrow # abundance: double # -- schema metadata -- # table_comment: 'NIST Atomic Weights & Isotopic Compositions.[^1] - + # [' + 97 # symbol: 'Element' # z: 'Atomic Number (Z)' @@ -198,7 +198,7 @@ CREATE EXTERNAL TABLE [IF NOT EXISTS] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [CLUSTERED BY (col_name, col_name, ...) INTO num_buckets BUCKETS] [ROW FORMAT row_format] - [STORED AS file_format] + [STORED AS file_format] [WITH SERDEPROPERTIES (...)] [LOCATION 's3://amzn-s3-demo-bucket/[folder]/'] [TBLPROPERTIES ( ['has_encrypted_data'='true | false',] ['classification'='aws_glue_classification',] property_name=property_value [, ...] ) ] @@ -242,4 +242,4 @@ duckdb.sql("FROM 'task.parquet'") # │ 123.456 │ {'attrs': [adorable, agreeable], 'name': Alice} │ 3 │ # │ 123.456 │ {'attrs': [bland, broke], 'name': Brent} │ 5 │ # └────────────┴─────────────────────────────────────────────────┴───────┘ -``` \ No newline at end of file +``` diff --git a/affinity.py b/affinity.py index 2621536..4930582 100644 --- a/affinity.py +++ b/affinity.py @@ -2,9 +2,11 @@ Module for creating well-documented datasets, with types and annotations. """ +from dataclasses import dataclass, field from importlib import import_module +from pathlib import Path from time import time -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, List, Optional, Tuple import numpy as np import pandas as pd @@ -29,6 +31,30 @@ def try_import(module: str) -> Optional[object]: pa = try_import("pyarrow") pq = try_import("pyarrow.parquet") +try: + # streaming to/from cloud + pass +except Exception: + pass + + +@dataclass +class Location: + folder: str | Path = field(default=Path(".")) + file: str | Path = field(default="export.csv") + partition_by: List[str] = field(default_factory=list) + + @property + def path(self) -> str: + _path = ( + self.folder.as_posix() if isinstance(self.folder, Path) else self.folder + ).rstrip("/") + for part in self.partition_by: + _path += f"/{part}={{}}" + else: + _path += f"/{self.file}" + return _path + class Descriptor: """Base class for scalars and vectors.""" @@ -128,7 +154,13 @@ def __str__(self): class DatasetMeta(type): - """Metaclass for custom repr.""" + """Metaclass for universal attributes and custom repr.""" + + def __new__(cls, name, bases, dct): + new_class = super().__new__(cls, name, bases, dct) + if "LOCATION" not in dct: + new_class.LOCATION = Location(file=f"{name}_export.csv") + return new_class def __repr__(cls) -> str: _lines = [cls.__name__] @@ -141,8 +173,6 @@ def __repr__(cls) -> str: class Dataset(metaclass=DatasetMeta): """Base class for typed, annotated datasets.""" - save_to = {"partition": tuple(), "prefix": "", "file": ""} - @classmethod def get_scalars(cls): return {k: None for k, v in cls.__dict__.items() if isinstance(v, Scalar)} @@ -155,7 +185,7 @@ def get_vectors(cls): def get_dict(cls): return dict(cls()) - def __init__(self, **fields: Union[Scalar, Vector]): + def __init__(self, **fields: Scalar | Vector): """Create dataset, dynamically setting field values. Vectors are initialized first, ensuring all are of equal length. @@ -331,9 +361,11 @@ def model_dump(self) -> dict: return self.dict def to_parquet(self, path, engine="duckdb", **kwargs): - if engine == "arrow": + if engine == "pandas": + self.df.to_parquet(path) + elif engine == "arrow": pq.write_table(self.arrow, path) - if engine == "duckdb": + elif engine == "duckdb": kv_metadata = [] for k, v in self.metadata.items(): if isinstance(v, str) and "'" in v: @@ -349,11 +381,28 @@ def to_parquet(self, path, engine="duckdb", **kwargs): );""", **kwargs, ) + else: + raise NotImplementedError return path - def save(self): - """Path and format constructed from `save_to` attribute.""" - raise NotImplementedError + def partition(self) -> Tuple[List[str], List[str]]: + """Path and format constructed from `LOCATION` attribute.""" + + _file = Path(self.LOCATION.file) + _stem = _file.stem + _ext = _file.suffix + if len(self.LOCATION.partition_by) == 0: + _partitions_iter = zip([""], [self.df]) + else: + _partitions_iter = self.df.groupby(self.LOCATION.partition_by) + paths = [] + partitions = [] + for partition, data in _partitions_iter: + _path = self.LOCATION.path.format(*partition) + # paths.append((_path, self.__class__.build(dataframe=data))) + paths.append(_path) + partitions.append(self.__class__.build(dataframe=data)) + return paths, partitions ### Typed scalars and vectors diff --git a/pyproject.toml b/pyproject.toml index 770c9ae..08580e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,27 +1,27 @@ -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[project] -name = "affinity" -version = "0.3.0" -description = "Module for creating well-documented datasets, with types and annotations." -authors = [ - { name = "Alex Kislukhin" } -] -readme = "README.md" -requires-python = ">=3.11" - -dependencies = [ - "pandas" -] - -[tool.hatch.build] -include = [ - "affinity.py", -] - -[tool.hatch.build.targets.wheel] - -[tool.ruff] -line-length = 88 +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "affinity" +version = "0.4.0" +description = "Module for creating well-documented datasets, with types and annotations." +authors = [ + { name = "Alex Kislukhin" } +] +readme = "README.md" +requires-python = ">=3.11" + +dependencies = [ + "pandas", +] + +[tool.hatch.build] +include = [ + "affinity.py", +] + +[tool.hatch.build.targets.wheel] + +[tool.ruff] +line-length = 88 diff --git a/test_affinity.py b/test_affinity.py index 195a40e..8aa1948 100644 --- a/test_affinity.py +++ b/test_affinity.py @@ -361,8 +361,37 @@ class IsotopeData(af.Dataset): assert data_from_parquet_duckdb == data_from_parquet_arrow -def test_save_default(): - pass +def test_partition(): + class aDataset(af.Dataset): + """Delightful data.""" + + v1 = af.VectorObject(comment="partition") + v2 = af.VectorI16(comment="int like a three") + v3 = af.VectorF32(comment="float like a butterfly") + + adata = aDataset(v1=list("aaabbc"), v2=[1, 2, 1, 2, 1, 2], v3=[9, 8, 7, 7, 8, 9]) + + paths, partitions = adata.partition() + assert paths[0] == "./aDataset_export.csv" + assert partitions[0] == adata + + adata.LOCATION.folder = "test_save" + adata.LOCATION.partition_by = ["v1", "v2"] + paths, partitions = adata.partition() + assert len(paths) == 5 + assert [len(p) for p in partitions] == [2, 1, 1, 1, 1] + + class bDataset(af.Dataset): + """Delightful data.""" + + v1 = af.VectorObject(comment="partition") + v2 = af.VectorI16(comment="int like a three") + v3 = af.VectorF32(comment="float like a butterfly") + LOCATION = af.Location(folder="s3://mybucket/affinity/", partition_by=["v1"]) + + bdata = bDataset.build(dataframe=adata.df) + paths, partitions = bdata.partition() + assert paths[0] == "s3://mybucket/affinity/v1=a/export.csv" def test_nested_dataset():