Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] group_by functionality directly on large dataset, instead of on a table? #13747

Open
nosterlu opened this issue Jul 29, 2022 · 15 comments

Comments

@nosterlu
Copy link

I have a large dataset that I would like to use group_by on without having to read the entire table into memory first.

After reading the documentation it seems dataset.to_batches is the best way of doing this? But it gets really complex when using other aggregation methods than for example count and sum.

I implemented it like below for count and sum, but for other more complex aggregations I am still forced to read the entire table.

table = []
for batch in ds.to_batches(columns=columns, filter=filters, batch_size=1e6):
    t = pyarrow.Table.from_batches([batch])
    table.append(t.group_by(group_by).aggregate(agg))
table = pyarrow.concat_tables(table)

# then after this I use group_by again on the concatenated table with `sum` as aggregation method

Thankful for any pointers or comments!

@nosterlu nosterlu changed the title group_by functionality directly on large dataset, instead of on a table? [pyarrow] group_by functionality directly on large dataset, instead of on a table? Jul 29, 2022
@nosterlu nosterlu changed the title [pyarrow] group_by functionality directly on large dataset, instead of on a table? [Python] group_by functionality directly on large dataset, instead of on a table? Jul 29, 2022
@amol-
Copy link
Member

amol- commented Aug 9, 2022

There has been some discussions about adding basic analysis capabilities to Datasets, at the moment work is undergoing to support Dataset.filter and I guess group_by might be added in the future too. Adding group by is a bit more complex as the current Table.group_by implementation doesn't immediately support Datasets, even though it's perfectly possible.

@legout
Copy link

legout commented Oct 6, 2022

Hope this is not off-topic, but you can leverage duckdb or polars for this.

import duckdb
import pyarrow.dataset as ds
import polars as pl

dset = ds.dataset('path/to/data')

# duckdb
con = duckdb.connect()

table = con.query("SELECTsum(col1), count(col1) FROM dset WHERE  col1>10 GROUP BY col2").arrow() 

#polars
table = pl.scan_ds(dset).filter(pl.col("col1")>10).groupby("col2").agg(
        [pl.col("col1").sum(), pl.col("col1").count()]
    ).collect().to_arrow()

@nosterlu
Copy link
Author

nosterlu commented Oct 7, 2022

Thank you @legout. Duckdb works really well, but polars is struggling. Maybe I am doing something wrong.

But anyway here is how it worked for me

# pyarrow 8.0.0
# duckdb 0.5.1
# polars 0.14.18
ib = dataset("install-base-from-vdw-standard/", filesystem=fs, partitioning="hive")

ib.count_rows()
# 1415259797
ib.schema
"""
bev: bool
market: int16
function_group: int32
part: int32
kdp: bool
kdp_accessory: bool
yearweek: int32
qty_part: int32
vehicle_type: int32
model_year: int32
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 1081
"""

def do_duckdb():
    sql = """
        SELECT i.part,
               i.bev,
               i.market,
               kdp_accessory,                   
               yearweek,
               SUM(i.qty_part) as qty_part_sum,
        FROM ib i
        WHERE vehicle_type=536
        GROUP BY
           i.part,
           i.bev,
           i.market,
           i.kdp_accessory,
           yearweek
    """
    conn = duckdb.connect(":memory:")
    result = conn.execute(sql)
    table = result.fetch_arrow_table()
    return table


def do_polar():
    table = (
        pl.scan_ds(ib)
        .filter("vehicle_type" == 536)
        .groupby(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .agg(pl.col("qty_part").sum())
        .collect()
        .to_arrow()
    )
    return table

%time table = do_duckdb()
# memory consumption increased temporarily with 2GB, 18.8s

%time table = do_polar()
# memory consumption increased slowly to fill almost all memory (32GB) before
# normalizing, 4min 54s

Note, duckdb was 50% faster than my pyarrow implementation for doing it on a table. duckdb used a little more RAM but not much. Pyarrow table in batches uses less RAM, but slows it down a little.

def do_pyarrow_batches():
    table = []
    columns = ["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"]
    filters = field("vehicle_type") == 536
    agg = [("qty_part", "sum")]
    group_by = ["part", "bev", "market", "kdp_accessory", "yearweek"]
    for batch in ib.to_batches(columns=columns, filter=filters, batch_size=1e6):
        t = pyarrow.Table.from_batches([batch])
        table.append(t.group_by(group_by).aggregate(agg))
    table = pyarrow.concat_tables(table)
    # need to aggregate again
    new_agg = []
    for a in agg:
        how = a[1].replace("hash_", "")
        new_agg.append((a[0] + "_" + how, "sum"))
    table = table.group_by(group_by).aggregate(new_agg)
    return table

def do_pyarrow_table():
    table = (
        ib.to_table(
            columns=["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"],
            filter=field("vehicle_type") == 536,
        )
        .group_by(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .aggregate([("qty_part", "sum")])
    )
    return table

``´

@ritchie46
Copy link
Contributor

Thank you @legout. Duckdb works really well, but polars is struggling. Maybe I am doing something wrong.

But anyway here is how it worked for me

# pyarrow 8.0.0
# duckdb 0.5.1
# polars 0.14.18
ib = dataset("install-base-from-vdw-standard/", filesystem=fs, partitioning="hive")

ib.count_rows()
# 1415259797
ib.schema
"""
bev: bool
market: int16
function_group: int32
part: int32
kdp: bool
kdp_accessory: bool
yearweek: int32
qty_part: int32
vehicle_type: int32
model_year: int32
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 1081
"""

def do_duckdb():
    sql = """
        SELECT i.part,
               i.bev,
               i.market,
               kdp_accessory,                   
               yearweek,
               SUM(i.qty_part) as qty_part_sum,
        FROM ib i
        WHERE vehicle_type=536
        GROUP BY
           i.part,
           i.bev,
           i.market,
           i.kdp_accessory,
           yearweek
    """
    conn = duckdb.connect(":memory:")
    result = conn.execute(sql)
    table = result.fetch_arrow_table()
    return table


def do_polar():
    table = (
        pl.scan_ds(ib)
        .filter("vehicle_type" == 536)
        .groupby(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .agg(pl.col("qty_part").sum())
        .collect()
        .to_arrow()
    )
    return table

%time table = do_duckdb()
# memory consumption increased temporarily with 2GB, 18.8s

%time table = do_polar()
# memory consumption increased slowly to fill almost all memory (32GB) before
# normalizing, 4min 54s

Note, duckdb was 50% faster than my pyarrow implementation for doing it on a table. duckdb used a little more RAM but not much. Pyarrow table in batches uses less RAM, but slows it down a little.

def do_pyarrow_batches():
    table = []
    columns = ["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"]
    filters = field("vehicle_type") == 536
    agg = [("qty_part", "sum")]
    group_by = ["part", "bev", "market", "kdp_accessory", "yearweek"]
    for batch in ib.to_batches(columns=columns, filter=filters, batch_size=1e6):
        t = pyarrow.Table.from_batches([batch])
        table.append(t.group_by(group_by).aggregate(agg))
    table = pyarrow.concat_tables(table)
    # need to aggregate again
    new_agg = []
    for a in agg:
        how = a[1].replace("hash_", "")
        new_agg.append((a[0] + "_" + how, "sum"))
    table = table.group_by(group_by).aggregate(new_agg)
    return table

def do_pyarrow_table():
    table = (
        ib.to_table(
            columns=["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"],
            filter=field("vehicle_type") == 536,
        )
        .group_by(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .aggregate([("qty_part", "sum")])
    )
    return table

``´

Polars is not able to push down the filter into a pyarrow dataset. Only in readers directly implemented by polars. (scan_parquet, scan_csv, etc). So that means the dataset is first completely read in memory. We could see if we can translate out predicate to filters pyarrow understands.

@legout
Copy link

legout commented Oct 7, 2022

What is the size of the dataset and where is it stored? In a s3 bucket? If so, this could be interesting for you:

#14336

@legout
Copy link

legout commented Oct 7, 2022

Thank you @legout. Duckdb works really well, but polars is struggling. Maybe I am doing something wrong.
But anyway here is how it worked for me

# pyarrow 8.0.0
# duckdb 0.5.1
# polars 0.14.18
ib = dataset("install-base-from-vdw-standard/", filesystem=fs, partitioning="hive")

ib.count_rows()
# 1415259797
ib.schema
"""
bev: bool
market: int16
function_group: int32
part: int32
kdp: bool
kdp_accessory: bool
yearweek: int32
qty_part: int32
vehicle_type: int32
model_year: int32
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 1081
"""

def do_duckdb():
    sql = """
        SELECT i.part,
               i.bev,
               i.market,
               kdp_accessory,                   
               yearweek,
               SUM(i.qty_part) as qty_part_sum,
        FROM ib i
        WHERE vehicle_type=536
        GROUP BY
           i.part,
           i.bev,
           i.market,
           i.kdp_accessory,
           yearweek
    """
    conn = duckdb.connect(":memory:")
    result = conn.execute(sql)
    table = result.fetch_arrow_table()
    return table


def do_polar():
    table = (
        pl.scan_ds(ib)
        .filter("vehicle_type" == 536)
        .groupby(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .agg(pl.col("qty_part").sum())
        .collect()
        .to_arrow()
    )
    return table

%time table = do_duckdb()
# memory consumption increased temporarily with 2GB, 18.8s

%time table = do_polar()
# memory consumption increased slowly to fill almost all memory (32GB) before
# normalizing, 4min 54s

Note, duckdb was 50% faster than my pyarrow implementation for doing it on a table. duckdb used a little more RAM but not much. Pyarrow table in batches uses less RAM, but slows it down a little.

def do_pyarrow_batches():
    table = []
    columns = ["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"]
    filters = field("vehicle_type") == 536
    agg = [("qty_part", "sum")]
    group_by = ["part", "bev", "market", "kdp_accessory", "yearweek"]
    for batch in ib.to_batches(columns=columns, filter=filters, batch_size=1e6):
        t = pyarrow.Table.from_batches([batch])
        table.append(t.group_by(group_by).aggregate(agg))
    table = pyarrow.concat_tables(table)
    # need to aggregate again
    new_agg = []
    for a in agg:
        how = a[1].replace("hash_", "")
        new_agg.append((a[0] + "_" + how, "sum"))
    table = table.group_by(group_by).aggregate(new_agg)
    return table

def do_pyarrow_table():
    table = (
        ib.to_table(
            columns=["part", "bev", "market", "kdp_accessory", "yearweek", "qty_part"],
            filter=field("vehicle_type") == 536,
        )
        .group_by(["part", "bev", "market", "kdp_accessory", "yearweek"])
        .aggregate([("qty_part", "sum")])
    )
    return table

``´

Polars is not able to push down the filter into a pyarrow dataset. Only in readers directly implemented by polars. (scan_parquet, scan_csv, etc). So that means the dataset is first completely read in memory. We could see if we can translate out predicate to filters pyarrow understands.

For me, a pyarrow datasets is the entry point for running queries/analytics on remote (s3 bucket) parquet datasets, whitout loading all data into memory. Is this possible with scan_parquet, too?

@nosterlu
Copy link
Author

nosterlu commented Oct 7, 2022

What is the size of the dataset and where is it stored? In a s3 bucket? If so, this could be interesting for you:

#14336

I use adlfs towards an azure storage account. The size of the entire (gzip parquet) dataset is maybe 5GB

@nosterlu
Copy link
Author

nosterlu commented Nov 4, 2022

@ritchie46, FYI, I changed the definition of the dataset to help polars with filtering the partitions like so

ibpolar = dataset("install-base-from-vdw-standard/vehicle_type=536/", filesystem=fs, partitioning="hive")

polars now used half of the memory compared to duckdb, and spent 7 seconds vs 12 on duckdb.

So being able to push down filters on partition columns would help a lot.

@MatthewRGonzalez
Copy link

Hi, all-- Have there been any developments on this front? @amol-

There has been some discussions about adding basic analysis capabilities to Datasets, at the moment work is undergoing to support Dataset.filter and I guess group_by might be added in the future too. Adding group by is a bit more complex as the current Table.group_by implementation doesn't immediately support Datasets, even though it's perfectly possible.

@amol-
Copy link
Member

amol- commented Aug 16, 2023

@MatthewRGonzalez I think this can now be achieved using the new Acero Declaration API.
See https://github.com/apache/arrow/blob/main/python/pyarrow/acero.py#L302-L308 and probably replace Declaration("table_source", TableSourceNodeOptions(table)) with a _dataset_to_decl call like sorting does ( https://github.com/apache/arrow/blob/main/python/pyarrow/acero.py#L280-L287 )

@MatthewRGonzalez
Copy link

Thanks, @amol! I'm still experimenting but the following worked:

import pyarrow as pa
import pyarrow.acero as ac
import pyarrow.dataset as ds

def _group_by(table_or_dataset, aggregates, keys):

    if isinstance(table_or_dataset, ds.Dataset):
        data_source = ac._dataset_to_decl(table_or_dataset, use_threads=True)
    else:
        data_source = ac.Declaration(
            "table_source", ac.TableSourceNodeOptions(table_or_dataset)
        )
    decl = ac.Declaration.from_sequence([data_source, ac.Declaration("aggregate", ac.AggregateNodeOptions(aggregates, keys = keys)) ])
    return(decl.to_table(use_threads=True))


## dataset from below table
# table = pa.table({'a': [1, 1, 2,2,3], 'b': [4, 5, 6,7,8]})
dataset = ds.dataset("my_data")

_group_by(dataset,[("b", "hash_sum", None, "b_sum")], keys = "e")

@nosterlu
Copy link
Author

A dummy question @MatthewRGonzalez from me maybe, but how do you install pyarrow acero? I have pyarrow version 10 currently because of limitation in snowflake-api, has acero been added afterwards?

@MatthewRGonzalez
Copy link

@nosterlu I believe it was added around version 12.

It's quite fast (maybe faster than similar polars aggregations). I hope this functionality continues to be developed!

@deanm0000
Copy link

Polars scan_parquet can now read hive partitions from AWS, s3, gcs, http without pyarrow

@ritchie46
Copy link
Contributor

Thank for the reminder on this one @deanm0000

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants