Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pot): create t0 t1 t2 computation logic #4

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, windows-latest]
python-version: ["3.10", "3.11", "3.12.0"]
python-version: ["3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4.1.0
- name: Set up Python ${{ matrix.python-version }}
@@ -32,7 +32,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, windows-latest]
python-version: ["3.10", "3.11", "3.12.0"]
python-version: ["3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4.1.0
- name: Set up Python ${{ matrix.python-version }}
@@ -57,7 +57,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
python-version: ["3.10", "3.11", "3.12.0"]
python-version: ["3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4.1.0
- name: Set up Python ${{ matrix.python-version }}
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -42,7 +42,8 @@ htmlcov/
.coverage.*
.cache
nosetests.xml
coverage.xml
coverage.*
cov.*
*.cover
*.py,cover
.hypothesis/
4 changes: 4 additions & 0 deletions src/anomalytics/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
__version__ = "0.1.0"

__all__ = ["set_time_window"]

from anomalytics.time_windows import set_time_window
Empty file.
Empty file.
Empty file.
4 changes: 4 additions & 0 deletions src/anomalytics/time_windows/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
__all__ = ["compute_pot_windows", "set_time_window"]

from anomalytics.time_windows.pot_windows import compute_pot_windows
from anomalytics.time_windows.time_window import set_time_window
32 changes: 32 additions & 0 deletions src/anomalytics/time_windows/pot_windows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import typing


def compute_pot_windows(
total_rows: int,
analysis_type: typing.Literal["historical", "real-time"],
t0_pct: float = 0.65,
t1_pct: float = 0.25,
t2_pct: float = 0.10,
) -> typing.Tuple[int, int, int]:
if t0_pct - t1_pct < 0.0:
raise ValueError("T0 time window needs to be bigger than T1 and T2, as a rule of thumb: t0 >= t1 > t2")
if analysis_type == "real-time":
if t0_pct + t1_pct != 1.0:
raise ValueError(
"In real-time analysis, the t2 time window will be the last row of the Time Series, hence `t0_pct` + `t1_pct` must equal to 1.0 (100%)"
)
t2 = 1
total_rows = total_rows - t2
t0 = int(t0_pct * total_rows)
t1 = int(t1_pct * total_rows)
uncounted_days = t0 + t1 - total_rows
else:
t0 = int(t0_pct * total_rows)
t1 = int(t1_pct * total_rows)
t2 = int(t2_pct * total_rows)
uncounted_days = t0 + t1 + t2 - total_rows
if uncounted_days < 0:
t1 += abs(uncounted_days)
elif uncounted_days > 0:
t1 -= uncounted_days
return (t0, t1, t2)
134 changes: 134 additions & 0 deletions src/anomalytics/time_windows/time_window.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import logging
import typing

from anomalytics.time_windows.pot_windows import compute_pot_windows

logger = logging.getLogger(__name__)


@typing.overload
def set_time_window( # type: ignore
total_rows: int,
method: typing.Literal["AE"],
analysis_type: typing.Literal["historical", "real-time"] = "historical",
*,
encoded: typing.List[float],
) -> typing.Tuple:
...


@typing.overload
def set_time_window( # type: ignore
total_rows: int,
method: typing.Literal["BM"],
analysis_type: typing.Literal["historical", "real-time"] = "historical",
*,
block_size: float = 365.2425,
) -> typing.Tuple:
...


@typing.overload
def set_time_window( # type: ignore
total_rows: int,
method: typing.Literal["DBSCAN"],
analysis_type: typing.Literal["historical", "real-time"] = "historical",
*,
total_cluster: int,
) -> typing.Tuple:
...


@typing.overload
def set_time_window( # type: ignore
total_rows: int,
method: typing.Literal["ISOF"],
analysis_type: typing.Literal["historical", "real-time"] = "historical",
*,
isolated: typing.List[float],
) -> typing.Tuple:
...


@typing.overload
def set_time_window( # type: ignore
total_rows: int,
method: typing.Literal["MAD"],
analysis_type: typing.Literal["historical", "real-time"] = "historical",
*,
medians: typing.List[float],
) -> typing.Tuple:
...


@typing.overload
def set_time_window( # type: ignore
total_rows: int,
method: typing.Literal["1CSVM"],
analysis_type: typing.Literal["historical", "real-time"] = "historical",
*,
vectors: typing.List[float],
) -> typing.Tuple:
...


@typing.overload
def set_time_window( # type: ignore
total_rows: int,
method: typing.Literal["POT"],
analysis_type: typing.Literal["historical", "real-time"] = "historical",
*,
t0_pct: float = 0.65,
t1_pct: float = 0.25,
t2_pct: float = 0.10,
) -> typing.Tuple:
...


@typing.overload
def set_time_window( # type: ignore
total_rows: int,
method: typing.Literal["ZS"],
analysis_type: typing.Literal["historical", "real-time"] = "historical",
*,
upper: float,
lower: float,
) -> typing.Tuple:
...


def set_time_window( # type: ignore
total_rows: int,
method: typing.Literal["AE", "BM", "DBSCAN", "ISOF", "MAD", "1CSVM", "POT", "ZS"],
analysis_type: typing.Literal["historical", "real-time"],
**kwargs,
) -> typing.Tuple:
if method == "AE":
raise NotImplementedError("The method 'AE' for Autoencoder specific time windows hasn't been implemented yet")
if method == "BM":
raise NotImplementedError("The method 'BM' for Block Maxima specific time windows hasn't been implemented yet")
if method == "DBSCAN":
raise NotImplementedError(
"The method 'DBSCAN' for Density-Based Spatial Clustering Application with Noise specific time windows hasn't been implemented yet"
)
if method == "ISOF":
raise NotImplementedError(
"The method 'ISOF' for Isolation Forest specific time windows hasn't been implemented yet"
)
if method == "MAD":
raise NotImplementedError(
"The method 'MAD' for Median Absolute Deviation specific time windows hasn't been implemented yet"
)
if method == "1CSVM":
raise NotImplementedError(
"The method '1CSVM' for One Class Support Vector Method specific time windows hasn't been implemented yet"
)
if method == "POT":
return compute_pot_windows(total_rows=total_rows, analysis_type=analysis_type, **kwargs)
if method == "ZS":
raise NotImplementedError("The method 'ZS' for Z-Score specific time windows hasn't been implemented yet")

raise ValueError(
f"Invalid value in '{method}' for the 'method' argument. Availabe methods are: "
"'AUTO', 'BM', 'DBSCAN', 'ISOF', 'MAD', '1CSVM', 'POT', 'ZS'"
)
42 changes: 42 additions & 0 deletions tests/test_time_windows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from unittest import TestCase

from anomalytics import set_time_window
from anomalytics.time_windows import compute_pot_windows


class TestTimeWindow(TestCase):
def setUp(self) -> None:
return super().setUp()

def test_compute_pot_windows_executed_correctly_via_set_time_window(self):
t0, t1, t2 = set_time_window(
total_rows=1000, method="POT", analysis_type="historical", t0_pct=0.65, t1_pct=0.25, t2_pct=0.10
)
expected_t0, expected_t1, expected_t2 = compute_pot_windows(
total_rows=1000, analysis_type="historical", t0_pct=0.65, t1_pct=0.25, t2_pct=0.10
)
self.assertEqual(first=(t0 + t1 + t2), second=(expected_t0 + expected_t1 + expected_t2))
self.assertEqual(first=t0, second=expected_t0)
self.assertEqual(first=t1, second=expected_t1)
self.assertEqual(first=t2, second=expected_t2)

def test_pot_windows_for_historical_analysis_via_set_time_window(self):
t0, t1, t2 = set_time_window(
total_rows=1000, method="POT", analysis_type="historical", t0_pct=0.65, t1_pct=0.25, t2_pct=0.10
)
self.assertEqual(first=(t0 + t1 + t2), second=1000)
self.assertEqual(first=t0, second=650)
self.assertEqual(first=t1, second=250)
self.assertEqual(first=t2, second=100)

def test_pot_windows_for_realtime_analysis_via_set_time_window(self):
t0, t1, t2 = set_time_window(
total_rows=1000, method="POT", analysis_type="real-time", t0_pct=0.7, t1_pct=0.3, t2_pct=0.0
)
self.assertEqual(first=(t0 + t1 + t2), second=1000)
self.assertEqual(first=t0, second=699)
self.assertEqual(first=t1, second=300)
self.assertEqual(first=t2, second=1)

def tearDown(self) -> None:
return super().tearDown()