Skip to content

Commit

Permalink
Add support for static and time-varying covariates for time series tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
shchur committed Aug 10, 2023
1 parent 54916d6 commit 99524a0
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 11 deletions.
23 changes: 22 additions & 1 deletion amlb/datasets/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ def __init__(self, path, fold, target, features, cache_dir, config):
self.id_column = config['id_column']
self.timestamp_column = config['timestamp_column']

# Ensure that id_column is parsed as string to avoid incorrect sorting
full_data[self.id_column] = full_data[self.id_column].astype(str)
full_data[self.timestamp_column] = pd.to_datetime(full_data[self.timestamp_column])
if config['name'] is not None:
file_name = config['name']
Expand All @@ -353,7 +355,9 @@ def __init__(self, path, fold, target, features, cache_dir, config):

self._train = CsvDatasplit(self, train_path, timestamp_column=self.timestamp_column)
self._test = CsvDatasplit(self, test_path, timestamp_column=self.timestamp_column)
self._dtypes = None
self._dtypes = full_data.dtypes

self.static_covariates_path = self.save_static_covariates(config['static_covariates_path'], save_dir=save_dir)

# Store repeated item_id & in-sample seasonal error for each time step in the forecast horizon - needed later for metrics like MASE.
# We need to store this information here because Result object has no access to past time series values.
Expand Down Expand Up @@ -385,6 +389,23 @@ def save_train_and_test_splits(self, full_data, fold, save_dir):
test_data.to_csv(test_path, index=False)
return train_path, test_path

def save_static_covariates(self, static_covariates_path, save_dir):
if static_covariates_path is not None:
static_covariates = read_csv(static_covariates_path)
if self.id_column not in static_covariates:
raise ValueError(f'The id_column with name {self.id_column} is missing from the static covariates')
ids_in_train_data = self.train.data[self.id_column].unique()
ids_in_static_covariates = static_covariates[self.id_column].astype(str)
if set(ids_in_train_data) != set(ids_in_static_covariates):
raise ValueError(f'Time series dataset and static covariates contain different item ids')

# save static_covariates locally because some framework venvs don't have dependencies to load files from S3
save_path = os.path.join(save_dir, "static_covariates.csv")
static_covariates.to_csv(save_path, index=False)
return save_path
else:
return None

def compute_seasonal_error(self):
train_data_with_index = self.train.data.set_index(self.id_column)
seasonal_diffs = train_data_with_index[self.target.name].groupby(level=self.id_column).diff(self.seasonality).abs()
Expand Down
1 change: 1 addition & 0 deletions frameworks/AutoGluon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def run_autogluon_timeseries(dataset: Dataset, config: TaskConfig):
data = dict(
train_path=dataset.train.path,
test_path=dataset.test.path,
static_covariates_path=dataset.static_covariates_path,
target=dataset.target.name,
id_column=dataset.id_column,
timestamp_column=dataset.timestamp_column,
Expand Down
38 changes: 29 additions & 9 deletions frameworks/AutoGluon/exec_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from joblib.externals.loky import get_reusable_executor

from frameworks.shared.callee import call_run, result, output_subdir
from frameworks.shared.utils import Timer, zip_path
from frameworks.shared.utils import Timer, zip_path, load_timeseries_dataset

log = logging.getLogger(__name__)

Expand All @@ -26,11 +26,29 @@ def run(dataset, config):
log.info(f"\n**** AutoGluon TimeSeries [v{__version__}] ****\n")
prediction_length = dataset.forecast_horizon_in_steps

train_data = TimeSeriesDataFrame.from_path(
dataset.train_path,
train_df, test_df, static_covariates = load_timeseries_dataset(dataset)
if static_covariates is not None:
static_covariates = static_covariates.set_index(dataset.id_column)

train_data = TimeSeriesDataFrame.from_data_frame(
train_df,
id_column=dataset.id_column,
timestamp_column=dataset.timestamp_column,
)
train_data.static_features = static_covariates


test_data = TimeSeriesDataFrame.from_data_frame(
test_df,
id_column=dataset.id_column,
timestamp_column=dataset.timestamp_column,
)
if len(train_data.columns) > 1:
future_known_covariates = test_data.drop(dataset.target, axis=1)
known_covariates_names = future_known_covariates.columns
else:
future_known_covariates = None
known_covariates_names = None

predictor_path = tempfile.mkdtemp() + os.sep
with Timer() as training:
Expand All @@ -41,18 +59,19 @@ def run(dataset, config):
eval_metric=get_eval_metric(config),
eval_metric_seasonal_period=dataset.seasonality,
quantile_levels=config.quantile_levels,
known_covariates_names=known_covariates_names,
)
predictor.fit(
train_data=train_data,
time_limit=config.max_runtime_seconds,
random_seed=config.seed,
**{k: v for k, v in config.framework_params.items() if not k.startswith('_')},
)

with Timer() as predict:
predictions = pd.DataFrame(predictor.predict(train_data))
predictions = pd.DataFrame(predictor.predict(train_data, known_covariates=future_known_covariates))

# Add columns necessary for the metric computation + quantile forecast to `optional_columns`
test_data_future = pd.read_csv(dataset.test_path, parse_dates=[dataset.timestamp_column])
optional_columns = dict(
repeated_item_id=np.load(dataset.repeated_item_id),
repeated_abs_seasonal_error=np.load(dataset.repeated_abs_seasonal_error),
Expand All @@ -61,13 +80,13 @@ def run(dataset, config):
optional_columns[str(q)] = predictions[str(q)].values

predictions_only = get_point_forecast(predictions, config.metric)
truth_only = test_data_future[dataset.target].values
truth_only = test_df[dataset.target].values

# Sanity check - make sure predictions are ordered correctly
future_index = pd.MultiIndex.from_frame(test_data_future[[dataset.id_column, dataset.timestamp_column]])
assert predictions.index.equals(future_index), "Predictions and test data index do not match"
assert predictions.index.equals(test_data.index), "Predictions and test data index do not match"

test_data_full = pd.concat([train_data, test_data_future.set_index([dataset.id_column, dataset.timestamp_column])])
test_data_full = pd.concat([train_data, test_data])
test_data_full.static_features = static_covariates
leaderboard = predictor.leaderboard(test_data_full, silent=True)

with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', 1000):
Expand Down Expand Up @@ -97,6 +116,7 @@ def get_eval_metric(config):
mase="MASE",
mse="MSE",
rmse="RMSE",
wql="mean_wQuantileLoss",
)

eval_metric = metrics_mapping[config.metric] if config.metric in metrics_mapping else None
Expand Down
12 changes: 12 additions & 0 deletions frameworks/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import importlib.util
import logging
import os
import pandas as pd
import sys


Expand Down Expand Up @@ -42,6 +43,17 @@ def load_amlb_module(mod, amlb_path=None):
return import_module(mod)


def load_timeseries_dataset(dataset):
# Ensure that id_column is loaded as string to avoid incorrect sorting
train_data = pd.read_csv(dataset.train_path, dtype={dataset.id_column: str}, parse_dates=[dataset.timestamp_column])
test_data = pd.read_csv(dataset.test_path, dtype={dataset.id_column: str}, parse_dates=[dataset.timestamp_column])
if dataset.static_covariates_path is not None:
static_covariates = pd.read_csv(dataset.static_covariates_path, dtype={dataset.id_column: str})
else:
static_covariates = None
return train_data, test_data, static_covariates


utils = load_amlb_module("amlb.utils")
# unorthodox for it's only now that we can safely import those functions
from amlb.utils import *
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
item_id,feat_1,feat_2,feat_3
T1,A,2,3.5
T2,A,3,2.5
T3,B,3,1.5
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CustomId,feat_1,feat_2,feat_3
T1,A,2,3.5
T2,A,3,2.5
T3,B,3,1.5
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
item_id,feat_1,feat_2,feat_3
T5,A,2,3.5
T1,A,3,2.5
T3,B,3,1.5
24 changes: 23 additions & 1 deletion tests/unit/amlb/datasets/file/test_file_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,16 @@ def _assert_data_consistency(dataset, check_encoded=True):
assert np.issubdtype(dataset.train.y_enc.dtype, np.floating) # not ideal given that it's also for classification targets, but well…


def _assert_static_covariates_consistency(dataset):
static_covariates = pd.read_csv(dataset.static_covariates_path)
assert set(dataset.train.data[dataset.id_column].unique()) == set(static_covariates[dataset.id_column].unique())


@pytest.mark.use_disk
def test_load_timeseries_task_csv(file_loader):
ds_def = ns(
path=os.path.join(res, "m4_hourly_subset.csv"),
static_covariates_path=os.path.join(res, "m4_hourly_subset_static_covariates.csv"),
forecast_horizon_in_steps=24,
seasonality=24,
freq="H",
Expand All @@ -286,13 +291,14 @@ def test_load_timeseries_task_csv(file_loader):
_assert_data_consistency(ds, check_encoded=False)
_assert_X_y_types(ds.train, check_encoded=False)
_assert_X_y_types(ds.test, check_encoded=False)
_assert_static_covariates_consistency(ds)

assert isinstance(ds.train.data, pd.DataFrame)
assert isinstance(ds.test.data, pd.DataFrame)
assert len(ds.repeated_abs_seasonal_error) == len(ds.test.data)
assert len(ds.repeated_item_id) == len(ds.test.data)

assert pat.is_categorical_dtype(ds._dtypes[ds.id_column])
assert pat.is_string_dtype(ds._dtypes[ds.id_column])
assert pat.is_datetime64_dtype(ds._dtypes[ds.timestamp_column])
assert pat.is_float_dtype(ds._dtypes[ds.target.name])

Expand Down Expand Up @@ -336,9 +342,24 @@ def test_given_nondefault_column_names_when_key_is_missing_then_exception_is_rai
file_loader.load(ds_def)


def test_if_static_covariates_contain_wrong_item_ids_then_exception_is_raised(file_loader):
ds_def = ns(
path=os.path.join(res, "m4_hourly_subset.csv"),
static_covariates_path=os.path.join(res, "m4_hourly_subset_static_covariates_wrong_items.csv"),
forecast_horizon_in_steps=24,
seasonality=24,
freq="H",
target="target",
type="timeseries",
)
with pytest.raises(ValueError, match="contain different item ids"):
ds = file_loader.load(ds_def)


def test_given_nondefault_column_names_then_timeseries_dataset_can_be_loaded(file_loader):
task_kwargs = dict(
path=os.path.join(res, "m4_hourly_subset_nondefault_cols.csv"),
static_covariates_path=os.path.join(res, "m4_hourly_subset_static_covariates_nondefault_cols.csv"),
forecast_horizon_in_steps=24,
seasonality=24,
freq="H",
Expand All @@ -350,6 +371,7 @@ def test_given_nondefault_column_names_then_timeseries_dataset_can_be_loaded(fil
ds_def = ns.from_dict(task_kwargs)
ds = file_loader.load(ds_def)
_assert_data_consistency(ds, check_encoded=False)
_assert_static_covariates_consistency(ds)


@pytest.mark.parametrize("forecast_horizon, fold", [(50, 2), (100, 0), (10, 9)])
Expand Down

0 comments on commit 99524a0

Please sign in to comment.