Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pot): create functions that utilises Peaks Over Threshold method #6

Merged
merged 13 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
- name: Install linting dependencies
run: |
python3 -m pip install --upgrade pip setuptools wheel
pip3 install -e .[codequality]
pip3 install -e .[codequality,extra]
- name: Lint with Black
run: black --config=pyproject.toml .
- name: Lint with Isort
Expand All @@ -68,7 +68,7 @@ jobs:
- name: Install package and dependencies
run: |
python3 -m pip install --upgrade pip setuptools wheel
pip3 install -e .[security,testcov]
pip3 install -e .[security,testcov,extra]
- name: Test application's vulnerability with bandit
run: bandit -c pyproject.toml -r .
- name: Test with Pytest-Cov
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ codequality = [
]
security = ["bandit>=1.7.5"]
testcov = ["pytest-cov>=4.1.0"]
extra = ["openpyxl"]

[tool.bandit]
exclude_dirs = ["tests"]
Expand Down
2 changes: 2 additions & 0 deletions src/anomalytics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

__all__ = ["set_time_window"]

from anomalytics.stats import get_exceedance_peaks_over_threshold
from anomalytics.time_series import read_ts
from anomalytics.time_windows import set_time_window
6 changes: 6 additions & 0 deletions src/anomalytics/stats/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
__all__ = ["get_threshold_peaks_over_threshold", "get_exceedance_peaks_over_threshold"]

from anomalytics.stats.peaks_over_threshold import (
get_exceedance_peaks_over_threshold,
get_threshold_peaks_over_threshold,
)
93 changes: 93 additions & 0 deletions src/anomalytics/stats/peaks_over_threshold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import logging
import typing

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)


def get_threshold_peaks_over_threshold(
ts: pd.Series,
t0: int,
anomaly_type: typing.Literal["high", "low"] = "high",
q: float = 0.90,
) -> pd.Series:
"""
Calculate the POT threshold value that will be used to extract the exceedances from `ts` dataset.

## Parameters
-------------
ts : pandas.Series
The dataset with 1 feature and datetime index to calculate the quantiles.

t0 : int
Time window to find dynamic expanding period for calculating quantile score.

q : float
The quantile to use for thresholding, default 0.90.

## Returns
----------
pot_thresholds : pandas.Series:
A Pandas Series where each value is a threshold to extract the exceedances from the original dataset.
"""
logger.debug(
f"calculating dynamic threshold for exceedance extraction using anomaly_type={anomaly_type}, t0={t0}, q={q}"
)

if anomaly_type not in ["high", "low"]:
raise ValueError(f"Invalid value! The `anomaly_type` argument must be 'high' or 'low'")
if not isinstance(ts, pd.Series):
raise TypeError("Invalid value! The `ts` argument must be a Pandas Series")
if t0 is None:
raise ValueError("Invalid value! The `t0` argument must be an integer")
if anomaly_type == "low":
q = 1.0 - q

logger.debug(f"successfully calculating threshold for {anomaly_type} anomaly type")

return ts.expanding(min_periods=t0).quantile(q=q).bfill()


def get_exceedance_peaks_over_threshold(
ts: pd.Series,
t0: int,
anomaly_type: typing.Literal["high", "low"] = "high",
q: float = 0.90,
) -> pd.Series:
"""
Extract values from the `ts` dataset that exceed the POT threshold values.

## Parameters
-------------
ts : pandas.Series
The dataset with 1 feature and datetime index to calculate the quantiles.

t0 : int
Time window to find dynamic expanding period for calculating quantile score.

q : float
The quantile to use for thresholding, default 0.90.

## Returns
----------
exceedances : pandas.Series
A Pandas Series with values exceeding the POT thresholds.
"""
logger.debug(f"extracting exceedances from dynamic threshold using anomaly_type={anomaly_type}, t0={t0}, q={q}")
if anomaly_type not in ["high", "low"]:
raise ValueError(f"Invalid value! The `anomaly_type` argument must be 'high' or 'low'")
if not isinstance(ts, pd.Series):
raise TypeError("Invalid value! The `ts` argument must be a Pandas Series")
if t0 is None:
raise ValueError("Invalid value! The `t0` argument must be an integer")

pot_thresholds = get_threshold_peaks_over_threshold(ts=ts, t0=t0, anomaly_type=anomaly_type, q=q)

if anomaly_type == "high":
pot_exceedances = np.maximum(ts - pot_thresholds, 0.0)
else:
pot_exceedances = np.where(ts > pot_thresholds, 0.0, np.abs(ts - pot_thresholds))
logger.debug(f"successfully extracting exceedances from dynamic threshold for {anomaly_type} anomaly type")
return pd.Series(index=ts.index, data=pot_exceedances, name="exceedances")
3 changes: 3 additions & 0 deletions src/anomalytics/time_series/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__all__ = ["create_ts_from_csv", "create_ts_from_xlsx", "read_ts"]

from anomalytics.time_series.upload import create_ts_from_csv, create_ts_from_xlsx, read_ts
85 changes: 85 additions & 0 deletions src/anomalytics/time_series/upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import typing

import pandas as pd


def create_ts_from_csv(
path_to_file: str,
index_col: int = 0,
names: typing.List[str] | None = None,
header: int | typing.Sequence[int] | typing.Literal["infer"] | None = None,
sep: str | None = ",",
) -> pd.Series:
return pd.read_csv(
filepath_or_buffer=path_to_file, header=header, index_col=index_col, sep=sep, parse_dates=True, names=names
).squeeze()


def create_ts_from_xlsx(
path_to_file: str,
index_col: int = 0,
names: typing.List[str] | None = None,
header: int | typing.Sequence[int] | typing.Literal["infer"] | None = None,
sheet_name: str | int | None = 0,
) -> pd.Series:
return pd.read_excel(
io=path_to_file, header=header, index_col=index_col, sheet_name=sheet_name, parse_dates=True, names=names
).squeeze()


@typing.overload
def read_ts(
path_to_file: str,
file_type: typing.Literal["csv"],
index_col: int = 0,
names: typing.List[str] | None = None,
header: int | typing.Sequence[int] | typing.Literal["infer"] | None = None,
*,
sep: str | None = ",",
) -> pd.Series:
...


@typing.overload
def read_ts(
path_to_file: str,
file_type: typing.Literal["xlsx"],
index_col: int = 0,
names: typing.List[str] | None = None,
header: int | typing.Sequence[int] | typing.Literal["infer"] | None = None,
*,
sheet_name: str | int | None = 0,
) -> pd.Series:
...


def read_ts(
path_to_file: str,
file_type: typing.Literal["csv", "xlsx"],
index_col: int = 0,
names: typing.List[str] | None = None,
header: int | typing.Sequence[int] | typing.Literal["infer"] | None = None,
**kwargs,
) -> pd.Series:
if not path_to_file:
raise ValueError("The argument for `path_to_file` can't be None.")

if file_type == "csv":
return create_ts_from_csv(
path_to_file=path_to_file,
index_col=index_col,
names=names,
header=header,
**kwargs,
)

if file_type == "xlsx":
return create_ts_from_xlsx(
path_to_file=path_to_file,
index_col=index_col,
names=names,
header=header,
**kwargs,
)

raise ValueError("Invalid value for `file_type` argument, available: 'csv', 'xlsx'")
2 changes: 1 addition & 1 deletion src/anomalytics/time_windows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__all__ = ["compute_pot_windows", "set_time_window"]

from anomalytics.time_windows.pot_windows import compute_pot_windows
from anomalytics.time_windows.pot_window import compute_pot_windows
from anomalytics.time_windows.time_window import set_time_window
2 changes: 1 addition & 1 deletion src/anomalytics/time_windows/time_window.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import typing

from anomalytics.time_windows.pot_windows import compute_pot_windows
from anomalytics.time_windows.pot_window import compute_pot_windows

logger = logging.getLogger(__name__)

Expand Down
65 changes: 65 additions & 0 deletions tests/test_peaks_over_threshold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import unittest

import numpy as np
import pandas as pd

from anomalytics import get_exceedance_peaks_over_threshold
from anomalytics.stats import get_threshold_peaks_over_threshold


class TestPeaksOverThreshold(unittest.TestCase):
def setUp(self):
self.sample_1_ts = pd.Series(
data=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], index=pd.date_range(start="2023-01-01", periods=10)
)
self.sample_2_ts = pd.Series(np.random.rand(100), index=pd.date_range(start="2023-01-01", periods=100))

def test_calculate_threshold_for_high_anomaly_type(self):
pot_threshold = get_threshold_peaks_over_threshold(ts=self.sample_1_ts, t0=3, anomaly_type="high", q=0.90)
self.assertIsInstance(pot_threshold, pd.Series)
self.assertEqual(len(pot_threshold), len(self.sample_1_ts))

def test__calculate_threshold_for_low_anomaly_type(self):
pot_threshold = get_threshold_peaks_over_threshold(ts=self.sample_1_ts, t0=3, anomaly_type="low", q=0.10)
self.assertIsInstance(pot_threshold, pd.Series)
self.assertEqual(len(pot_threshold), len(self.sample_1_ts))

def test_invalid_anomaly_type_in_threshold_calculation_function(self):
with self.assertRaises(ValueError):
get_threshold_peaks_over_threshold(ts=self.sample_1_ts, t0=3, anomaly_type="invalid", q=0.90) # type: ignore

def test_invalid_ts_type_in_threshold_calculation_function(self):
with self.assertRaises(TypeError):
get_threshold_peaks_over_threshold(ts=[1, 2, 3, 4], t0=3, anomaly_type="high", q=0.90)

def test_invalid_t0_value_in_threshold_calculation_function(self):
with self.assertRaises(ValueError):
get_threshold_peaks_over_threshold(ts=self.sample_1_ts, t0=None, anomaly_type="high", q=0.90) # type: ignore

def test_extract_exceedance_for_high_anomaly_type(self):
pot_exceedance = get_exceedance_peaks_over_threshold(ts=self.sample_2_ts, t0=5, anomaly_type="high", q=0.90)
self.assertIsInstance(pot_exceedance, pd.Series)
self.assertEqual(len(pot_exceedance), len(self.sample_2_ts))
self.assertTrue((pot_exceedance >= 0).all())

def test_extract_exceedance_for_low_anomaly_type(self):
pot_exceedance = get_exceedance_peaks_over_threshold(ts=self.sample_2_ts, t0=5, anomaly_type="low", q=0.10)
self.assertIsInstance(pot_exceedance, pd.Series)
self.assertEqual(len(pot_exceedance), len(self.sample_2_ts))
# Check if all exceedances are non-negative
self.assertTrue((pot_exceedance >= 0).all())

def test_invalid_anomaly_type_in_exceedance_extraction_function(self):
with self.assertRaises(ValueError):
get_exceedance_peaks_over_threshold(ts=self.sample_2_ts, t0=5, anomaly_type="invalid", q=0.90) # type: ignore

def test_invalid_ts_type_in_exceedance_extraction_function(self):
with self.assertRaises(TypeError):
get_exceedance_peaks_over_threshold(ts="Not a series", t0=5, anomaly_type="high", q=0.90) # type: ignore

def test_invalid_t0_value_in_exceedance_extraction_function(self):
with self.assertRaises(ValueError):
get_exceedance_peaks_over_threshold(ts=self.sample_2_ts, t0=None, anomaly_type="high", q=0.90) # type: ignore

def tearDown(self) -> None:
return super().tearDown()
53 changes: 53 additions & 0 deletions tests/test_time_series.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import unittest

import pandas as pd

from anomalytics import read_ts
from anomalytics.time_series.upload import create_ts_from_csv, create_ts_from_xlsx


class TestTimeSeriesReaders(unittest.TestCase):
csv_file: str
xlsx_file: str

@classmethod
def setUpClass(cls):
cls.csv_file = "test_data.csv"
cls.xlsx_file = "test_data.xlsx"
test_data = pd.Series(range(10), index=pd.date_range("2023-01-01", periods=10))
test_data.to_csv(cls.csv_file, header=False)
test_data.to_excel(cls.xlsx_file, header=False)

def test_read_csv(self):
ts = create_ts_from_csv(path_to_file=self.csv_file, header=None)
self.assertIsInstance(ts, pd.Series)
self.assertEqual(len(ts), 10)

def test_read_xlsx(self):
ts = create_ts_from_xlsx(path_to_file=self.xlsx_file, index_col=0)
self.assertIsInstance(ts, pd.Series)
self.assertEqual(len(ts), 10)

def test_read_ts_csv(self):
ts = read_ts(path_to_file=self.csv_file, file_type="csv", header=None)
self.assertIsInstance(ts, pd.Series)
self.assertEqual(len(ts), 10)

def test_read_ts_xlsx(self):
ts = read_ts(path_to_file=self.xlsx_file, file_type="xlsx", index_col=0)
self.assertIsInstance(ts, pd.Series)
self.assertEqual(len(ts), 10)

def test_invalid_file_type(self):
with self.assertRaises(ValueError):
read_ts(path_to_file="test_data.txt", file_type="txt") # type: ignore

def test_missing_file(self):
with self.assertRaises(FileNotFoundError):
read_ts(path_to_file="non_existent_file.csv", file_type="csv")

@classmethod
def tearDownClass(cls):
os.remove(cls.csv_file)
os.remove(cls.xlsx_file)