From 435b972172e3751656d5dde79cb061898378831e Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 16 Jul 2024 14:21:35 -0400 Subject: [PATCH 01/13] Export Series via pycapsule interface --- py-polars/polars/series/series.py | 8 ++++++++ py-polars/src/interop/arrow/to_py.rs | 18 ++++++++++++++++++ py-polars/src/series/export.rs | 13 ++++++++++++- 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/py-polars/polars/series/series.py b/py-polars/polars/series/series.py index f298b793c9ad..10b9afbcfd23 100644 --- a/py-polars/polars/series/series.py +++ b/py-polars/polars/series/series.py @@ -1401,6 +1401,14 @@ def __array_ufunc__( ) raise NotImplementedError(msg) + def __arrow_c_stream__(self, requested_schema): + """ + Export a Series via the Arrow PyCapsule Interface. + + https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html + """ + return self._s.__arrow_c_stream__(requested_schema) + def _repr_html_(self) -> str: """Format output data in HTML for display in Jupyter Notebooks.""" return self.to_frame()._repr_html_(_from_series=True) diff --git a/py-polars/src/interop/arrow/to_py.rs b/py-polars/src/interop/arrow/to_py.rs index 24769ba5e938..3cd1dc86116f 100644 --- a/py-polars/src/interop/arrow/to_py.rs +++ b/py-polars/src/interop/arrow/to_py.rs @@ -1,9 +1,14 @@ +use std::ffi::CString; + use arrow::ffi; use arrow::record_batch::RecordBatch; +use polars::datatypes::CompatLevel; use polars::prelude::{ArrayRef, ArrowField}; +use polars::series::Series; use polars_core::utils::arrow; use pyo3::ffi::Py_uintptr_t; use pyo3::prelude::*; +use pyo3::types::PyCapsule; /// Arrow array to Python. pub(crate) fn to_py_array( @@ -49,3 +54,16 @@ pub(crate) fn to_py_rb( Ok(record.to_object(py)) } + +/// Export a series to a C stream via a PyCapsule according to the Arrow PyCapsule Interface +/// https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html +pub(crate) fn series_to_stream<'py>( + series: &'py Series, + py: Python<'py>, +) -> PyResult> { + let field = series.field().to_arrow(CompatLevel::oldest()); + let iter = Box::new(series.chunks().clone().into_iter().map(Ok)) as _; + let stream = ffi::export_iterator(iter, field); + let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); + PyCapsule::new_bound(py, stream, Some(stream_capsule_name)) +} diff --git a/py-polars/src/series/export.rs b/py-polars/src/series/export.rs index eb320311e7f8..901050ad74be 100644 --- a/py-polars/src/series/export.rs +++ b/py-polars/src/series/export.rs @@ -1,7 +1,8 @@ use polars_core::prelude::*; use pyo3::prelude::*; -use pyo3::types::PyList; +use pyo3::types::{PyCapsule, PyList}; +use crate::interop::arrow::to_py::series_to_stream; use crate::prelude::*; use crate::{interop, PySeries}; @@ -157,4 +158,14 @@ impl PySeries { ) }) } + + #[allow(unused_variables)] + #[pyo3(signature = (requested_schema=None))] + fn __arrow_c_stream__<'py>( + &'py self, + py: Python<'py>, + requested_schema: Option, + ) -> PyResult> { + series_to_stream(&self.series, py) + } } From 11e45fc27e80f45633e328c692ce83f36516c520 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 16 Jul 2024 15:19:08 -0400 Subject: [PATCH 02/13] Try to add __arrow_c_stream__ on DataFrame --- py-polars/polars/dataframe/frame.py | 8 ++++++++ py-polars/src/dataframe/export.rs | 13 ++++++++++++- py-polars/src/interop/arrow/to_py.rs | 26 +++++++++++++++++++++++++- 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index ae1a9260bf65..22d14334ea3b 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -1264,6 +1264,14 @@ def __deepcopy__(self, memo: None = None) -> DataFrame: def _ipython_key_completions_(self) -> list[str]: return self.columns + def __arrow_c_stream__(self, requested_schema): + """ + Export a DataFrame via the Arrow PyCapsule Interface. + + https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html + """ + return self._df.__arrow_c_stream__(requested_schema) + def _repr_html_(self, *, _from_series: bool = False) -> str: """ Format output data in HTML for display in Jupyter Notebooks. diff --git a/py-polars/src/dataframe/export.rs b/py-polars/src/dataframe/export.rs index 0b0d0a4f9020..592a52061c6a 100644 --- a/py-polars/src/dataframe/export.rs +++ b/py-polars/src/dataframe/export.rs @@ -2,11 +2,12 @@ use polars::export::arrow::record_batch::RecordBatch; use polars_core::export::arrow::datatypes::IntegerType; use polars_core::utils::arrow::compute::cast::CastOptionsImpl; use pyo3::prelude::*; -use pyo3::types::{PyList, PyTuple}; +use pyo3::types::{PyCapsule, PyList, PyTuple}; use super::*; use crate::conversion::{ObjectValue, Wrap}; use crate::interop; +use crate::interop::arrow::to_py::dataframe_to_stream; use crate::prelude::PyCompatLevel; #[pymethods] @@ -130,4 +131,14 @@ impl PyDataFrame { Ok(rbs) }) } + + #[allow(unused_variables)] + #[pyo3(signature = (requested_schema=None))] + fn __arrow_c_stream__<'py>( + &'py self, + py: Python<'py>, + requested_schema: Option, + ) -> PyResult> { + dataframe_to_stream(&self.df, py) + } } diff --git a/py-polars/src/interop/arrow/to_py.rs b/py-polars/src/interop/arrow/to_py.rs index 3cd1dc86116f..cfa7f6c2398f 100644 --- a/py-polars/src/interop/arrow/to_py.rs +++ b/py-polars/src/interop/arrow/to_py.rs @@ -2,7 +2,8 @@ use std::ffi::CString; use arrow::ffi; use arrow::record_batch::RecordBatch; -use polars::datatypes::CompatLevel; +use polars::datatypes::{CompatLevel, DataType, Field}; +use polars::frame::DataFrame; use polars::prelude::{ArrayRef, ArrowField}; use polars::series::Series; use polars_core::utils::arrow; @@ -67,3 +68,26 @@ pub(crate) fn series_to_stream<'py>( let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); PyCapsule::new_bound(py, stream, Some(stream_capsule_name)) } + +pub(crate) fn dataframe_to_stream<'py>( + df: &'py DataFrame, + py: Python<'py>, +) -> PyResult> { + let schema_fields = df.schema().iter_fields().collect::>(); + + let struct_field = + Field::new("", DataType::Struct(schema_fields)).to_arrow(CompatLevel::oldest()); + let struct_data_type = struct_field.data_type().clone(); + + let iter = df + .iter_chunks(CompatLevel::oldest(), false) + .into_iter() + .map(|chunk| { + let arrays = chunk.into_arrays(); + let x = arrow::array::StructArray::new(struct_data_type.clone(), arrays, None); + Ok(Box::new(x) as Box) + }); + let stream = ffi::export_iterator(Box::new(iter), struct_field); + let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); + PyCapsule::new_bound(py, stream, Some(stream_capsule_name)) +} From a084a4552460595597bcbdcd6bcf65514ba10e99 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 16 Jul 2024 15:25:26 -0400 Subject: [PATCH 03/13] Add series test --- crates/polars-arrow/src/ffi/stream.rs | 2 ++ py-polars/tests/unit/series/test_series.py | 24 ++++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/crates/polars-arrow/src/ffi/stream.rs b/crates/polars-arrow/src/ffi/stream.rs index 58a0b0785529..b894bc6748ab 100644 --- a/crates/polars-arrow/src/ffi/stream.rs +++ b/crates/polars-arrow/src/ffi/stream.rs @@ -19,6 +19,8 @@ impl Drop for ArrowArrayStream { } } +unsafe impl Send for ArrowArrayStream {} + impl ArrowArrayStream { /// Creates an empty [`ArrowArrayStream`] used to import from a producer. pub fn empty() -> Self { diff --git a/py-polars/tests/unit/series/test_series.py b/py-polars/tests/unit/series/test_series.py index 0eef7af263b1..45cde074d85a 100644 --- a/py-polars/tests/unit/series/test_series.py +++ b/py-polars/tests/unit/series/test_series.py @@ -628,6 +628,30 @@ def test_arrow() -> None: ) +def test_pycapsule_interface() -> None: + class PyCapsuleSeriesHolder: + """ + Hold the Arrow C Stream pycapsule. + + A class that exposes _only_ the Arrow C Stream interface via Arrow PyCapsules. + This ensures that pyarrow is seeing _only_ the `__arrow_c_stream__` dunder, and + that nothing else (e.g. the dataframe or array interface) is actually being + used. + """ + + capsule: tuple[object, object] + + def __init__(self, capsule: tuple[object, object]): + self.capsule = capsule + + def __arrow_c_stream__(self, requested_schema) -> tuple[object, object]: + return self.capsule + + a = pl.Series("a", [1, 2, 3, None]) + out = pa.array(PyCapsuleSeriesHolder(a.__arrow_c_stream__(None))) + assert out == pa.array([1, 2, 3, None]) + + def test_get() -> None: a = pl.Series("a", [1, 2, 3]) pos_idxs = pl.Series("idxs", [2, 0, 1, 0], dtype=pl.Int8) From d9c8a803c7def030e3eab02e40410bcc88a4059e Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 16 Jul 2024 15:39:29 -0400 Subject: [PATCH 04/13] Fix Series test --- py-polars/tests/unit/series/test_series.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/py-polars/tests/unit/series/test_series.py b/py-polars/tests/unit/series/test_series.py index 45cde074d85a..693f5d7a4547 100644 --- a/py-polars/tests/unit/series/test_series.py +++ b/py-polars/tests/unit/series/test_series.py @@ -648,8 +648,9 @@ def __arrow_c_stream__(self, requested_schema) -> tuple[object, object]: return self.capsule a = pl.Series("a", [1, 2, 3, None]) - out = pa.array(PyCapsuleSeriesHolder(a.__arrow_c_stream__(None))) - assert out == pa.array([1, 2, 3, None]) + out = pa.chunked_array(PyCapsuleSeriesHolder(a.__arrow_c_stream__(None))) + out_arr = out.combine_chunks() + assert out_arr == pa.array([1, 2, 3, None]) def test_get() -> None: From eb9b960e0046b913efea1e895014a96099e4c3ca Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 17 Jul 2024 13:00:36 -0400 Subject: [PATCH 05/13] Port from R --- py-polars/src/dataframe/export.rs | 5 +- py-polars/src/interop/arrow/to_py.rs | 76 ++++++++++++++++++++-------- 2 files changed, 59 insertions(+), 22 deletions(-) diff --git a/py-polars/src/dataframe/export.rs b/py-polars/src/dataframe/export.rs index 592a52061c6a..f595ac92f02c 100644 --- a/py-polars/src/dataframe/export.rs +++ b/py-polars/src/dataframe/export.rs @@ -135,10 +135,11 @@ impl PyDataFrame { #[allow(unused_variables)] #[pyo3(signature = (requested_schema=None))] fn __arrow_c_stream__<'py>( - &'py self, + &'py mut self, py: Python<'py>, requested_schema: Option, ) -> PyResult> { - dataframe_to_stream(&self.df, py) + self.df.align_chunks(); + dataframe_to_stream(self.df.clone(), py) } } diff --git a/py-polars/src/interop/arrow/to_py.rs b/py-polars/src/interop/arrow/to_py.rs index cfa7f6c2398f..5048e399e988 100644 --- a/py-polars/src/interop/arrow/to_py.rs +++ b/py-polars/src/interop/arrow/to_py.rs @@ -2,11 +2,12 @@ use std::ffi::CString; use arrow::ffi; use arrow::record_batch::RecordBatch; -use polars::datatypes::{CompatLevel, DataType, Field}; +use polars::datatypes::CompatLevel; use polars::frame::DataFrame; use polars::prelude::{ArrayRef, ArrowField}; use polars::series::Series; use polars_core::utils::arrow; +use polars_error::PolarsResult; use pyo3::ffi::Py_uintptr_t; use pyo3::prelude::*; use pyo3::types::PyCapsule; @@ -69,25 +70,60 @@ pub(crate) fn series_to_stream<'py>( PyCapsule::new_bound(py, stream, Some(stream_capsule_name)) } -pub(crate) fn dataframe_to_stream<'py>( - df: &'py DataFrame, - py: Python<'py>, -) -> PyResult> { - let schema_fields = df.schema().iter_fields().collect::>(); - - let struct_field = - Field::new("", DataType::Struct(schema_fields)).to_arrow(CompatLevel::oldest()); - let struct_data_type = struct_field.data_type().clone(); - - let iter = df - .iter_chunks(CompatLevel::oldest(), false) - .into_iter() - .map(|chunk| { - let arrays = chunk.into_arrays(); - let x = arrow::array::StructArray::new(struct_data_type.clone(), arrays, None); - Ok(Box::new(x) as Box) - }); - let stream = ffi::export_iterator(Box::new(iter), struct_field); +pub(crate) fn dataframe_to_stream(df: DataFrame, py: Python) -> PyResult> { + let iter = Box::new(DataFrameStreamIterator::new(df)); + let field = iter.field(); + let stream = ffi::export_iterator(iter, field); let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); PyCapsule::new_bound(py, stream, Some(stream_capsule_name)) } + +pub struct DataFrameStreamIterator { + columns: Vec, + data_type: arrow::datatypes::ArrowDataType, + idx: usize, + n_chunks: usize, +} + +impl DataFrameStreamIterator { + fn new(df: polars::frame::DataFrame) -> Self { + let schema = df.schema().to_arrow(CompatLevel::oldest()); + let data_type = arrow::datatypes::ArrowDataType::Struct(schema.fields); + + Self { + columns: df.get_columns().to_vec(), + data_type, + idx: 0, + n_chunks: df.n_chunks(), + } + } + + fn field(&self) -> ArrowField { + ArrowField::new("", self.data_type.clone(), false) + } +} + +impl Iterator for DataFrameStreamIterator { + type Item = PolarsResult>; + + fn next(&mut self) -> Option { + if self.idx >= self.n_chunks { + None + } else { + // create a batch of the columns with the same chunk no. + let batch_cols = self + .columns + .iter() + .map(|s| s.to_arrow(self.idx, CompatLevel::oldest())) + .collect(); + self.idx += 1; + + let array = arrow::array::StructArray::new( + self.data_type.clone(), + batch_cols, + std::option::Option::None, + ); + Some(std::result::Result::Ok(Box::new(array))) + } + } +} From 59c20480190a8fdc6e28cb67f13c4f2a8e0b818b Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 17 Jul 2024 13:13:13 -0400 Subject: [PATCH 06/13] Add df interop test --- py-polars/tests/unit/interop/test_interop.py | 28 ++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/py-polars/tests/unit/interop/test_interop.py b/py-polars/tests/unit/interop/test_interop.py index 5dd7f442c905..41bf89ef0a17 100644 --- a/py-polars/tests/unit/interop/test_interop.py +++ b/py-polars/tests/unit/interop/test_interop.py @@ -749,3 +749,31 @@ def test_compat_level(monkeypatch: pytest.MonkeyPatch) -> None: assert len(df.write_ipc_stream(None).getbuffer()) == 544 assert len(df.write_ipc_stream(None, compat_level=oldest).getbuffer()) == 672 assert len(df.write_ipc_stream(None, compat_level=newest).getbuffer()) == 544 + + +def test_df_pycapsule_interface() -> None: + class PyCapsuleStreamHolder: + """ + Hold the Arrow C Stream pycapsule. + + A class that exposes _only_ the Arrow C Stream interface via Arrow PyCapsules. + This ensures that pyarrow is seeing _only_ the `__arrow_c_stream__` dunder, and + that nothing else (e.g. the dataframe or array interface) is actually being + used. + """ + + capsule: tuple[object, object] + + def __init__(self, capsule: tuple[object, object]): + self.capsule = capsule + + def __arrow_c_stream__(self, requested_schema) -> tuple[object, object]: + return self.capsule + + df = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) + out = pa.table(PyCapsuleStreamHolder(df.__arrow_c_stream__(None))) + assert df.shape == out.shape + assert df.schema.names() == out.schema.names + + df2 = pl.from_arrow(out) + assert df.equals(df2) # type: ignore From 82269319a3dd6cbf4264ae170f0d482d70723599 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 17 Jul 2024 13:39:06 -0400 Subject: [PATCH 07/13] type hints --- py-polars/polars/dataframe/frame.py | 2 +- py-polars/polars/series/series.py | 2 +- py-polars/tests/unit/series/test_series.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 22d14334ea3b..a369f56aaba7 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -1264,7 +1264,7 @@ def __deepcopy__(self, memo: None = None) -> DataFrame: def _ipython_key_completions_(self) -> list[str]: return self.columns - def __arrow_c_stream__(self, requested_schema): + def __arrow_c_stream__(self, requested_schema: object): """ Export a DataFrame via the Arrow PyCapsule Interface. diff --git a/py-polars/polars/series/series.py b/py-polars/polars/series/series.py index 10b9afbcfd23..a47d57bd3f37 100644 --- a/py-polars/polars/series/series.py +++ b/py-polars/polars/series/series.py @@ -1401,7 +1401,7 @@ def __array_ufunc__( ) raise NotImplementedError(msg) - def __arrow_c_stream__(self, requested_schema): + def __arrow_c_stream__(self, requested_schema: object): """ Export a Series via the Arrow PyCapsule Interface. diff --git a/py-polars/tests/unit/series/test_series.py b/py-polars/tests/unit/series/test_series.py index 693f5d7a4547..7a48aa45623f 100644 --- a/py-polars/tests/unit/series/test_series.py +++ b/py-polars/tests/unit/series/test_series.py @@ -644,7 +644,7 @@ class PyCapsuleSeriesHolder: def __init__(self, capsule: tuple[object, object]): self.capsule = capsule - def __arrow_c_stream__(self, requested_schema) -> tuple[object, object]: + def __arrow_c_stream__(self, requested_schema: object) -> tuple[object, object]: return self.capsule a = pl.Series("a", [1, 2, 3, None]) From ab015a18344bf868b8dc335357f1223f5bd4dda4 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 17 Jul 2024 14:17:29 -0400 Subject: [PATCH 08/13] lint --- py-polars/polars/dataframe/frame.py | 2 +- py-polars/polars/series/series.py | 2 +- py-polars/tests/unit/interop/test_interop.py | 9 +++++---- py-polars/tests/unit/series/test_series.py | 6 +++--- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index a369f56aaba7..1246f3c35e69 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -1264,7 +1264,7 @@ def __deepcopy__(self, memo: None = None) -> DataFrame: def _ipython_key_completions_(self) -> list[str]: return self.columns - def __arrow_c_stream__(self, requested_schema: object): + def __arrow_c_stream__(self, requested_schema: object) -> object: """ Export a DataFrame via the Arrow PyCapsule Interface. diff --git a/py-polars/polars/series/series.py b/py-polars/polars/series/series.py index a47d57bd3f37..6a685d46caec 100644 --- a/py-polars/polars/series/series.py +++ b/py-polars/polars/series/series.py @@ -1401,7 +1401,7 @@ def __array_ufunc__( ) raise NotImplementedError(msg) - def __arrow_c_stream__(self, requested_schema: object): + def __arrow_c_stream__(self, requested_schema: object) -> object: """ Export a Series via the Arrow PyCapsule Interface. diff --git a/py-polars/tests/unit/interop/test_interop.py b/py-polars/tests/unit/interop/test_interop.py index 41bf89ef0a17..8ae1b9a3b55e 100644 --- a/py-polars/tests/unit/interop/test_interop.py +++ b/py-polars/tests/unit/interop/test_interop.py @@ -762,12 +762,12 @@ class PyCapsuleStreamHolder: used. """ - capsule: tuple[object, object] + capsule: object - def __init__(self, capsule: tuple[object, object]): + def __init__(self, capsule: object) -> None: self.capsule = capsule - def __arrow_c_stream__(self, requested_schema) -> tuple[object, object]: + def __arrow_c_stream__(self, requested_schema: object) -> object: return self.capsule df = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) @@ -776,4 +776,5 @@ def __arrow_c_stream__(self, requested_schema) -> tuple[object, object]: assert df.schema.names() == out.schema.names df2 = pl.from_arrow(out) - assert df.equals(df2) # type: ignore + assert isinstance(df2, pl.DataFrame) + assert df.equals(df2) diff --git a/py-polars/tests/unit/series/test_series.py b/py-polars/tests/unit/series/test_series.py index 7a48aa45623f..ebd0abc23752 100644 --- a/py-polars/tests/unit/series/test_series.py +++ b/py-polars/tests/unit/series/test_series.py @@ -639,12 +639,12 @@ class PyCapsuleSeriesHolder: used. """ - capsule: tuple[object, object] + capsule: object - def __init__(self, capsule: tuple[object, object]): + def __init__(self, capsule: object): self.capsule = capsule - def __arrow_c_stream__(self, requested_schema: object) -> tuple[object, object]: + def __arrow_c_stream__(self, requested_schema: object) -> object: return self.capsule a = pl.Series("a", [1, 2, 3, None]) From b309a57bc53d11fec2e22f9da94d61c6dabd021e Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 22 Jul 2024 22:11:39 -0400 Subject: [PATCH 09/13] Don't hold bare capsules --- py-polars/tests/unit/interop/test_interop.py | 21 ++---------------- py-polars/tests/unit/series/test_series.py | 21 ++---------------- py-polars/tests/unit/utils/pycapsule_utils.py | 22 +++++++++++++++++++ 3 files changed, 26 insertions(+), 38 deletions(-) create mode 100644 py-polars/tests/unit/utils/pycapsule_utils.py diff --git a/py-polars/tests/unit/interop/test_interop.py b/py-polars/tests/unit/interop/test_interop.py index 8ae1b9a3b55e..daad87df371d 100644 --- a/py-polars/tests/unit/interop/test_interop.py +++ b/py-polars/tests/unit/interop/test_interop.py @@ -13,6 +13,7 @@ from polars.exceptions import ComputeError, UnstableWarning from polars.interchange.protocol import CompatLevel from polars.testing import assert_frame_equal, assert_series_equal +from tests.unit.utils.pycapsule_utils import PyCapsuleStreamHolder def test_arrow_list_roundtrip() -> None: @@ -752,26 +753,8 @@ def test_compat_level(monkeypatch: pytest.MonkeyPatch) -> None: def test_df_pycapsule_interface() -> None: - class PyCapsuleStreamHolder: - """ - Hold the Arrow C Stream pycapsule. - - A class that exposes _only_ the Arrow C Stream interface via Arrow PyCapsules. - This ensures that pyarrow is seeing _only_ the `__arrow_c_stream__` dunder, and - that nothing else (e.g. the dataframe or array interface) is actually being - used. - """ - - capsule: object - - def __init__(self, capsule: object) -> None: - self.capsule = capsule - - def __arrow_c_stream__(self, requested_schema: object) -> object: - return self.capsule - df = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) - out = pa.table(PyCapsuleStreamHolder(df.__arrow_c_stream__(None))) + out = pa.table(PyCapsuleStreamHolder(df)) assert df.shape == out.shape assert df.schema.names() == out.schema.names diff --git a/py-polars/tests/unit/series/test_series.py b/py-polars/tests/unit/series/test_series.py index ebd0abc23752..71e217607168 100644 --- a/py-polars/tests/unit/series/test_series.py +++ b/py-polars/tests/unit/series/test_series.py @@ -29,6 +29,7 @@ ShapeError, ) from polars.testing import assert_frame_equal, assert_series_equal +from tests.unit.utils.pycapsule_utils import PyCapsuleStreamHolder if TYPE_CHECKING: from zoneinfo import ZoneInfo @@ -629,26 +630,8 @@ def test_arrow() -> None: def test_pycapsule_interface() -> None: - class PyCapsuleSeriesHolder: - """ - Hold the Arrow C Stream pycapsule. - - A class that exposes _only_ the Arrow C Stream interface via Arrow PyCapsules. - This ensures that pyarrow is seeing _only_ the `__arrow_c_stream__` dunder, and - that nothing else (e.g. the dataframe or array interface) is actually being - used. - """ - - capsule: object - - def __init__(self, capsule: object): - self.capsule = capsule - - def __arrow_c_stream__(self, requested_schema: object) -> object: - return self.capsule - a = pl.Series("a", [1, 2, 3, None]) - out = pa.chunked_array(PyCapsuleSeriesHolder(a.__arrow_c_stream__(None))) + out = pa.chunked_array(PyCapsuleStreamHolder(a)) out_arr = out.combine_chunks() assert out_arr == pa.array([1, 2, 3, None]) diff --git a/py-polars/tests/unit/utils/pycapsule_utils.py b/py-polars/tests/unit/utils/pycapsule_utils.py new file mode 100644 index 000000000000..3e7285fc0202 --- /dev/null +++ b/py-polars/tests/unit/utils/pycapsule_utils.py @@ -0,0 +1,22 @@ +from typing import Any + + +class PyCapsuleStreamHolder: + """ + Hold the Arrow C Stream pycapsule. + + A class that exposes _only_ the Arrow C Stream interface via Arrow PyCapsules. + This ensures that pyarrow is seeing _only_ the `__arrow_c_stream__` dunder, and + that nothing else (e.g. the dataframe or array interface) is actually being + used. + + This is used by tests across multiple files. + """ + + arrow_obj: Any + + def __init__(self, arrow_obj: object) -> None: + self.arrow_obj = arrow_obj + + def __arrow_c_stream__(self, requested_schema: object = None) -> object: + return self.arrow_obj.__arrow_c_stream__(requested_schema) From 5f23484062f56a4db35b34b9f80ca4cefabe6a22 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jul 2024 13:37:32 -0400 Subject: [PATCH 10/13] Address most comments --- py-polars/src/interop/arrow/to_py.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/py-polars/src/interop/arrow/to_py.rs b/py-polars/src/interop/arrow/to_py.rs index 5048e399e988..3940bae35800 100644 --- a/py-polars/src/interop/arrow/to_py.rs +++ b/py-polars/src/interop/arrow/to_py.rs @@ -1,5 +1,6 @@ use std::ffi::CString; +use arrow::datatypes::ArrowDataType; use arrow::ffi; use arrow::record_batch::RecordBatch; use polars::datatypes::CompatLevel; @@ -79,16 +80,16 @@ pub(crate) fn dataframe_to_stream(df: DataFrame, py: Python) -> PyResult, - data_type: arrow::datatypes::ArrowDataType, + columns: Vec, + data_type: ArrowDataType, idx: usize, n_chunks: usize, } impl DataFrameStreamIterator { - fn new(df: polars::frame::DataFrame) -> Self { - let schema = df.schema().to_arrow(CompatLevel::oldest()); - let data_type = arrow::datatypes::ArrowDataType::Struct(schema.fields); + fn new(df: DataFrame) -> Self { + let schema = df.schema().to_arrow(CompatLevel::newest()); + let data_type = ArrowDataType::Struct(schema.fields); Self { columns: df.get_columns().to_vec(), @@ -104,7 +105,7 @@ impl DataFrameStreamIterator { } impl Iterator for DataFrameStreamIterator { - type Item = PolarsResult>; + type Item = PolarsResult; fn next(&mut self) -> Option { if self.idx >= self.n_chunks { @@ -114,16 +115,12 @@ impl Iterator for DataFrameStreamIterator { let batch_cols = self .columns .iter() - .map(|s| s.to_arrow(self.idx, CompatLevel::oldest())) + .map(|s| s.to_arrow(self.idx, CompatLevel::newest())) .collect(); self.idx += 1; - let array = arrow::array::StructArray::new( - self.data_type.clone(), - batch_cols, - std::option::Option::None, - ); - Some(std::result::Result::Ok(Box::new(array))) + let array = arrow::array::StructArray::new(self.data_type.clone(), batch_cols, None); + Some(Ok(Box::new(array))) } } } From cc04d92653fb22fd02414f8ad85ad481458d6590 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jul 2024 13:38:29 -0400 Subject: [PATCH 11/13] compat level newest --- py-polars/src/interop/arrow/to_py.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/src/interop/arrow/to_py.rs b/py-polars/src/interop/arrow/to_py.rs index 3940bae35800..17a89ff1eccc 100644 --- a/py-polars/src/interop/arrow/to_py.rs +++ b/py-polars/src/interop/arrow/to_py.rs @@ -64,7 +64,7 @@ pub(crate) fn series_to_stream<'py>( series: &'py Series, py: Python<'py>, ) -> PyResult> { - let field = series.field().to_arrow(CompatLevel::oldest()); + let field = series.field().to_arrow(CompatLevel::newest()); let iter = Box::new(series.chunks().clone().into_iter().map(Ok)) as _; let stream = ffi::export_iterator(iter, field); let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); From c2e2144d8d1b48462a1bb9ea64cb759249cf620c Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jul 2024 13:42:27 -0400 Subject: [PATCH 12/13] Remove clone --- py-polars/src/dataframe/export.rs | 2 +- py-polars/src/interop/arrow/to_py.rs | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/py-polars/src/dataframe/export.rs b/py-polars/src/dataframe/export.rs index f595ac92f02c..cfd6f1406ef2 100644 --- a/py-polars/src/dataframe/export.rs +++ b/py-polars/src/dataframe/export.rs @@ -140,6 +140,6 @@ impl PyDataFrame { requested_schema: Option, ) -> PyResult> { self.df.align_chunks(); - dataframe_to_stream(self.df.clone(), py) + dataframe_to_stream(&self.df, py) } } diff --git a/py-polars/src/interop/arrow/to_py.rs b/py-polars/src/interop/arrow/to_py.rs index 17a89ff1eccc..2581a52f34ce 100644 --- a/py-polars/src/interop/arrow/to_py.rs +++ b/py-polars/src/interop/arrow/to_py.rs @@ -71,7 +71,10 @@ pub(crate) fn series_to_stream<'py>( PyCapsule::new_bound(py, stream, Some(stream_capsule_name)) } -pub(crate) fn dataframe_to_stream(df: DataFrame, py: Python) -> PyResult> { +pub(crate) fn dataframe_to_stream<'py>( + df: &'py DataFrame, + py: Python<'py>, +) -> PyResult> { let iter = Box::new(DataFrameStreamIterator::new(df)); let field = iter.field(); let stream = ffi::export_iterator(iter, field); @@ -87,7 +90,7 @@ pub struct DataFrameStreamIterator { } impl DataFrameStreamIterator { - fn new(df: DataFrame) -> Self { + fn new(df: &DataFrame) -> Self { let schema = df.schema().to_arrow(CompatLevel::newest()); let data_type = ArrowDataType::Struct(schema.fields); From d40f6962d43ffbd3ef99fdc8f1c0e54aca83b889 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jul 2024 13:52:00 -0400 Subject: [PATCH 13/13] longer strings in test --- py-polars/tests/unit/interop/test_interop.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/py-polars/tests/unit/interop/test_interop.py b/py-polars/tests/unit/interop/test_interop.py index daad87df371d..1730a5c6f8bb 100644 --- a/py-polars/tests/unit/interop/test_interop.py +++ b/py-polars/tests/unit/interop/test_interop.py @@ -753,7 +753,13 @@ def test_compat_level(monkeypatch: pytest.MonkeyPatch) -> None: def test_df_pycapsule_interface() -> None: - df = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) + df = pl.DataFrame( + { + "a": [1, 2, 3], + "b": ["a", "b", "c"], + "c": ["fooooooooooooooooooooo", "bar", "looooooooooooooooong string"], + } + ) out = pa.table(PyCapsuleStreamHolder(df)) assert df.shape == out.shape assert df.schema.names() == out.schema.names