Skip to content

Commit

Permalink
use joblib in comparator over imputers or splits + rearrange index.rst
Browse files Browse the repository at this point in the history
  • Loading branch information
hlbotterman committed Oct 23, 2024
1 parent bc9b4c8 commit 1233456
Show file tree
Hide file tree
Showing 3 changed files with 659 additions and 142 deletions.
10 changes: 5 additions & 5 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
.. toctree::
:maxdepth: 2
:hidden:
:caption: API
:caption: ANALYSIS

api
analysis
examples/tutorials/plot_tuto_mcar

.. toctree::
:maxdepth: 2
:hidden:
:caption: ANALYSIS
:caption: API

analysis
examples/tutorials/plot_tuto_mcar
api
235 changes: 172 additions & 63 deletions qolmat/benchmark/comparator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Script for comparator."""

import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from joblib import Parallel, cpu_count, delayed

from qolmat.benchmark import hyperparameters, metrics
from qolmat.benchmark.missing_patterns import _HoleGenerator
Expand Down Expand Up @@ -93,99 +94,207 @@ def get_errors(
df_errors = pd.concat(dict_errors.values(), keys=dict_errors.keys())
return df_errors

def evaluate_errors_sample(
self,
imputer: Any,
df: pd.DataFrame,
dict_config_opti_imputer: Dict[str, Any] = {},
metric_optim: str = "mse",
) -> pd.Series:
"""Evaluate the errors in the cross-validation.
def process_split(
self, split_data: Tuple[int, pd.DataFrame, pd.DataFrame]
) -> pd.DataFrame:
"""Process a split.
Parameters
----------
imputer : Any
imputation model
df : pd.DataFrame
dataframe to impute
dict_config_opti_imputer : Dict
search space for tested_model's hyperparameters
metric_optim : str
Loss function used when imputers undergo hyperparameter
optimization
split_data : Tuple
contains (split_idx, df_mask, df_origin)
Returns
-------
pd.Series
Series with the errors for each metric and each variable
pd.DataFrame
errors results
"""
list_errors = []
df_origin = df[self.selected_columns].copy()
for df_mask in self.generator_holes.split(df_origin):
df_corrupted = df_origin.copy()
df_corrupted[df_mask] = np.nan
_, df_mask, df_origin = split_data
df_with_holes = df_origin.copy()
df_with_holes[df_mask] = np.nan

split_results = {}
for imputer_name, imputer in self.dict_imputers.items():
dict_config_opti_imputer = self.dict_config_opti.get(
imputer_name, {}
)

imputer_opti = hyperparameters.optimize(
imputer,
df,
df_origin,
self.generator_holes,
metric_optim,
self.metric_optim,
dict_config_opti_imputer,
max_evals=self.max_evals,
verbose=self.verbose,
)
df_imputed = imputer_opti.fit_transform(df_corrupted)
subset = self.generator_holes.subset
if subset is None:
raise ValueError(
"HoleGenerator `subset` should be overwritten in split "
"but it is none!"
)
df_errors = self.get_errors(
df_origin[subset], df_imputed[subset], df_mask[subset]
)
list_errors.append(df_errors)
df_errors = pd.DataFrame(list_errors)
errors_mean = df_errors.mean(axis=0)

return errors_mean
df_imputed = imputer_opti.fit_transform(df_with_holes)
errors = self.get_errors(df_origin, df_imputed, df_mask)
split_results[imputer_name] = errors

return pd.concat(split_results, axis=1)

def process_imputer(
self, imputer_data: Tuple[str, Any, List[pd.DataFrame], pd.DataFrame]
) -> Tuple[str, pd.DataFrame]:
"""Process an imputer.
Parameters
----------
imputer_data : Tuple[str, Any, List[pd.DataFrame], pd.DataFrame]
contains (imputer_name, imputer, all_masks, df_origin)
Returns
-------
Tuple[str, pd.DataFrame]
imputer name, errors results
"""
imputer_name, imputer, all_masks, df_origin = imputer_data

dict_config_opti_imputer = self.dict_config_opti.get(imputer_name, {})
imputer_opti = hyperparameters.optimize(
imputer,
df_origin,
self.generator_holes,
self.metric_optim,
dict_config_opti_imputer,
max_evals=self.max_evals,
verbose=self.verbose,
)

imputer_results = []
for i, df_mask in enumerate(all_masks):
df_with_holes = df_origin.copy()
df_with_holes[df_mask] = np.nan
df_imputed = imputer_opti.fit_transform(df_with_holes)
errors = self.get_errors(df_origin, df_imputed, df_mask)
imputer_results.append(errors)

return imputer_name, pd.concat(imputer_results).groupby(
level=[0, 1]
).mean()

def compare(
self,
df: pd.DataFrame,
):
"""Compure different imputation methods on dataframe df.
df_origin: pd.DataFrame,
use_parallel: bool = True,
n_jobs: int = -1,
parallel_over: str = "auto",
) -> pd.DataFrame:
"""Compare different imputers in parallel with hyperparams opti.
Parameters
----------
df : pd.DataFrame
input dataframe (for comparison)
df_origin : pd.DataFrame
df with missing values
n_splits : int, optional
number of 'splits', i.e. fake dataframe with
artificial holes, by default 10
use_parallel : bool, optional
if parallelisation, by default True
n_jobs : int, optional
number of jobs to use for the parallelisation, by default -1
parallel_over : str, optional
'splits' or 'imputers', by default "auto"
Returns
-------
pd.DataFrame
Dataframe with the metrics results, imputers are in columns
and indices represent metrics and variables.
DataFrame (2-level index) with results.
Columsn are imputers.
0-level index are the metrics.
1-level index are the column names.
"""
dict_errors = {}
logging.info(
f"Starting comparison for {len(self.dict_imputers)} imputers."
)

for name, imputer in self.dict_imputers.items():
dict_config_opti_imputer = self.dict_config_opti.get(name, {})
all_splits = list(self.generator_holes.split(df_origin))

try:
logging.info(f"Testing model: {name}...")
dict_errors[name] = self.evaluate_errors_sample(
imputer, df, dict_config_opti_imputer, self.metric_optim
if parallel_over == "auto":
parallel_over = (
"splits"
if len(all_splits) > len(self.dict_imputers)
else "imputers"
)

if use_parallel:
logging.info(f"Parallelisation over: {parallel_over}...")
if parallel_over == "splits":
split_data = [
(i, df_mask, df_origin)
for i, df_mask in enumerate(all_splits)
]
n_jobs = self.get_optimal_n_jobs(split_data, n_jobs)
results = Parallel(n_jobs=n_jobs)(
delayed(self.process_split)(data) for data in split_data
)
final_results = pd.concat(results).groupby(level=[0, 1]).mean()
elif parallel_over == "imputers":
imputer_data = [
(name, imputer, all_splits, df_origin)
for name, imputer in self.dict_imputers.items()
]
n_jobs = self.get_optimal_n_jobs(imputer_data, n_jobs)
results = Parallel(n_jobs=n_jobs)(
delayed(self.process_imputer)(data)
for data in imputer_data
)
logging.info("done.")
except Exception as excp:
logging.info(
f"Error while testing {name} of type "
f"{type(imputer).__name__}!"
final_results = pd.concat(dict(results), axis=1)
else:
raise ValueError(
"`parallel_over` should be `auto`, `splits` or `imputers`."
)

else:
logging.info("Sequential treatment...")
if parallel_over == "splits":
split_data = [
(i, df_mask, df_origin)
for i, df_mask in enumerate(all_splits)
]
results = [self.process_split(data) for data in split_data]
final_results = pd.concat(results).groupby(level=[0, 1]).mean()
elif parallel_over == "imputers":
imputer_data = [
(name, imputer, all_splits, df_origin)
for name, imputer in self.dict_imputers.items()
]
results = [self.process_imputer(data) for data in imputer_data]
final_results = pd.concat(dict(results), axis=1)
else:
raise ValueError(
"`parallel_over` should be `auto`, `splits` or `imputers`."
)
raise excp

df_errors = pd.DataFrame(dict_errors)
logging.info("Comparison successfully terminated.")
return final_results

return df_errors
@staticmethod
def get_optimal_n_jobs(split_data: List, n_jobs: int = -1) -> int:
"""Determine the optimal number of parallel jobs to use.
If `n_jobs` is specified by the user, that value is used.
Otherwise, the function returns the minimum between the number of
CPU cores and the number of tasks (i.e., the length of `split_data`),
ensuring that no more jobs than tasks are launched.
Parameters
----------
split_data : List
A collection of data to be processed in parallel.
The length of this collection determines the number of tasks.
n_jobs : int
The number of jobs (parallel workers) to use, by default -1
Returns
-------
int
The optimal number of jobs to run in parallel
"""
return min(cpu_count(), len(split_data)) if n_jobs == -1 else n_jobs
Loading

0 comments on commit 1233456

Please sign in to comment.