-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Python] group_by functionality directly on large dataset, instead of on a table? #13747
Comments
There has been some discussions about adding basic analysis capabilities to Datasets, at the moment work is undergoing to support |
Hope this is not off-topic, but you can leverage import duckdb
import pyarrow.dataset as ds
import polars as pl
dset = ds.dataset('path/to/data')
# duckdb
con = duckdb.connect()
table = con.query("SELECTsum(col1), count(col1) FROM dset WHERE col1>10 GROUP BY col2").arrow()
#polars
table = pl.scan_ds(dset).filter(pl.col("col1")>10).groupby("col2").agg(
[pl.col("col1").sum(), pl.col("col1").count()]
).collect().to_arrow() |
Thank you @legout. Duckdb works really well, but polars is struggling. Maybe I am doing something wrong. But anyway here is how it worked for me # pyarrow 8.0.0
# duckdb 0.5.1
# polars 0.14.18
ib = dataset("install-base-from-vdw-standard/", filesystem=fs, partitioning="hive")
ib.count_rows()
# 1415259797
ib.schema
"""
bev: bool
market: int16
function_group: int32
part: int32
kdp: bool
kdp_accessory: bool
yearweek: int32
qty_part: int32
vehicle_type: int32
model_year: int32
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 1081
"""
def do_duckdb():
sql = """
SELECT i.part,
i.bev,
i.market,
kdp_accessory,
yearweek,
SUM(i.qty_part) as qty_part_sum,
FROM ib i
WHERE vehicle_type=536
GROUP BY
i.part,
i.bev,
i.market,
i.kdp_accessory,
yearweek
"""
conn = duckdb.connect(":memory:")
result = conn.execute(sql)
table = result.fetch_arrow_table()
return table
def do_polar():
table = (
pl.scan_ds(ib)
.filter("vehicle_type" == 536)
.groupby(["part", "bev", "market", "kdp_accessory", "yearweek"])
.agg(pl.col("qty_part").sum())
.collect()
.to_arrow()
)
return table
%time table = do_duckdb()
# memory consumption increased temporarily with 2GB, 18.8s
%time table = do_polar()
# memory consumption increased slowly to fill almost all memory (32GB) before
# normalizing, 4min 54s Note, duckdb was 50% faster than my pyarrow implementation for doing it on a table. duckdb used a little more RAM but not much. Pyarrow table in batches uses less RAM, but slows it down a little. def do_pyarrow_batches():
table = []
columns = ["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"]
filters = field("vehicle_type") == 536
agg = [("qty_part", "sum")]
group_by = ["part", "bev", "market", "kdp_accessory", "yearweek"]
for batch in ib.to_batches(columns=columns, filter=filters, batch_size=1e6):
t = pyarrow.Table.from_batches([batch])
table.append(t.group_by(group_by).aggregate(agg))
table = pyarrow.concat_tables(table)
# need to aggregate again
new_agg = []
for a in agg:
how = a[1].replace("hash_", "")
new_agg.append((a[0] + "_" + how, "sum"))
table = table.group_by(group_by).aggregate(new_agg)
return table
def do_pyarrow_table():
table = (
ib.to_table(
columns=["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"],
filter=field("vehicle_type") == 536,
)
.group_by(["part", "bev", "market", "kdp_accessory", "yearweek"])
.aggregate([("qty_part", "sum")])
)
return table
``´ |
Polars is not able to push down the filter into a pyarrow dataset. Only in readers directly implemented by polars. (scan_parquet, scan_csv, etc). So that means the dataset is first completely read in memory. We could see if we can translate out predicate to filters pyarrow understands. |
What is the size of the dataset and where is it stored? In a s3 bucket? If so, this could be interesting for you: |
For me, a pyarrow datasets is the entry point for running queries/analytics on remote (s3 bucket) parquet datasets, whitout loading all data into memory. Is this possible with scan_parquet, too? |
I use adlfs towards an azure storage account. The size of the entire (gzip parquet) dataset is maybe 5GB |
@ritchie46, FYI, I changed the definition of the dataset to help polars with filtering the partitions like so ibpolar = dataset("install-base-from-vdw-standard/vehicle_type=536/", filesystem=fs, partitioning="hive") polars now used half of the memory compared to duckdb, and spent 7 seconds vs 12 on duckdb. So being able to push down filters on partition columns would help a lot. |
Hi, all-- Have there been any developments on this front? @amol-
|
@MatthewRGonzalez I think this can now be achieved using the new Acero Declaration API. |
Thanks, @amol! I'm still experimenting but the following worked:
|
A dummy question @MatthewRGonzalez from me maybe, but how do you install pyarrow acero? I have pyarrow version 10 currently because of limitation in snowflake-api, has acero been added afterwards? |
@nosterlu I believe it was added around version 12. It's quite fast (maybe faster than similar |
Polars scan_parquet can now read hive partitions from AWS, s3, gcs, http without pyarrow |
Thank for the reminder on this one @deanm0000 |
I have a large dataset that I would like to use
group_by
on without having to read the entire table into memory first.After reading the documentation it seems
dataset.to_batches
is the best way of doing this? But it gets really complex when using other aggregation methods than for examplecount
andsum
.I implemented it like below for
count
andsum
, but for other more complex aggregations I am still forced to read the entire table.Thankful for any pointers or comments!
The text was updated successfully, but these errors were encountered: