Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PYTHON] Add a recipe for sending mixed schema data over a single Flight stream #353

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Sphinx>=4.0.2
pyarrow==15.0.2
pyarrow==16.1.0
pandas>=1.2.5
opentelemetry-api>=1.0.0
opentelemetry-sdk>=1.0.0
opentelemetry-sdk>=1.0.0
124 changes: 123 additions & 1 deletion python/source/flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -967,4 +967,126 @@ middleware, and one in the server middleware.
:hide:

# Shutdown the server
server.shutdown()
server.shutdown()

Sending Mixed Schema Data over a single Flight Stream
=====================================================

In some applications it is desirable to transmit multiple record batches or tables with
*different* schemas over a single Arrow Flight stream. Although the IPC stream
specification does not allow for multiple schemas within a stream, it is possible to
work around this limitation by embedding the mixed schema data within a
:class:`pyarrow.UnionArray`.

The folllowing example shows how to transmit two record batches with different schemas
over a single Flight stream.

.. testcode::

import pyarrow as pa

# Create record batches with different schemas
n_legs = pa.array([2, 2, 4, 4, 5, 100])
animals = pa.array(["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"])
animals_recbatch = pa.RecordBatch.from_arrays(
[n_legs, animals], names=["n_legs", "animals"],
)

years = pa.array([2020, 2022, 2021, 2022])
months = pa.array([3, 5, 7, 9])
days = pa.array([1, 5, 9, 13])
dates_recbatch = pa.RecordBatch.from_arrays(
[years, months, days], names=["year", "month", "day"],
)

# Convert the record batches to struct arrays
animals_array = animals_recbatch.to_struct_array()
dates_array = dates_recbatch.to_struct_array()

# Create a UnionArray containing the animals data followed by the dates data
n_animals, n_dates = animals_recbatch.num_rows, dates_recbatch.num_rows
types = pa.array(([0] * n_animals) + ([1] * n_dates), type=pa.int8())
offsets = pa.array(list(range(n_animals)) + list(range(n_dates)), type=pa.int32())
union_array = pa.UnionArray.from_dense(
types, offsets, [animals_array, dates_array], ['animals', 'dates'],
)

# Finally create a single column RecordBatch from the UnionArray
union_recbatch = pa.RecordBatch.from_arrays([union_array], ['animals_dates_union'])

.. testcode::
:hide:

print(union_recbatch)

.. testoutput::
:hide:

pyarrow.RecordBatch
animals_dates_union: dense_union<animals: struct<n_legs: int64, animals: string>=0, dates: struct<year: int64, month: int64, day: int64>=1>
child 0, animals: struct<n_legs: int64, animals: string>
child 0, n_legs: int64
child 1, animals: string
child 1, dates: struct<year: int64, month: int64, day: int64>
child 0, year: int64
child 1, month: int64
child 2, day: int64
----
animals_dates_union: -- is_valid: all not null-- type_ids: [0,0,0,0,0,0,1,1,1,1]-- value_offsets: [0,1,2,3,4,5,0,1,2,3]
-- child 0 type: struct<n_legs: int64, animals: string>
-- is_valid: all not null
-- child 0 type: int64
[2,2,4,4,5,100]
-- child 1 type: string
["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"]
-- child 1 type: struct<year: int64, month: int64, day: int64>
-- is_valid: all not null
-- child 0 type: int64
[2020,2022,2021,2022]
-- child 1 type: int64
[3,5,7,9]
-- child 2 type: int64
[1,5,9,13]

We can then use :meth:`pyarrow.flight.FlightStreamWriter.write_batch` to write the
"mixed schema" record batch to the Flight stream. On the client side we can reconstruct
the original record batches as follows.

.. testcode::
:hide:

chunk = (union_recbatch, None)

.. testcode::

import pyarrow as pa

# Unpack chunk from FlightStreamReader.read_chunk()
data, metadata = chunk
union_array = data.column(0)
animals_recbatch = pa.RecordBatch.from_struct_array(union_array.field(0))
dates_recbatch = pa.RecordBatch.from_struct_array(union_array.field(1))

.. testcode::
:hide:

print(animals_recbatch)
print(dates_recbatch)

.. testoutput::
:hide:

pyarrow.RecordBatch
n_legs: int64
animals: string
----
n_legs: [2,2,4,4,5,100]
animals: ["Flamingo","Parrot","Dog","Horse","Brittle stars","Centipede"]
pyarrow.RecordBatch
year: int64
month: int64
day: int64
----
year: [2020,2022,2021,2022]
month: [3,5,7,9]
day: [1,5,9,13]
Loading