Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init: m5 forecasting FE benchmark #136

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions m5-forecasting-feature-engineering/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# M5 Forecasting Feature Engineering

The [M5 Forecasting Competition](https://www.sciencedirect.com/science/article/pii/S0169207021001874) was held on Kaggle in 2020,
and top solutions generally featured a lot of heavy feature engineering.

Participants typically used pandas (Polars was only just getting started at the time), so here we benchmark how long it have
taken to do the same feature engineering with Polars and DuckDB.

The original code can be found here: https://github.com/Mcompetitions/M5-methods. We run part of the prepocessing
functions from the top solution "A1". The code is generally kept as-is, with some minor modifications to deal
with pandas syntax updates which happened in the last 2 years.

## Running the benchmark

**Data**: download the output files from https://www.kaggle.com/code/marcogorelli/winning-solution-preprocessing.
Place them in a `data` folder here.

**Run**:

- `python pandas_queries.py`
- `python polars_queries.py`
- `python duckdb_queries.py`

162 changes: 162 additions & 0 deletions m5-forecasting-feature-engineering/duckdb_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import time
from pathlib import Path

import duckdb

print("duckdb version", duckdb.__version__)

PROCESSED_DATA_DIR = "data"

TARGET = "sales"
SHIFT_DAY = 28


# Set this to True if you just want to test that everything runs
SMALL = False
if SMALL:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1_small.parquet"
else:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1.parquet"

LAG_DAYS = list(range(SHIFT_DAY, SHIFT_DAY + 15))

WINDOW = {"partition_by": "id", "order_by": "d", "mapping_strategy": "explode"}


def q1_duckdb(df):
return duckdb.sql("""
SELECT
*,
LAG(sales, 28) OVER w AS sales_lag_28,
LAG(sales, 29) OVER w AS sales_lag_29,
LAG(sales, 30) OVER w AS sales_lag_30,
LAG(sales, 31) OVER w AS sales_lag_31,
LAG(sales, 32) OVER w AS sales_lag_32,
LAG(sales, 33) OVER w AS sales_lag_33,
LAG(sales, 34) OVER w AS sales_lag_34,
LAG(sales, 35) OVER w AS sales_lag_35,
LAG(sales, 36) OVER w AS sales_lag_36,
LAG(sales, 37) OVER w AS sales_lag_37,
LAG(sales, 38) OVER w AS sales_lag_38,
LAG(sales, 39) OVER w AS sales_lag_39,
LAG(sales, 40) OVER w AS sales_lag_40,
LAG(sales, 41) OVER w AS sales_lag_41,
LAG(sales, 42) OVER w AS sales_lag_42
FROM
df
WINDOW w AS (PARTITION BY id ORDER BY d);""")


def q2_duckdb(df):
return duckdb.sql("""
WITH lagged AS (
SELECT
*,
LAG(sales, 28) OVER id_w AS sales_lagged
FROM df
WINDOW id_w AS (PARTITION BY id ORDER BY d)
)

SELECT
* EXCLUDE sales_lagged,

-- Rolling means
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_7 = 7,
AVG(sales_lagged) OVER roll_7, NULL) AS rolling_mean_7,
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_14 = 14,
AVG(sales_lagged) OVER roll_14, NULL) AS rolling_mean_14,
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_30 = 30,
AVG(sales_lagged) OVER roll_30, NULL) AS rolling_mean_30,
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_60 = 60,
AVG(sales_lagged) OVER roll_60, NULL) AS rolling_mean_60,
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_180 = 180,
AVG(sales_lagged) OVER roll_180, NULL) AS rolling_mean_180,

-- Rolling standard deviations
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_7 = 7,
STDDEV_SAMP(sales_lagged) OVER roll_7, NULL) AS rolling_std_7,
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_14 = 14,
STDDEV_SAMP(sales_lagged) OVER roll_14, NULL) AS rolling_std_14,
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_30 = 30,
STDDEV_SAMP(sales_lagged) OVER roll_30, NULL) AS rolling_std_30,
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_60 = 60,
STDDEV_SAMP(sales_lagged) OVER roll_60, NULL) AS rolling_std_60,
IF(SUM(IF(sales_lagged IS NULL, 0, 1)) OVER roll_180 = 180,
STDDEV_SAMP(sales_lagged) OVER roll_180, NULL) AS rolling_std_180

FROM lagged

WINDOW
roll_7 AS (PARTITION BY id ORDER BY d ROWS BETWEEN 6 PRECEDING AND CURRENT ROW),
roll_14 AS (PARTITION BY id ORDER BY d ROWS BETWEEN 13 PRECEDING AND CURRENT ROW),
roll_30 AS (PARTITION BY id ORDER BY d ROWS BETWEEN 29 PRECEDING AND CURRENT ROW),
roll_60 AS (PARTITION BY id ORDER BY d ROWS BETWEEN 59 PRECEDING AND CURRENT ROW),
roll_180 AS (PARTITION BY id ORDER BY d ROWS BETWEEN 179 PRECEDING AND CURRENT ROW);
""")


def q3_duckdb(df):
return duckdb.sql("""
WITH lagged AS (
-- Create a lagged column for 'sales' with a lag of 28 days
SELECT
*,
LAG(sales, 1) OVER id_w AS sales_1,
LAG(sales, 7) OVER id_w AS sales_7,
LAG(sales, 14) OVER id_w AS sales_14
FROM df
WINDOW id_w AS (PARTITION BY id ORDER BY d)
)

SELECT
* EXCLUDE (sales_1, sales_7, sales_14),

IF(SUM(IF(sales_1 IS NULL, 0, 1)) OVER roll_7 = 7,
AVG(sales_1) OVER roll_7, NULL) AS rolling_mean_1_7,
IF(SUM(IF(sales_1 IS NULL, 0, 1)) OVER roll_14 = 14,
AVG(sales_1) OVER roll_14, NULL) AS rolling_mean_1_14,
IF(SUM(IF(sales_1 IS NULL, 0, 1)) OVER roll_30 = 30,
AVG(sales_1) OVER roll_30, NULL) AS rolling_mean_1_30,
IF(SUM(IF(sales_1 IS NULL, 0, 1)) OVER roll_60 = 60,
AVG(sales_1) OVER roll_60, NULL) AS rolling_mean_1_60,
IF(SUM(IF(sales_7 IS NULL, 0, 1)) OVER roll_7 = 7,
AVG(sales_7) OVER roll_7, NULL) AS rolling_mean_7_7,
IF(SUM(IF(sales_7 IS NULL, 0, 1)) OVER roll_14 = 14,
AVG(sales_7) OVER roll_14, NULL) AS rolling_mean_7_14,
IF(SUM(IF(sales_7 IS NULL, 0, 1)) OVER roll_30 = 30,
AVG(sales_7) OVER roll_30, NULL) AS rolling_mean_7_30,
IF(SUM(IF(sales_7 IS NULL, 0, 1)) OVER roll_60 = 60,
AVG(sales_7) OVER roll_60, NULL) AS rolling_mean_7_60,
IF(SUM(IF(sales_14 IS NULL, 0, 1)) OVER roll_7 = 7,
AVG(sales_14) OVER roll_7, NULL) AS rolling_mean_14_7,
IF(SUM(IF(sales_14 IS NULL, 0, 1)) OVER roll_14 = 14,
AVG(sales_14) OVER roll_14, NULL) AS rolling_mean_14_14,
IF(SUM(IF(sales_14 IS NULL, 0, 1)) OVER roll_30 = 30,
AVG(sales_14) OVER roll_30, NULL) AS rolling_mean_14_30,
IF(SUM(IF(sales_14 IS NULL, 0, 1)) OVER roll_60 = 60,
AVG(sales_14) OVER roll_60, NULL) AS rolling_mean_14_60

FROM lagged

-- Define rolling windows for calculating the rolling mean and standard deviation
WINDOW
roll_7 AS (PARTITION BY id ORDER BY d ROWS BETWEEN 6 PRECEDING AND CURRENT ROW),
roll_14 AS (PARTITION BY id ORDER BY d ROWS BETWEEN 13 PRECEDING AND CURRENT ROW),
roll_30 AS (PARTITION BY id ORDER BY d ROWS BETWEEN 29 PRECEDING AND CURRENT ROW),
roll_60 AS (PARTITION BY id ORDER BY d ROWS BETWEEN 59 PRECEDING AND CURRENT ROW)
""")


print("*** duckdb ***")

start_time = time.perf_counter()
q1_duckdb(duckdb.read_parquet(str(PATH))).to_arrow_table()
print(f"q1 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q2_duckdb(duckdb.read_parquet(str(PATH))).to_arrow_table()
print(f"q1 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q3_duckdb(duckdb.read_parquet(str(PATH))).to_arrow_table()
print(f"q1 took: {time.perf_counter() - start_time}")
73 changes: 73 additions & 0 deletions m5-forecasting-feature-engineering/pandas_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import time
from pathlib import Path

import numpy as np
import pandas as pd
import pyarrow

print("pandas version", pd.__version__)
print("numpy version", np.__version__)
print("pyarrow version", pyarrow.__version__)

pd.options.mode.copy_on_write = True
pd.options.future.infer_string = True

PROCESSED_DATA_DIR = "data"

TARGET = "sales"
SHIFT_DAY = 28

# Set this to True if you just want to test that everything runs
SMALL = True
if SMALL:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1_small.parquet"
else:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1.parquet"

LAG_DAYS = list(range(SHIFT_DAY, SHIFT_DAY + 15))


def q1_pandas(df):
return df.assign(
**{
f"{TARGET}_lag_{lag}": df.groupby(["id"], observed=True)[TARGET].transform(
lambda x: x.shift(lag) # noqa: B023
)
for lag in LAG_DAYS
}
)


def q2_pandas(df):
for i in [7, 14, 30, 60, 180]:
df["rolling_mean_" + str(i)] = df.groupby(["id"], observed=True)[
TARGET
].transform(lambda x: x.shift(SHIFT_DAY).rolling(i).mean()) # noqa: B023
for i in [7, 14, 30, 60, 180]:
df["rolling_std_" + str(i)] = df.groupby(["id"], observed=True)[
TARGET
].transform(lambda x: x.shift(SHIFT_DAY).rolling(i).std()) # noqa: B023
return df


def q3_pandas(df):
for d_shift in [1, 7, 14]:
for d_window in [7, 14, 30, 60]:
col_name = "rolling_mean_" + str(d_shift) + "_" + str(d_window)
df[col_name] = df.groupby(["id"], observed=True)[TARGET].transform(
lambda x: x.shift(d_shift).rolling(d_window).mean() # noqa: B023
)
return df


start_time = time.perf_counter()
q1_pandas(pd.read_parquet(PATH, engine="pyarrow"))
print(f"q1 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q2_pandas(pd.read_parquet(PATH, engine="pyarrow"))
print(f"q2 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q3_pandas(pd.read_parquet(PATH, engine="pyarrow"))
print(f"q3 took: {time.perf_counter() - start_time}")
85 changes: 85 additions & 0 deletions m5-forecasting-feature-engineering/polars_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import time
from pathlib import Path

import polars as pl

print("polars version", pl.__version__)

PROCESSED_DATA_DIR = "data"

TARGET = "sales"
SHIFT_DAY = 28


# Set this to True if you just want to test that everything runs
SMALL = False
if SMALL:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1_small.parquet"
else:
PATH = Path(PROCESSED_DATA_DIR) / "grid_part_1.parquet"

LAG_DAYS = list(range(SHIFT_DAY, SHIFT_DAY + 15))

WINDOW = {"partition_by": "id", "order_by": "d", "mapping_strategy": "explode"}


def q1_polars(df):
return df.select(
pl.all().over(**WINDOW),
*[
pl.col(TARGET).shift(lag).over(**WINDOW).alias(f"{TARGET}_lag_{lag}")
for lag in LAG_DAYS
],
)


def q2_polars(df):
return df.select(
pl.all().over(**WINDOW),
*[
pl.col(TARGET)
.shift(SHIFT_DAY)
.rolling_mean(window_size=i)
.over(**WINDOW)
.alias(f"rolling_mean_{i}")
for i in [7, 14, 30, 60, 180]
],
*[
pl.col(TARGET)
.shift(SHIFT_DAY)
.rolling_std(window_size=i)
.over(**WINDOW)
.alias(f"rolling_std_{i}")
for i in [7, 14, 30, 60, 180]
],
)


def q3_polars(df):
return df.select(
pl.all().over(**WINDOW),
*[
pl.col(TARGET)
.shift(d_shift)
.rolling_mean(window_size=d_window)
.over(**WINDOW)
.alias(f"rolling_mean_{d_shift}_{d_window}")
for d_shift in [1, 7, 14]
for d_window in [7, 14, 30, 60]
],
)


print("*** polars lazy ***")

start_time = time.perf_counter()
q1_polars(pl.scan_parquet(PATH)).collect()
print(f"q1 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q2_polars(pl.scan_parquet(PATH)).collect()
print(f"q2 took: {time.perf_counter() - start_time}")

start_time = time.perf_counter()
q3_polars(pl.scan_parquet(PATH)).collect()
print(f"q3 took: {time.perf_counter() - start_time}")
Loading
Loading