diff --git a/hlink/linking/model_exploration/link_step_train_test_models.py b/hlink/linking/model_exploration/link_step_train_test_models.py index 58c92c6..e5f4769 100644 --- a/hlink/linking/model_exploration/link_step_train_test_models.py +++ b/hlink/linking/model_exploration/link_step_train_test_models.py @@ -347,7 +347,7 @@ def _evaluate_threshold_combinations( dep_var: str, id_a: str, id_b: str, - ) -> tuple[pd.DataFrame, Any]: + ) -> tuple[dict[int, pd.DataFrame], Any]: training_config_name = str(self.task.training_conf) config = self.task.link_run.config @@ -371,8 +371,8 @@ def _evaluate_threshold_combinations( info = f"\nTesting the best model + parameters against all {len(threshold_matrix)} threshold combinations.\n" logger.debug(info) - prediction_results = dict[int, ThresholdTestResult] = {} - training_results: dict[int, ThresholdTestResult] = {} + prediction_results: dict[int, ThresholdTestResult] = {} + # training_results: dict[int, ThresholdTestResult] = {} cached_training_data = thresholding_training_data.cache() cached_test_data = thresholding_test_data.cache() @@ -397,6 +397,7 @@ def _evaluate_threshold_combinations( id_b, dep_var, ) + """ thresholding_predict_train = _get_probability_and_select_pred_columns( cached_training_data, thresholding_model, @@ -405,6 +406,7 @@ def _evaluate_threshold_combinations( id_b, dep_var, ) + """ for threshold_index, ( this_alpha_threshold, @@ -418,6 +420,7 @@ def _evaluate_threshold_combinations( logger.debug(diag) decision = training_settings.get("decision") start_predict_time = perf_counter() + predictions = threshold_core.predict_using_thresholds( thresholding_predictions, this_alpha_threshold, @@ -425,6 +428,7 @@ def _evaluate_threshold_combinations( id_column, decision, ) + """ predict_train = threshold_core.predict_using_thresholds( thresholding_predict_train, this_alpha_threshold, @@ -432,6 +436,7 @@ def _evaluate_threshold_combinations( id_column, decision, ) + """ end_predict_time = perf_counter() info = f"Predictions for test-train data on threshold took {end_predict_time - start_predict_time:.2f}s" @@ -446,7 +451,7 @@ def _evaluate_threshold_combinations( this_threshold_ratio, best_model.score, ) - + """ training_results[threshold_index] = self._capture_training_results( predict_train, dep_var, @@ -456,11 +461,12 @@ def _evaluate_threshold_combinations( this_threshold_ratio, best_model.score, ) + """ thresholding_test_data.unpersist() thresholding_training_data.unpersist() - return prediction_results, training_results, suspicious_data + return prediction_results, suspicious_data def _run(self) -> None: training_section_name = str(self.task.training_conf) @@ -482,7 +488,8 @@ def _run(self) -> None: ) # Stores suspicious data - suspicious_data = self._create_suspicious_data(id_a, id_b) + # suspicious_data = self._create_suspicious_data(id_a, id_b) + suspicious_data = None outer_fold_count = training_settings.get("n_training_iterations", 10) inner_fold_count = 3 @@ -492,7 +499,7 @@ def _run(self) -> None: # At the end we combine this information collected from every outer fold threshold_test_results: list[ThresholdTestResult] = [] - threshold_training_results: list[ThresholdTestResult] + # threshold_training_results: list[ThresholdTestResult] all_suspicious_data: list[Any] = [] best_models: list[ModelEval] = [] @@ -538,7 +545,7 @@ def _run(self) -> None: hyperparam_evaluation_results ) - prediction_results, training_results, suspicious_data_for_threshold = ( + prediction_results, suspicious_data_for_threshold = ( self._evaluate_threshold_combinations( best_model, suspicious_data, @@ -551,19 +558,24 @@ def _run(self) -> None: # Collect the outputs for each fold threshold_test_results.append(prediction_results) - threshold_training_results.append(training_results) - all_suspicious_data.append(suspicious_data_for_threshold) + # threshold_training_results.append(training_results) + # all_suspicious_data.append(suspicious_data_for_threshold) best_models.append(best_model) - combined_test = (_combine_by_threshold_matrix_entry(prediction_results),) - combined_train = (_combine_by_threshold_matrix_entry(training_results),) + combined_test = _combine_by_threshold_matrix_entry(threshold_test_results) + # combined_train = (_combine_by_threshold_matrix_entry(training_results),) + # there are 'm' threshold_test_results items matching the number of + # inner folds. Each entry has 'n' items matching the number of + # threshold matrix entries. threshold_matrix_size = len(threshold_test_results[0]) thresholded_metrics_df = _create_thresholded_metrics_df() for i in range(threshold_matrix_size): + print(type(combined_test[i])) + print(combined_test[i]) thresholded_metrics_df = _aggregate_per_threshold_results( - thresholded_metrics_df, combined_test[i], combined_train[i], best_models + thresholded_metrics_df, combined_test[i], best_models ) print("*** Final thresholded metrics ***") @@ -577,7 +589,7 @@ def _run(self) -> None: ) self._save_training_results(thresholded_metrics_df, self.task.spark) - self._save_suspicious_data(suspicious_data, self.task.spark) + # self._save_suspicious_data(suspicious_data, self.task.spark) self.task.spark.sql("set spark.sql.shuffle.partitions=200") def _split_into_folds( @@ -669,38 +681,6 @@ def _get_splits( ) return splits - def _capture_training_results( - self, - predict_train: pyspark.sql.DataFrame, - dep_var: str, - model: Model, - suspicious_data: dict[str, Any] | None, - alpha_threshold: float, - threshold_ratio: float | None, - pr_auc: float, - ) -> ThresholdTestResult: - table_prefix = self.task.table_prefix - predict_train.createOrReplaceTempView(f"{table_prefix}predict_train") - ( - train_TP_count, - train_FP_count, - train_FN_count, - train_TN_count, - ) = _get_confusion_matrix(predict_train, dep_var, suspicious_data) - train_precision, train_recall, train_mcc = _get_aggregate_metrics( - train_TP_count, train_FP_count, train_FN_count, train_TN_count - ) - result = ThresholdTestResult( - precision=train_precision, - recall=train_recall, - mcc=train_mcc, - pr_auc=pr_auc, - model_id=model, - alpha_threshold=alpha_threshold, - threshold_ratio=threshold_ratio, - ) - return result - def _capture_prediction_results( self, predictions: pyspark.sql.DataFrame, @@ -710,7 +690,7 @@ def _capture_prediction_results( alpha_threshold: float, threshold_ratio: float | None, pr_auc: float, - ) -> pd.DataFrame: + ) -> ThresholdTestResult: table_prefix = self.task.table_prefix # write to sql tables for testing predictions.createOrReplaceTempView(f"{table_prefix}predictions") @@ -993,16 +973,16 @@ def _get_aggregate_metrics( # The outer list entries hold results from each outer fold, the inner list has a ThresholdTestResult per threshold # matrix entry. We need to get data for each threshold entry together. Basically we need to invert the data. def _combine_by_threshold_matrix_entry( - threshold_results: list[list[ThresholdTestResult]], + threshold_results: list[dict[int, ThresholdTestResult]], ) -> list[ThresholdTestResult]: # This list will have a size of the number of threshold matrix entries results: list[ThresholdTestResult] = [] + # Check number of folds if len(threshold_results) < 2: - raise RuntimeError( - "Can't combine threshold results from less than two outer folds." - ) + raise RuntimeError("Must have at least two outer folds.") + # Check if there are more than 0 threshold matrix entries if len(threshold_results[0]) == 0: raise RuntimeError( "No entries in the first set of threshold results; can't determine threshold matrix size." @@ -1011,36 +991,40 @@ def _combine_by_threshold_matrix_entry( inferred_threshold_matrix_size = len(threshold_results[0]) for t in range(inferred_threshold_matrix_size): - results[t] = None + # One list per threshold matrix entry + results.append([]) for fold_results in threshold_results: for t in range(inferred_threshold_matrix_size): - results[t].append(fold_results[t]) - + threshold_results_for_this_fold = fold_results[t] + results[t].append(threshold_results_for_this_fold) return results def _aggregate_per_threshold_results( thresholded_metrics_df: pd.DataFrame, prediction_results: list[ThresholdTestResult], - training_results: list[ThresholdTestResult], + # training_results: list[ThresholdTestResult], best_models: list[ModelEval], ) -> pd.DataFrame: - # The threshold is the same for all entries in the lists alpha_threshold = prediction_results[0].alpha_threshold threshold_ratio = prediction_results[0].threshold_ratio # Pull out columns to be aggregated - precision_test = [r.precision for r in prediction_results] - recall_test = [r.recall for r in prediction_results] + precision_test = [ + r.precision for r in prediction_results if r.precision is not np.nan + ] + recall_test = [r.recall for r in prediction_results if r.recall is not np.NaN] pr_auc_test = [r.pr_auc for r in prediction_results] mcc_test = [r.mcc for r in prediction_results] + """ precision_train = [r.precision for r in training_results] recall_train = [r.recall for r in training_results] pr_auc_train = [r.pr_auc for r in training_results] mcc_train = [r.mcc for r in training_results] + """ new_desc = pd.DataFrame( { @@ -1056,12 +1040,6 @@ def _aggregate_per_threshold_results( "pr_auc_test_sd": [statistics.stdev(pr_auc_test)], "mcc_test_mean": [statistics.mean(mcc_test)], "mcc_test_sd": [statistics.stdev(mcc_test)], - "precision_train_mean": [statistics.mean(precision_train)], - "precision_train_sd": [statistics.stdev(precision_train)], - "recall_train_mean": [statistics.mean(recall_train)], - "recall_train_sd": [statistics.stdev(recall_train)], - "mcc_train_mean": [statistics.mean(mcc_train)], - "mcc_train_sd": [statistics.stdev(mcc_train)], }, ) @@ -1127,7 +1105,8 @@ def _create_thresholded_metrics_df() -> pd.DataFrame: "recall_test_mean", "recall_test_sd", "mcc_test_mean", - "mcc_test_sd", + "mcc_test_sd" + """ "precision_train_mean", "precision_train_sd", "recall_train_mean", @@ -1136,6 +1115,7 @@ def _create_thresholded_metrics_df() -> pd.DataFrame: "pr_auc_sd", "mcc_train_mean", "mcc_train_sd", + """, ] ) diff --git a/hlink/tests/model_exploration_test.py b/hlink/tests/model_exploration_test.py index f9b8a73..cc2e9c1 100644 --- a/hlink/tests/model_exploration_test.py +++ b/hlink/tests/model_exploration_test.py @@ -584,7 +584,7 @@ def feature_conf(training_conf): training_conf["training"]["independent_vars"] = ["namelast_jw", "regionf"] training_conf["training"]["model_parameters"] = [] - training_conf["training"]["n_training_iterations"] = 2 + training_conf["training"]["n_training_iterations"] = 3 return training_conf