diff --git a/.github/workflows/code-quality.yaml b/.github/workflows/code-quality.yaml index 11bc0b6..efcfc9c 100644 --- a/.github/workflows/code-quality.yaml +++ b/.github/workflows/code-quality.yaml @@ -8,7 +8,7 @@ on: - "fix/**/**" - "release/v*.*.*" workflow_run: - workflows: ["Build"] + workflows: [CI Build"] types: - completed diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 438b5ad..f7b6c70 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -8,7 +8,7 @@ on: - "fix/**/**" - "release/v*.*.*" workflow_run: - workflows: ["Code Quality"] + workflows: ["CI Code Quality"] types: - completed diff --git a/README.md b/README.md index 1c88f14..d8b542c 100644 --- a/README.md +++ b/README.md @@ -21,13 +21,13 @@ CI - Build - - CI - Code Style + + CI - Code Quality - CI - Test + CI - Automated Testing - + License: MIT diff --git a/src/anomalytics/__init__.py b/src/anomalytics/__init__.py index 16f2a3e..933e27b 100644 --- a/src/anomalytics/__init__.py +++ b/src/anomalytics/__init__.py @@ -1,7 +1,15 @@ __version__ = "0.1.0" -__all__ = ["get_anomaly", "get_anomaly_score", "get_exceedance_peaks_over_threshold", "read_ts", "set_time_window"] +__all__ = [ + "get_anomaly", + "get_anomaly_score", + "get_detector", + "get_exceedance_peaks_over_threshold", + "read_ts", + "set_time_window", +] +from anomalytics.models import get_detector from anomalytics.stats import get_anomaly, get_anomaly_score, get_exceedance_peaks_over_threshold from anomalytics.time_series import read_ts from anomalytics.time_windows import set_time_window diff --git a/src/anomalytics/evals/kolmogorv_smirnov.py b/src/anomalytics/evals/kolmogorv_smirnov.py index 852e8bf..69662ac 100644 --- a/src/anomalytics/evals/kolmogorv_smirnov.py +++ b/src/anomalytics/evals/kolmogorv_smirnov.py @@ -90,14 +90,12 @@ def ks_1sample( ) return dict( - total_nonzero_exceedances=len(ts), - start_datetime=ts.index[0], - end_datetime=fit_params[-1]["datetime"], - stats_distance=ks_result.statistic, - p_value=ks_result.pvalue, - c=c, - loc=loc, - scale=scale, + total_nonzero_exceedances=[ts.shape[0]], + stats_distance=[ks_result.statistic], + p_value=[ks_result.pvalue], + c=[c], + loc=[loc], + scale=[scale], ) if stats_method == "ZS": raise NotImplementedError() diff --git a/src/anomalytics/models/__init__.py b/src/anomalytics/models/__init__.py new file mode 100644 index 0000000..6856aee --- /dev/null +++ b/src/anomalytics/models/__init__.py @@ -0,0 +1,3 @@ +__all__ = ["get_detector"] + +from anomalytics.models.detector import get_detector diff --git a/src/anomalytics/models/abstract.py b/src/anomalytics/models/abstract.py new file mode 100644 index 0000000..c880c62 --- /dev/null +++ b/src/anomalytics/models/abstract.py @@ -0,0 +1,65 @@ +import abc +import typing + +import pandas as pd + + +class Detector(metaclass=abc.ABCMeta): + @abc.abstractmethod + def __init__( + self, dataset: typing.Union[pd.DataFrame, pd.Series], anomaly_type: typing.Literal["high", "low"] = "high" + ): + """ + Initialize the anomaly detection model with a specific statisticail method. + + ## Parameters + ---------- + dataset : typing.Union[pandas.DataFrame, pandas.Series] + DataFame or Series objects to be analyzed. + Index must be date-time and values must be numeric. + + anomaly_type : typing.Literal["high", "low"] + Defining which kind of anomaly are we expecting. + """ + ... + + @abc.abstractmethod + def fit(self) -> None: + """ + Train the anomaly detection model using the provided data. + """ + ... + + @abc.abstractmethod + def detect(self) -> None: + """ + Detect anomalies in the dataset. + """ + ... + + @abc.abstractmethod + def evaluate(self, method: typing.Literal["ks", "qq"] = "ks") -> None: + """ + Evaluate the performance of the anomaly detection model based on true and predicted labels. + + ## Parameters + ------------- + method : method: typing.Literal["ks", "qq"], default "ks" + A parameter that decide what statistical method to use for testing the analysis result. + * "ks" for Kolmogorov Smirnov + * "qq" for QQ Plot + """ + ... + + @property + @abc.abstractmethod + def params(self) -> typing.Dict: + """ + Retrieve the parameters of the anomaly detection model. + + ## Returns + ---------- + parameters : typing.Dict + The fitting result from the model. + """ + ... diff --git a/src/anomalytics/models/autoencoder.py b/src/anomalytics/models/autoencoder.py new file mode 100644 index 0000000..f17a6c2 --- /dev/null +++ b/src/anomalytics/models/autoencoder.py @@ -0,0 +1,55 @@ +import typing + +import pandas as pd + +from anomalytics.models.abstract import Detector + + +class AutoencoderDetector(Detector): + """ + Anomaly detector class that implements the "Autoencoder" method. + ! TODO: Implement anomaly detection with autoencoder method! + """ + + __slots__ = [ + "__anomaly_type", + "__dataset__", + ] + + __anomaly_type: typing.Literal["high", "low"] + __dataset: typing.Union[pd.DataFrame, pd.Series] + + def __init__( + self, dataset: typing.Union[pd.DataFrame, pd.Series], anomaly_type: typing.Literal["high", "low"] = "high" + ): + """ + Initialize Autoencoder model for anomaly detection. + + ## Parameters + ---------- + dataset : typing.Union[pandas.DataFrame, pandas.Series] + DataFame or Series objects to be analyzed. + Index must be date-time and values must be numeric. + + anomaly_type : typing.Literal["high", "low"] + Defining which kind of anomaly are we expecting. + """ + + self.__anomaly_type = anomaly_type + self.__dataset = dataset + + def fit(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def detect(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def evaluate(self, method: typing.Literal["ks", "qq"] = "ks") -> None: + raise NotImplementedError("Not yet implemented!") + + @property + def params(self) -> dict: # type: ignore + raise NotImplementedError("Not yet implemented!") + + def __str__(self) -> str: + return "AE" diff --git a/src/anomalytics/models/block_maxima.py b/src/anomalytics/models/block_maxima.py new file mode 100644 index 0000000..c771592 --- /dev/null +++ b/src/anomalytics/models/block_maxima.py @@ -0,0 +1,55 @@ +import typing + +import pandas as pd + +from anomalytics.models.abstract import Detector + + +class BlockMaximaDetector(Detector): + """ + Anomaly detector class that implements the "Block Maxima" method. + ! TODO: Implement anomaly detection with block-maxima method! + """ + + __slots__ = [ + "__anomaly_type", + "__dataset__", + ] + + __anomaly_type: typing.Literal["high", "low"] + __dataset: typing.Union[pd.DataFrame, pd.Series] + + def __init__( + self, dataset: typing.Union[pd.DataFrame, pd.Series], anomaly_type: typing.Literal["high", "low"] = "high" + ): + """ + Initialize Block-Maxima model for anomaly detection. + + ## Parameters + ---------- + dataset : typing.Union[pandas.DataFrame, pandas.Series] + DataFame or Series objects to be analyzed. + Index must be date-time and values must be numeric. + + anomaly_type : typing.Literal["high", "low"] + Defining which kind of anomaly are we expecting. + """ + + self.__anomaly_type = anomaly_type + self.__dataset = dataset + + def fit(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def detect(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def evaluate(self, method: typing.Literal["ks", "qq"] = "ks") -> None: + raise NotImplementedError("Not yet implemented!") + + @property + def params(self) -> dict: # type: ignore + raise NotImplementedError("Not yet implemented!") + + def __str__(self) -> str: + return "BM" diff --git a/src/anomalytics/models/dbscan.py b/src/anomalytics/models/dbscan.py new file mode 100644 index 0000000..b05453b --- /dev/null +++ b/src/anomalytics/models/dbscan.py @@ -0,0 +1,55 @@ +import typing + +import pandas as pd + +from anomalytics.models.abstract import Detector + + +class DBSCANDetector(Detector): + """ + Anomaly detector class that implements the "Density-Based Spatial Clustering of Applications with Noise" (D. B. S. C. A. N.) method. + ! TODO: Implement anomaly detection with "DBSCAN" method! + """ + + __slots__ = [ + "__anomaly_type", + "__dataset__", + ] + + __anomaly_type: typing.Literal["high", "low"] + __dataset: typing.Union[pd.DataFrame, pd.Series] + + def __init__( + self, dataset: typing.Union[pd.DataFrame, pd.Series], anomaly_type: typing.Literal["high", "low"] = "high" + ): + """ + Initialize DBSCAN model for anomaly detection. + + ## Parameters + ---------- + dataset : typing.Union[pandas.DataFrame, pandas.Series] + DataFame or Series objects to be analyzed. + Index must be date-time and values must be numeric. + + anomaly_type : typing.Literal["high", "low"] + Defining which kind of anomaly are we expecting. + """ + + self.__anomaly_type = anomaly_type + self.__dataset = dataset + + def fit(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def detect(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def evaluate(self, method: typing.Literal["ks", "qq"] = "ks") -> None: + raise NotImplementedError("Not yet implemented!") + + @property + def params(self) -> dict: # type: ignore + raise NotImplementedError("Not yet implemented!") + + def __str__(self) -> str: + return "DBSCAN" diff --git a/src/anomalytics/models/detector.py b/src/anomalytics/models/detector.py new file mode 100644 index 0000000..a5dabed --- /dev/null +++ b/src/anomalytics/models/detector.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import logging +import typing + +import pandas as pd + +logger = logging.getLogger(__name__) + + +class FactoryDetector: + def __init__( + self, + method: typing.Literal["AE", "BM", "DBSCAN", "ISOF", "MAD", "POT", "ZS", "1CSVM"], + dataset: typing.Union[pd.DataFrame, pd.Series], + anomaly_type: typing.Literal["high", "low"] = "high", + ): + self.method = method + self.dataset = dataset + self.anomaly_type = anomaly_type + + def __call__(self): + if self.method == "AE": + from anomalytics.models.autoencoder import AutoencoderDetector + + return AutoencoderDetector(dataset=self.dataset, anomaly_type=self.anomaly_type) + + elif self.method == "BM": + from anomalytics.models.block_maxima import BlockMaximaDetector + + return BlockMaximaDetector(dataset=self.dataset, anomaly_type=self.anomaly_type) + + elif self.method == "DBSCAN": + from anomalytics.models.dbscan import DBSCANDetector + + return DBSCANDetector(dataset=self.dataset, anomaly_type=self.anomaly_type) + + elif self.method == "ISOF": + from anomalytics.models.isoforest import IsoForestDetector + + return IsoForestDetector(dataset=self.dataset, anomaly_type=self.anomaly_type) + + elif self.method == "MAD": + from anomalytics.models.mad import MADDetector + + return MADDetector(dataset=self.dataset, anomaly_type=self.anomaly_type) + + elif self.method == "1CSVM": + from anomalytics.models.one_class_svm import OneClassSVMDetector + + return OneClassSVMDetector(dataset=self.dataset, anomaly_type=self.anomaly_type) + + elif self.method == "POT": + from anomalytics.models.peaks_over_threshold import POTDetector + + return POTDetector(dataset=self.dataset, anomaly_type=self.anomaly_type) + + elif self.method == "ZS": + from anomalytics.models.zscore import ZScoreDetector + + return ZScoreDetector(dataset=self.dataset, anomaly_type=self.anomaly_type) + + raise ValueError( + "Invalid value! Available `method` arguments: 'AE', 'BM', 'DBSCAN', 'ISOF', 'MAD', 'POT', 'ZS', '1CSVM'" + ) + + +def get_detector( + method: typing.Literal["AE", "BM", "DBSCAN", "ISOF", "MAD", "POT", "ZS", "1CSVM"], + dataset: typing.Union[pd.DataFrame, pd.Series], + anomaly_type: typing.Literal["high", "low"] = "high", +): + return FactoryDetector(method=method, dataset=dataset, anomaly_type=anomaly_type)() diff --git a/src/anomalytics/models/isoforest.py b/src/anomalytics/models/isoforest.py new file mode 100644 index 0000000..a2a06e8 --- /dev/null +++ b/src/anomalytics/models/isoforest.py @@ -0,0 +1,55 @@ +import typing + +import pandas as pd + +from anomalytics.models.abstract import Detector + + +class IsoForestDetector(Detector): + """ + Anomaly detector class that implements the "Isolation Forest" method. + ! TODO: Implement anomaly detection with isolation forest method! + """ + + __slots__ = [ + "__anomaly_type", + "__dataset__", + ] + + __anomaly_type: typing.Literal["high", "low"] + __dataset: typing.Union[pd.DataFrame, pd.Series] + + def __init__( + self, dataset: typing.Union[pd.DataFrame, pd.Series], anomaly_type: typing.Literal["high", "low"] = "high" + ): + """ + Initialize Isolation Forest model for anomaly detection. + + ## Parameters + ---------- + dataset : typing.Union[pandas.DataFrame, pandas.Series] + DataFame or Series objects to be analyzed. + Index must be date-time and values must be numeric. + + anomaly_type : typing.Literal["high", "low"] + Defining which kind of anomaly are we expecting. + """ + + self.__anomaly_type = anomaly_type + self.__dataset = dataset + + def fit(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def detect(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def evaluate(self, method: typing.Literal["ks", "qq"] = "ks") -> None: + raise NotImplementedError("Not yet implemented!") + + @property + def params(self) -> dict: # type: ignore + raise NotImplementedError("Not yet implemented!") + + def __str__(self) -> str: + return "ISOF" diff --git a/src/anomalytics/models/mad.py b/src/anomalytics/models/mad.py new file mode 100644 index 0000000..9e20768 --- /dev/null +++ b/src/anomalytics/models/mad.py @@ -0,0 +1,55 @@ +import typing + +import pandas as pd + +from anomalytics.models.abstract import Detector + + +class MADDetector(Detector): + """ + Anomaly detector class that implements the "Median Absolute Deviation" (M. A. D.) method. + ! TODO: Implement anomaly detection with MAD method! + """ + + __slots__ = [ + "__anomaly_type", + "__dataset__", + ] + + __anomaly_type: typing.Literal["high", "low"] + __dataset: typing.Union[pd.DataFrame, pd.Series] + + def __init__( + self, dataset: typing.Union[pd.DataFrame, pd.Series], anomaly_type: typing.Literal["high", "low"] = "high" + ): + """ + Initialize MAD model for anomaly detection. + + ## Parameters + ---------- + dataset : typing.Union[pandas.DataFrame, pandas.Series] + DataFame or Series objects to be analyzed. + Index must be date-time and values must be numeric. + + anomaly_type : typing.Literal["high", "low"] + Defining which kind of anomaly are we expecting. + """ + + self.__anomaly_type = anomaly_type + self.__dataset = dataset + + def fit(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def detect(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def evaluate(self, method: typing.Literal["ks", "qq"] = "ks") -> None: + raise NotImplementedError("Not yet implemented!") + + @property + def params(self) -> dict: # type: ignore + raise NotImplementedError("Not yet implemented!") + + def __str__(self) -> str: + return "MAD" diff --git a/src/anomalytics/models/one_class_svm.py b/src/anomalytics/models/one_class_svm.py new file mode 100644 index 0000000..cf3793e --- /dev/null +++ b/src/anomalytics/models/one_class_svm.py @@ -0,0 +1,55 @@ +import typing + +import pandas as pd + +from anomalytics.models.abstract import Detector + + +class OneClassSVMDetector(Detector): + """ + Anomaly detector class that implements the "Once Class Support Vector Machine" (S. V. M.) method. + ! TODO: Implement anomaly detection with SVM method! + """ + + __slots__ = [ + "__anomaly_type", + "__dataset__", + ] + + __anomaly_type: typing.Literal["high", "low"] + __dataset: typing.Union[pd.DataFrame, pd.Series] + + def __init__( + self, dataset: typing.Union[pd.DataFrame, pd.Series], anomaly_type: typing.Literal["high", "low"] = "high" + ): + """ + Initialize 1 Class SVM model for anomaly detection. + + ## Parameters + ---------- + dataset : typing.Union[pandas.DataFrame, pandas.Series] + DataFame or Series objects to be analyzed. + Index must be date-time and values must be numeric. + + anomaly_type : typing.Literal["high", "low"] + Defining which kind of anomaly are we expecting. + """ + + self.__anomaly_type = anomaly_type + self.__dataset = dataset + + def fit(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def detect(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def evaluate(self, method: typing.Literal["ks", "qq"] = "ks") -> None: + raise NotImplementedError("Not yet implemented!") + + @property + def params(self) -> dict: # type: ignore + raise NotImplementedError("Not yet implemented!") + + def __str__(self) -> str: + return "1CSVM" diff --git a/src/anomalytics/models/peaks_over_threshold.py b/src/anomalytics/models/peaks_over_threshold.py new file mode 100644 index 0000000..9aed648 --- /dev/null +++ b/src/anomalytics/models/peaks_over_threshold.py @@ -0,0 +1,300 @@ +import datetime +import logging +import typing +import warnings + +import numpy as np +import pandas as pd + +from anomalytics.evals.kolmogorv_smirnov import ks_1sample +from anomalytics.evals.qq_plot import visualize_qq_plot +from anomalytics.models.abstract import Detector +from anomalytics.plots.plot import plot_gen_pareto, plot_hist, plot_line +from anomalytics.stats.peaks_over_threshold import ( + get_anomaly, + get_anomaly_score, + get_anomaly_threshold, + get_exceedance_peaks_over_threshold, + get_threshold_peaks_over_threshold, +) +from anomalytics.time_windows.time_window import set_time_window + +logger = logging.getLogger(__name__) + + +class POTDetector(Detector): + __slots__ = [ + "__dataset", + "__time_window", + "__anomaly_type", + "__exceedance_threshold", + "__exceedance", + "__anomaly_score", + "__anomaly_threshold", + "__anomaly", + "__eval", + "__params", + ] + + __anomaly_type: typing.Literal["high", "low"] + __dataset: typing.Union[pd.DataFrame, pd.Series] + __time_window: typing.Tuple[int, int, int] + __exceedance_threshold: typing.Union[pd.DataFrame, pd.Series] + __exceedance: typing.Union[pd.DataFrame, pd.Series] + __anomaly_score: typing.Union[pd.DataFrame, pd.Series] + __anomaly_threshold: typing.Union[pd.DataFrame, pd.Series] + __anomaly: typing.Union[pd.DataFrame, pd.Series] + __eval: pd.DataFrame + __params: typing.Dict + + def __init__( + self, dataset: typing.Union[pd.DataFrame, pd.Series], anomaly_type: typing.Literal["high", "low"] = "high" + ): + """ + Initialize POT model for anomaly detection. + + ## Parameters + ---------- + dataset : typing.Union[pandas.DataFrame, pandas.Series] + DataFame or Series objects to be analyzed. + Index must be date-time and values must be numeric. + + anomaly_type : typing.Literal["high", "low"] + Defining which kind of anomaly are we expecting. + """ + logger.info("start initialization of POT detection model") + + dataset = dataset.copy(deep=True) + + if not isinstance(dataset.index, pd.DatetimeIndex): + try: + msg = "Invalid data type! The dataset index is not pandas.DatetimeIndex - start converting to `pandas.DatetimeIndex`" + logger.debug(msg) + warnings.warn(msg, category=RuntimeWarning) + dataset.index = pd.to_datetime(dataset.index) + except TypeError as _error: + raise ValueError( + f"Invalid data type! The dataset index is not and can not be converted to `pandas.DatetimeIndex`" + ) from _error + + if not np.issubdtype(dataset.dtype, np.number): + try: + msg = "Invalid data type! The dataset value is not `numpy.numeric` - start converting to `numpyp.float64`" + logger.debug(msg) + warnings.warn(msg, category=RuntimeWarning) + dataset = dataset.astype(np.float64) + except ValueError as _error: + raise TypeError( + f"Invalid data type! The dataset value is and can not be converted to `numpyp.float64`" + ) from _error + + self.__anomaly_type = anomaly_type + self.__dataset = dataset + self.__time_window = set_time_window( + total_rows=self.__dataset.shape[0], + method="POT", + analysis_type="real-time", + t0_pct=0.70, + t1_pct=0.3, + t2_pct=0.0, + ) + self.__exceedance_threshold = None + self.__exceedance = None + self.__anomaly_score = None + self.__anomaly_threshold = None + self.__anomaly = None + self.__eval = None + self.__params = {} + + logger.info("successfully initialized POT detection model") + + def get_extremes(self, q: float = 0.90) -> None: + if isinstance(self.__dataset, pd.DataFrame): + pass + + self.__exceedance_threshold = get_threshold_peaks_over_threshold( + ts=self.__dataset, t0=self.__time_window[0], anomaly_type=self.__anomaly_type, q=q + ) + self.__exceedance = get_exceedance_peaks_over_threshold( + ts=self.__dataset, t0=self.__time_window[0], anomaly_type=self.__anomaly_type, q=q + ) + + def fit(self) -> None: + if isinstance(self.__dataset, pd.DataFrame): + pass + + self.__anomaly_score = get_anomaly_score( + ts=self.__exceedance, t0=self.__time_window[0], gpd_params=self.__params + ) + + def detect(self, q: float = 0.90) -> None: + if isinstance(self.__dataset, pd.DataFrame): + pass + + self.__anomaly_threshold = get_anomaly_threshold(ts=self.__anomaly_score, t1=self.__time_window[1], q=q) + self.__anomaly = get_anomaly(ts=self.__anomaly_score, t1=self.__time_window[1], q=q) + + def evaluate(self, method: typing.Literal["ks", "qq"] = "ks") -> None: + params = self.__get_nonzero_params + if method == "ks": + self.__eval = pd.DataFrame(data=ks_1sample(ts=self.__exceedance, stats_method="POT", fit_params=params)) + assert isinstance(self.__eval, pd.DataFrame) + else: + visualize_qq_plot(ts=self.__exceedance, stats_method="POT", fit_params=params, is_random_param=True) + + @property + def __get_nonzero_params(self) -> typing.List[typing.Dict[str, typing.Union[datetime.datetime, float]]]: + """ + Filter and return only GPD params where there are at least 1 parameter that is greater than 0. + + ## Returns + ---------- + parameters : typing.List[typing.Dict[str, typing.Union[datetime.datetime, float]]] + A list of all parameters stored in __params that are greater than 0. + """ + if self.__time_window[0] is None: + raise ValueError("Invalid value! `t1` is not set?") + + if len(self.params) == 0: + raise ValueError("`__params` is still empty. Need to call `fit()` first!") + + nonzero_params = [] + for row in range(0, self.__time_window[1] + self.__time_window[2]): # type: ignore + if ( + self.params[row]["c"] != 0 # type: ignore + or self.params[row]["loc"] != 0 # type: ignore + or self.params[row]["scale"] != 0 # type: ignore + ): + nonzero_params.append(self.params[row]) + return nonzero_params + + @property + def params(self) -> dict: # type: ignore + return self.__params + + def return_dataset( + self, + set_type: typing.Literal[ + "exceedance_threshold", "exceedance", "anomaly", "anomaly_threshold", "anomaly_score" + ], + ) -> typing.Union[pd.DataFrame, pd.Series]: + if set_type == "exceedance_threshold": + dataset = self.__exceedance_threshold + elif set_type == "exceedance": + dataset = self.__exceedance + elif set_type == "anomaly_score": + dataset = self.__anomaly_score + elif set_type == "anomaly_threshold": + dataset = self.__anomaly_threshold + elif set_type == "anomaly": + dataset = self.__anomaly + else: + raise ValueError( + "Invalid value! Available `set_type` values: 'exceedance_threshold', 'exceedance', 'anomaly', 'anomaly_threshold', 'anomaly_score'" + ) + return dataset + + def plot( + self, + plot_type: typing.Literal["l", "l+eth", "l+ath", "hist", "gpd", "gpd+ov"], + title: str, + xlabel: str, + ylabel: str, + bins: typing.Optional[int] = 50, + plot_width: int = 13, + plot_height: int = 8, + plot_color: str = "black", + th_color: str = "red", + th_type: str = "dashed", + th_line_width: int = 2, + alpha: float = 0.8, + ): + if plot_type == "l": + plot_line( + dataset=self.__dataset, + threshold=None, + title=title, + xlabel=xlabel, + ylabel=ylabel, + is_threshold=False, + plot_width=plot_width, + plot_height=plot_height, + plot_color=plot_color, + th_color=th_color, + th_type=th_type, + th_line_width=th_line_width, + alpha=alpha, + ) + elif plot_type == "l+ath": + plot_line( + dataset=self.__exceedance, + threshold=self.__anomaly_threshold, + title=title, + xlabel=xlabel, + ylabel=ylabel, + is_threshold=True, + plot_width=plot_width, + plot_height=plot_height, + plot_color=plot_color, + th_color=th_color, + th_type=th_type, + th_line_width=th_line_width, + alpha=alpha, + ) + elif plot_type == "l+eth": + plot_line( + dataset=self.__dataset, + threshold=self.__exceedance_threshold, + title=title, + xlabel=xlabel, + ylabel=ylabel, + is_threshold=True, + plot_width=plot_width, + plot_height=plot_height, + plot_color=plot_color, + th_color=th_color, + th_type=th_type, + th_line_width=th_line_width, + alpha=alpha, + ) + elif plot_type == "hist": + plot_hist( + dataset=self.__dataset, + title=title, + xlabel=xlabel, + ylabel=ylabel, + bins=bins, + plot_width=plot_width, + plot_height=plot_height, + plot_color=plot_color, + alpha=alpha, + ) + elif plot_type == "gpd": + plot_gen_pareto( + dataset=self.__exceedance, + title=title, + xlabel=xlabel, + ylabel=ylabel, + bins=bins, + plot_width=plot_width, + plot_height=plot_height, + plot_color=plot_color, + alpha=alpha, + params=None, + ) + elif plot_type == "gpd+ov": + plot_gen_pareto( + dataset=self.__exceedance, + title=title, + xlabel=xlabel, + ylabel=ylabel, + bins=bins, + plot_width=plot_width, + plot_height=plot_height, + plot_color=plot_color, + alpha=alpha, + params=self.__params, + ) + + def __str__(self) -> str: + return "POT" diff --git a/src/anomalytics/models/zscore.py b/src/anomalytics/models/zscore.py new file mode 100644 index 0000000..50ff810 --- /dev/null +++ b/src/anomalytics/models/zscore.py @@ -0,0 +1,55 @@ +import typing + +import pandas as pd + +from anomalytics.models.abstract import Detector + + +class ZScoreDetector(Detector): + """ + Anomaly detector class that implements the "Z-Score" method. + ! TODO: Implement anomaly detection with z-score method! + """ + + __slots__ = [ + "__anomaly_type", + "__dataset__", + ] + + __anomaly_type: typing.Literal["high", "low"] + __dataset: typing.Union[pd.DataFrame, pd.Series] + + def __init__( + self, dataset: typing.Union[pd.DataFrame, pd.Series], anomaly_type: typing.Literal["high", "low"] = "high" + ): + """ + Initialize Z-Score model for anomaly detection. + + ## Parameters + ---------- + dataset : typing.Union[pandas.DataFrame, pandas.Series] + DataFame or Series objects to be analyzed. + Index must be date-time and values must be numeric. + + anomaly_type : typing.Literal["high", "low"] + Defining which kind of anomaly are we expecting. + """ + + self.__anomaly_type = anomaly_type + self.__dataset = dataset + + def fit(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def detect(self) -> None: + raise NotImplementedError("Not yet implemented!") + + def evaluate(self, method: typing.Literal["ks", "qq"] = "ks") -> None: + raise NotImplementedError("Not yet implemented!") + + @property + def params(self) -> dict: # type: ignore + raise NotImplementedError("Not yet implemented!") + + def __str__(self) -> str: + return "ZS" diff --git a/src/anomalytics/plots/__init__.py b/src/anomalytics/plots/__init__.py index e69de29..ea06c8b 100644 --- a/src/anomalytics/plots/__init__.py +++ b/src/anomalytics/plots/__init__.py @@ -0,0 +1,3 @@ +__all__ = ["plot_gen_pareto", "plot_hist", "plot_line"] + +from anomalytics.plots.plot import plot_gen_pareto, plot_hist, plot_line diff --git a/src/anomalytics/plots/plot.py b/src/anomalytics/plots/plot.py new file mode 100644 index 0000000..7b0120d --- /dev/null +++ b/src/anomalytics/plots/plot.py @@ -0,0 +1,102 @@ +import typing + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import scipy.stats as stats + + +def plot_line( + dataset: typing.Union[pd.DataFrame, pd.Series], + threshold: typing.Union[pd.Series, float, None], + title: str, + xlabel: str, + ylabel: str, + is_threshold: bool = True, + plot_width: int = 13, + plot_height: int = 8, + plot_color: str = "black", + th_color: str = "red", + th_type: str = "dashed", + th_line_width: int = 2, + alpha: float = 0.8, +): + fig = plt.figure(figsize=(plot_width, plot_height)) + plt.plot(dataset.index, dataset.values, color=plot_color, alpha=alpha) + + if is_threshold: + if type(threshold) == float: + plt.axhline(threshold, c=th_color, ls=th_type, lw=th_line_width) + elif isinstance(threshold, pd.Series): + plt.plot(dataset.index, threshold.values, c=th_color, ls=th_type, lw=th_line_width) + plt.title(title) + plt.xlabel(xlabel) + plt.ylabel(ylabel) + + fig.legend(loc="upper left", shadow=True, fancybox=True) + plt.show() + + +def plot_hist( + dataset: typing.Union[pd.DataFrame, pd.Series], + title: str, + xlabel: str, + ylabel: str, + bins: typing.Optional[int] = 50, + plot_width: int = 13, + plot_height: int = 8, + plot_color: str = "black", + alpha: float = 0.8, +): + fig = plt.figure(figsize=(plot_width, plot_height)) + plt.hist(dataset.values, bins=bins, color=plot_color, alpha=alpha) + plt.title(title) + plt.xlabel(xlabel) + plt.ylabel(ylabel) + + fig.legend(loc="upper left", shadow=True, fancybox=True) + plt.show() + + +def plot_gen_pareto( + dataset: typing.Union[pd.DataFrame, pd.Series], + title: str, + xlabel: str, + ylabel: str, + bins: typing.Optional[int] = 50, + plot_width: int = 13, + plot_height: int = 8, + plot_color: str = "black", + alpha: float = 0.8, + params: typing.Union[typing.Dict, None] = None, +): + fig = plt.figure(figsize=(plot_width, plot_height)) + + nonzero_exceedences = [exceedence for exceedence in dataset if exceedence > 0] + if params: + param_label = f"\n{round(params['c'], 3)}\n{round(params['loc'], 3)}\n{round(params['scale'], 3)}\n" + overlay = np.linspace( + stats.genpareto.ppf(0.1, c=params["c"], loc=params["loc"], scale=params["scale"]), + stats.genpareto.ppf(0.999, c=params["c"], loc=params["loc"], scale=params["scale"]), + len(nonzero_exceedences), + ) + plt.plot( + overlay, + stats.genpareto.pdf(overlay, c=params["c"], loc=params["loc"], scale=params["scale"]), + c="lime", + lw=2, + label=f"\nFitted Params:{param_label}", + ) + plt.hist( + nonzero_exceedences, + bins=bins, + density=True, + alpha=alpha, + color=plot_color, + label=f"{len(nonzero_exceedences)}", + ) + plt.xlabel(xlabel) + plt.ylabel(ylabel) + plt.title(title) + fig.legend(loc="upper right", shadow=True, fancybox=True) + plt.show() diff --git a/tests/test_detectors.py b/tests/test_detectors.py new file mode 100644 index 0000000..1bad175 --- /dev/null +++ b/tests/test_detectors.py @@ -0,0 +1,184 @@ +from unittest import TestCase + +import pandas as pd + +import anomalytics as atics +from anomalytics.models.abstract import Detector + + +class TestDetector(TestCase): + def setUp(self) -> None: + super().setUp() + self.sample_1_ts = pd.Series( + data=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], index=pd.date_range(start="2023-01-01", periods=10) + ) + self.sample_2_ts = pd.Series( + index=pd.date_range(start="2023-01-01", periods=50), + data=[ + 263, + 275, + 56, + 308, + 488, + 211, + 70, + 42, + 67, + 472, + 304, + 297, + 480, + 227, + 453, + 342, + 115, + 115, + 67, + 295, + 9, + 228, + 89, + 225, + 360, + 367, + 418, + 124, + 229, + 12, + 111, + 341, + 209, + 374, + 254, + 322, + 99, + 166, + 435, + 481, + 106, + 438, + 180, + 33, + 30, + 330, + 139, + 17, + 268, + 204000, + ], + ) + self.ae_detector = atics.get_detector(method="AE", dataset=self.sample_1_ts) + self.bm_detector = atics.get_detector(method="BM", dataset=self.sample_1_ts) + self.dbscan_detector = atics.get_detector(method="DBSCAN", dataset=self.sample_1_ts) + self.isof_detector = atics.get_detector(method="ISOF", dataset=self.sample_1_ts) + self.mad_detector = atics.get_detector(method="MAD", dataset=self.sample_1_ts) + self.svm_detector = atics.get_detector(method="1CSVM", dataset=self.sample_1_ts) + self.pot_detector = atics.get_detector(method="POT", dataset=self.sample_1_ts) + self.zs_detector = atics.get_detector(method="ZS", dataset=self.sample_1_ts) + + def test_detector_instance_is_abstract_class(self): + self.assertIsInstance(obj=self.ae_detector, cls=Detector) + self.assertIsInstance(obj=self.bm_detector, cls=Detector) + self.assertIsInstance(obj=self.dbscan_detector, cls=Detector) + self.assertIsInstance(obj=self.isof_detector, cls=Detector) + self.assertIsInstance(obj=self.mad_detector, cls=Detector) + self.assertIsInstance(obj=self.svm_detector, cls=Detector) + self.assertIsInstance(obj=self.pot_detector, cls=Detector) + self.assertIsInstance(obj=self.zs_detector, cls=Detector) + + def test_detector_string_method(self): + self.assertEqual(first=str(self.ae_detector), second="AE") + self.assertEqual(first=str(self.bm_detector), second="BM") + self.assertEqual(first=str(self.dbscan_detector), second="DBSCAN") + self.assertEqual(first=str(self.isof_detector), second="ISOF") + self.assertEqual(first=str(self.mad_detector), second="MAD") + self.assertEqual(first=str(self.svm_detector), second="1CSVM") + self.assertEqual(first=str(self.pot_detector), second="POT") + self.assertEqual(first=str(self.zs_detector), second="ZS") + + def test_pot_detector_get_extremes(self): + self.pot_detector.get_extremes(q=0.9) + + expected_exceedance_threshold = pd.Series( + [5.5, 5.5, 5.5, 5.5, 5.5, 5.5, 6.4, 7.3, 8.2, 9.1], index=self.sample_1_ts.index + ) + expected_exceedance = pd.Series( + [ + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.5, + 0.5999999999999996, + 0.7000000000000002, + 0.8000000000000007, + 0.9000000000000004, + ], + index=self.sample_1_ts.index, + name="exceedances", + ) + + pd.testing.assert_series_equal( + self.pot_detector._POTDetector__exceedance_threshold, expected_exceedance_threshold + ) + pd.testing.assert_series_equal(self.pot_detector._POTDetector__exceedance, expected_exceedance) + + def test_pot_detector_genpareto_fit(self): + self.pot_detector.get_extremes(q=0.90) + self.pot_detector.fit() + + expected_anomaly_scores = pd.Series( + data=[1.922777880970598, 2.445890926224859, 3.6935717350888506, 3121651314.625431], + index=self.sample_1_ts.index[6:], + name="anomaly scores", + ) + expected_params = { + 0: { + "index": pd.Timestamp("2023-01-07 00:00:00"), + "c": -1.6804238287454643, + "loc": 0, + "scale": 1.5123814458709186, + "p_value": 0.5200808735615424, + "anomaly_score": 1.922777880970598, + }, + } + + pd.testing.assert_series_equal(self.pot_detector._POTDetector__anomaly_score, expected_anomaly_scores) + self.assertEqual(self.pot_detector._POTDetector__params[0], expected_params[0]) + + def test_pot_detector_compute_anomaly_threshold_method(self): + expected_anomalies = [True] + expected_anomaly_threshold = 1.2394417670604552 + pot_detector = atics.get_detector(method="POT", dataset=self.sample_2_ts, anomaly_type="high") + + pot_detector.get_extremes(q=0.90) + pot_detector.fit() + pot_detector.detect(q=0.90) + + self.assertEqual(pot_detector._POTDetector__anomaly_threshold, expected_anomaly_threshold) + self.assertEqual(pot_detector._POTDetector__anomaly.iloc[0], expected_anomalies) + + def test_pot_detector_evaluation_with_ks_1sample(self): + pot_detector = atics.get_detector(method="POT", dataset=self.sample_2_ts, anomaly_type="high") + + pot_detector.get_extremes(q=0.90) + pot_detector.fit() + pot_detector.detect(q=0.90) + pot_detector.evaluate(method="ks") + + expected_kstest_result = pd.DataFrame( + data={ + "total_nonzero_exceedances": [50], + "stats_distance": [0.9798328261695748], + "p_value": [3.414145934563587e-85], + "c": [-1.3371948412738648], + "loc": [0], + "scale": [272179.457686573], + } + ) + + pd.testing.assert_frame_equal(pot_detector._POTDetector__eval, expected_kstest_result) + + def tearDown(self) -> None: + return super().tearDown() diff --git a/tests/test_evaluation_methods.py b/tests/test_evaluation_methods.py index 35125a9..25190e5 100644 --- a/tests/test_evaluation_methods.py +++ b/tests/test_evaluation_methods.py @@ -18,8 +18,6 @@ def test_ks_1sample_with_valid_input_pot(self): self.assertIsInstance(result, dict) self.assertIn("total_nonzero_exceedances", result) - self.assertIn("start_datetime", result) - self.assertIn("end_datetime", result) self.assertIn("stats_distance", result) self.assertIn("p_value", result)