Skip to content

Commit

Permalink
FORMAT: align with river
Browse files Browse the repository at this point in the history
  • Loading branch information
MarekWadinger committed Mar 6, 2024
1 parent 510b6be commit bb7137f
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 189 deletions.
89 changes: 89 additions & 0 deletions functions/hankel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations

from collections import deque
from typing import Literal

from river.base import Transformer

__all__ = ["Hankelizer"]


class Hankelizer(Transformer):
"""Time Delay Embedding using Hankelization.
Convert a time series into a time delay embedded Hankel vectors.
Args:
w: The number of data snapshots to preserve
return_partial: Whether to return partial Hankel matrices when the
window is not full. Default "copy" fills missing with copies.
Examples:
>>> h = Hankelizer(w=3)
>>> h.transform_one({"a": 1, "b": 2})
{'a_0': 1, 'b_0': 2, 'a_1': 1, 'b_1': 2, 'a_2': 1, 'b_2': 2}
>>> h = Hankelizer(w=3, return_partial=False)
>>> h.transform_one({"a": 1, "b": 2})
Traceback (most recent call last):
...
ValueError: The window is not full yet. Set `return_partial` to True ...
>>> h = Hankelizer(w=3, return_partial=True)
>>> h.transform_one({"a": 1, "b": 2})
{'a_0': nan, 'b_0': nan, 'a_1': nan, 'b_1': nan, 'a_2': 1, 'b_2': 2}
Transformation is stateless so we lost previous data.
>>> h.transform_one({"a": 3, "b": 4})
{'a_0': nan, 'b_0': nan, 'a_1': nan, 'b_1': nan, 'a_2': 3, 'b_2': 4}
>>> h._window
deque([], maxlen=2)
>>> h.learn_one({"a": 1, "b": 2})
Transform and learn in one go.
>>> h.learn_transform_one({"a": 3, "b": 4})
{'a_0': nan, 'b_0': nan, 'a_1': 1, 'b_1': 2, 'a_2': 3, 'b_2': 4}
>>> h.transform_one({"a": 5, "b": 6})
{'a_0': 1, 'b_0': 2, 'a_1': 3, 'b_1': 4, 'a_2': 5, 'b_2': 6}
"""

def __init__(
self, w: int, return_partial: bool | Literal["copy"] = "copy"
):
self.w = w
self.return_partial = return_partial

self._window = deque(maxlen=self.w - 1)
self.feature_names_in_: list[str]
self.n_features_in_: int

def learn_one(self, x: dict):
if not hasattr(self, "feature_names_in_"):
self.feature_names_in_ = list(x.keys())
self.n_features_in_ = len(x)

self._window.append(x)

def transform_one(self, x: dict):
_window = list(self._window) + [x]
w_past_current = len(_window)
if not self.return_partial and w_past_current < self.w:
raise ValueError(
"The window is not full yet. Set `return_partial` to True to return partial Hankel matrices."
)
else:
n_missing = self.w - w_past_current
_window = [_window[0]] * (n_missing) + _window
if not self.return_partial == "copy":
for i in range(n_missing):
_window[i] = {k: float("nan") for k in _window[0]}
return {
f"{k}_{i}": v
for i, d in enumerate(_window)
for k, v in d.items()
}

def learn_transform_one(self, x: dict):
y = self.transform_one(x)
self.learn_one(x)
return y
77 changes: 37 additions & 40 deletions functions/odmd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Online Dynamic Mode Decomposition (DMD) in [River API](riverml.xyz).
This module contains the implementation of the Online DMD, Weighted Online DMD,
Expand All @@ -24,14 +23,14 @@
from __future__ import annotations

import warnings
from typing import Union

import numpy as np
import pandas as pd
import scipy as sp
from river.base import MiniBatchRegressor
from scipy.sparse.linalg._eigen.arpack.arpack import ArpackNoConvergence

from river.base import MiniBatchRegressor

__all__ = [
"OnlineDMD",
"OnlineDMDwC",
Expand Down Expand Up @@ -152,7 +151,7 @@ def __init__(
w: float = 1.0,
initialize: int = 1,
exponential_weighting: bool = False,
seed: Union[int, None] = None,
seed: int | None = None,
) -> None:
self.r = int(r)
self.w = float(w)
Expand Down Expand Up @@ -214,8 +213,8 @@ def objective_function(x):

def update(
self,
x: Union[dict, np.ndarray],
y: Union[dict, np.ndarray, None] = None,
x: dict | np.ndarray,
y: dict | np.ndarray | None = None,
) -> None:
"""Update the DMD computation with a new pair of snapshots (x, y)
Expand Down Expand Up @@ -279,16 +278,16 @@ def update(

def learn_one(
self,
x: Union[dict, np.ndarray],
y: Union[dict, np.ndarray, None] = None,
x: dict | np.ndarray,
y: dict | np.ndarray | None = None,
) -> None:
"""Allias for update method."""
self.update(x, y)

def revert(
self,
x: Union[dict, np.ndarray],
y: Union[dict, np.ndarray, None] = None,
x: dict | np.ndarray,
y: dict | np.ndarray | None = None,
) -> None:
"""Gradually forget the older snapshots and revert the DMD computation.
Expand Down Expand Up @@ -338,8 +337,8 @@ def revert(

def _update_many(
self,
X: Union[np.ndarray, pd.DataFrame],
Y: Union[np.ndarray, pd.DataFrame],
X: np.ndarray | pd.DataFrame,
Y: np.ndarray | pd.DataFrame,
) -> None:
"""Update the DMD computation with a new batch of snapshots (X,Y).
Expand Down Expand Up @@ -375,8 +374,8 @@ def _update_many(

def learn_many(
self,
X: Union[np.ndarray, pd.DataFrame],
Y: Union[np.ndarray, pd.DataFrame, None] = None,
X: np.ndarray | pd.DataFrame,
Y: np.ndarray | pd.DataFrame | None = None,
) -> None:
"""Learn the OnlineDMD model using multiple snapshot pairs.
Expand Down Expand Up @@ -425,7 +424,7 @@ def learn_many(
else:
self._update_many(X, Y)

def predict_one(self, x: Union[dict, np.ndarray]) -> np.ndarray:
def predict_one(self, x: dict | np.ndarray) -> np.ndarray:
"""
Predicts the next state given the current state.
Expand All @@ -441,9 +440,7 @@ def predict_one(self, x: Union[dict, np.ndarray]) -> np.ndarray:
mat[s, :] = (self.A @ mat[s - 1, :]).real
return mat[-1, :]

def predict_many(
self, x: Union[dict, np.ndarray], forecast: int
) -> np.ndarray:
def predict_many(self, x: dict | np.ndarray, forecast: int) -> np.ndarray:
"""
Predicts multiple future values based on the given initial value.
Expand Down Expand Up @@ -478,7 +475,7 @@ def truncation_error(self, X: np.ndarray, Y: np.ndarray) -> float:
Y_hat = self.A @ X.T
return float(np.linalg.norm(Y - Y_hat.T) / np.linalg.norm(Y))

def transform_one(self, x: Union[dict, np.ndarray]) -> np.ndarray:
def transform_one(self, x: dict | np.ndarray) -> np.ndarray:
"""
Transforms the given input sample.
Expand All @@ -494,7 +491,7 @@ def transform_one(self, x: Union[dict, np.ndarray]) -> np.ndarray:
_, Phi = self.eig
return Phi.T @ x

def transform_many(self, X: Union[np.ndarray, pd.DataFrame]) -> np.ndarray:
def transform_many(self, X: np.ndarray | pd.DataFrame) -> np.ndarray:
"""
Transforms the given input sequence.
Expand Down Expand Up @@ -636,12 +633,12 @@ class OnlineDMDwC(OnlineDMD):

def __init__(
self,
B: Union[np.ndarray, None] = None,
B: np.ndarray | None = None,
r: int = 0,
w: float = 1.0,
initialize: int = 1,
exponential_weighting: bool = False,
seed: Union[int, None] = None,
seed: int | None = None,
) -> None:
super().__init__(
r,
Expand All @@ -656,9 +653,9 @@ def __init__(

def _update_many(
self,
X: Union[np.ndarray, pd.DataFrame],
Y: Union[np.ndarray, pd.DataFrame],
U: Union[np.ndarray, pd.DataFrame, None] = None,
X: np.ndarray | pd.DataFrame,
Y: np.ndarray | pd.DataFrame,
U: np.ndarray | pd.DataFrame | None = None,
) -> None:
"""Update the DMD computation with a new batch of snapshots (X,Y).
Expand Down Expand Up @@ -735,9 +732,9 @@ def _init_update(self):

def update(
self,
x: Union[dict, np.ndarray],
y: Union[dict, np.ndarray],
u: Union[dict, np.ndarray, None] = None,
x: dict | np.ndarray,
y: dict | np.ndarray,
u: dict | np.ndarray | None = None,
) -> None:
"""Update the DMD computation with a new pair of snapshots (x, y)
Expand Down Expand Up @@ -791,18 +788,18 @@ def update(

def learn_one(
self,
x: Union[dict, np.ndarray],
y: Union[dict, np.ndarray],
u: Union[dict, np.ndarray],
x: dict | np.ndarray,
y: dict | np.ndarray,
u: dict | np.ndarray,
) -> None:
"""Allias for OnlineDMDwC.update method."""
return self.update(x, y, u)

def revert(
self,
x: Union[dict, np.ndarray],
y: Union[dict, np.ndarray],
u: Union[dict, np.ndarray],
x: dict | np.ndarray,
y: dict | np.ndarray,
u: dict | np.ndarray,
) -> None:
"""Gradually forget the older snapshots and revert the DMD computation.
Expand Down Expand Up @@ -834,7 +831,7 @@ def revert(
self.A = self.A[:, : -self.l]

def predict_one(
self, x: Union[dict, np.ndarray], u: Union[dict, np.ndarray]
self, x: dict | np.ndarray, u: dict | np.ndarray
) -> np.ndarray:
"""
Predicts the next state given the current state.
Expand All @@ -858,8 +855,8 @@ def predict_one(

def predict_many(
self,
x: Union[dict, np.ndarray],
U: Union[np.ndarray, pd.DataFrame],
x: dict | np.ndarray,
U: np.ndarray | pd.DataFrame,
forecast: int,
) -> np.ndarray:
"""
Expand Down Expand Up @@ -888,9 +885,9 @@ def predict_many(

def truncation_error(
self,
X: Union[np.ndarray, pd.DataFrame],
Y: Union[np.ndarray, pd.DataFrame],
U: Union[np.ndarray, pd.DataFrame],
X: np.ndarray | pd.DataFrame,
Y: np.ndarray | pd.DataFrame,
U: np.ndarray | pd.DataFrame,
) -> float:
"""Compute the truncation error of the DMD model on the given data.
Expand Down
2 changes: 1 addition & 1 deletion functions/opca.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Online Principal Component Analysis (PCA) in [River API](riverml.xyz).
This module contains the implementation of the Online PCA algorithm.
Expand All @@ -13,6 +12,7 @@
from typing import Union

import numpy as np

from river.base import Transformer

__all__ = [
Expand Down
20 changes: 8 additions & 12 deletions functions/osvd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Online Singular Value Decomposition (SVD) in [River API](riverml.xyz).
This module contains the implementation of the Online SVD algorithm.
Expand All @@ -9,11 +8,10 @@
"""
from __future__ import annotations

from typing import Union

import numpy as np
import pandas as pd
import scipy as sp

from river.base import MiniBatchTransformer

__all__ = [
Expand Down Expand Up @@ -130,7 +128,7 @@ def _orthogonalize(self, U_, Sigma_, V_):
)
return UQ @ tU_, tSigma_, VQ @ tV_

def update(self, x: Union[dict, np.ndarray]):
def update(self, x: dict | np.ndarray):
if isinstance(x, dict):
self.feature_names_in_ = list(x.keys())
x = np.array(list(x.values()))
Expand All @@ -147,7 +145,7 @@ def update(self, x: Union[dict, np.ndarray]):
U_, Sigma_, V_ = self._orthogonalize(U_, Sigma_, V_)
self._U, self._S, self._V = U_, Sigma_, V_

def revert(self, _: Union[dict, np.ndarray]):
def revert(self, _: dict | np.ndarray):
# TODO: verify proper implementation of revert method
b = np.concatenate([np.zeros(self._V.shape[1] - 1), [1]]).reshape(
-1, 1
Expand All @@ -170,11 +168,11 @@ def revert(self, _: Union[dict, np.ndarray]):
U_, Sigma_, V_ = self._orthogonalize(U_, Sigma_, V_)
self._U, self._S, self._V = U_, Sigma_, V_

def learn_one(self, x: Union[dict, np.ndarray]):
def learn_one(self, x: dict | np.ndarray):
"""Allias for update method."""
self.update(x)

def learn_many(self, X: Union[np.ndarray, pd.DataFrame]):
def learn_many(self, X: np.ndarray | pd.DataFrame):
if isinstance(X, pd.DataFrame):
self.feature_names_in_ = list(X.columns)
X = X.values
Expand All @@ -188,9 +186,7 @@ def learn_many(self, X: Union[np.ndarray, pd.DataFrame]):
X.T, k=self.n_components_
)

def transform_one(
self, x: Union[dict, np.ndarray]
) -> Union[dict, np.ndarray]:
def transform_one(self, x: dict | np.ndarray) -> dict | np.ndarray:
is_dict = isinstance(x, dict)
if is_dict:
self.feature_names_in_ = list(x.keys())
Expand All @@ -200,8 +196,8 @@ def transform_one(
return x_ if not is_dict else dict(zip(self.feature_names_in_, x_))

def transform_many(
self, X: Union[np.ndarray, pd.DataFrame]
) -> Union[np.ndarray, pd.DataFrame]:
self, X: np.ndarray | pd.DataFrame
) -> np.ndarray | pd.DataFrame:
is_df = isinstance(X, pd.DataFrame)
if is_df:
self.feature_names_in_ = list(X.columns)
Expand Down
Loading

0 comments on commit bb7137f

Please sign in to comment.