Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Implement Arrow PyCapsule Interface for Series/DataFrame export #17676

Merged
merged 14 commits into from
Jul 25, 2024
2 changes: 2 additions & 0 deletions crates/polars-arrow/src/ffi/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ impl Drop for ArrowArrayStream {
}
}

unsafe impl Send for ArrowArrayStream {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


impl ArrowArrayStream {
/// Creates an empty [`ArrowArrayStream`] used to import from a producer.
pub fn empty() -> Self {
Expand Down
8 changes: 8 additions & 0 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,14 @@ def __deepcopy__(self, memo: None = None) -> DataFrame:
def _ipython_key_completions_(self) -> list[str]:
return self.columns

def __arrow_c_stream__(self, requested_schema):
"""
Export a DataFrame via the Arrow PyCapsule Interface.

https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html
"""
return self._df.__arrow_c_stream__(requested_schema)

def _repr_html_(self, *, _from_series: bool = False) -> str:
"""
Format output data in HTML for display in Jupyter Notebooks.
Expand Down
8 changes: 8 additions & 0 deletions py-polars/polars/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,14 @@ def __array_ufunc__(
)
raise NotImplementedError(msg)

def __arrow_c_stream__(self, requested_schema):
"""
Export a Series via the Arrow PyCapsule Interface.

https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html
"""
return self._s.__arrow_c_stream__(requested_schema)

def _repr_html_(self) -> str:
"""Format output data in HTML for display in Jupyter Notebooks."""
return self.to_frame()._repr_html_(_from_series=True)
Expand Down
13 changes: 12 additions & 1 deletion py-polars/src/dataframe/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use polars::export::arrow::record_batch::RecordBatch;
use polars_core::export::arrow::datatypes::IntegerType;
use polars_core::utils::arrow::compute::cast::CastOptionsImpl;
use pyo3::prelude::*;
use pyo3::types::{PyList, PyTuple};
use pyo3::types::{PyCapsule, PyList, PyTuple};

use super::*;
use crate::conversion::{ObjectValue, Wrap};
use crate::interop;
use crate::interop::arrow::to_py::dataframe_to_stream;
use crate::prelude::PyCompatLevel;

#[pymethods]
Expand Down Expand Up @@ -130,4 +131,14 @@ impl PyDataFrame {
Ok(rbs)
})
}

#[allow(unused_variables)]
#[pyo3(signature = (requested_schema=None))]
fn __arrow_c_stream__<'py>(
&'py self,
py: Python<'py>,
requested_schema: Option<PyObject>,
) -> PyResult<Bound<'py, PyCapsule>> {
dataframe_to_stream(&self.df, py)
}
}
42 changes: 42 additions & 0 deletions py-polars/src/interop/arrow/to_py.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use std::ffi::CString;
kylebarron marked this conversation as resolved.
Show resolved Hide resolved

use arrow::ffi;
use arrow::record_batch::RecordBatch;
use polars::datatypes::{CompatLevel, DataType, Field};
use polars::frame::DataFrame;
use polars::prelude::{ArrayRef, ArrowField};
use polars::series::Series;
use polars_core::utils::arrow;
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;

/// Arrow array to Python.
pub(crate) fn to_py_array(
Expand Down Expand Up @@ -49,3 +55,39 @@ pub(crate) fn to_py_rb(

Ok(record.to_object(py))
}

/// Export a series to a C stream via a PyCapsule according to the Arrow PyCapsule Interface
/// https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html
pub(crate) fn series_to_stream<'py>(
series: &'py Series,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
let field = series.field().to_arrow(CompatLevel::oldest());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think this should be newest, otherwise we trigger a copy whereas the consumer should decide if they want to cast to a datatype they can support.

Copy link
Contributor

@ruihe774 ruihe774 Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why requested_schema is not used? I think it instead of CompatLevel should decides what schema should be used (e.g. LargeString or Utf8View). In the future, imo it can replace CompatLevel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why requested_schema is not used?

Does the protocol allow for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why requested_schema is not used?

Does the protocol allow for this?

https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html#schema-requests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, then I agree request_schema should be respected and if none given we can default to newest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's been discussion about this in apache/arrow#39689. To be able to pass in a requested_schema argument, the consumer needs to know the schema of the producer's existing Arrow data. Only then can it know whether it needs to ask the producer to cast to a different type.

I believe I summarized the consensus in apache/arrow#39689 (comment), but while waiting for confirmation, I think it would be best for us to leave requested_schema and schema negotiation to a follow up PR, if that's ok.

let iter = Box::new(series.chunks().clone().into_iter().map(Ok)) as _;
let stream = ffi::export_iterator(iter, field);
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
PyCapsule::new_bound(py, stream, Some(stream_capsule_name))
}

pub(crate) fn dataframe_to_stream<'py>(
df: &'py DataFrame,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
let schema_fields = df.schema().iter_fields().collect::<Vec<_>>();

let struct_field =
Field::new("", DataType::Struct(schema_fields)).to_arrow(CompatLevel::oldest());
let struct_data_type = struct_field.data_type().clone();

let iter = df
.iter_chunks(CompatLevel::oldest(), false)
.into_iter()
.map(|chunk| {
let arrays = chunk.into_arrays();
let x = arrow::array::StructArray::new(struct_data_type.clone(), arrays, None);
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
Ok(Box::new(x) as Box<dyn arrow::array::Array>)
});
let stream = ffi::export_iterator(Box::new(iter), struct_field);
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
PyCapsule::new_bound(py, stream, Some(stream_capsule_name))
}
13 changes: 12 additions & 1 deletion py-polars/src/series/export.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use polars_core::prelude::*;
use pyo3::prelude::*;
use pyo3::types::PyList;
use pyo3::types::{PyCapsule, PyList};

use crate::interop::arrow::to_py::series_to_stream;
use crate::prelude::*;
use crate::{interop, PySeries};

Expand Down Expand Up @@ -157,4 +158,14 @@ impl PySeries {
)
})
}

#[allow(unused_variables)]
#[pyo3(signature = (requested_schema=None))]
fn __arrow_c_stream__<'py>(
&'py self,
py: Python<'py>,
requested_schema: Option<PyObject>,
) -> PyResult<Bound<'py, PyCapsule>> {
series_to_stream(&self.series, py)
}
}
25 changes: 25 additions & 0 deletions py-polars/tests/unit/series/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,31 @@ def test_arrow() -> None:
)


def test_pycapsule_interface() -> None:
class PyCapsuleSeriesHolder:
"""
Hold the Arrow C Stream pycapsule.

A class that exposes _only_ the Arrow C Stream interface via Arrow PyCapsules.
This ensures that pyarrow is seeing _only_ the `__arrow_c_stream__` dunder, and
that nothing else (e.g. the dataframe or array interface) is actually being
used.
"""

capsule: tuple[object, object]

def __init__(self, capsule: tuple[object, object]):
self.capsule = capsule

def __arrow_c_stream__(self, requested_schema) -> tuple[object, object]:
return self.capsule

a = pl.Series("a", [1, 2, 3, None])
out = pa.chunked_array(PyCapsuleSeriesHolder(a.__arrow_c_stream__(None)))
out_arr = out.combine_chunks()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same idea here (drop a before doing things with out)

assert out_arr == pa.array([1, 2, 3, None])


def test_get() -> None:
a = pl.Series("a", [1, 2, 3])
pos_idxs = pl.Series("idxs", [2, 0, 1, 0], dtype=pl.Int8)
Expand Down
Loading