aws-parquet
is a toolkit than enables working with parquet datasets on AWS. It handles AWS S3 reads/writes, AWS Glue catalog updates and AWS Athena queries by providing a simple and intuitive interface.
The goal is to provide a simple and intuitive interface to create and manage parquet datasets on AWS.
aws-parquet
makes use of the following tools:
- awswrangler as an AWS SDK for pandas
- pandera for pandas-based data validation
- typeguard and pydantic for runtime type checking
aws-parquet
provides a ParquetDataset
class that enables the following operations:
- create a parquet dataset that will get registered in AWS Glue
- append new data to the dataset and update the AWS Glue catalog
- read a partition of the dataset and perform proper schema validation and type casting
- overwrite data in the dataset after performing proper schema validation and type casting
- delete a partition of the dataset and update the AWS Glue catalog
- query the dataset using AWS Athena
Using pip:
pip install aws_parquet
Create a parquet dataset that will get registered in AWS Glue
import os
from aws_parquet import ParquetDataset
import pandas as pd
import pandera as pa
from pandera.typing import Series
# define your pandera schema model
class MyDatasetSchemaModel(pa.SchemaModel):
col1: Series[int] = pa.Field(nullable=False, ge=0, lt=10)
col2: Series[pa.DateTime]
col3: Series[float]
# configuration
database = "default"
bucket_name = os.environ["AWS_S3_BUCKET"]
table_name = "foo_bar"
path = f"s3://{bucket_name}/{table_name}/"
partition_cols = ["col1", "col2"]
schema = MyDatasetSchemaModel.to_schema()
# create the dataset
dataset = ParquetDataset(
database=database,
table=table_name,
partition_cols=partition_cols,
path=path,
pandera_schema=schema,
)
dataset.create()
Append new data to the dataset
df = pd.DataFrame({
"col1": [1, 2, 3],
"col2": ["2021-01-01", "2021-01-02", "2021-01-03"],
"col3": [1.0, 2.0, 3.0]
})
dataset.update(df)
Read a partition of the dataset
df = dataset.read({"col2": "2021-01-01"})
Overwrite data in the dataset
df_overwrite = pd.DataFrame({
"col1": [1, 2, 3],
"col2": ["2021-01-01", "2021-01-02", "2021-01-03"],
"col3": [4.0, 5.0, 6.0]
})
dataset.update(df_overwrite, overwrite=True)
Query the dataset using AWS Athena
df = dataset.query("SELECT col1 FROM foo_bar")
Delete a partition of the dataset
dataset.delete({"col1": 1, "col2": "2021-01-01"})
Delete the dataset in its entirety
dataset.delete()