From 9ae055c89165e94994e7f38c026d7c1a5cf33f30 Mon Sep 17 00:00:00 2001 From: Mengqi <2534671415@qq.com> Date: Mon, 4 Mar 2024 21:50:13 +0800 Subject: [PATCH] feat: add Abnormal Detection module and Isolation Forest algorithm. --- geochemistrypi/data_mining/cli_pipeline.py | 11 +- geochemistrypi/data_mining/constants.py | 5 +- geochemistrypi/data_mining/model/detection.py | 225 ++++++++++++++++++ .../func/algo_abnormaldetection/__init__.py | 1 + .../func/algo_abnormaldetection/_iforest.py | 48 ++++ geochemistrypi/data_mining/process/detect.py | 68 ++++++ 6 files changed, 355 insertions(+), 3 deletions(-) create mode 100644 geochemistrypi/data_mining/model/detection.py create mode 100644 geochemistrypi/data_mining/model/func/algo_abnormaldetection/__init__.py create mode 100644 geochemistrypi/data_mining/model/func/algo_abnormaldetection/_iforest.py create mode 100644 geochemistrypi/data_mining/process/detect.py diff --git a/geochemistrypi/data_mining/cli_pipeline.py b/geochemistrypi/data_mining/cli_pipeline.py index 37783e87..40575e0a 100644 --- a/geochemistrypi/data_mining/cli_pipeline.py +++ b/geochemistrypi/data_mining/cli_pipeline.py @@ -9,6 +9,7 @@ from rich.prompt import Confirm, Prompt from .constants import ( + ABNORMALDETECTION_MODELS, CLASSIFICATION_MODELS, CLASSIFICATION_MODELS_WITH_MISSING_VALUES, CLUSTERING_MODELS, @@ -40,6 +41,7 @@ from .process.classify import ClassificationModelSelection from .process.cluster import ClusteringModelSelection from .process.decompose import DecompositionModelSelection +from .process.detect import AbnormalDetectionModelSelection from .process.regress import RegressionModelSelection from .utils.base import check_package, clear_output, create_geopi_output_dir, get_os, install_package, log, save_data, show_warning from .utils.mlflow_utils import retrieve_previous_experiment_id @@ -193,6 +195,8 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = training_data_path = "Data_Clustering.xlsx" elif built_in_training_data_num == 4: training_data_path = "Data_Decomposition.xlsx" + elif built_in_training_data_num == 5: + training_data_path = "Data_AbnormalDetection.xlsx" data = read_data(file_path=training_data_path) print(f"Successfully loading the built-in training data set '{training_data_path}'.") show_data_columns(data.columns) @@ -217,6 +221,8 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = inference_data = None elif is_built_in_inference_data and built_in_training_data_num == 4: inference_data = None + elif is_built_in_inference_data and built_in_training_data_num == 5: + inference_data = None # <--- World Map Projection ---> logger.debug("World Map Projection") @@ -367,6 +373,8 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = if missing_value_flag and not process_missing_value_flag: # Delete the decomposition mode because it doesn't support missing values. MODE_OPTION.remove("Dimensional Reduction") + # Delete the abnormal detection mode because it doesn't support missing values. + MODE_OPTION.remove("Abnormal Detection") num2option(MODE_OPTION) mode_num = limit_num_input(MODE_OPTION, SECTION[2], num_input) else: @@ -508,12 +516,13 @@ def cli_pipeline(training_data_path: str, application_data_path: Optional[str] = Modes2Models = {1: REGRESSION_MODELS_WITH_MISSING_VALUES, 2: CLASSIFICATION_MODELS_WITH_MISSING_VALUES, 3: CLUSTERING_MODELS_WITH_MISSING_VALUES} Modes2Initiators = {1: RegressionModelSelection, 2: ClassificationModelSelection, 3: ClusteringModelSelection} else: - Modes2Models = {1: REGRESSION_MODELS, 2: CLASSIFICATION_MODELS, 3: CLUSTERING_MODELS, 4: DECOMPOSITION_MODELS} + Modes2Models = {1: REGRESSION_MODELS, 2: CLASSIFICATION_MODELS, 3: CLUSTERING_MODELS, 4: DECOMPOSITION_MODELS, 5: ABNORMALDETECTION_MODELS} Modes2Initiators = { 1: RegressionModelSelection, 2: ClassificationModelSelection, 3: ClusteringModelSelection, 4: DecompositionModelSelection, + 5: AbnormalDetectionModelSelection, } MODELS = Modes2Models[mode_num] num2option(MODELS) diff --git a/geochemistrypi/data_mining/constants.py b/geochemistrypi/data_mining/constants.py index 92b6bb9e..01a71d2c 100644 --- a/geochemistrypi/data_mining/constants.py +++ b/geochemistrypi/data_mining/constants.py @@ -26,8 +26,8 @@ OPTION = ["Yes", "No"] DATA_OPTION = ["Own Data", "Testing Data (Built-in)"] -TEST_DATA_OPTION = ["Data For Regression", "Data For Classification", "Data For Clustering", "Data For Dimensional Reduction"] -MODE_OPTION = ["Regression", "Classification", "Clustering", "Dimensional Reduction"] +TEST_DATA_OPTION = ["Data For Regression", "Data For Classification", "Data For Clustering", "Data For Dimensional Reduction", "Data For Abnormal Detection"] +MODE_OPTION = ["Regression", "Classification", "Clustering", "Dimensional Reduction", "Abnormal Detection"] # The model provided to use REGRESSION_MODELS = [ @@ -67,6 +67,7 @@ ] CLUSTERING_MODELS = ["KMeans", "DBSCAN", "Agglomerative", "AffinityPropagation"] DECOMPOSITION_MODELS = ["PCA", "T-SNE", "MDS"] +ABNORMALDETECTION_MODELS = ["Isolation Forest"] # The model can deal with missing values # Reference: https://scikit-learn.org/stable/modules/impute.html#estimators-that-handle-nan-values diff --git a/geochemistrypi/data_mining/model/detection.py b/geochemistrypi/data_mining/model/detection.py new file mode 100644 index 00000000..cce5abcf --- /dev/null +++ b/geochemistrypi/data_mining/model/detection.py @@ -0,0 +1,225 @@ +# -*- coding: utf-8 -*- + +from typing import Dict, Optional, Union + +import numpy as np +import pandas as pd +from rich import print +from sklearn.ensemble import IsolationForest + +from ..utils.base import clear_output +from ._base import WorkflowBase +from .func.algo_abnormaldetection._iforest import isolation_forest_manual_hyper_parameters + + +class AbnormalDetectionWorkflowBase(WorkflowBase): + """The base workflow class of abnormal detection algorithms.""" + + # common_function = [] + + def __init__(self) -> None: + super().__init__() + self.mode = "Abnormal Detection" + + def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> None: + """Fit the model by Scikit-learn framework.""" + self.X = X + self.model.fit(X) + + def predict(self, X: pd.DataFrame) -> np.ndarray: + """Perform Abnormal Detection on samples in X by Scikit-learn framework.""" + y_predict = self.model.predict(X) + return y_predict + + @classmethod + def manual_hyper_parameters(cls) -> Dict: + """Manual hyper-parameters specification.""" + return dict() + + @staticmethod + def _detect_data(X: pd.DataFrame, detect_label: np.ndarray) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """Merge the detection results into the source data. + + Parameters + ---------- + X : pd.DataFrame + The original data. + + detect_label : np.ndarray + The detection labels for each data point. + + Returns + ------- + X_abnormal_detection : pd.DataFrame + DataFrame containing the original data with detection results. + + X_normal : pd.DataFrame + DataFrame containing the normal data points. + + X_abnormal : pd.DataFrame + DataFrame containing the abnormal data points. + """ + X_abnormal_detection = X.copy() + # Merge detection results into the source data + X_abnormal_detection["is_abnormal"] = detect_label + X_normal = X_abnormal_detection[X_abnormal_detection["is_abnormal"] == 1] + X_abnormal = X_abnormal_detection[X_abnormal_detection["is_abnormal"] == -1] + + return X_abnormal_detection, X_normal, X_abnormal + + def common_components(self) -> None: + """Invoke all common application functions for abnormal detection algorithms by Scikit-learn framework.""" + pass + + +class IsolationForestAbnormalDetection(AbnormalDetectionWorkflowBase): + """The automation workflow of using Isolation Forest algorithm to make insightful products.""" + + name = "Isolation Forest" + # special_function = [] + + def __init__( + self, + n_estimators: int = 100, + max_samples: Union[str, int, float] = "auto", + contamination: Union[str, float] = "auto", + max_features: Union[int, float] = 1.0, + bootstrap: bool = False, + n_jobs: Optional[int] = None, + random_state: Optional[int] = None, + verbose: int = 0, + warm_start: bool = False, + ) -> None: + """ + Isolation Forest Algorithm. + + Return the anomaly score of each sample using the IsolationForest algorithm + + The IsolationForest 'isolates' observations by randomly selecting a feature + and then randomly selecting a split value between the maximum and minimum + values of the selected feature. + + Since recursive partitioning can be represented by a tree structure, the + number of splittings required to isolate a sample is equivalent to the path + length from the root node to the terminating node. + + This path length, averaged over a forest of such random trees, is a + measure of normality and our decision function. + + Random partitioning produces noticeably shorter paths for anomalies. + Hence, when a forest of random trees collectively produce shorter path + lengths for particular samples, they are highly likely to be anomalies. + + Read more in the :ref:`User Guide `. + + .. versionadded:: 0.18 + + Parameters + ---------- + n_estimators : int, default=100 + The number of base estimators in the ensemble. + + max_samples : "auto", int or float, default="auto" + The number of samples to draw from X to train each base estimator. + - If int, then draw `max_samples` samples. + - If float, then draw `max_samples * X.shape[0]` samples. + - If "auto", then `max_samples=min(256, n_samples)`. + + If max_samples is larger than the number of samples provided, + all samples will be used for all trees (no sampling). + + contamination : 'auto' or float, default='auto' + The amount of contamination of the data set, i.e. the proportion + of outliers in the data set. Used when fitting to define the threshold + on the scores of the samples. + + - If 'auto', the threshold is determined as in the + original paper. + - If float, the contamination should be in the range (0, 0.5]. + + .. versionchanged:: 0.22 + The default value of ``contamination`` changed from 0.1 + to ``'auto'``. + + max_features : int or float, default=1.0 + The number of features to draw from X to train each base estimator. + + - If int, then draw `max_features` features. + - If float, then draw `max(1, int(max_features * n_features_in_))` features. + + Note: using a float number less than 1.0 or integer less than number of + features will enable feature subsampling and leads to a longer runtime. + + bootstrap : bool, default=False + If True, individual trees are fit on random subsets of the training + data sampled with replacement. If False, sampling without replacement + is performed. + + n_jobs : int, default=None + The number of jobs to run in parallel for both :meth:`fit` and + :meth:`predict`. ``None`` means 1 unless in a + :obj:`joblib.parallel_backend` context. ``-1`` means using all + processors. See :term:`Glossary ` for more details. + + random_state : int, RandomState instance or None, default=None + Controls the pseudo-randomness of the selection of the feature + and split values for each branching step and each tree in the forest. + + Pass an int for reproducible results across multiple function calls. + See :term:`Glossary `. + + verbose : int, default=0 + Controls the verbosity of the tree building process. + + warm_start : bool, default=False + When set to ``True``, reuse the solution of the previous call to fit + and add more estimators to the ensemble, otherwise, just fit a whole + new forest. See :term:`the Glossary `. + + .. versionadded:: 0.21 + + References + ---------- + Scikit-learn API: sklearn.ensemble.IsolationForest + https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html# + """ + + super().__init__() + self.n_estimators = n_estimators + self.max_samples = max_samples + self.contamination = contamination + self.max_features = max_features + self.bootstrap = bootstrap + self.n_jobs = n_jobs + self.verbose = verbose + self.warm_start = warm_start + + if random_state: + self.random_state = random_state + + # If 'random_state' is None, 'self.random_state' comes from the parent class 'WorkflowBase' + self.model = IsolationForest( + n_estimators=self.n_estimators, + max_samples=self.max_samples, + contamination=self.contamination, + max_features=self.max_features, + bootstrap=self.bootstrap, + n_jobs=self.n_jobs, + random_state=self.random_state, + verbose=self.verbose, + warm_start=self.warm_start, + ) + + self.naming = IsolationForestAbnormalDetection.name + + @classmethod + def manual_hyper_parameters(cls) -> Dict: + """Manual hyper-parameters specification.""" + print(f"-*-*- {cls.name} - Hyper-parameters Specification -*-*-") + hyper_parameters = isolation_forest_manual_hyper_parameters() + clear_output() + return hyper_parameters + + def special_components(self, **kwargs) -> None: + """Invoke all special application functions for this algorithms by Scikit-learn framework.""" + pass diff --git a/geochemistrypi/data_mining/model/func/algo_abnormaldetection/__init__.py b/geochemistrypi/data_mining/model/func/algo_abnormaldetection/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/geochemistrypi/data_mining/model/func/algo_abnormaldetection/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/geochemistrypi/data_mining/model/func/algo_abnormaldetection/_iforest.py b/geochemistrypi/data_mining/model/func/algo_abnormaldetection/_iforest.py new file mode 100644 index 00000000..41994f9e --- /dev/null +++ b/geochemistrypi/data_mining/model/func/algo_abnormaldetection/_iforest.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +from typing import Dict + +from rich import print + +from ....constants import SECTION +from ....data.data_readiness import bool_input, float_input, num_input + + +def isolation_forest_manual_hyper_parameters() -> Dict: + """Manually set hyperparameters. + + Returns + ------- + hyper_parameters : dict + """ + print("N Estimators: The number of trees in the forest.") + print("Please specify the number of trees in the forest. A good starting range could be between 50 and 500, such as 100.") + n_estimators = num_input(SECTION[2], "@N Estimators: ") + print("Contamination: The amount of contamination of the data set.") + print("Please specify the contamination of the data set. A good starting range could be between 0.1 and 0.5, such as 0.3.") + contamination = float_input(0.3, SECTION[2], "@Contamination: ") + print("Max Features: The number of features to draw from X to train each base estimator.") + print("Please specify the number of features. A good starting range could be between 1 and the total number of features in the dataset.") + max_features = num_input(SECTION[2], "@Max Features: ") + print( + "Bootstrap: Whether bootstrap samples are used when building trees. Bootstrapping is a technique where a random subset of the data is sampled with replacement" + " to create a new dataset of the same size as the original. This new dataset is then used to construct a decision tree in the ensemble. If False, the whole dataset is used to build each tree." + ) + print("Please specify whether bootstrap samples are used when building trees. It is generally recommended to leave it as True.") + bootstrap = bool_input(SECTION[2]) + max_samples = None + if bootstrap: + print("Max Samples: The number of samples to draw from X_train to train each base estimator.") + print("Please specify the number of samples. A good starting range could be between 256 and the number of dataset.") + max_samples = num_input(SECTION[2], "@@Max Samples: ") + hyper_parameters = { + "n_estimators": n_estimators, + "contamination": contamination, + "max_features": max_features, + "bootstrap": bootstrap, + } + if not max_samples: + # Use the default value provided by sklearn.ensemble.RandomForestClassifier. + hyper_parameters["max_samples"] = None + else: + hyper_parameters["max_samples"] = max_samples + return hyper_parameters diff --git a/geochemistrypi/data_mining/process/detect.py b/geochemistrypi/data_mining/process/detect.py new file mode 100644 index 00000000..cf49df5d --- /dev/null +++ b/geochemistrypi/data_mining/process/detect.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +import os + +import pandas as pd + +from ..constants import MLFLOW_ARTIFACT_DATA_PATH +from ..model.detection import AbnormalDetectionWorkflowBase, IsolationForestAbnormalDetection +from ._base import ModelSelectionBase + + +class AbnormalDetectionModelSelection(ModelSelectionBase): + """Simulate the normal way of invoking scikit-learn abnormal detection algorithms.""" + + def __init__(self, model_name: str) -> None: + self.model_name = model_name + self.ad_workflow = AbnormalDetectionWorkflowBase() + self.transformer_config = {} + + # @dispatch(object, object, object, object, object, object) + def activate( + self, + X: pd.DataFrame, + y: pd.DataFrame, + X_train: pd.DataFrame, + X_test: pd.DataFrame, + y_train: pd.DataFrame, + y_test: pd.DataFrame, + ) -> None: + """Train by Scikit-learn framework.""" + + self.ad_workflow.data_upload(X=X, y=y, X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test) + + # Model option + if self.model_name == "Isolation Forest": + hyper_parameters = IsolationForestAbnormalDetection.manual_hyper_parameters() + self.ad_workflow = IsolationForestAbnormalDetection( + n_estimators=hyper_parameters["n_estimators"], + contamination=hyper_parameters["contamination"], + max_features=hyper_parameters["max_features"], + bootstrap=hyper_parameters["bootstrap"], + max_samples=hyper_parameters["max_samples"], + ) + + self.ad_workflow.show_info() + + # Use Scikit-learn style API to process input data + self.ad_workflow.fit(X) + y_predict = self.ad_workflow.predict(X) + X_abnormal_detection, X_normal, X_abnormal = self.ad_workflow._detect_data(X, y_predict) + + self.ad_workflow.data_upload(X=X, y=y, X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test) + + # Save the model hyper-parameters + self.ad_workflow.save_hyper_parameters(hyper_parameters, self.model_name, os.getenv("GEOPI_OUTPUT_PARAMETERS_PATH")) + + # Common components for every abnormal detection algorithm + self.ad_workflow.common_components() + + # special components of different algorithms + self.ad_workflow.special_components() + + # Save abnormal detection result + self.ad_workflow.data_save(X_abnormal_detection, "X Abnormal Detection", os.getenv("GEOPI_OUTPUT_ARTIFACTS_DATA_PATH"), MLFLOW_ARTIFACT_DATA_PATH, "Abnormal Detection Data") + self.ad_workflow.data_save(X_normal, "X Normal", os.getenv("GEOPI_OUTPUT_ARTIFACTS_DATA_PATH"), MLFLOW_ARTIFACT_DATA_PATH, "Normal Data") + self.ad_workflow.data_save(X_abnormal, "X Abnormal", os.getenv("GEOPI_OUTPUT_ARTIFACTS_DATA_PATH"), MLFLOW_ARTIFACT_DATA_PATH, "Abnormal Data") + + # Save the trained model + self.ad_workflow.model_save()