Skip to content

Commit

Permalink
WIP on correct metrics output; some tests break because of not enough…
Browse files Browse the repository at this point in the history
… threshold matrix entries
  • Loading branch information
ccdavis committed Dec 9, 2024
1 parent 93a5c4e commit dd49937
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 66 deletions.
110 changes: 45 additions & 65 deletions hlink/linking/model_exploration/link_step_train_test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def _evaluate_threshold_combinations(
dep_var: str,
id_a: str,
id_b: str,
) -> tuple[pd.DataFrame, Any]:
) -> tuple[dict[int, pd.DataFrame], Any]:
training_config_name = str(self.task.training_conf)
config = self.task.link_run.config

Expand All @@ -371,8 +371,8 @@ def _evaluate_threshold_combinations(
info = f"\nTesting the best model + parameters against all {len(threshold_matrix)} threshold combinations.\n"
logger.debug(info)

prediction_results = dict[int, ThresholdTestResult] = {}
training_results: dict[int, ThresholdTestResult] = {}
prediction_results: dict[int, ThresholdTestResult] = {}
# training_results: dict[int, ThresholdTestResult] = {}

cached_training_data = thresholding_training_data.cache()
cached_test_data = thresholding_test_data.cache()
Expand All @@ -397,6 +397,7 @@ def _evaluate_threshold_combinations(
id_b,
dep_var,
)
"""
thresholding_predict_train = _get_probability_and_select_pred_columns(
cached_training_data,
thresholding_model,
Expand All @@ -405,6 +406,7 @@ def _evaluate_threshold_combinations(
id_b,
dep_var,
)
"""

for threshold_index, (
this_alpha_threshold,
Expand All @@ -418,20 +420,23 @@ def _evaluate_threshold_combinations(
logger.debug(diag)
decision = training_settings.get("decision")
start_predict_time = perf_counter()

predictions = threshold_core.predict_using_thresholds(
thresholding_predictions,
this_alpha_threshold,
this_threshold_ratio,
id_column,
decision,
)
"""
predict_train = threshold_core.predict_using_thresholds(
thresholding_predict_train,
this_alpha_threshold,
this_threshold_ratio,
id_column,
decision,
)
"""

end_predict_time = perf_counter()
info = f"Predictions for test-train data on threshold took {end_predict_time - start_predict_time:.2f}s"
Expand All @@ -446,7 +451,7 @@ def _evaluate_threshold_combinations(
this_threshold_ratio,
best_model.score,
)

"""
training_results[threshold_index] = self._capture_training_results(
predict_train,
dep_var,
Expand All @@ -456,11 +461,12 @@ def _evaluate_threshold_combinations(
this_threshold_ratio,
best_model.score,
)
"""

thresholding_test_data.unpersist()
thresholding_training_data.unpersist()

return prediction_results, training_results, suspicious_data
return prediction_results, suspicious_data

def _run(self) -> None:
training_section_name = str(self.task.training_conf)
Expand All @@ -482,7 +488,8 @@ def _run(self) -> None:
)

# Stores suspicious data
suspicious_data = self._create_suspicious_data(id_a, id_b)
# suspicious_data = self._create_suspicious_data(id_a, id_b)
suspicious_data = None

outer_fold_count = training_settings.get("n_training_iterations", 10)
inner_fold_count = 3
Expand All @@ -492,7 +499,7 @@ def _run(self) -> None:

# At the end we combine this information collected from every outer fold
threshold_test_results: list[ThresholdTestResult] = []
threshold_training_results: list[ThresholdTestResult]
# threshold_training_results: list[ThresholdTestResult]
all_suspicious_data: list[Any] = []
best_models: list[ModelEval] = []

Expand Down Expand Up @@ -538,7 +545,7 @@ def _run(self) -> None:
hyperparam_evaluation_results
)

prediction_results, training_results, suspicious_data_for_threshold = (
prediction_results, suspicious_data_for_threshold = (
self._evaluate_threshold_combinations(
best_model,
suspicious_data,
Expand All @@ -551,19 +558,24 @@ def _run(self) -> None:

# Collect the outputs for each fold
threshold_test_results.append(prediction_results)
threshold_training_results.append(training_results)
all_suspicious_data.append(suspicious_data_for_threshold)
# threshold_training_results.append(training_results)
# all_suspicious_data.append(suspicious_data_for_threshold)
best_models.append(best_model)

combined_test = (_combine_by_threshold_matrix_entry(prediction_results),)
combined_train = (_combine_by_threshold_matrix_entry(training_results),)
combined_test = _combine_by_threshold_matrix_entry(threshold_test_results)
# combined_train = (_combine_by_threshold_matrix_entry(training_results),)

# there are 'm' threshold_test_results items matching the number of
# inner folds. Each entry has 'n' items matching the number of
# threshold matrix entries.
threshold_matrix_size = len(threshold_test_results[0])

thresholded_metrics_df = _create_thresholded_metrics_df()
for i in range(threshold_matrix_size):
print(type(combined_test[i]))
print(combined_test[i])
thresholded_metrics_df = _aggregate_per_threshold_results(
thresholded_metrics_df, combined_test[i], combined_train[i], best_models
thresholded_metrics_df, combined_test[i], best_models
)

print("*** Final thresholded metrics ***")
Expand All @@ -577,7 +589,7 @@ def _run(self) -> None:
)

self._save_training_results(thresholded_metrics_df, self.task.spark)
self._save_suspicious_data(suspicious_data, self.task.spark)
# self._save_suspicious_data(suspicious_data, self.task.spark)
self.task.spark.sql("set spark.sql.shuffle.partitions=200")

def _split_into_folds(
Expand Down Expand Up @@ -669,38 +681,6 @@ def _get_splits(
)
return splits

def _capture_training_results(
self,
predict_train: pyspark.sql.DataFrame,
dep_var: str,
model: Model,
suspicious_data: dict[str, Any] | None,
alpha_threshold: float,
threshold_ratio: float | None,
pr_auc: float,
) -> ThresholdTestResult:
table_prefix = self.task.table_prefix
predict_train.createOrReplaceTempView(f"{table_prefix}predict_train")
(
train_TP_count,
train_FP_count,
train_FN_count,
train_TN_count,
) = _get_confusion_matrix(predict_train, dep_var, suspicious_data)
train_precision, train_recall, train_mcc = _get_aggregate_metrics(
train_TP_count, train_FP_count, train_FN_count, train_TN_count
)
result = ThresholdTestResult(
precision=train_precision,
recall=train_recall,
mcc=train_mcc,
pr_auc=pr_auc,
model_id=model,
alpha_threshold=alpha_threshold,
threshold_ratio=threshold_ratio,
)
return result

def _capture_prediction_results(
self,
predictions: pyspark.sql.DataFrame,
Expand All @@ -710,7 +690,7 @@ def _capture_prediction_results(
alpha_threshold: float,
threshold_ratio: float | None,
pr_auc: float,
) -> pd.DataFrame:
) -> ThresholdTestResult:
table_prefix = self.task.table_prefix
# write to sql tables for testing
predictions.createOrReplaceTempView(f"{table_prefix}predictions")
Expand Down Expand Up @@ -993,16 +973,16 @@ def _get_aggregate_metrics(
# The outer list entries hold results from each outer fold, the inner list has a ThresholdTestResult per threshold
# matrix entry. We need to get data for each threshold entry together. Basically we need to invert the data.
def _combine_by_threshold_matrix_entry(
threshold_results: list[list[ThresholdTestResult]],
threshold_results: list[dict[int, ThresholdTestResult]],
) -> list[ThresholdTestResult]:
# This list will have a size of the number of threshold matrix entries
results: list[ThresholdTestResult] = []

# Check number of folds
if len(threshold_results) < 2:
raise RuntimeError(
"Can't combine threshold results from less than two outer folds."
)
raise RuntimeError("Must have at least two outer folds.")

# Check if there are more than 0 threshold matrix entries
if len(threshold_results[0]) == 0:
raise RuntimeError(
"No entries in the first set of threshold results; can't determine threshold matrix size."
Expand All @@ -1011,36 +991,40 @@ def _combine_by_threshold_matrix_entry(
inferred_threshold_matrix_size = len(threshold_results[0])

for t in range(inferred_threshold_matrix_size):
results[t] = None
# One list per threshold matrix entry
results.append([])

for fold_results in threshold_results:
for t in range(inferred_threshold_matrix_size):
results[t].append(fold_results[t])

threshold_results_for_this_fold = fold_results[t]
results[t].append(threshold_results_for_this_fold)
return results


def _aggregate_per_threshold_results(
thresholded_metrics_df: pd.DataFrame,
prediction_results: list[ThresholdTestResult],
training_results: list[ThresholdTestResult],
# training_results: list[ThresholdTestResult],
best_models: list[ModelEval],
) -> pd.DataFrame:

# The threshold is the same for all entries in the lists
alpha_threshold = prediction_results[0].alpha_threshold
threshold_ratio = prediction_results[0].threshold_ratio

# Pull out columns to be aggregated
precision_test = [r.precision for r in prediction_results]
recall_test = [r.recall for r in prediction_results]
precision_test = [
r.precision for r in prediction_results if r.precision is not np.nan
]
recall_test = [r.recall for r in prediction_results if r.recall is not np.NaN]
pr_auc_test = [r.pr_auc for r in prediction_results]
mcc_test = [r.mcc for r in prediction_results]

"""
precision_train = [r.precision for r in training_results]
recall_train = [r.recall for r in training_results]
pr_auc_train = [r.pr_auc for r in training_results]
mcc_train = [r.mcc for r in training_results]
"""

new_desc = pd.DataFrame(
{
Expand All @@ -1056,12 +1040,6 @@ def _aggregate_per_threshold_results(
"pr_auc_test_sd": [statistics.stdev(pr_auc_test)],
"mcc_test_mean": [statistics.mean(mcc_test)],
"mcc_test_sd": [statistics.stdev(mcc_test)],
"precision_train_mean": [statistics.mean(precision_train)],
"precision_train_sd": [statistics.stdev(precision_train)],
"recall_train_mean": [statistics.mean(recall_train)],
"recall_train_sd": [statistics.stdev(recall_train)],
"mcc_train_mean": [statistics.mean(mcc_train)],
"mcc_train_sd": [statistics.stdev(mcc_train)],
},
)

Expand Down Expand Up @@ -1127,7 +1105,8 @@ def _create_thresholded_metrics_df() -> pd.DataFrame:
"recall_test_mean",
"recall_test_sd",
"mcc_test_mean",
"mcc_test_sd",
"mcc_test_sd"
"""
"precision_train_mean",
"precision_train_sd",
"recall_train_mean",
Expand All @@ -1136,6 +1115,7 @@ def _create_thresholded_metrics_df() -> pd.DataFrame:
"pr_auc_sd",
"mcc_train_mean",
"mcc_train_sd",
""",
]
)

Expand Down
2 changes: 1 addition & 1 deletion hlink/tests/model_exploration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ def feature_conf(training_conf):
training_conf["training"]["independent_vars"] = ["namelast_jw", "regionf"]

training_conf["training"]["model_parameters"] = []
training_conf["training"]["n_training_iterations"] = 2
training_conf["training"]["n_training_iterations"] = 3
return training_conf


Expand Down

0 comments on commit dd49937

Please sign in to comment.