diff --git a/pyproject.toml b/pyproject.toml index 6053086..03c4267 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] dynamic = ["version"] -dependencies = ["pyarrow", "typer", "deltalake"] +dependencies = ["pyarrow", "typer"] [project.optional-dependencies] dev = ["ruff", "ipython"] diff --git a/python/falsa/app.py b/python/falsa/app.py index ed56fec..dd5f865 100644 --- a/python/falsa/app.py +++ b/python/falsa/app.py @@ -1,6 +1,9 @@ +import json import random +import shutil from enum import Enum from pathlib import Path +from uuid import uuid4 import pyarrow as pa import typer @@ -19,6 +22,8 @@ JoinSmallGenerator, ) +from .utils import generate_delta_log + help_str = """ [bold][green]H2O db-like-benchmark data generation.[/green][/bold]\n [italic][red]This implementation is unofficial![/red][/italic] @@ -61,12 +66,8 @@ class Format(str, Enum): def pprint(self): print(f"An output format is [green]{self.value}[/green]") - if self is Format.DELTA: - print("\n[red]Warning![/red]Batch writes are not supported for Delta!") - print("The whole dataset will be materialized first!") - else: - print("\nBatch mode is supported.") - print("In case of memory problems you can try to reduce a [green]batch_size[/green].") + print("\nBatch mode is supported.") + print("In case of memory problems you can try to reduce a [green]batch_size[/green].") print() @@ -131,9 +132,10 @@ def _create_filename(ds_type: str, n: int, k: int, nas: int, fmt: Format) -> str def _clear_prev_if_exists(fp: Path, fmt: Format) -> None: if fp.exists(): # All is file, delta is directory - # Delta delete dir by itself. if fmt is not Format.DELTA: fp.unlink() + else: + shutil.rmtree(fp, ignore_errors=True) @app.command(help="Create H2O GroupBy Dataset") @@ -143,12 +145,10 @@ def groupby( k: Annotated[int, typer.Option(help="An amount of keys (groups)")] = 100, nas: Annotated[int, typer.Option(min=0, max=100, help="A percentage of NULLS")] = 0, seed: Annotated[int, typer.Option(min=0, help="A seed of the generation")] = 42, - batch_size: Annotated[ - int, typer.Option(min=0, help="A batch-size (in rows)") - ] = 5_000_000, + batch_size: Annotated[int, typer.Option(min=0, help="A batch-size (in rows)")] = 5_000_000, data_format: Annotated[ Format, - typer.Option(help="An output format for generated data. DELTA requires materialization of the whole data!"), + typer.Option(help="An output format for generated data."), ] = Format.CSV, ): gb = GroupByGenerator(size._to(), k, nas, seed, batch_size) @@ -196,7 +196,14 @@ def groupby( writer.close() if data_format is Format.DELTA: - write_deltalake(output_filepath, data=gb.iter_batches(), schema=schema) + output_filepath.mkdir(parents=True) + delta_file_pq = output_filepath.joinpath("data.parquet") + writer = parquet.ParquetWriter(where=delta_file_pq, schema=schema) + for batch in track(gb.iter_batches(), total=len(gb.batches)): + writer.write_batch(batch) + + writer.close() + generate_delta_log(output_filepath, schema) @app.command(help="Create three H2O join datasets") @@ -206,12 +213,10 @@ def join( k: Annotated[int, typer.Option(help="An amount of keys (groups)")] = 10, nas: Annotated[int, typer.Option(min=0, max=100, help="A percentage of NULLS")] = 0, seed: Annotated[int, typer.Option(min=0, help="A seed of the generation")] = 42, - batch_size: Annotated[ - int, typer.Option(min=0, help="A batch-size (in rows)") - ] = 5_000_000, + batch_size: Annotated[int, typer.Option(min=0, help="A batch-size (in rows)")] = 5_000_000, data_format: Annotated[ Format, - typer.Option(help="An output format for generated data. DELTA requires materialization of the whole data!"), + typer.Option(help="An output format for generated data."), ] = Format.CSV, ): random.seed(seed) @@ -298,7 +303,14 @@ def join( writer.close() if data_format is Format.DELTA: - write_deltalake(output_small, data=join_small.iter_batches(), schema=schema_small) + output_small.mkdir(parents=True) + delta_file_pq = output_small.joinpath("data.parquet") + writer = parquet.ParquetWriter(where=delta_file_pq, schema=schema_small) + for batch in track(join_small.iter_batches(), total=len(join_small.batches)): + writer.write_batch(batch) + + writer.close() + generate_delta_log(output_small, schema_small) print() print("An [bold]MEDIUM[/bold] data [green]schema[/green] is the following:") @@ -320,7 +332,14 @@ def join( writer.close() if data_format is Format.DELTA: - write_deltalake(output_medium, data=join_medium.iter_batches(), schema=schema_medium) + output_medium.mkdir(parents=True) + delta_file_pq = output_medium.joinpath("data.parquet") + writer = parquet.ParquetWriter(where=delta_file_pq, schema=schema_medium) + for batch in track(join_medium.iter_batches(), total=len(join_medium.batches)): + writer.write_batch(batch) + + writer.close() + generate_delta_log(output_medium, schema_medium) print() print("An [bold]BIG[/bold] data [green]schema[/green] is the following:") @@ -342,7 +361,14 @@ def join( writer.close() if data_format is Format.DELTA: - write_deltalake(output_big, data=join_big.iter_batches(), schema=schema_big) + output_big.mkdir(parents=True) + delta_file_pq = output_big.joinpath("data.parquet") + writer = parquet.ParquetWriter(where=delta_file_pq, schema=schema_big) + for batch in track(join_big.iter_batches(), total=len(join_big.batches)): + writer.write_batch(batch) + + writer.close() + generate_delta_log(output_big, schema_big) def entry_point() -> None: diff --git a/python/falsa/utils.py b/python/falsa/utils.py new file mode 100644 index 0000000..09aa061 --- /dev/null +++ b/python/falsa/utils.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import json +import time +from pathlib import Path +from uuid import uuid4 + +from pyarrow import Schema + +PA_2_DELTA_DTYPES = { + "int32": "integer", + "int64": "long", +} + + +def generate_delta_log(output_filepath: Path, schema: Schema) -> None: + """Generate a delta-log from existing parquet files and the given schema.""" + file_len = 20 + delta_dir = output_filepath.joinpath("_delta_log") + delta_dir.mkdir(exist_ok=True) + add_meta_log = "0" * file_len + ".json" + + with open(delta_dir.joinpath(add_meta_log), "w") as meta_log: + jsons = [] + # Generate "metaData" + jsons.append( + json.dumps( + { + "metaData": { + "id": uuid4().__str__(), + "format": { + "provider": "parquet", + "options": {}, + }, + "schemaString": json.dumps( + { + "type": "struct", + "fields": [ + { + "name": field.name, + "type": PA_2_DELTA_DTYPES.get(field.type.__str__(), field.type.__str__()), + "nullable": field.nullable, + "metadata": {}, + } + for field in schema + ], + } + ), + "configuration": {}, + "partitionColumns": [], + } + } + ) + ) + # Generate "add" + for pp in output_filepath.glob("*.parquet"): + jsons.append( + json.dumps( + { + "add": { + "path": pp.relative_to(output_filepath).__str__(), + "partitionValues": {}, + "size": pp.stat().st_size, + "modificationTime": int(time.time() * 1000), + "dataChange": True, + } + } + ) + ) + + # Generate "protocol" + jsons.append(json.dumps({"protocol": {"minReaderVersion": 1, "minWriterVersion": 2}})) + meta_log.write("\n".join(jsons))