Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle partitions with empty table in read_parquet with dataset=True #2983

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions awswrangler/s3/_read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,14 @@ def _read_parquet(
itertools.repeat(schema),
itertools.repeat(decryption_properties),
)
# When the first table is empty in a dataset, the inferred schema may not
# be compatible with the other tables, which will raise an exception when
# concatening them down the line. As a workaround, we filter out empty
# tables, unless every table is empty. In that latter case, the schemas
# will be compatible so we do nothing in that case.
should_filter_out = any(len(table) > 0 for table in tables)
if should_filter_out:
tables = [table for table in tables if len(table) > 0]
return _utils.table_refs_to_df(tables, kwargs=arrow_kwargs)


Expand Down
41 changes: 41 additions & 0 deletions tests/unit/test_moto.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,47 @@ def test_s3_delete_object_success(moto_s3_client: "S3Client") -> None:
wr.s3.read_parquet(path=path, dataset=True)


@pytest.mark.parametrize("chunked", [True, False])
def test_s3_parquet_empty_table(moto_s3_client: "S3Client", chunked) -> None:
path = "s3://bucket/file.parquet"

r_df = pd.DataFrame({"id": []}, dtype=pd.Int64Dtype())
wr.s3.to_parquet(df=r_df, path=path)

df = wr.s3.read_parquet(path, chunked=chunked)
if chunked:
df = pd.concat(list(df))

pd.testing.assert_frame_equal(r_df, df, check_dtype=True)


def test_s3_dataset_empty_table(moto_s3_client: "S3Client") -> None:
"""Test that a dataset split into multiple parquet files whose first
partition is an empty table still loads properly.
"""
partition_col, partition_val = "col0", "1"
dataset = f"{partition_col}={partition_val}"
s3_key = f"s3://bucket/{dataset}"

dtypes = {"id": "string[python]"}
df1 = pd.DataFrame({"id": []}).astype(dtypes)
df2 = pd.DataFrame({"id": ["1"] * 2}).astype(dtypes)
df3 = pd.DataFrame({"id": ["1"] * 3}).astype(dtypes)

dataframes = [df1, df2, df3]
r_df = pd.concat(dataframes, ignore_index=True)
r_df = r_df.assign(col0=pd.Categorical([partition_val] * len(r_df)))

for i, df in enumerate(dataframes):
wr.s3.to_parquet(
df=df,
path=f"{s3_key}/part{i}.parquet",
)

result_df = wr.s3.read_parquet(path=s3_key, dataset=True)
pd.testing.assert_frame_equal(result_df, r_df, check_dtype=True)


def test_s3_raise_delete_object_exception_success(moto_s3_client: "S3Client") -> None:
path = "s3://bucket/test.parquet"
wr.s3.to_parquet(df=get_df_list(), path=path, index=False, dataset=True, partition_cols=["par0", "par1"])
Expand Down
Loading