From e57dad67f62e78447d5068ec82d6cb8a5279e852 Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 5 Dec 2024 09:58:54 -0600 Subject: [PATCH 01/11] [#172] Add type hints and docs to linking.core.classifier The output type of choose_classifier() is really hard to write down precisely because of the way PySpark types are set up. It's something like tuple["Classifier", "Transformer"], but for some reason SQLTransformer is not a subtype of Transformer. --- hlink/linking/core/classifier.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/hlink/linking/core/classifier.py b/hlink/linking/core/classifier.py index d9543ed..2acd2c4 100644 --- a/hlink/linking/core/classifier.py +++ b/hlink/linking/core/classifier.py @@ -3,6 +3,8 @@ # in this project's top-level directory, and also on-line at: # https://github.com/ipums/hlink +from typing import Any + from pyspark.ml.feature import SQLTransformer from pyspark.ml.regression import GeneralizedLinearRegression from pyspark.ml.classification import ( @@ -28,22 +30,32 @@ _xgboost_available = True -def choose_classifier(model_type, params, dep_var): - """Returns a classifier and a post_classification transformer given model type and params. +def choose_classifier(model_type: str, params: dict[str, Any], dep_var: str): + """Given a model type and hyper-parameters for the model, return a + classifier of that type with those hyper-parameters, along with a + post-classification transformer to run after classification. + + The post-classification transformer standardizes the output of the + classifier for further processing. For example, some classifiers create + models that output a probability array of [P(dep_var=0), P(dep_var=1)], and + the post-classification transformer extracts the single float P(dep_var=1) + as the probability for these models. Parameters ---------- - model_type: string - name of model - params: dictionary - dictionary of parameters for model - dep_var: string - the dependent variable for the model + model_type + the type of model, which may be random_forest, probit, + logistic_regression, decision_tree, gradient_boosted_trees, lightgbm + (requires the 'lightgbm' extra), or xgboost (requires the 'xgboost' + extra) + params + a dictionary of hyper-parameters for the model + dep_var + the dependent variable for the model, sometimes also called the "label" Returns ------- - The classifer and a transformer to be used after classification. - + The classifier and a transformer to be used after classification, as a tuple. """ post_transformer = SQLTransformer(statement="SELECT * FROM __THIS__") features_vector = "features_vector" From a736dd070d74654cfc8ffdd6e8c81994b16c28fe Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 5 Dec 2024 10:22:37 -0600 Subject: [PATCH 02/11] [#172] Don't handle threshold and threshold_ratio in choose_classifier() The caller is responsible for passing a dictionary of hyper-parameters to choose_classifier(), and this dictionary should not include hlink's threshold or threshold_ratio. Both of the places where we call choose_classifier() (training and model exploration) already handle this. --- hlink/linking/core/classifier.py | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/hlink/linking/core/classifier.py b/hlink/linking/core/classifier.py index 2acd2c4..bb27123 100644 --- a/hlink/linking/core/classifier.py +++ b/hlink/linking/core/classifier.py @@ -61,11 +61,7 @@ def choose_classifier(model_type: str, params: dict[str, Any], dep_var: str): features_vector = "features_vector" if model_type == "random_forest": classifier = RandomForestClassifier( - **{ - key: val - for key, val in params.items() - if key not in ["threshold", "threshold_ratio"] - }, + **params, labelCol=dep_var, featuresCol=features_vector, seed=2133, @@ -110,11 +106,7 @@ def choose_classifier(model_type: str, params: dict[str, Any], dep_var: str): elif model_type == "gradient_boosted_trees": classifier = GBTClassifier( - **{ - key: val - for key, val in params.items() - if key not in ["threshold", "threshold_ratio"] - }, + **params, featuresCol=features_vector, labelCol=dep_var, seed=2133, @@ -130,13 +122,8 @@ def choose_classifier(model_type: str, params: dict[str, Any], dep_var: str): "its dependencies. Try installing hlink with the lightgbm extra: " "\n\n pip install hlink[lightgbm]" ) - params_without_threshold = { - key: val - for key, val in params.items() - if key not in {"threshold", "threshold_ratio"} - } classifier = synapse.ml.lightgbm.LightGBMClassifier( - **params_without_threshold, + **params, featuresCol=features_vector, labelCol=dep_var, probabilityCol="probability_array", @@ -151,13 +138,8 @@ def choose_classifier(model_type: str, params: dict[str, Any], dep_var: str): "the xgboost library and its dependencies. Try installing hlink with " "the xgboost extra:\n\n pip install hlink[xgboost]" ) - params_without_threshold = { - key: val - for key, val in params.items() - if key not in {"threshold", "threshold_ratio"} - } classifier = xgboost.spark.SparkXGBClassifier( - **params_without_threshold, + **params, features_col=features_vector, label_col=dep_var, probability_col="probability_array", From 49bda13344d3e25e49b8173f9524a6ff91fea9cf Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 5 Dec 2024 16:54:51 +0000 Subject: [PATCH 03/11] [#174] Add type hints to linking.core.threshold --- hlink/linking/core/threshold.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/hlink/linking/core/threshold.py b/hlink/linking/core/threshold.py index 36dfd03..720b559 100644 --- a/hlink/linking/core/threshold.py +++ b/hlink/linking/core/threshold.py @@ -3,11 +3,16 @@ # in this project's top-level directory, and also on-line at: # https://github.com/ipums/hlink +from typing import Any + +from pyspark.sql import DataFrame from pyspark.sql.window import Window from pyspark.sql.functions import rank, lead -def get_threshold_ratio(training_conf, model_conf, default=1.3): +def get_threshold_ratio( + training_conf: dict[str, Any], model_conf: dict[str, Any], default: float = 1.3 +) -> float | Any: """Gets the threshold ratio or default from the config using the correct precedence. Parameters @@ -32,8 +37,12 @@ def get_threshold_ratio(training_conf, model_conf, default=1.3): def predict_using_thresholds( - pred_df, alpha_threshold, threshold_ratio, training_conf, id_col -): + pred_df: DataFrame, + alpha_threshold: float, + threshold_ratio: float, + training_conf: dict[str, Any], + id_col: str, +) -> DataFrame: """Adds a prediction column to the given pred_df by applying thresholds. Parameters @@ -69,14 +78,16 @@ def predict_using_thresholds( return _apply_alpha_threshold(pred_df.drop("prediction"), alpha_threshold) -def _apply_alpha_threshold(pred_df, alpha_threshold): +def _apply_alpha_threshold(pred_df: DataFrame, alpha_threshold: float) -> DataFrame: return pred_df.selectExpr( "*", f"case when probability >= {alpha_threshold} then 1 else 0 end as prediction", ) -def _apply_threshold_ratio(df, alpha_threshold, threshold_ratio, id_col): +def _apply_threshold_ratio( + df: DataFrame, alpha_threshold: float, threshold_ratio: float, id_col: str +) -> DataFrame: """Apply a decision threshold using the ration of a match's probability to the next closest match's probability.""" id_a = id_col + "_a" id_b = id_col + "_b" From 28bcd03218348ab6d6aa37e561bfcb4b24dd1cda Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 5 Dec 2024 17:38:10 +0000 Subject: [PATCH 04/11] [#174] Add a couple of unit tests for linking.core.threshold --- hlink/tests/core/threshold_test.py | 88 ++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 hlink/tests/core/threshold_test.py diff --git a/hlink/tests/core/threshold_test.py b/hlink/tests/core/threshold_test.py new file mode 100644 index 0000000..3bb0272 --- /dev/null +++ b/hlink/tests/core/threshold_test.py @@ -0,0 +1,88 @@ +# This file is part of the ISRDI's hlink. +# For copyright and licensing information, see the NOTICE and LICENSE files +# in this project's top-level directory, and also on-line at: +# https://github.com/ipums/hlink + +from pyspark.sql import Row, SparkSession + +from hlink.linking.core.threshold import predict_using_thresholds + + +def test_predict_using_thresholds_default_decision(spark: SparkSession) -> None: + """ + The default decision tells predict_using_thresholds() not to do + de-duplication on the id. Instead, it just applies alpha_threshold to the + probabilities to determine predictions. + """ + input_rows = [ + (0, "A", 0.1), + (0, "B", 0.7), + (1, "C", 0.2), + (2, "D", 0.4), + (3, "E", 1.0), + (4, "F", 0.0), + ] + df = spark.createDataFrame(input_rows, schema=["id_a", "id_b", "probability"]) + + # We are using the default decision, so threshold_ratio will be ignored + predictions = predict_using_thresholds( + df, alpha_threshold=0.6, threshold_ratio=0.0, training_conf={}, id_col="id" + ) + + output_rows = ( + predictions.sort("id_a", "id_b").select("id_a", "id_b", "prediction").collect() + ) + + OutputRow = Row("id_a", "id_b", "prediction") + assert output_rows == [ + OutputRow(0, "A", 0), + OutputRow(0, "B", 1), + OutputRow(1, "C", 0), + OutputRow(2, "D", 0), + OutputRow(3, "E", 1), + OutputRow(4, "F", 0), + ] + + +def test_predict_using_thresholds_drop_duplicates_decision(spark: SparkSession) -> None: + """ + The "drop_duplicates_with_threshold_ratio" decision tells + predict_using_thresholds() to look at the ratio between the first- and + second-best probabilities for each id, and to only set prediction = 1 when + the ratio between those probabilities is at least threshold_ratio. + """ + # id_a 0: two probable matches that will be de-duplicated so that both have prediction = 0 + # id_a 1: one probable match that will have prediction = 1 + # id_a 2: one improbable match that will have prediction = 0 + # id_a 3: one probable match that will have prediction = 1, and one improbable match that will have prediction = 0 + input_rows = [ + (0, "A", 0.8), + (0, "B", 0.9), + (1, "C", 0.75), + (2, "C", 0.3), + (3, "D", 0.1), + (3, "E", 0.8), + ] + df = spark.createDataFrame(input_rows, schema=["id_a", "id_b", "probability"]) + training_conf = {"decision": "drop_duplicate_with_threshold_ratio"} + predictions = predict_using_thresholds( + df, + alpha_threshold=0.5, + threshold_ratio=2.0, + training_conf=training_conf, + id_col="id", + ) + + output_rows = ( + predictions.sort("id_a", "id_b").select("id_a", "id_b", "prediction").collect() + ) + OutputRow = Row("id_a", "id_b", "prediction") + + assert output_rows == [ + OutputRow(0, "A", 0), + OutputRow(0, "B", 0), + OutputRow(1, "C", 1), + OutputRow(2, "C", 0), + OutputRow(3, "D", 0), + OutputRow(3, "E", 1), + ] From ad6ce10ecef2fc9bca587bf37fe37f805d2ad139 Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 5 Dec 2024 19:05:14 +0000 Subject: [PATCH 05/11] [#174] Pass just decision into predict_with_thresholds() instead of the whole training config This makes it clear which part of the config predict_with_thresholds() is using and makes it easier to call. It also means that predict_with_thresholds() does not need to know about the structure of the config. --- hlink/linking/core/threshold.py | 8 ++++---- hlink/linking/matching/link_step_score.py | 3 ++- .../model_exploration/link_step_train_test_models.py | 5 +++-- hlink/tests/core/threshold_test.py | 5 ++--- hlink/tests/matching_scoring_test.py | 2 +- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/hlink/linking/core/threshold.py b/hlink/linking/core/threshold.py index 720b559..789afd3 100644 --- a/hlink/linking/core/threshold.py +++ b/hlink/linking/core/threshold.py @@ -40,8 +40,8 @@ def predict_using_thresholds( pred_df: DataFrame, alpha_threshold: float, threshold_ratio: float, - training_conf: dict[str, Any], id_col: str, + decision: str | None, ) -> DataFrame: """Adds a prediction column to the given pred_df by applying thresholds. @@ -57,17 +57,17 @@ def predict_using_thresholds( to the "a" record's next best probability value. Only used with the "drop_duplicate_with_threshold_ratio" configuration value. - training_conf: dictionary - the training config section id_col: string the id column + decision: str | None + how to apply the thresholds Returns ------- A Spark DataFrame containing the "prediction" column as well as other intermediate columns generated to create the prediction. """ use_threshold_ratio = ( - training_conf.get("decision", "") == "drop_duplicate_with_threshold_ratio" + decision is not None and decision == "drop_duplicate_with_threshold_ratio" ) if use_threshold_ratio: diff --git a/hlink/linking/matching/link_step_score.py b/hlink/linking/matching/link_step_score.py index b4d192e..12b5da3 100644 --- a/hlink/linking/matching/link_step_score.py +++ b/hlink/linking/matching/link_step_score.py @@ -96,12 +96,13 @@ def _run(self): threshold_ratio = threshold_core.get_threshold_ratio( config[training_conf], chosen_model_params, default=1.3 ) + decision = config[training_conf].get("decision") predictions = threshold_core.predict_using_thresholds( score_tmp, alpha_threshold, threshold_ratio, - config[training_conf], config["id_column"], + decision, ) predictions.write.mode("overwrite").saveAsTable(f"{table_prefix}predictions") pmp = self.task.spark.table(f"{table_prefix}potential_matches_pipeline") diff --git a/hlink/linking/model_exploration/link_step_train_test_models.py b/hlink/linking/model_exploration/link_step_train_test_models.py index 1486c53..a05c3ed 100644 --- a/hlink/linking/model_exploration/link_step_train_test_models.py +++ b/hlink/linking/model_exploration/link_step_train_test_models.py @@ -411,20 +411,21 @@ def _evaluate_threshold_combinations( f"{this_alpha_threshold=} and {this_threshold_ratio=}" ) logger.debug(diag) + decision = training_settings.get("decision") start_predict_time = perf_counter() predictions = threshold_core.predict_using_thresholds( thresholding_predictions, this_alpha_threshold, this_threshold_ratio, - training_settings, id_column, + decision, ) predict_train = threshold_core.predict_using_thresholds( thresholding_predict_train, this_alpha_threshold, this_threshold_ratio, - training_settings, id_column, + decision, ) end_predict_time = perf_counter() diff --git a/hlink/tests/core/threshold_test.py b/hlink/tests/core/threshold_test.py index 3bb0272..b477b09 100644 --- a/hlink/tests/core/threshold_test.py +++ b/hlink/tests/core/threshold_test.py @@ -26,7 +26,7 @@ def test_predict_using_thresholds_default_decision(spark: SparkSession) -> None: # We are using the default decision, so threshold_ratio will be ignored predictions = predict_using_thresholds( - df, alpha_threshold=0.6, threshold_ratio=0.0, training_conf={}, id_col="id" + df, alpha_threshold=0.6, threshold_ratio=0.0, id_col="id", decision=None ) output_rows = ( @@ -64,13 +64,12 @@ def test_predict_using_thresholds_drop_duplicates_decision(spark: SparkSession) (3, "E", 0.8), ] df = spark.createDataFrame(input_rows, schema=["id_a", "id_b", "probability"]) - training_conf = {"decision": "drop_duplicate_with_threshold_ratio"} predictions = predict_using_thresholds( df, alpha_threshold=0.5, threshold_ratio=2.0, - training_conf=training_conf, id_col="id", + decision="drop_duplicate_with_threshold_ratio", ) output_rows = ( diff --git a/hlink/tests/matching_scoring_test.py b/hlink/tests/matching_scoring_test.py index 613e1f6..191663c 100755 --- a/hlink/tests/matching_scoring_test.py +++ b/hlink/tests/matching_scoring_test.py @@ -51,8 +51,8 @@ def test_step_2_alpha_beta_thresholds( score_tmp, alpha_threshold, threshold_ratio, - matching_conf["training"], matching_conf["id_column"], + matching_conf["training"].get("decision"), ) predictions.write.mode("overwrite").saveAsTable("predictions") From 54245132dc1ecb7ed1e5720ba222fe0d17aaf775 Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 5 Dec 2024 19:35:10 +0000 Subject: [PATCH 06/11] [#174] Do some minor refactoring and cleanup of linking.core.threshold --- hlink/linking/core/threshold.py | 70 ++++++++++++++++----------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/hlink/linking/core/threshold.py b/hlink/linking/core/threshold.py index 789afd3..b0f57a0 100644 --- a/hlink/linking/core/threshold.py +++ b/hlink/linking/core/threshold.py @@ -81,7 +81,7 @@ def predict_using_thresholds( def _apply_alpha_threshold(pred_df: DataFrame, alpha_threshold: float) -> DataFrame: return pred_df.selectExpr( "*", - f"case when probability >= {alpha_threshold} then 1 else 0 end as prediction", + f"CASE WHEN probability >= {alpha_threshold} THEN 1 ELSE 0 END AS prediction", ) @@ -95,39 +95,39 @@ def _apply_threshold_ratio( raise NameError( 'In order to calculate the threshold ratio based on probabilities, you need to have a "probability" column in your data.' ) - else: - windowSpec = Window.partitionBy(df[f"{id_a}"]).orderBy( - df["probability"].desc(), df[f"{id_b}"] + + windowSpec = Window.partitionBy(df[id_a]).orderBy( + df["probability"].desc(), df[id_b] + ) + prob_rank = rank().over(windowSpec) + prob_lead = lead(df["probability"], 1).over(windowSpec) + return ( + df.select( + df["*"], + prob_rank.alias("prob_rank"), + prob_lead.alias("second_best_prob"), ) - prob_rank = rank().over(windowSpec) - prob_lead = lead(df["probability"], 1).over(windowSpec) - return ( - df.select( - df["*"], - prob_rank.alias("prob_rank"), - prob_lead.alias("second_best_prob"), - ) - .selectExpr( - "*", - f""" - IF( - second_best_prob IS NOT NULL - AND second_best_prob >= {alpha_threshold} - AND prob_rank == 1, - probability / second_best_prob, - NULL) - as ratio - """, - ) - .selectExpr( - "*", - f""" - CAST( - probability >= {alpha_threshold} - AND prob_rank == 1 - AND (ratio > {threshold_ratio} OR ratio is NULL) - as INT) as prediction - """, - ) - .drop("prob_rank") + .selectExpr( + "*", + f""" + IF( + second_best_prob IS NOT NULL + AND second_best_prob >= {alpha_threshold} + AND prob_rank == 1, + probability / second_best_prob, + NULL) + AS ratio + """, ) + .selectExpr( + "*", + f""" + CAST( + probability >= {alpha_threshold} + AND prob_rank == 1 + AND (ratio > {threshold_ratio} OR ratio IS NULL) + AS INT) AS prediction + """, + ) + .drop("prob_rank") + ) From dd1636012d3b6c7b7474c5ca90fe3674df52abdf Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 5 Dec 2024 21:59:03 +0000 Subject: [PATCH 07/11] [#174] Replace a SQL query with the equivalent spark expression This prevents a possible SQL injection error by setting alpha_threshold to something weird. It's also a bit easier to read and work with in my experience. It's more composable since you can build up the expression instead of having to write all of the SQL at once. --- hlink/linking/core/threshold.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/hlink/linking/core/threshold.py b/hlink/linking/core/threshold.py index b0f57a0..b0523d3 100644 --- a/hlink/linking/core/threshold.py +++ b/hlink/linking/core/threshold.py @@ -7,7 +7,7 @@ from pyspark.sql import DataFrame from pyspark.sql.window import Window -from pyspark.sql.functions import rank, lead +from pyspark.sql.functions import col, lead, rank, when def get_threshold_ratio( @@ -79,10 +79,8 @@ def predict_using_thresholds( def _apply_alpha_threshold(pred_df: DataFrame, alpha_threshold: float) -> DataFrame: - return pred_df.selectExpr( - "*", - f"CASE WHEN probability >= {alpha_threshold} THEN 1 ELSE 0 END AS prediction", - ) + prediction = when(col("probability") >= alpha_threshold, 1).otherwise(0) + return pred_df.withColumn("prediction", prediction) def _apply_threshold_ratio( From 647a7517b0db01efe76f71520ef0cc8c00277d33 Mon Sep 17 00:00:00 2001 From: rileyh Date: Thu, 5 Dec 2024 22:47:31 +0000 Subject: [PATCH 08/11] [#174] Rewrite some thresholding code to use PySpark exprs instead of SQL --- hlink/linking/core/threshold.py | 52 ++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/hlink/linking/core/threshold.py b/hlink/linking/core/threshold.py index b0523d3..d5cd5ba 100644 --- a/hlink/linking/core/threshold.py +++ b/hlink/linking/core/threshold.py @@ -94,38 +94,44 @@ def _apply_threshold_ratio( 'In order to calculate the threshold ratio based on probabilities, you need to have a "probability" column in your data.' ) - windowSpec = Window.partitionBy(df[id_a]).orderBy( - df["probability"].desc(), df[id_b] - ) + windowSpec = Window.partitionBy(id_a).orderBy(col("probability").desc(), id_b) prob_rank = rank().over(windowSpec) - prob_lead = lead(df["probability"], 1).over(windowSpec) + prob_lead = lead("probability", 1).over(windowSpec) + + should_compute_probability_ratio = ( + col("second_best_prob").isNotNull() + & (col("second_best_prob") >= alpha_threshold) + & (col("prob_rank") == 1) + ) + # To be a match, the row must... + # 1. Have prob_rank 1, so that it's the most likely match, + # 2. Have a probability of at least alpha_threshold, + # and + # 3. Either have no ratio (since there's no second best probability of at + # least alpha_threshold), or have a ratio of more than threshold_ratio. + is_match = ( + (col("probability") >= alpha_threshold) + & (col("prob_rank") == 1) + & ((col("ratio") > threshold_ratio) | col("ratio").isNull()) + ) return ( df.select( - df["*"], + "*", prob_rank.alias("prob_rank"), prob_lead.alias("second_best_prob"), ) - .selectExpr( + .select( "*", - f""" - IF( - second_best_prob IS NOT NULL - AND second_best_prob >= {alpha_threshold} - AND prob_rank == 1, - probability / second_best_prob, - NULL) - AS ratio - """, + when( + should_compute_probability_ratio, + col("probability") / col("second_best_prob"), + ) + .otherwise(None) + .alias("ratio"), ) - .selectExpr( + .select( "*", - f""" - CAST( - probability >= {alpha_threshold} - AND prob_rank == 1 - AND (ratio > {threshold_ratio} OR ratio IS NULL) - AS INT) AS prediction - """, + is_match.cast("integer").alias("prediction"), ) .drop("prob_rank") ) From b5c8ae98cc617f7d75e8a55ca87af8bd35f6f99d Mon Sep 17 00:00:00 2001 From: rileyh Date: Fri, 6 Dec 2024 15:15:33 +0000 Subject: [PATCH 09/11] [#174] Use withColumn() instead of select("*", ...) This is just a bit cleaner to read, and makes clear the names of the columns that we're adding. We can't select ratio and prediction at once because prediction depends on ratio. --- hlink/linking/core/threshold.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/hlink/linking/core/threshold.py b/hlink/linking/core/threshold.py index d5cd5ba..e7ab09f 100644 --- a/hlink/linking/core/threshold.py +++ b/hlink/linking/core/threshold.py @@ -120,18 +120,13 @@ def _apply_threshold_ratio( prob_rank.alias("prob_rank"), prob_lead.alias("second_best_prob"), ) - .select( - "*", + .withColumn( + "ratio", when( should_compute_probability_ratio, col("probability") / col("second_best_prob"), - ) - .otherwise(None) - .alias("ratio"), - ) - .select( - "*", - is_match.cast("integer").alias("prediction"), + ).otherwise(None), ) + .withColumn("prediction", is_match.cast("integer")) .drop("prob_rank") ) From 1ffb6d118b75465561bd20fc3e9e84dd4c13e00f Mon Sep 17 00:00:00 2001 From: rileyh Date: Fri, 6 Dec 2024 16:00:15 +0000 Subject: [PATCH 10/11] [#174] Improve the error message when there's no probability column --- hlink/linking/core/threshold.py | 10 +++++----- hlink/tests/core/threshold_test.py | 20 +++++++++++++++++++- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/hlink/linking/core/threshold.py b/hlink/linking/core/threshold.py index e7ab09f..49c8418 100644 --- a/hlink/linking/core/threshold.py +++ b/hlink/linking/core/threshold.py @@ -66,6 +66,11 @@ def predict_using_thresholds( ------- A Spark DataFrame containing the "prediction" column as well as other intermediate columns generated to create the prediction. """ + if "probability" not in pred_df.columns: + raise ValueError( + "the input data frame must have a 'probability' column to make predictions using thresholds" + ) + use_threshold_ratio = ( decision is not None and decision == "drop_duplicate_with_threshold_ratio" ) @@ -89,11 +94,6 @@ def _apply_threshold_ratio( """Apply a decision threshold using the ration of a match's probability to the next closest match's probability.""" id_a = id_col + "_a" id_b = id_col + "_b" - if "probability" not in df.columns: - raise NameError( - 'In order to calculate the threshold ratio based on probabilities, you need to have a "probability" column in your data.' - ) - windowSpec = Window.partitionBy(id_a).orderBy(col("probability").desc(), id_b) prob_rank = rank().over(windowSpec) prob_lead = lead("probability", 1).over(windowSpec) diff --git a/hlink/tests/core/threshold_test.py b/hlink/tests/core/threshold_test.py index b477b09..0882ca3 100644 --- a/hlink/tests/core/threshold_test.py +++ b/hlink/tests/core/threshold_test.py @@ -4,6 +4,7 @@ # https://github.com/ipums/hlink from pyspark.sql import Row, SparkSession +import pytest from hlink.linking.core.threshold import predict_using_thresholds @@ -46,7 +47,7 @@ def test_predict_using_thresholds_default_decision(spark: SparkSession) -> None: def test_predict_using_thresholds_drop_duplicates_decision(spark: SparkSession) -> None: """ - The "drop_duplicates_with_threshold_ratio" decision tells + The "drop_duplicate_with_threshold_ratio" decision tells predict_using_thresholds() to look at the ratio between the first- and second-best probabilities for each id, and to only set prediction = 1 when the ratio between those probabilities is at least threshold_ratio. @@ -85,3 +86,20 @@ def test_predict_using_thresholds_drop_duplicates_decision(spark: SparkSession) OutputRow(3, "D", 0), OutputRow(3, "E", 1), ] + + +@pytest.mark.parametrize("decision", [None, "drop_duplicate_with_threshold_ratio"]) +def test_predict_using_thresholds_missing_probability_column_error( + spark: SparkSession, decision: str | None +) -> None: + """ + When the input DataFrame is missing the "probability" column, + predict_using_thresholds() raises a friendly error. + """ + df = spark.createDataFrame([(0, "A"), (1, "B")], schema=["id_a", "id_b"]) + with pytest.raises( + ValueError, match="the input data frame must have a 'probability' column" + ): + predict_using_thresholds( + df, alpha_threshold=0.5, threshold_ratio=1.5, id_col="id", decision=decision + ) From d32c2bfbf93925029b356f2d2c63492aa7f184a5 Mon Sep 17 00:00:00 2001 From: rileyh Date: Fri, 6 Dec 2024 17:56:17 +0000 Subject: [PATCH 11/11] [#174] Update documentation and add a few logging debug statements --- hlink/linking/core/threshold.py | 77 +++++++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 17 deletions(-) diff --git a/hlink/linking/core/threshold.py b/hlink/linking/core/threshold.py index 49c8418..6498022 100644 --- a/hlink/linking/core/threshold.py +++ b/hlink/linking/core/threshold.py @@ -3,12 +3,15 @@ # in this project's top-level directory, and also on-line at: # https://github.com/ipums/hlink +import logging from typing import Any from pyspark.sql import DataFrame from pyspark.sql.window import Window from pyspark.sql.functions import col, lead, rank, when +logger = logging.getLogger(__name__) + def get_threshold_ratio( training_conf: dict[str, Any], model_conf: dict[str, Any], default: float = 1.3 @@ -43,28 +46,58 @@ def predict_using_thresholds( id_col: str, decision: str | None, ) -> DataFrame: - """Adds a prediction column to the given pred_df by applying thresholds. + """Adds a "prediction" column to the given data frame by applying + thresholds to the "probability" column. The prediction column has either + the value 0, indicating that the potential match does not meet the + requirements for a match, or 1, indicating that the potential match does + meet the requirements for a match. The requirements for a match depend on + the decision argument, which switches between two different options. + + 1. If decision is "drop_duplicate_with_threshold_ratio", then + predict_using_thresholds() uses both the alpha_threshold and + threshold_ratio. + + predict_using_thresholds() groups the matches by their id in data set A, and + selects from each group the potential match with the highest probability. + Then, if there is a second-highest probability in the group and it is at + least alpha_threshold, predict_using_thresholds() computes the ratio of the + highest probability to the second highest probability and stores it as the + ratio column. Finally, predict_using_thresholds() picks out of each group + the potential match with the highest probability and marks it with + prediction = 1 if + + A. its probability is at least alpha_threshold and + B. either there is no second-highest probability over alpha_threshold, or + the ratio of the highest probability to the second-highest is greater + than threshold_ratio. + + 2. If decision is any other string or is None, then + predict_using_thresholds() does not use threshold_ratio and instead just + applies alpha_threshold. Each potential match with a probability of at + least alpha_threshold gets prediction = 1, and each potential match with a + probability less than alpha_threshold gets prediction = 0. Parameters ---------- - pred_df: DataFrame - a Spark DataFrame of potential matches a probability column - alpha_threshold: float - the alpha threshold cutoff value. No record with a probability lower than this - value will be considered for prediction = 1. - threshold_ratio: float - the threshold ratio cutoff value. Ratio's refer - to the "a" record's next best probability value. - Only used with the "drop_duplicate_with_threshold_ratio" - configuration value. - id_col: string - the id column - decision: str | None - how to apply the thresholds + pred_df: + a Spark DataFrame of potential matches with a probability column + alpha_threshold: + The alpha threshold cutoff value. No record with a probability lower + than this value will be considered for prediction = 1. + threshold_ratio: + The threshold ratio cutoff value, only used with the + "drop_duplicate_with_threshold_ratio" decision. The ratio is between + the best probability and second-best probability for potential matches + with the same id in data set A. + id_col: + the name of the id column + decision: + how to apply the alpha_threshold and threshold_ratio Returns ------- - A Spark DataFrame containing the "prediction" column as well as other intermediate columns generated to create the prediction. + a Spark DataFrame containing the "prediction" column, and possibly some + additional intermediate columns generated to create the prediction """ if "probability" not in pred_df.columns: raise ValueError( @@ -76,10 +109,16 @@ def predict_using_thresholds( ) if use_threshold_ratio: + logger.debug( + f"Making predictions with alpha threshold and threshold ratio: {alpha_threshold=}, {threshold_ratio=}" + ) return _apply_threshold_ratio( pred_df.drop("prediction"), alpha_threshold, threshold_ratio, id_col ) else: + logger.debug( + f"Making predictions with alpha threshold but without threshold ratio: {alpha_threshold=}" + ) return _apply_alpha_threshold(pred_df.drop("prediction"), alpha_threshold) @@ -91,7 +130,11 @@ def _apply_alpha_threshold(pred_df: DataFrame, alpha_threshold: float) -> DataFr def _apply_threshold_ratio( df: DataFrame, alpha_threshold: float, threshold_ratio: float, id_col: str ) -> DataFrame: - """Apply a decision threshold using the ration of a match's probability to the next closest match's probability.""" + """Apply an alpha_threshold and threshold_ratio. + + After thresholding on alpha_threshold, compute the ratio of each id_a's + highest potential match probability to its second-highest potential match + probability and compare the ratio to threshold_ratio.""" id_a = id_col + "_a" id_b = id_col + "_b" windowSpec = Window.partitionBy(id_a).orderBy(col("probability").desc(), id_b)