Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support for Arrow PyCapsule interface #23

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/quak/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""An anywidget for data that talks like a duck."""

from ._util import has_pycapsule_stream_interface
from ._version import __version__
from ._widget import Widget

Expand All @@ -18,7 +19,11 @@ def format(self, obj, include=None, exclude=None):
# special case for duckdb relations
if isinstance(obj, duckdb.DuckDBPyRelation):
obj = obj.arrow()
if is_arrow_ipc(obj) or is_dataframe_api_obj(obj):
if (
has_pycapsule_stream_interface(obj)
or is_arrow_ipc(obj)
or is_dataframe_api_obj(obj)
):
obj = Widget(obj)
return super().format(obj, include, exclude)

Expand Down
10 changes: 10 additions & 0 deletions src/quak/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@
import pyarrow as pa


def has_pycapsule_stream_interface(obj: object) -> bool:
"""
Check if an object implements the Arrow C Stream Arrow via the PyCapsule Interface.

https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
has an Arrow C Stream
"""
return hasattr(obj, "__arrow_c_stream__")


def is_dataframe_api_obj(obj: object) -> DataFrameObject:
"""Check if an object has a dataframe API."""
method = getattr(obj, "__dataframe__", None)
Expand Down
19 changes: 17 additions & 2 deletions src/quak/_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@

import anywidget
import duckdb
import pyarrow as pa
import traitlets

from ._util import (
arrow_table_from_dataframe_protocol,
arrow_table_from_ipc,
get_columns,
has_pycapsule_stream_interface,
is_arrow_ipc,
is_dataframe_api_obj,
table_to_ipc,
)

Expand All @@ -37,10 +40,22 @@ def __init__(self, data, *, table: str = "df"):
conn = data
else:
conn = duckdb.connect(":memory:")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if the DuckDB :memory: DB will spill to disk? Or for very large input would it just crash the process?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if the DuckDB :memory: DB will spill to disk? Or for very large input would it just crash the process?

I believe it spills to disk

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I was talking to someone recently who asserted :memory: in particular didn't spill to disk because it doesn't have a path to store a local database. But I'm not sure

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi just stopping by;

in-memory databases will spill to disk, it will create/use the .tmp folder in the current working directory, a database file is not required to spill to disk.

You can test this out by setting a relatively low memory limit and creating a temp table that exceeds this limit.
Using select * from duckdb_temporary_files() will show you which temporary files were created to back this temporary table

if is_arrow_ipc(data):
if has_pycapsule_stream_interface(data):
# NOTE: for now we materialize the input into an in-memory Arrow table,
# so that we can perform repeated queries on that. In the future, it may
# be better to keep this Arrow stream non-materalized in Python and
# create a new DuckDB table from the stream.
# arrow_table = pa.RecordBatchReader.from_stream(data)
arrow_table = pa.table(data)
Comment on lines +48 to +49
Copy link
Collaborator Author

@kylebarron kylebarron Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here this is not ideal because pa.table materializes the entire stream in memory at once. I tried with the RecordBatchReader but I think you might need to adjust your queries to actually create a table instead of querying the stream directly. I assume you do multiple queries on the input... and presumably the first query exhausts the input stream and then the second query has no data.

If I uncomment the RecordBatchReader line above instead, I get:

image

Note that it shows 11k rows in the bottom right, but then doesn't display anything. So I think you need to persist the Arrow stream input manually.

Copy link
Owner

@manzt manzt Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I figured this is not ideal... I just wasn't sure how to wire up duckdb to query something consistently. I'm guessing something like "CREATE VIEW" from the input stream might allow us register some tables that duckdb can continuously query.

Copy link
Collaborator Author

@kylebarron kylebarron Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% certain but I don't think you could always use CREATE VIEW here for Arrow streams. Reading https://duckdb.org/docs/api/python/data_ingestion#directly-accessing-dataframes-and-arrow-objects that makes me think the first time you query the view it will exhaust the input stream.

I think we're running into the same issue as in ibis: ibis-project/ibis#9663 (comment) . In particular, the presence of the dunder method doesn't tell you whether the input data is already materialized or a stream. If you know that the input data is an in-memory table, then you can call __arrow_c_stream__ multiple times and get the full data each time. In that case you'd prefer to make a DuckDB view, because then you don't need to copy the input data. But if the input is a stream, then you would need to persist it into a full duckdb table for ongoing queries.

elif is_arrow_ipc(data):
arrow_table = arrow_table_from_ipc(data)
else:
elif is_dataframe_api_obj(data):
arrow_table = arrow_table_from_dataframe_protocol(data)
else:
raise ValueError(
"input must be a DuckDB connection, DataFrame-like, an Arrow IPC "
"table, or an Arrow object exporting the Arrow C Stream interface."
)
conn.register(table, arrow_table)
self._conn = conn
super().__init__(
Expand Down