Skip to content

Commit

Permalink
Merge pull request #175 from ipums/core-arguments
Browse files Browse the repository at this point in the history
Update linking.core.classifier and linking.core.threshold
  • Loading branch information
riley-harper authored Dec 6, 2024
2 parents 9542800 + d32c2bf commit 3c9043c
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 102 deletions.
58 changes: 26 additions & 32 deletions hlink/linking/core/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# in this project's top-level directory, and also on-line at:
# https://github.com/ipums/hlink

from typing import Any

from pyspark.ml.feature import SQLTransformer
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import (
Expand All @@ -28,32 +30,38 @@
_xgboost_available = True


def choose_classifier(model_type, params, dep_var):
"""Returns a classifier and a post_classification transformer given model type and params.
def choose_classifier(model_type: str, params: dict[str, Any], dep_var: str):
"""Given a model type and hyper-parameters for the model, return a
classifier of that type with those hyper-parameters, along with a
post-classification transformer to run after classification.
The post-classification transformer standardizes the output of the
classifier for further processing. For example, some classifiers create
models that output a probability array of [P(dep_var=0), P(dep_var=1)], and
the post-classification transformer extracts the single float P(dep_var=1)
as the probability for these models.
Parameters
----------
model_type: string
name of model
params: dictionary
dictionary of parameters for model
dep_var: string
the dependent variable for the model
model_type
the type of model, which may be random_forest, probit,
logistic_regression, decision_tree, gradient_boosted_trees, lightgbm
(requires the 'lightgbm' extra), or xgboost (requires the 'xgboost'
extra)
params
a dictionary of hyper-parameters for the model
dep_var
the dependent variable for the model, sometimes also called the "label"
Returns
-------
The classifer and a transformer to be used after classification.
The classifier and a transformer to be used after classification, as a tuple.
"""
post_transformer = SQLTransformer(statement="SELECT * FROM __THIS__")
features_vector = "features_vector"
if model_type == "random_forest":
classifier = RandomForestClassifier(
**{
key: val
for key, val in params.items()
if key not in ["threshold", "threshold_ratio"]
},
**params,
labelCol=dep_var,
featuresCol=features_vector,
seed=2133,
Expand Down Expand Up @@ -98,11 +106,7 @@ def choose_classifier(model_type, params, dep_var):

elif model_type == "gradient_boosted_trees":
classifier = GBTClassifier(
**{
key: val
for key, val in params.items()
if key not in ["threshold", "threshold_ratio"]
},
**params,
featuresCol=features_vector,
labelCol=dep_var,
seed=2133,
Expand All @@ -118,13 +122,8 @@ def choose_classifier(model_type, params, dep_var):
"its dependencies. Try installing hlink with the lightgbm extra: "
"\n\n pip install hlink[lightgbm]"
)
params_without_threshold = {
key: val
for key, val in params.items()
if key not in {"threshold", "threshold_ratio"}
}
classifier = synapse.ml.lightgbm.LightGBMClassifier(
**params_without_threshold,
**params,
featuresCol=features_vector,
labelCol=dep_var,
probabilityCol="probability_array",
Expand All @@ -139,13 +138,8 @@ def choose_classifier(model_type, params, dep_var):
"the xgboost library and its dependencies. Try installing hlink with "
"the xgboost extra:\n\n pip install hlink[xgboost]"
)
params_without_threshold = {
key: val
for key, val in params.items()
if key not in {"threshold", "threshold_ratio"}
}
classifier = xgboost.spark.SparkXGBClassifier(
**params_without_threshold,
**params,
features_col=features_vector,
label_col=dep_var,
probability_col="probability_array",
Expand Down
185 changes: 119 additions & 66 deletions hlink/linking/core/threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@
# in this project's top-level directory, and also on-line at:
# https://github.com/ipums/hlink

import logging
from typing import Any

from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, lead
from pyspark.sql.functions import col, lead, rank, when

logger = logging.getLogger(__name__)


def get_threshold_ratio(training_conf, model_conf, default=1.3):
def get_threshold_ratio(
training_conf: dict[str, Any], model_conf: dict[str, Any], default: float = 1.3
) -> float | Any:
"""Gets the threshold ratio or default from the config using the correct precedence.
Parameters
Expand All @@ -32,91 +40,136 @@ def get_threshold_ratio(training_conf, model_conf, default=1.3):


def predict_using_thresholds(
pred_df, alpha_threshold, threshold_ratio, training_conf, id_col
):
"""Adds a prediction column to the given pred_df by applying thresholds.
pred_df: DataFrame,
alpha_threshold: float,
threshold_ratio: float,
id_col: str,
decision: str | None,
) -> DataFrame:
"""Adds a "prediction" column to the given data frame by applying
thresholds to the "probability" column. The prediction column has either
the value 0, indicating that the potential match does not meet the
requirements for a match, or 1, indicating that the potential match does
meet the requirements for a match. The requirements for a match depend on
the decision argument, which switches between two different options.
1. If decision is "drop_duplicate_with_threshold_ratio", then
predict_using_thresholds() uses both the alpha_threshold and
threshold_ratio.
predict_using_thresholds() groups the matches by their id in data set A, and
selects from each group the potential match with the highest probability.
Then, if there is a second-highest probability in the group and it is at
least alpha_threshold, predict_using_thresholds() computes the ratio of the
highest probability to the second highest probability and stores it as the
ratio column. Finally, predict_using_thresholds() picks out of each group
the potential match with the highest probability and marks it with
prediction = 1 if
A. its probability is at least alpha_threshold and
B. either there is no second-highest probability over alpha_threshold, or
the ratio of the highest probability to the second-highest is greater
than threshold_ratio.
2. If decision is any other string or is None, then
predict_using_thresholds() does not use threshold_ratio and instead just
applies alpha_threshold. Each potential match with a probability of at
least alpha_threshold gets prediction = 1, and each potential match with a
probability less than alpha_threshold gets prediction = 0.
Parameters
----------
pred_df: DataFrame
a Spark DataFrame of potential matches a probability column
alpha_threshold: float
the alpha threshold cutoff value. No record with a probability lower than this
value will be considered for prediction = 1.
threshold_ratio: float
the threshold ratio cutoff value. Ratio's refer
to the "a" record's next best probability value.
Only used with the "drop_duplicate_with_threshold_ratio"
configuration value.
training_conf: dictionary
the training config section
id_col: string
the id column
pred_df:
a Spark DataFrame of potential matches with a probability column
alpha_threshold:
The alpha threshold cutoff value. No record with a probability lower
than this value will be considered for prediction = 1.
threshold_ratio:
The threshold ratio cutoff value, only used with the
"drop_duplicate_with_threshold_ratio" decision. The ratio is between
the best probability and second-best probability for potential matches
with the same id in data set A.
id_col:
the name of the id column
decision:
how to apply the alpha_threshold and threshold_ratio
Returns
-------
A Spark DataFrame containing the "prediction" column as well as other intermediate columns generated to create the prediction.
a Spark DataFrame containing the "prediction" column, and possibly some
additional intermediate columns generated to create the prediction
"""
if "probability" not in pred_df.columns:
raise ValueError(
"the input data frame must have a 'probability' column to make predictions using thresholds"
)

use_threshold_ratio = (
training_conf.get("decision", "") == "drop_duplicate_with_threshold_ratio"
decision is not None and decision == "drop_duplicate_with_threshold_ratio"
)

if use_threshold_ratio:
logger.debug(
f"Making predictions with alpha threshold and threshold ratio: {alpha_threshold=}, {threshold_ratio=}"
)
return _apply_threshold_ratio(
pred_df.drop("prediction"), alpha_threshold, threshold_ratio, id_col
)
else:
logger.debug(
f"Making predictions with alpha threshold but without threshold ratio: {alpha_threshold=}"
)
return _apply_alpha_threshold(pred_df.drop("prediction"), alpha_threshold)


def _apply_alpha_threshold(pred_df, alpha_threshold):
return pred_df.selectExpr(
"*",
f"case when probability >= {alpha_threshold} then 1 else 0 end as prediction",
)
def _apply_alpha_threshold(pred_df: DataFrame, alpha_threshold: float) -> DataFrame:
prediction = when(col("probability") >= alpha_threshold, 1).otherwise(0)
return pred_df.withColumn("prediction", prediction)


def _apply_threshold_ratio(
df: DataFrame, alpha_threshold: float, threshold_ratio: float, id_col: str
) -> DataFrame:
"""Apply an alpha_threshold and threshold_ratio.
def _apply_threshold_ratio(df, alpha_threshold, threshold_ratio, id_col):
"""Apply a decision threshold using the ration of a match's probability to the next closest match's probability."""
After thresholding on alpha_threshold, compute the ratio of each id_a's
highest potential match probability to its second-highest potential match
probability and compare the ratio to threshold_ratio."""
id_a = id_col + "_a"
id_b = id_col + "_b"
if "probability" not in df.columns:
raise NameError(
'In order to calculate the threshold ratio based on probabilities, you need to have a "probability" column in your data.'
)
else:
windowSpec = Window.partitionBy(df[f"{id_a}"]).orderBy(
df["probability"].desc(), df[f"{id_b}"]
windowSpec = Window.partitionBy(id_a).orderBy(col("probability").desc(), id_b)
prob_rank = rank().over(windowSpec)
prob_lead = lead("probability", 1).over(windowSpec)

should_compute_probability_ratio = (
col("second_best_prob").isNotNull()
& (col("second_best_prob") >= alpha_threshold)
& (col("prob_rank") == 1)
)
# To be a match, the row must...
# 1. Have prob_rank 1, so that it's the most likely match,
# 2. Have a probability of at least alpha_threshold,
# and
# 3. Either have no ratio (since there's no second best probability of at
# least alpha_threshold), or have a ratio of more than threshold_ratio.
is_match = (
(col("probability") >= alpha_threshold)
& (col("prob_rank") == 1)
& ((col("ratio") > threshold_ratio) | col("ratio").isNull())
)
return (
df.select(
"*",
prob_rank.alias("prob_rank"),
prob_lead.alias("second_best_prob"),
)
prob_rank = rank().over(windowSpec)
prob_lead = lead(df["probability"], 1).over(windowSpec)
return (
df.select(
df["*"],
prob_rank.alias("prob_rank"),
prob_lead.alias("second_best_prob"),
)
.selectExpr(
"*",
f"""
IF(
second_best_prob IS NOT NULL
AND second_best_prob >= {alpha_threshold}
AND prob_rank == 1,
probability / second_best_prob,
NULL)
as ratio
""",
)
.selectExpr(
"*",
f"""
CAST(
probability >= {alpha_threshold}
AND prob_rank == 1
AND (ratio > {threshold_ratio} OR ratio is NULL)
as INT) as prediction
""",
)
.drop("prob_rank")
.withColumn(
"ratio",
when(
should_compute_probability_ratio,
col("probability") / col("second_best_prob"),
).otherwise(None),
)
.withColumn("prediction", is_match.cast("integer"))
.drop("prob_rank")
)
3 changes: 2 additions & 1 deletion hlink/linking/matching/link_step_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ def _run(self):
threshold_ratio = threshold_core.get_threshold_ratio(
config[training_conf], chosen_model_params, default=1.3
)
decision = config[training_conf].get("decision")
predictions = threshold_core.predict_using_thresholds(
score_tmp,
alpha_threshold,
threshold_ratio,
config[training_conf],
config["id_column"],
decision,
)
predictions.write.mode("overwrite").saveAsTable(f"{table_prefix}predictions")
pmp = self.task.spark.table(f"{table_prefix}potential_matches_pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,20 +411,21 @@ def _evaluate_threshold_combinations(
f"{this_alpha_threshold=} and {this_threshold_ratio=}"
)
logger.debug(diag)
decision = training_settings.get("decision")
start_predict_time = perf_counter()
predictions = threshold_core.predict_using_thresholds(
thresholding_predictions,
this_alpha_threshold,
this_threshold_ratio,
training_settings,
id_column,
decision,
)
predict_train = threshold_core.predict_using_thresholds(
thresholding_predict_train,
this_alpha_threshold,
this_threshold_ratio,
training_settings,
id_column,
decision,
)

end_predict_time = perf_counter()
Expand Down
Loading

0 comments on commit 3c9043c

Please sign in to comment.