Skip to content

Commit

Permalink
fix(python): make read_parquet() respect rechunk flag when using pyar…
Browse files Browse the repository at this point in the history
…row (#16418)

Co-authored-by: Itamar Turner-Trauring <[email protected]>
  • Loading branch information
itamarst and pythonspeed authored May 23, 2024
1 parent 1e44ae4 commit d417334
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
4 changes: 3 additions & 1 deletion py-polars/polars/io/parquet/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def read_parquet(
storage_options=storage_options,
pyarrow_options=pyarrow_options,
memory_map=memory_map,
rechunk=rechunk,
)

# Read file and bytes inputs using `read_parquet`
Expand Down Expand Up @@ -209,6 +210,7 @@ def _read_parquet_with_pyarrow(
storage_options: dict[str, Any] | None = None,
pyarrow_options: dict[str, Any] | None = None,
memory_map: bool = True,
rechunk: bool = True,
) -> DataFrame:
pyarrow_parquet = import_optional(
"pyarrow.parquet",
Expand All @@ -228,7 +230,7 @@ def _read_parquet_with_pyarrow(
columns=columns,
**pyarrow_options,
)
return from_arrow(pa_table) # type: ignore[return-value]
return from_arrow(pa_table, rechunk=rechunk) # type: ignore[return-value]


def _read_parquet_binary(
Expand Down
4 changes: 2 additions & 2 deletions py-polars/src/dataframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ impl PyDataFrame {
Ok(df.into())
}

pub fn rechunk(&self) -> Self {
pub fn rechunk(&self, py: Python) -> Self {
let mut df = self.df.clone();
df.as_single_chunk_par();
py.allow_threads(|| df.as_single_chunk_par());
df.into()
}

Expand Down
17 changes: 17 additions & 0 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ def test_to_from_buffer(
assert_frame_equal(df, read_df, categorical_as_str=True)


@pytest.mark.parametrize("use_pyarrow", [True, False])
@pytest.mark.parametrize("rechunk_and_expected_chunks", [(True, 1), (False, 3)])
def test_read_parquet_respects_rechunk_16416(
use_pyarrow: bool, rechunk_and_expected_chunks: tuple[bool, int]
) -> None:
# Create a dataframe with 3 chunks:
df = pl.DataFrame({"a": [1]})
df = pl.concat([df, df, df])
buf = io.BytesIO()
df.write_parquet(buf)
buf.seek(0)

rechunk, expected_chunks = rechunk_and_expected_chunks
result = pl.read_parquet(buf, use_pyarrow=use_pyarrow, rechunk=rechunk)
assert result.n_chunks() == expected_chunks


def test_to_from_buffer_lzo(df: pl.DataFrame) -> None:
buf = io.BytesIO()
# Writing lzo compressed parquet files is not supported for now.
Expand Down

0 comments on commit d417334

Please sign in to comment.