Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Abnormal Detection module and Isolation Forest algorithm. #319

Merged
merged 2 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions geochemistrypi/data_mining/cli_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from rich.prompt import Confirm, Prompt

from .constants import (
ABNORMALDETECTION_MODELS,
CLASSIFICATION_MODELS,
CLASSIFICATION_MODELS_WITH_MISSING_VALUES,
CLUSTERING_MODELS,
Expand Down Expand Up @@ -40,6 +41,7 @@
from .process.classify import ClassificationModelSelection
from .process.cluster import ClusteringModelSelection
from .process.decompose import DecompositionModelSelection
from .process.detect import AbnormalDetectionModelSelection
from .process.regress import RegressionModelSelection
from .utils.base import check_package, clear_output, create_geopi_output_dir, get_os, install_package, log, save_data, show_warning
from .utils.mlflow_utils import retrieve_previous_experiment_id
Expand Down Expand Up @@ -137,8 +139,8 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
# Create a new experiment or use the previous experiment
is_used_previous_experiment = Confirm.ask("✨ Use Previous Experiment", default=False)
# Set the tracking uri to the local directory, in the future, we can set it to the remote server.
artifact_localtion = f"file:{WORKING_PATH}/geopi_tracking"
mlflow.set_tracking_uri(artifact_localtion)
experiments_localtion = f"file:{WORKING_PATH}/geopi_tracking"
mlflow.set_tracking_uri(experiments_localtion)
# Print the tracking uri for debugging.
# print("tracking uri:", mlflow.get_tracking_uri())
if is_used_previous_experiment:
Expand All @@ -159,7 +161,7 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
# new_experiment_tag = Prompt.ask("✨ Experiment Tag Version", default="E - v1.0.0")
try:
# new_experiment_id = mlflow.create_experiment(name=new_experiment_name, artifact_location=artifact_localtion, tags={"version": new_experiment_tag})
new_experiment_id = mlflow.create_experiment(name=new_experiment_name, artifact_location=artifact_localtion)
new_experiment_id = mlflow.create_experiment(name=new_experiment_name)
except mlflow.exceptions.MlflowException as e:
if "already exists" in str(e):
console.print(" The experiment name already exists.", style="bold red")
Expand Down Expand Up @@ -193,6 +195,8 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
training_data_path = "Data_Clustering.xlsx"
elif built_in_training_data_num == 4:
training_data_path = "Data_Decomposition.xlsx"
elif built_in_training_data_num == 5:
training_data_path = "Data_AbnormalDetection.xlsx"
data = read_data(file_path=training_data_path)
print(f"Successfully loading the built-in training data set '{training_data_path}'.")
show_data_columns(data.columns)
Expand All @@ -217,6 +221,8 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
inference_data = None
elif is_built_in_inference_data and built_in_training_data_num == 4:
inference_data = None
elif is_built_in_inference_data and built_in_training_data_num == 5:
inference_data = None

# <--- World Map Projection --->
logger.debug("World Map Projection")
Expand Down Expand Up @@ -367,6 +373,8 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
if missing_value_flag and not process_missing_value_flag:
# Delete the decomposition mode because it doesn't support missing values.
MODE_OPTION.remove("Dimensional Reduction")
# Delete the abnormal detection mode because it doesn't support missing values.
MODE_OPTION.remove("Abnormal Detection")
num2option(MODE_OPTION)
mode_num = limit_num_input(MODE_OPTION, SECTION[2], num_input)
else:
Expand Down Expand Up @@ -508,12 +516,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] =
Modes2Models = {1: REGRESSION_MODELS_WITH_MISSING_VALUES, 2: CLASSIFICATION_MODELS_WITH_MISSING_VALUES, 3: CLUSTERING_MODELS_WITH_MISSING_VALUES}
Modes2Initiators = {1: RegressionModelSelection, 2: ClassificationModelSelection, 3: ClusteringModelSelection}
else:
Modes2Models = {1: REGRESSION_MODELS, 2: CLASSIFICATION_MODELS, 3: CLUSTERING_MODELS, 4: DECOMPOSITION_MODELS}
Modes2Models = {1: REGRESSION_MODELS, 2: CLASSIFICATION_MODELS, 3: CLUSTERING_MODELS, 4: DECOMPOSITION_MODELS, 5: ABNORMALDETECTION_MODELS}
Modes2Initiators = {
1: RegressionModelSelection,
2: ClassificationModelSelection,
3: ClusteringModelSelection,
4: DecompositionModelSelection,
5: AbnormalDetectionModelSelection,
}
MODELS = Modes2Models[mode_num]
num2option(MODELS)
Expand Down
5 changes: 3 additions & 2 deletions geochemistrypi/data_mining/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

OPTION = ["Yes", "No"]
DATA_OPTION = ["Own Data", "Testing Data (Built-in)"]
TEST_DATA_OPTION = ["Data For Regression", "Data For Classification", "Data For Clustering", "Data For Dimensional Reduction"]
MODE_OPTION = ["Regression", "Classification", "Clustering", "Dimensional Reduction"]
TEST_DATA_OPTION = ["Data For Regression", "Data For Classification", "Data For Clustering", "Data For Dimensional Reduction", "Data For Abnormal Detection"]
MODE_OPTION = ["Regression", "Classification", "Clustering", "Dimensional Reduction", "Abnormal Detection"]

# The model provided to use
REGRESSION_MODELS = [
Expand Down Expand Up @@ -67,6 +67,7 @@
]
CLUSTERING_MODELS = ["KMeans", "DBSCAN", "Agglomerative", "AffinityPropagation"]
DECOMPOSITION_MODELS = ["PCA", "T-SNE", "MDS"]
ABNORMALDETECTION_MODELS = ["Isolation Forest"]

# The model can deal with missing values
# Reference: https://scikit-learn.org/stable/modules/impute.html#estimators-that-handle-nan-values
Expand Down
225 changes: 225 additions & 0 deletions geochemistrypi/data_mining/model/detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# -*- coding: utf-8 -*-

from typing import Dict, Optional, Union

import numpy as np
import pandas as pd
from rich import print
from sklearn.ensemble import IsolationForest

from ..utils.base import clear_output
from ._base import WorkflowBase
from .func.algo_abnormaldetection._iforest import isolation_forest_manual_hyper_parameters


class AbnormalDetectionWorkflowBase(WorkflowBase):
"""The base workflow class of abnormal detection algorithms."""

# common_function = []

def __init__(self) -> None:
super().__init__()
self.mode = "Abnormal Detection"

def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> None:
"""Fit the model by Scikit-learn framework."""
self.X = X
self.model.fit(X)

def predict(self, X: pd.DataFrame) -> np.ndarray:
"""Perform Abnormal Detection on samples in X by Scikit-learn framework."""
y_predict = self.model.predict(X)
return y_predict

@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
return dict()

@staticmethod
def _detect_data(X: pd.DataFrame, detect_label: np.ndarray) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Merge the detection results into the source data.

Parameters
----------
X : pd.DataFrame
The original data.

detect_label : np.ndarray
The detection labels for each data point.

Returns
-------
X_abnormal_detection : pd.DataFrame
DataFrame containing the original data with detection results.

X_normal : pd.DataFrame
DataFrame containing the normal data points.

X_abnormal : pd.DataFrame
DataFrame containing the abnormal data points.
"""
X_abnormal_detection = X.copy()
# Merge detection results into the source data
X_abnormal_detection["is_abnormal"] = detect_label
X_normal = X_abnormal_detection[X_abnormal_detection["is_abnormal"] == 1]
X_abnormal = X_abnormal_detection[X_abnormal_detection["is_abnormal"] == -1]

return X_abnormal_detection, X_normal, X_abnormal

def common_components(self) -> None:
"""Invoke all common application functions for abnormal detection algorithms by Scikit-learn framework."""
pass


class IsolationForestAbnormalDetection(AbnormalDetectionWorkflowBase):
"""The automation workflow of using Isolation Forest algorithm to make insightful products."""

name = "Isolation Forest"
# special_function = []

def __init__(
self,
n_estimators: int = 100,
max_samples: Union[str, int, float] = "auto",
contamination: Union[str, float] = "auto",
max_features: Union[int, float] = 1.0,
bootstrap: bool = False,
n_jobs: Optional[int] = None,
random_state: Optional[int] = None,
verbose: int = 0,
warm_start: bool = False,
) -> None:
"""
Isolation Forest Algorithm.

Return the anomaly score of each sample using the IsolationForest algorithm

The IsolationForest 'isolates' observations by randomly selecting a feature
and then randomly selecting a split value between the maximum and minimum
values of the selected feature.

Since recursive partitioning can be represented by a tree structure, the
number of splittings required to isolate a sample is equivalent to the path
length from the root node to the terminating node.

This path length, averaged over a forest of such random trees, is a
measure of normality and our decision function.

Random partitioning produces noticeably shorter paths for anomalies.
Hence, when a forest of random trees collectively produce shorter path
lengths for particular samples, they are highly likely to be anomalies.

Read more in the :ref:`User Guide <isolation_forest>`.

.. versionadded:: 0.18

Parameters
----------
n_estimators : int, default=100
The number of base estimators in the ensemble.

max_samples : "auto", int or float, default="auto"
The number of samples to draw from X to train each base estimator.
- If int, then draw `max_samples` samples.
- If float, then draw `max_samples * X.shape[0]` samples.
- If "auto", then `max_samples=min(256, n_samples)`.

If max_samples is larger than the number of samples provided,
all samples will be used for all trees (no sampling).

contamination : 'auto' or float, default='auto'
The amount of contamination of the data set, i.e. the proportion
of outliers in the data set. Used when fitting to define the threshold
on the scores of the samples.

- If 'auto', the threshold is determined as in the
original paper.
- If float, the contamination should be in the range (0, 0.5].

.. versionchanged:: 0.22
The default value of ``contamination`` changed from 0.1
to ``'auto'``.

max_features : int or float, default=1.0
The number of features to draw from X to train each base estimator.

- If int, then draw `max_features` features.
- If float, then draw `max(1, int(max_features * n_features_in_))` features.

Note: using a float number less than 1.0 or integer less than number of
features will enable feature subsampling and leads to a longer runtime.

bootstrap : bool, default=False
If True, individual trees are fit on random subsets of the training
data sampled with replacement. If False, sampling without replacement
is performed.

n_jobs : int, default=None
The number of jobs to run in parallel for both :meth:`fit` and
:meth:`predict`. ``None`` means 1 unless in a
:obj:`joblib.parallel_backend` context. ``-1`` means using all
processors. See :term:`Glossary <n_jobs>` for more details.

random_state : int, RandomState instance or None, default=None
Controls the pseudo-randomness of the selection of the feature
and split values for each branching step and each tree in the forest.

Pass an int for reproducible results across multiple function calls.
See :term:`Glossary <random_state>`.

verbose : int, default=0
Controls the verbosity of the tree building process.

warm_start : bool, default=False
When set to ``True``, reuse the solution of the previous call to fit
and add more estimators to the ensemble, otherwise, just fit a whole
new forest. See :term:`the Glossary <warm_start>`.

.. versionadded:: 0.21

References
----------
Scikit-learn API: sklearn.ensemble.IsolationForest
https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html#
"""

super().__init__()
self.n_estimators = n_estimators
self.max_samples = max_samples
self.contamination = contamination
self.max_features = max_features
self.bootstrap = bootstrap
self.n_jobs = n_jobs
self.verbose = verbose
self.warm_start = warm_start

if random_state:
self.random_state = random_state

# If 'random_state' is None, 'self.random_state' comes from the parent class 'WorkflowBase'
self.model = IsolationForest(
n_estimators=self.n_estimators,
max_samples=self.max_samples,
contamination=self.contamination,
max_features=self.max_features,
bootstrap=self.bootstrap,
n_jobs=self.n_jobs,
random_state=self.random_state,
verbose=self.verbose,
warm_start=self.warm_start,
)

self.naming = IsolationForestAbnormalDetection.name

@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"-*-*- {cls.name} - Hyper-parameters Specification -*-*-")
hyper_parameters = isolation_forest_manual_hyper_parameters()
clear_output()
return hyper_parameters

def special_components(self, **kwargs) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# -*- coding: utf-8 -*-
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
from typing import Dict

from rich import print

from ....constants import SECTION
from ....data.data_readiness import bool_input, float_input, num_input


def isolation_forest_manual_hyper_parameters() -> Dict:
"""Manually set hyperparameters.

Returns
-------
hyper_parameters : dict
"""
print("N Estimators: The number of trees in the forest.")
print("Please specify the number of trees in the forest. A good starting range could be between 50 and 500, such as 100.")
n_estimators = num_input(SECTION[2], "@N Estimators: ")
print("Contamination: The amount of contamination of the data set.")
print("Please specify the contamination of the data set. A good starting range could be between 0.1 and 0.5, such as 0.3.")
contamination = float_input(0.3, SECTION[2], "@Contamination: ")
print("Max Features: The number of features to draw from X to train each base estimator.")
print("Please specify the number of features. A good starting range could be between 1 and the total number of features in the dataset.")
max_features = num_input(SECTION[2], "@Max Features: ")
print(
"Bootstrap: Whether bootstrap samples are used when building trees. Bootstrapping is a technique where a random subset of the data is sampled with replacement"
" to create a new dataset of the same size as the original. This new dataset is then used to construct a decision tree in the ensemble. If False, the whole dataset is used to build each tree."
)
print("Please specify whether bootstrap samples are used when building trees. It is generally recommended to leave it as True.")
bootstrap = bool_input(SECTION[2])
max_samples = None
if bootstrap:
print("Max Samples: The number of samples to draw from X_train to train each base estimator.")
print("Please specify the number of samples. A good starting range could be between 256 and the number of dataset.")
max_samples = num_input(SECTION[2], "@@Max Samples: ")
hyper_parameters = {
"n_estimators": n_estimators,
"contamination": contamination,
"max_features": max_features,
"bootstrap": bootstrap,
}
if not max_samples:
# Use the default value provided by sklearn.ensemble.RandomForestClassifier.
hyper_parameters["max_samples"] = None
else:
hyper_parameters["max_samples"] = max_samples
return hyper_parameters
Loading
Loading