From 12334568e6fbb2811c9f934188cba062479461a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=B4ng-Lan=20Botterman?= Date: Wed, 23 Oct 2024 16:07:40 +0200 Subject: [PATCH] use joblib in comparator over imputers or splits + rearrange index.rst --- docs/index.rst | 10 +- qolmat/benchmark/comparator.py | 235 ++++++++---- tests/benchmark/test_comparator.py | 556 +++++++++++++++++++++++++---- 3 files changed, 659 insertions(+), 142 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 8a17fd8..0905f4e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -22,14 +22,14 @@ .. toctree:: :maxdepth: 2 :hidden: - :caption: API + :caption: ANALYSIS - api + analysis + examples/tutorials/plot_tuto_mcar .. toctree:: :maxdepth: 2 :hidden: - :caption: ANALYSIS + :caption: API - analysis - examples/tutorials/plot_tuto_mcar \ No newline at end of file + api diff --git a/qolmat/benchmark/comparator.py b/qolmat/benchmark/comparator.py index 413e944..2a0f5cf 100644 --- a/qolmat/benchmark/comparator.py +++ b/qolmat/benchmark/comparator.py @@ -1,10 +1,11 @@ """Script for comparator.""" import logging -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd +from joblib import Parallel, cpu_count, delayed from qolmat.benchmark import hyperparameters, metrics from qolmat.benchmark.missing_patterns import _HoleGenerator @@ -93,99 +94,207 @@ def get_errors( df_errors = pd.concat(dict_errors.values(), keys=dict_errors.keys()) return df_errors - def evaluate_errors_sample( - self, - imputer: Any, - df: pd.DataFrame, - dict_config_opti_imputer: Dict[str, Any] = {}, - metric_optim: str = "mse", - ) -> pd.Series: - """Evaluate the errors in the cross-validation. + def process_split( + self, split_data: Tuple[int, pd.DataFrame, pd.DataFrame] + ) -> pd.DataFrame: + """Process a split. Parameters ---------- - imputer : Any - imputation model - df : pd.DataFrame - dataframe to impute - dict_config_opti_imputer : Dict - search space for tested_model's hyperparameters - metric_optim : str - Loss function used when imputers undergo hyperparameter - optimization + split_data : Tuple + contains (split_idx, df_mask, df_origin) Returns ------- - pd.Series - Series with the errors for each metric and each variable + pd.DataFrame + errors results """ - list_errors = [] - df_origin = df[self.selected_columns].copy() - for df_mask in self.generator_holes.split(df_origin): - df_corrupted = df_origin.copy() - df_corrupted[df_mask] = np.nan + _, df_mask, df_origin = split_data + df_with_holes = df_origin.copy() + df_with_holes[df_mask] = np.nan + + split_results = {} + for imputer_name, imputer in self.dict_imputers.items(): + dict_config_opti_imputer = self.dict_config_opti.get( + imputer_name, {} + ) + imputer_opti = hyperparameters.optimize( imputer, - df, + df_origin, self.generator_holes, - metric_optim, + self.metric_optim, dict_config_opti_imputer, max_evals=self.max_evals, verbose=self.verbose, ) - df_imputed = imputer_opti.fit_transform(df_corrupted) - subset = self.generator_holes.subset - if subset is None: - raise ValueError( - "HoleGenerator `subset` should be overwritten in split " - "but it is none!" - ) - df_errors = self.get_errors( - df_origin[subset], df_imputed[subset], df_mask[subset] - ) - list_errors.append(df_errors) - df_errors = pd.DataFrame(list_errors) - errors_mean = df_errors.mean(axis=0) - return errors_mean + df_imputed = imputer_opti.fit_transform(df_with_holes) + errors = self.get_errors(df_origin, df_imputed, df_mask) + split_results[imputer_name] = errors + + return pd.concat(split_results, axis=1) + + def process_imputer( + self, imputer_data: Tuple[str, Any, List[pd.DataFrame], pd.DataFrame] + ) -> Tuple[str, pd.DataFrame]: + """Process an imputer. + + Parameters + ---------- + imputer_data : Tuple[str, Any, List[pd.DataFrame], pd.DataFrame] + contains (imputer_name, imputer, all_masks, df_origin) + + Returns + ------- + Tuple[str, pd.DataFrame] + imputer name, errors results + + """ + imputer_name, imputer, all_masks, df_origin = imputer_data + + dict_config_opti_imputer = self.dict_config_opti.get(imputer_name, {}) + imputer_opti = hyperparameters.optimize( + imputer, + df_origin, + self.generator_holes, + self.metric_optim, + dict_config_opti_imputer, + max_evals=self.max_evals, + verbose=self.verbose, + ) + + imputer_results = [] + for i, df_mask in enumerate(all_masks): + df_with_holes = df_origin.copy() + df_with_holes[df_mask] = np.nan + df_imputed = imputer_opti.fit_transform(df_with_holes) + errors = self.get_errors(df_origin, df_imputed, df_mask) + imputer_results.append(errors) + + return imputer_name, pd.concat(imputer_results).groupby( + level=[0, 1] + ).mean() def compare( self, - df: pd.DataFrame, - ): - """Compure different imputation methods on dataframe df. + df_origin: pd.DataFrame, + use_parallel: bool = True, + n_jobs: int = -1, + parallel_over: str = "auto", + ) -> pd.DataFrame: + """Compare different imputers in parallel with hyperparams opti. Parameters ---------- - df : pd.DataFrame - input dataframe (for comparison) + df_origin : pd.DataFrame + df with missing values + n_splits : int, optional + number of 'splits', i.e. fake dataframe with + artificial holes, by default 10 + use_parallel : bool, optional + if parallelisation, by default True + n_jobs : int, optional + number of jobs to use for the parallelisation, by default -1 + parallel_over : str, optional + 'splits' or 'imputers', by default "auto" Returns ------- pd.DataFrame - Dataframe with the metrics results, imputers are in columns - and indices represent metrics and variables. + DataFrame (2-level index) with results. + Columsn are imputers. + 0-level index are the metrics. + 1-level index are the column names. """ - dict_errors = {} + logging.info( + f"Starting comparison for {len(self.dict_imputers)} imputers." + ) - for name, imputer in self.dict_imputers.items(): - dict_config_opti_imputer = self.dict_config_opti.get(name, {}) + all_splits = list(self.generator_holes.split(df_origin)) - try: - logging.info(f"Testing model: {name}...") - dict_errors[name] = self.evaluate_errors_sample( - imputer, df, dict_config_opti_imputer, self.metric_optim + if parallel_over == "auto": + parallel_over = ( + "splits" + if len(all_splits) > len(self.dict_imputers) + else "imputers" + ) + + if use_parallel: + logging.info(f"Parallelisation over: {parallel_over}...") + if parallel_over == "splits": + split_data = [ + (i, df_mask, df_origin) + for i, df_mask in enumerate(all_splits) + ] + n_jobs = self.get_optimal_n_jobs(split_data, n_jobs) + results = Parallel(n_jobs=n_jobs)( + delayed(self.process_split)(data) for data in split_data + ) + final_results = pd.concat(results).groupby(level=[0, 1]).mean() + elif parallel_over == "imputers": + imputer_data = [ + (name, imputer, all_splits, df_origin) + for name, imputer in self.dict_imputers.items() + ] + n_jobs = self.get_optimal_n_jobs(imputer_data, n_jobs) + results = Parallel(n_jobs=n_jobs)( + delayed(self.process_imputer)(data) + for data in imputer_data ) - logging.info("done.") - except Exception as excp: - logging.info( - f"Error while testing {name} of type " - f"{type(imputer).__name__}!" + final_results = pd.concat(dict(results), axis=1) + else: + raise ValueError( + "`parallel_over` should be `auto`, `splits` or `imputers`." + ) + + else: + logging.info("Sequential treatment...") + if parallel_over == "splits": + split_data = [ + (i, df_mask, df_origin) + for i, df_mask in enumerate(all_splits) + ] + results = [self.process_split(data) for data in split_data] + final_results = pd.concat(results).groupby(level=[0, 1]).mean() + elif parallel_over == "imputers": + imputer_data = [ + (name, imputer, all_splits, df_origin) + for name, imputer in self.dict_imputers.items() + ] + results = [self.process_imputer(data) for data in imputer_data] + final_results = pd.concat(dict(results), axis=1) + else: + raise ValueError( + "`parallel_over` should be `auto`, `splits` or `imputers`." ) - raise excp - df_errors = pd.DataFrame(dict_errors) + logging.info("Comparison successfully terminated.") + return final_results - return df_errors + @staticmethod + def get_optimal_n_jobs(split_data: List, n_jobs: int = -1) -> int: + """Determine the optimal number of parallel jobs to use. + + If `n_jobs` is specified by the user, that value is used. + Otherwise, the function returns the minimum between the number of + CPU cores and the number of tasks (i.e., the length of `split_data`), + ensuring that no more jobs than tasks are launched. + + Parameters + ---------- + split_data : List + A collection of data to be processed in parallel. + The length of this collection determines the number of tasks. + n_jobs : int + The number of jobs (parallel workers) to use, by default -1 + + Returns + ------- + int + The optimal number of jobs to run in parallel + + """ + return min(cpu_count(), len(split_data)) if n_jobs == -1 else n_jobs diff --git a/tests/benchmark/test_comparator.py b/tests/benchmark/test_comparator.py index 02971bb..1805995 100644 --- a/tests/benchmark/test_comparator.py +++ b/tests/benchmark/test_comparator.py @@ -1,87 +1,495 @@ -from unittest.mock import MagicMock, patch +"""Tests for Comparator class. + +Class: + TestComparator: group tests for Comparator class. + +""" + +import logging +from typing import Any, Dict, List import numpy as np import pandas as pd +import pytest +from pytest_mock import MockerFixture from qolmat.benchmark.comparator import Comparator +from qolmat.benchmark.missing_patterns import _HoleGenerator + + +@pytest.fixture +def generator_holes_mock(mocker: MockerFixture) -> _HoleGenerator: + mock = mocker.create_autospec(_HoleGenerator, instance=True) + mock.split.return_value = [ + pd.DataFrame({"A": [False, False, True], "B": [True, False, False]}) + ] + return mock + -generator_holes_mock = MagicMock() -generator_holes_mock.split.return_value = [ - pd.DataFrame({"A": [False, False, True], "B": [True, False, False]}) -] - -comparator = Comparator( - dict_models={}, - selected_columns=["A", "B"], - generator_holes=generator_holes_mock, - metrics=["mae", "mse"], -) - -imputer_mock = MagicMock() -expected_get_errors = pd.Series( - [1.0, 1.0, 1.0, 1.0], - index=pd.MultiIndex.from_tuples( - [("mae", "A"), ("mae", "B"), ("mse", "A"), ("mse", "B")] - ), -) - - -@patch("qolmat.benchmark.metrics.get_metric") -def test_get_errors(mock_get_metric): - df_origin = pd.DataFrame({"A": [1, np.nan, 3], "B": [np.nan, 5, 6]}) - df_imputed = pd.DataFrame({"A": [1, 2, 4], "B": [4, 5, 7]}) - df_mask = pd.DataFrame( - {"A": [False, False, True], "B": [False, False, True]} +@pytest.fixture +def comparator(generator_holes_mock: _HoleGenerator) -> Comparator: + return Comparator( + dict_models={}, + selected_columns=["A", "B"], + generator_holes=generator_holes_mock, + metrics=["mae", "mse"], ) - mock_get_metric.return_value = ( - lambda df_origin, df_imputed, df_mask: pd.Series( - [1.0, 1.0], index=["A", "B"] - ) + +@pytest.fixture +def expected_get_errors() -> pd.Series: + return pd.Series( + [1.0, 1.0, 1.0, 1.0], + index=pd.MultiIndex.from_tuples( + [("mae", "A"), ("mae", "B"), ("mse", "A"), ("mse", "B")] + ), ) - errors = comparator.get_errors(df_origin, df_imputed, df_mask) - pd.testing.assert_series_equal(errors, expected_get_errors) - - -@patch("qolmat.benchmark.hyperparameters.optimize", return_value=imputer_mock) -@patch( - "qolmat.benchmark.comparator.Comparator.get_errors", - return_value=expected_get_errors, -) -def test_evaluate_errors_sample(mock_get_errors, mock_optimize): - errors_mean = comparator.evaluate_errors_sample( - imputer_mock, pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, np.nan]}) + + +@pytest.fixture +def df_origin() -> pd.DataFrame: + return pd.DataFrame({"A": [1, np.nan, 3], "B": [np.nan, 5, 6]}) + + +@pytest.fixture +def df_imputed() -> pd.DataFrame: + return pd.DataFrame({"A": [1, 2, 4], "B": [4, 5, 7]}) + + +@pytest.fixture +def df_mask() -> pd.DataFrame: + return pd.DataFrame({"A": [False, False, True], "B": [False, False, True]}) + + +@pytest.fixture +def imputers_mock(mocker: MockerFixture) -> Dict[str, Any]: + imputer_mock = mocker.MagicMock() + imputer_mock.fit_transform.return_value = pd.DataFrame( + {"A": [1, 2, 3], "B": [4, 5, 6]} ) - expected_errors_mean = expected_get_errors - pd.testing.assert_series_equal(errors_mean, expected_errors_mean) - mock_optimize.assert_called_once() - mock_get_errors.assert_called() - - -@patch( - "qolmat.benchmark.comparator.Comparator.evaluate_errors_sample", - return_value=expected_get_errors, -) -def test_compare(mock_evaluate_errors_sample): - df_test = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) - - imputer1 = MagicMock(name="Imputer1") - imputer2 = MagicMock(name="Imputer2") - comparator.dict_imputers = {"imputer1": imputer1, "imputer2": imputer2} - - errors_imputer1 = pd.Series([0.1, 0.2], index=["mae", "mse"]) - errors_imputer2 = pd.Series([0.3, 0.4], index=["mae", "mse"]) - mock_evaluate_errors_sample.side_effect = [ - errors_imputer1, - errors_imputer2, + return {"imputer_1": imputer_mock} + + +@pytest.fixture +def config_opti_mock() -> Dict[str, Dict[str, Any]]: + return {"imputer_1": {"param_1": "value"}} + + +@pytest.fixture +def all_masks() -> List[pd.DataFrame]: + return [ + pd.DataFrame({"A": [False, True, False], "B": [False, False, True]}), + pd.DataFrame({"A": [True, False, False], "B": [False, True, False]}), ] - df_errors = comparator.compare(df_test) - assert mock_evaluate_errors_sample.call_count == 2 - mock_evaluate_errors_sample.assert_any_call(imputer1, df_test, {}, "mse") - mock_evaluate_errors_sample.assert_any_call(imputer2, df_test, {}, "mse") - expected_df_errors = pd.DataFrame( - {"imputer1": [0.1, 0.2], "imputer2": [0.3, 0.4]}, index=["mae", "mse"] - ) - pd.testing.assert_frame_equal(df_errors, expected_df_errors) +class TestComparator: + """Group tests for Comparator class.""" + + def test_get_errors( + self, + mocker: MockerFixture, + comparator: Comparator, + expected_get_errors: pd.Series, + df_origin: pd.DataFrame, + df_imputed: pd.DataFrame, + df_mask: pd.DataFrame, + ) -> None: + """Test the get_errors method.""" + mock_get_metric = mocker.patch("qolmat.benchmark.metrics.get_metric") + mock_get_metric.return_value = ( + lambda df_origin, df_imputed, df_mask: pd.Series( + [1.0, 1.0], index=["A", "B"] + ) + ) + + errors = comparator.get_errors(df_origin, df_imputed, df_mask) + + pd.testing.assert_series_equal(errors, expected_get_errors) + + def test_process_split( + self, + mocker, + comparator, + imputers_mock, + config_opti_mock, + df_origin, + df_mask, + ): + """Test the process_split method.""" + comparator.dict_imputers = imputers_mock + comparator.dict_config_opti = config_opti_mock + comparator.metric_optim = "mae" + comparator.max_evals = 100 + comparator.verbose = False + + mock_optimize = mocker.patch( + "qolmat.benchmark.comparator.hyperparameters.optimize" + ) + mock_optimize.return_value = imputers_mock["imputer_1"] + split_data = (0, df_mask, df_origin) + df_with_holes = df_origin.copy() + df_with_holes[df_mask] = np.nan + + result = comparator.process_split(split_data) + + assert isinstance(result, pd.DataFrame) + assert "imputer_1" in result.columns + assert {"mae", "mse"} == set( + result.index.get_level_values(0) + ), "Index level 0 should be 'mae' and 'mse'" + assert {"A", "B"} == set( + result.index.get_level_values(1) + ), "Index level 1 should be 'A' and 'B'" + + mock_optimize.assert_called_once_with( + imputers_mock["imputer_1"], + df_origin, + comparator.generator_holes, + comparator.metric_optim, + config_opti_mock["imputer_1"], + max_evals=comparator.max_evals, + verbose=comparator.verbose, + ) + args, _ = imputers_mock["imputer_1"].fit_transform.call_args + pd.testing.assert_frame_equal(args[0], df_with_holes) + + def test_process_imputer( + self, + mocker: MockerFixture, + comparator: Comparator, + imputers_mock: Dict[str, Any], + config_opti_mock: Dict[str, Dict[str, Any]], + all_masks: List[pd.DataFrame], + df_origin: pd.DataFrame, + ) -> None: + """Test the process_imputer method.""" + comparator.dict_imputers = imputers_mock + comparator.dict_config_opti = config_opti_mock + comparator.metric_optim = "mae" + comparator.max_evals = 100 + comparator.verbose = False + mock_optimize = mocker.patch( + "qolmat.benchmark.comparator.hyperparameters.optimize" + ) + mock_optimize.return_value = imputers_mock["imputer_1"] + mock_get_errors = mocker.patch.object(comparator, "get_errors") + mock_get_errors.side_effect = [ + pd.Series( + [1.0, 2.0], + index=pd.MultiIndex.from_tuples([("mae", "A"), ("mae", "B")]), + ), + pd.Series( + [1.5, 2.5], + index=pd.MultiIndex.from_tuples([("mae", "A"), ("mae", "B")]), + ), + ] + expected_result = pd.Series( + [1.25, 2.25], + index=pd.MultiIndex.from_tuples([("mae", "A"), ("mae", "B")]), + ) + + imputer_data = ( + "imputer_1", + imputers_mock["imputer_1"], + all_masks, + df_origin, + ) + imputer_name, result = comparator.process_imputer(imputer_data) + + assert imputer_name == "imputer_1" + pd.testing.assert_series_equal(result, expected_result) + mock_optimize.assert_called_once_with( + imputers_mock["imputer_1"], + df_origin, + comparator.generator_holes, + comparator.metric_optim, + config_opti_mock["imputer_1"], + max_evals=comparator.max_evals, + verbose=comparator.verbose, + ) + assert imputers_mock["imputer_1"].fit_transform.call_count == len( + all_masks + ) + assert mock_get_errors.call_count == len(all_masks) + + def test_compare_parallel_splits( + self, + mocker: MockerFixture, + comparator: Comparator, + df_origin: pd.DataFrame, + caplog: pytest.LogCaptureFixture, + ) -> None: + """Test the compare method with parallel splits.""" + mock_split = mocker.patch.object(comparator.generator_holes, "split") + mock_split.return_value = [ + pd.DataFrame( + {"A": [False, True, False], "B": [True, False, True]} + ), + pd.DataFrame( + {"A": [True, False, True], "B": [False, True, False]} + ), + ] + mock_process_split = mocker.patch.object(comparator, "process_split") + mock_process_split.side_effect = [ + pd.Series( + [1.0, 2.0], + index=pd.MultiIndex.from_tuples([("mae", "A"), ("mae", "B")]), + ), + pd.Series( + [1.5, 2.5], + index=pd.MultiIndex.from_tuples([("mae", "A"), ("mae", "B")]), + ), + ] + mock_get_optimal_n_jobs = mocker.patch.object( + comparator, "get_optimal_n_jobs" + ) + mock_get_optimal_n_jobs.return_value = 1 + + expected_result = pd.Series( + [1.25, 2.25], + index=pd.MultiIndex.from_tuples([("mae", "A"), ("mae", "B")]), + ) + + with caplog.at_level(logging.INFO): + result = comparator.compare(df_origin, parallel_over="splits") + + pd.testing.assert_series_equal(result, expected_result) + assert mock_process_split.call_count == 2 + assert mock_get_optimal_n_jobs.call_count == 1 + assert "Starting comparison for" in caplog.text + assert "Comparison successfully terminated." in caplog.text + + def test_compare_sequential_splits( + self, + mocker: MockerFixture, + comparator: Comparator, + df_origin: pd.DataFrame, + caplog: pytest.LogCaptureFixture, + ) -> None: + """Test the compare method with sequential splits.""" + mock_split = mocker.patch.object(comparator.generator_holes, "split") + mock_split.return_value = [ + pd.DataFrame( + {"A": [False, True, False], "B": [True, False, True]} + ), + pd.DataFrame( + {"A": [True, False, True], "B": [False, True, False]} + ), + ] + mock_process_split = mocker.patch.object(comparator, "process_split") + mock_process_split.side_effect = [ + pd.Series( + [1.0, 2.0], + index=pd.MultiIndex.from_tuples([("mae", "A"), ("mae", "B")]), + ), + pd.Series( + [1.5, 2.5], + index=pd.MultiIndex.from_tuples([("mae", "A"), ("mae", "B")]), + ), + ] + + expected_result = pd.Series( + [1.25, 2.25], + index=pd.MultiIndex.from_tuples([("mae", "A"), ("mae", "B")]), + ) + + with caplog.at_level(logging.INFO): + result = comparator.compare( + df_origin, use_parallel=False, parallel_over="splits" + ) + + pd.testing.assert_series_equal(result, expected_result) + assert mock_process_split.call_count == 2 + assert "Starting comparison for" in caplog.text + assert "Comparison successfully terminated." in caplog.text + + def test_compare_parallel_imputers( + self, + mocker: MockerFixture, + comparator: Comparator, + df_origin: pd.DataFrame, + caplog: pytest.LogCaptureFixture, + ): + """Test the compare method with parallel imputers.""" + mock_split = mocker.patch.object(comparator.generator_holes, "split") + mock_split.return_value = [ + pd.DataFrame( + {"A": [False, True, False], "B": [True, False, True]} + ), + pd.DataFrame( + {"A": [True, False, True], "B": [False, True, False]} + ), + ] + comparator.dict_imputers = { + "imputer_1": mocker.Mock(), + "imputer_2": mocker.Mock(), + } + mock_process_imputer = mocker.patch.object( + comparator, "process_imputer" + ) + mock_process_imputer.side_effect = [ + ( + "imputer_1", + pd.DataFrame( + {"A": [1.0, 2.0], "B": [3.0, 4.0]}, + index=pd.MultiIndex.from_tuples( + [("mae", "A"), ("mae", "B")] + ), + ), + ), + ( + "imputer_2", + pd.DataFrame( + {"A": [1.5, 2.5], "B": [3.5, 4.5]}, + index=pd.MultiIndex.from_tuples( + [("mae", "A"), ("mae", "B")] + ), + ), + ), + ] + mock_get_optimal_n_jobs = mocker.patch.object( + comparator, "get_optimal_n_jobs" + ) + mock_get_optimal_n_jobs.return_value = 1 + + expected_result = pd.concat( + { + "imputer_1": pd.DataFrame( + {"A": [1.0, 2.0], "B": [3.0, 4.0]}, + index=pd.MultiIndex.from_tuples( + [("mae", "A"), ("mae", "B")] + ), + ), + "imputer_2": pd.DataFrame( + {"A": [1.5, 2.5], "B": [3.5, 4.5]}, + index=pd.MultiIndex.from_tuples( + [("mae", "A"), ("mae", "B")] + ), + ), + }, + axis=1, + ) + + with caplog.at_level(logging.INFO): + result = comparator.compare( + df_origin, use_parallel=True, parallel_over="imputers" + ) + + pd.testing.assert_frame_equal(result, expected_result) + assert mock_process_imputer.call_count == 2 + assert mock_get_optimal_n_jobs.call_count == 1 + assert "Starting comparison for" in caplog.text + assert "Comparison successfully terminated." in caplog.text + + def test_compare_sequential_imputers( + self, + mocker: MockerFixture, + comparator: Comparator, + df_origin: pd.DataFrame, + caplog: pytest.LogCaptureFixture, + ) -> None: + """Test the compare method with sequential imputers.""" + mock_split = mocker.patch.object(comparator.generator_holes, "split") + mock_split.return_value = [ + pd.DataFrame( + {"A": [False, True, False], "B": [True, False, True]} + ), + pd.DataFrame( + {"A": [True, False, True], "B": [False, True, False]} + ), + ] + comparator.dict_imputers = { + "imputer_1": mocker.Mock(), + "imputer_2": mocker.Mock(), + } + mock_process_imputer = mocker.patch.object( + comparator, "process_imputer" + ) + mock_process_imputer.side_effect = [ + ( + "imputer_1", + pd.DataFrame( + {"A": [1.0, 2.0], "B": [3.0, 4.0]}, + index=pd.MultiIndex.from_tuples( + [("mae", "A"), ("mae", "B")] + ), + ), + ), + ( + "imputer_2", + pd.DataFrame( + {"A": [1.5, 2.5], "B": [3.5, 4.5]}, + index=pd.MultiIndex.from_tuples( + [("mae", "A"), ("mae", "B")] + ), + ), + ), + ] + + expected_result = pd.concat( + { + "imputer_1": pd.DataFrame( + {"A": [1.0, 2.0], "B": [3.0, 4.0]}, + index=pd.MultiIndex.from_tuples( + [("mae", "A"), ("mae", "B")] + ), + ), + "imputer_2": pd.DataFrame( + {"A": [1.5, 2.5], "B": [3.5, 4.5]}, + index=pd.MultiIndex.from_tuples( + [("mae", "A"), ("mae", "B")] + ), + ), + }, + axis=1, + ) + + with caplog.at_level(logging.INFO): + result = comparator.compare( + df_origin, use_parallel=False, parallel_over="imputers" + ) + + pd.testing.assert_frame_equal(result, expected_result) + assert mock_process_imputer.call_count == 2 + assert "Starting comparison for" in caplog.text + assert "Comparison successfully terminated." in caplog.text + + def test_get_optimal_n_jobs_with_specified_n_jobs(self) -> None: + """Test when n_jobs is specified.""" + split_data = [1, 2, 3, 4] + n_jobs = 2 + + result = Comparator.get_optimal_n_jobs(split_data, n_jobs=n_jobs) + + assert ( + result == n_jobs + ), f"Expected n_jobs to be {n_jobs}, but got {result}" + + def test_get_optimal_n_jobs_with_default_n_jobs( + self, + mocker: MockerFixture, + ) -> None: + """Test when n_jobs is not specified.""" + split_data = [1, 2, 3, 4] + + mocker.patch("multiprocessing.cpu_count", return_value=8) + + result = Comparator.get_optimal_n_jobs(split_data, n_jobs=-1) + assert result == len( + split_data + ), f"Expected {len(split_data)}, but got {result}" + + def test_get_optimal_n_jobs_with_large_cpu_count( + self, + mocker: MockerFixture, + ) -> None: + """Test when number of CPUs is greater than the len of split_data.""" + split_data = [1, 2] # Seulement 2 tâches + + mocker.patch("multiprocessing.cpu_count", return_value=16) + + result = Comparator.get_optimal_n_jobs(split_data, n_jobs=-1) + assert result == len( + split_data + ), f"Expected {len(split_data)}, but got {result}"