From 75e1d47d37399c2261e27a3dc9dec39a4f928dbc Mon Sep 17 00:00:00 2001 From: "Mats E. Mollestad" Date: Wed, 22 Nov 2023 08:06:10 +0100 Subject: [PATCH] fix: aggregations with offset --- aligned/local/job.py | 21 +++++++++------------ pyproject.toml | 2 +- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/aligned/local/job.py b/aligned/local/job.py index e550508c..a96f1384 100644 --- a/aligned/local/job.py +++ b/aligned/local/job.py @@ -78,25 +78,22 @@ async def aggregate(request: RetrivalRequest, core_data: pl.LazyFrame) -> pl.Laz raise ValueError('No time window spesificed.') if over.window.every_interval: - sub = ( - sorted_data.groupby_dynamic( - time_name, - every=over.window.every_interval, - period=over.window.time_window, - offset=over.window.offset_interval, - by=over.group_by_names, - ) - .agg(exprs) - .with_columns(pl.col(time_name) + over.window.time_window) - ) + sub = sorted_data.groupby_dynamic( + time_name, + every=over.window.every_interval, + period=over.window.time_window, + by=over.group_by_names, + ).agg(exprs) else: sub = sorted_data.groupby_rolling( time_name, period=over.window.time_window, - offset=over.window.offset_interval, by=over.group_by_names, ).agg(exprs) + if over.window.offset_interval: + sub = sub.with_columns(pl.col(time_name) - over.window.offset_interval) + if results is not None: existing_result = results.collect() new_aggregations = sub.collect() diff --git a/pyproject.toml b/pyproject.toml index 180ee498..ab1c8050 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aligned" -version = "0.0.45" +version = "0.0.46" description = "A scalable feature store that makes it easy to align offline and online ML systems" authors = ["Mats E. Mollestad "] license = "Apache-2.0"