Skip to content

Commit

Permalink
Add beta pyspark API. (#107)
Browse files Browse the repository at this point in the history
* Simplify data I/O utils.

Previously, the data I/O utilities were responsible for aggregating
hierarchical time series data from a single source CSV. We instead
delegate this to pyspark in this PR.

* Allow tree models to have max_forecast_steps=None.

* Robust target_seq_index for ForecasterEnsemble.

This commit converts target_seq_index into a property for the
ForecasterEnsemble. This allows us to enforce a single value of
target_seq_index for all models.

* Bugfix w/ TimeSeries from string index dataframe.

* Create merlion.spark API with pyspark Pandas UDFs.

We now have pyspark pandas UDFs for forecasting (parallelized over time
series ID), anomaly detection (parallelized over time series ID), and
hierarchical time series reconciliation (parallelized over time).

* Fix minor edge cases.

* Update Java install instructions.

* Switch from yaml to json.

* Allow datasets to not have data_cols specified.

* Allow null index_cols values in spark datasets.

* Simplify pyspark forecasting app.

Move visualization features to a different file.

* Fix spark data processing bug when >1 index_cols.

* Create pyspark app for anomaly detection.

* Add Dockerfile.

* Add docs & clean up code.

* Remove wheel & pytest dependencies.

* Add backward compatibility with Spark 3.1.1

* Remove strict pyspark version requirement.

* Remove pyspark session helper.

* Update Dockerfile to extend pyspark image.

* Minor rearrangement of Dockerfile.

* Update version to 1.2.2.

* Add .dockerignore.

* Streamline Dockerfile.
  • Loading branch information
aadyotb authored Jun 17, 2022
1 parent 3e49d2e commit 5a322ca
Show file tree
Hide file tree
Showing 21 changed files with 687 additions and 130 deletions.
22 changes: 22 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# package
__pycache__
*.egg-info
data
docs
tmp
ts_datasets
# pytest
.pytest_cache
.coverage*
htmlcov
# IDE/system
.idea
*.swp
.DS_Store
sandbox
.vscode
Icon?
# build files
docs/build/*
.ipynb_checkpoints
venv/
34 changes: 34 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
ARG spark_uid=185
FROM apache/spark-py:v3.2.1

# Change to root user for installation steps
USER 0

# Uninstall existing python and replace it with miniconda.
# This is to get the right version of Python in Debian, since Prophet doesn't play nice with Python 3.9+.
# FIXME: maybe optimize the size? this image is currently 3.2GB.
RUN apt-get update && \
apt-get remove -y python3 python3-pip && \
apt-get install -y --no-install-recommends curl && \
apt-get autoremove -yqq --purge && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
RUN curl -fsSL -v -o ~/miniconda.sh -O https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && \
chmod +x ~/miniconda.sh && \
~/miniconda.sh -b -p /opt/conda && \
rm ~/miniconda.sh && \
# Install prophet while we're at it, since this is easier to conda install than pip install
/opt/conda/bin/conda install -y prophet && \
/opt/conda/bin/conda clean -ya
ENV PATH="/opt/conda/bin:${SPARK_HOME}/bin:${PATH}"

# Install (for spark-sql) and Merlion; get pyspark & py4j from the PYTHONPATH
ENV PYTHONPATH="${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:${PYTHONPATH}"
COPY *.md ./
COPY setup.py ./
COPY merlion merlion
RUN pip install pyarrow "./[prophet]" && pip uninstall -y py4j

# Copy Merlion pyspark apps
COPY spark /opt/spark/apps
USER ${spark_uid}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Note the following external dependencies:
2. Some of our anomaly detection models depend on the Java Development Kit (JDK). For Ubuntu, call
``sudo apt-get install openjdk-11-jdk``. For Mac OS, install [Homebrew](<https://brew.sh/>) and call
``brew tap adoptopenjdk/openjdk && brew install --cask adoptopenjdk11``. Also ensure that ``java`` can be found
on your ``$PATH``, and that the ``$JAVA_HOME`` environment variable is set.
on your ``PATH``, and that the ``JAVA_HOME`` environment variable is set.

## Documentation

Expand Down
3 changes: 2 additions & 1 deletion docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ Note the following external dependencies:

2. Some of our anomaly detection models depend on having the Java Development Kit (JDK) installed. For Ubuntu, call
``sudo apt-get install openjdk-11-jdk``. For Mac OS, install `Homebrew <https://brew.sh/>`_ and call
``brew tap adoptopenjdk/openjdk && brew install --cask adoptopenjdk11``.
``brew tap adoptopenjdk/openjdk && brew install --cask adoptopenjdk11``. Also ensure that ``java`` can be found
on your ``PATH``, and that the ``JAVA_HOME`` environment variable is set.
This is relevant for the :py:class:`RandomCutForest <merlion.models.anomaly.random_cut_forest.RandomCutForest>`
which is also used as a part of the :py:class:`DefaultDetector <merlion.models.defaults.DefaultDetector>`.

Expand Down
20 changes: 10 additions & 10 deletions docs/source/merlion.models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ Finally, we support ensembles of models in :py:mod:`merlion.models.ensemble`.
:show-inheritance:

.. autosummary::
defaults
factory
base
layers
defaults
anomaly
anomaly.change_point
anomaly.forecast_based
Expand All @@ -87,6 +87,15 @@ Subpackages
Submodules
----------

merlion.models.defaults module
------------------------------

.. automodule:: merlion.models.defaults
:members:
:undoc-members:
:show-inheritance:


merlion.models.factory module
-----------------------------

Expand All @@ -111,12 +120,3 @@ merlion.models.layers module
:members:
:undoc-members:
:show-inheritance:

merlion.models.defaults module
------------------------------

.. automodule:: merlion.models.defaults
:members:
:undoc-members:
:show-inheritance:

5 changes: 5 additions & 0 deletions docs/source/merlion.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ each associated with its own sub-package:
- :py:mod:`merlion.models`: A library of models unified under a single shared interface, with specializations
for anomaly detection and forecasting. More specifically, we have

- :py:mod:`merlion.models.defaults`: Default models for anomaly detection and forecasting. These are good models
for getting started.
- :py:mod:`merlion.models.anomaly`: Anomaly detection models
- :py:mod:`merlion.models.anomaly.change_point`: Change point detection models
- :py:mod:`merlion.models.forecast`: Forecasting models
Expand All @@ -15,6 +17,8 @@ each associated with its own sub-package:
detection and forecasting.
- :py:mod:`merlion.models.automl`: AutoML layers for various models

- :py:mod:`merlion.spark`: APIs to integrate Merlion with PySpark for using distributed computing to run training
and inference on multiple time series in parallel.
- :py:mod:`merlion.transform`: Data pre-processing layer which implements many standard data transformations used in
time series analysis. Transforms are callable objects, and each model has its own configurable ``model.transform``
which it uses to pre-process all input time series for both training and inference.
Expand Down Expand Up @@ -51,6 +55,7 @@ Subpackages
:maxdepth: 4

merlion.models
merlion.spark
merlion.transform
merlion.post_process
merlion.evaluate
Expand Down
33 changes: 33 additions & 0 deletions docs/source/merlion.spark.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
merlion.spark package
=====================
This module implements APIs to integrate Merlion with PySpark. The expected use case is to
use distributed computing to train and run inference on multiple time series in parallel.

.. automodule:: merlion.spark
:members:
:undoc-members:
:show-inheritance:

.. autosummary::
dataset
pandas_udf

Submodules
----------

merlion.spark.dataset module
----------------------------

.. automodule:: merlion.spark.dataset
:members:
:undoc-members:
:show-inheritance:

merlion.spark.pandas\_udf module
--------------------------------

.. automodule:: merlion.spark.pandas_udf
:members:
:undoc-members:
:show-inheritance:

2 changes: 1 addition & 1 deletion merlion/models/automl/autoprophet.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class AutoProphetConfig(SeasonalityConfig):
def __init__(
self,
model: Union[Prophet, dict] = None,
periodicity_strategy: PeriodicityStrategy = PeriodicityStrategy.All,
periodicity_strategy: Union[PeriodicityStrategy, str] = PeriodicityStrategy.All,
**kwargs,
):
model = dict(name="Prophet") if model is None else model
Expand Down
32 changes: 30 additions & 2 deletions merlion/models/ensemble/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,37 @@ class ForecasterEnsembleConfig(ForecasterConfig, EnsembleConfig):

_default_combiner = Mean(abs_score=False)

def __init__(self, max_forecast_steps=None, verbose=False, **kwargs):
def __init__(self, max_forecast_steps=None, target_seq_index=None, verbose=False, **kwargs):
self.verbose = verbose
super().__init__(max_forecast_steps=max_forecast_steps, **kwargs)
super().__init__(max_forecast_steps=max_forecast_steps, target_seq_index=None, **kwargs)
# Override the target_seq_index of all individual models after everything has been initialized
# FIXME: doesn't work if models have heterogeneous transforms which change the dim of the input time series
self.target_seq_index = target_seq_index
if self.models is not None:
assert all(model.target_seq_index == self.target_seq_index for model in self.models)

@property
def target_seq_index(self):
return self._target_seq_index

@target_seq_index.setter
def target_seq_index(self, target_seq_index):
if self.models is not None:
# Get the target_seq_index from the models if None is given
if target_seq_index is None:
non_none_idxs = [m.target_seq_index for m in self.models if m.target_seq_index is not None]
if len(non_none_idxs) > 0:
target_seq_index = non_none_idxs[0]
assert all(m.target_seq_index in [None, target_seq_index] for m in self.models), (
f"Attempted to infer target_seq_index from the individual models in the ensemble, but "
f"not all models have the same target_seq_index. Got {[m.target_seq_index for m in self.models]}"
)
# Only override the target_seq_index from the models if there is one
if target_seq_index is not None:
for model in self.models:
model.config.target_seq_index = target_seq_index
# Save the ensemble-level target_seq_index as a private variable
self._target_seq_index = target_seq_index


class ForecasterEnsemble(EnsembleBase, ForecasterBase):
Expand Down
13 changes: 13 additions & 0 deletions merlion/models/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
Contains the `ModelFactory`.
"""
import copy
import inspect
from typing import Dict, Tuple, Type, Union

Expand Down Expand Up @@ -101,3 +102,15 @@ def load_bytes(cls, obj, **kwargs) -> ModelBase:
name = dill.loads(obj)[0]
model_class = cls.get_model_class(name)
return model_class.from_bytes(obj, **kwargs)


def instantiate_or_copy_model(model: Union[dict, ModelBase]):
if isinstance(model, ModelBase):
return copy.deepcopy(model)
if isinstance(model, dict):
try:
return ModelFactory.create(**model)
except Exception as e:
raise ValueError(f"Invalid `dict` specifying a model config.\n\nGot {model}\n\nException: {e}")
else:
raise TypeError(f"Expected model to be a `dict` or `ModelBase`. Got {model}.")
6 changes: 3 additions & 3 deletions merlion/models/forecast/trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def _train(self, train_data: pd.DataFrame, train_config=None):
)
self.model.fit(inputs_train, labels_train)
inputs_train = np.atleast_2d(inputs_train)
pred = self._hybrid_forecast(inputs_train)
pred = self._hybrid_forecast(inputs_train, self.max_forecast_steps or len(inputs_train) - self.maxlags)
# since the model may predict multiple steps, we concatenate all the first steps together
pred = pred[:, 0].reshape(-1)

Expand All @@ -134,7 +134,7 @@ def _train(self, train_data: pd.DataFrame, train_config=None):
# sequence mode, set prediction_stride = max_forecast_steps
if self.prediction_stride > 1:
max_forecast_steps = seq_ar_common.max_feasible_forecast_steps(train_data, self.maxlags)
if self.max_forecast_steps > max_forecast_steps:
if self.max_forecast_steps is not None and self.max_forecast_steps > max_forecast_steps:
logger.warning(
f"With train data of length {len(train_data)} and "
f"maxlags={self.maxlags}, the maximum supported forecast "
Expand All @@ -144,7 +144,7 @@ def _train(self, train_data: pd.DataFrame, train_config=None):
f"'training_mode = autogression'."
)
self.config.max_forecast_steps = max_forecast_steps
if self.prediction_stride != self.max_forecast_steps:
if self.max_forecast_steps is not None and self.prediction_stride != self.max_forecast_steps:
logger.warning(
f"For multivariate dataset, reset prediction_stride = max_forecast_steps = {self.max_forecast_steps} "
)
Expand Down
Loading

0 comments on commit 5a322ca

Please sign in to comment.