From 6acce1dc5faa980b24e5bf558e2e8244d6b4af5b Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 19 Feb 2024 16:49:34 +0000 Subject: [PATCH 1/8] [#130] Update black and run new version --- hlink/linking/matching/link_step_explode.py | 26 +++++++++++++-------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/hlink/linking/matching/link_step_explode.py b/hlink/linking/matching/link_step_explode.py index 9b7b744..2017246 100644 --- a/hlink/linking/matching/link_step_explode.py +++ b/hlink/linking/matching/link_step_explode.py @@ -115,26 +115,32 @@ def _explode(self, df, comparisons, comparison_features, blocking, id_column, is expand_length = exploding_column["expand_length"] derived_from_column = exploding_column["derived_from"] explode_selects = [ - explode(self._expand(derived_from_column, expand_length)).alias( - exploding_column_name + ( + explode(self._expand(derived_from_column, expand_length)).alias( + exploding_column_name + ) + if exploding_column_name == column + else column ) - if exploding_column_name == column - else column for column in all_column_names ] else: explode_selects = [ - explode(col(exploding_column_name)).alias(exploding_column_name) - if exploding_column_name == c - else c + ( + explode(col(exploding_column_name)).alias(exploding_column_name) + if exploding_column_name == c + else c + ) for c in all_column_names ] if "dataset" in exploding_column: derived_from_column = exploding_column["derived_from"] explode_selects_with_derived_column = [ - col(derived_from_column).alias(exploding_column_name) - if exploding_column_name == column - else column + ( + col(derived_from_column).alias(exploding_column_name) + if exploding_column_name == column + else column + ) for column in all_column_names ] if exploding_column["dataset"] == "a": From db6708509992c09d9734999db98a5d4ce018eafc Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 19 Feb 2024 17:27:16 +0000 Subject: [PATCH 2/8] [#130] Add a couple of unit tests for check_column_mappings --- hlink/tests/conf_validations_test.py | 38 +++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/hlink/tests/conf_validations_test.py b/hlink/tests/conf_validations_test.py index ffa85ff..df6c99b 100644 --- a/hlink/tests/conf_validations_test.py +++ b/hlink/tests/conf_validations_test.py @@ -1,8 +1,10 @@ import os import pytest +from pyspark.sql import SparkSession + from hlink.configs.load_config import load_conf_file -from hlink.scripts.lib.conf_validations import analyze_conf +from hlink.scripts.lib.conf_validations import analyze_conf, check_column_mappings from hlink.linking.link_run import LinkRun @@ -25,3 +27,37 @@ def test_invalid_conf(conf_dir_path, spark, conf_name, error_msg): with pytest.raises(ValueError, match=error_msg): analyze_conf(link_run) + + +def test_check_column_mappings_mappings_missing(spark: SparkSession) -> None: + """ + The config must have a column_mappings section. + """ + config = {} + df_a = spark.createDataFrame([[1], [2], [3]], ["a"]) + df_b = spark.createDataFrame([[4], [5], [6]], ["b"]) + + with pytest.raises( + ValueError, match=r"No \[\[column_mappings\]\] exist in the conf file" + ): + check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_no_column_name(spark: SparkSession) -> None: + """ + Each column mapping in the config must have a column_name attribute. + """ + config = { + "column_mappings": [{"column_name": "AGE", "alias": "age"}, {"alias": "height"}] + } + df_a = spark.createDataFrame([[20], [40], [60]], ["AGE"]) + df_b = spark.createDataFrame([[70], [50], [30]], ["AGE"]) + + df_a.show() + df_b.show() + + expected_err = ( + r"The following \[\[column_mappings\]\] has no 'column_name' attribute:" + ) + with pytest.raises(ValueError, match=expected_err): + check_column_mappings(config, df_a, df_b) From 9e725713ddeb3ec4eee33651bf043ea0ff18cbfc Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 19 Feb 2024 19:16:07 +0000 Subject: [PATCH 3/8] [#130] Add several more unit tests for check_column_mappings --- hlink/tests/conf_validations_test.py | 96 +++++++++++++++++++++++++++- 1 file changed, 93 insertions(+), 3 deletions(-) diff --git a/hlink/tests/conf_validations_test.py b/hlink/tests/conf_validations_test.py index df6c99b..e9d6d6a 100644 --- a/hlink/tests/conf_validations_test.py +++ b/hlink/tests/conf_validations_test.py @@ -53,11 +53,101 @@ def test_check_column_mappings_no_column_name(spark: SparkSession) -> None: df_a = spark.createDataFrame([[20], [40], [60]], ["AGE"]) df_b = spark.createDataFrame([[70], [50], [30]], ["AGE"]) - df_a.show() - df_b.show() - expected_err = ( r"The following \[\[column_mappings\]\] has no 'column_name' attribute:" ) with pytest.raises(ValueError, match=expected_err): check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_column_name_not_available_datasource_a( + spark: SparkSession, +) -> None: + """ + Column mappings may only use column_names that appear in datasource A or a + previous column mapping. + """ + config = {"column_mappings": [{"column_name": "HEIGHT"}]} + + df_a = spark.createDataFrame([[20], [40], [60]], ["AGE"]) + df_b = spark.createDataFrame([[70, 123], [50, 123], [30, 123]], ["AGE", "HEIGHT"]) + + expected_err = ( + r"Within a \[\[column_mappings\]\] the column_name: 'HEIGHT' " + r"does not exist in datasource_a and no previous \[\[column_mapping\]\] " + "alias exists for it" + ) + + with pytest.raises(ValueError, match=expected_err): + check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_set_value_column_a_does_not_need_column( + spark: SparkSession, +) -> None: + """ + When set_value_column_a is present for a column mapping, that column does not + need to be present in datasource A. + """ + config = {"column_mappings": [{"column_name": "HEIGHT", "set_value_column_a": 125}]} + + df_a = spark.createDataFrame([[20], [40], [60]], ["AGE"]) + df_b = spark.createDataFrame([[70, 123], [50, 123], [30, 123]], ["AGE", "HEIGHT"]) + + check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_column_name_not_available_datasource_b( + spark: SparkSession, +) -> None: + """ + Column mappings may only use column_names that appear in datasource B or a + previous column mapping. + """ + config = {"column_mappings": [{"column_name": "HEIGHT"}]} + + df_a = spark.createDataFrame([[70, 123], [50, 123], [30, 123]], ["AGE", "HEIGHT"]) + df_b = spark.createDataFrame([[20], [40], [60]], ["AGE"]) + + expected_err = ( + r"Within a \[\[column_mappings\]\] the column_name: 'HEIGHT' " + r"does not exist in datasource_b and no previous \[\[column_mapping\]\] " + "alias exists for it" + ) + + with pytest.raises(ValueError, match=expected_err): + check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_set_value_column_b_does_not_need_column( + spark: SparkSession, +) -> None: + """ + When set_value_column_b is present for a column mapping, that column does not + need to be present in datasource B. + """ + config = {"column_mappings": [{"column_name": "HEIGHT", "set_value_column_b": 125}]} + + df_a = spark.createDataFrame([[70, 123], [50, 123], [30, 123]], ["AGE", "HEIGHT"]) + df_b = spark.createDataFrame([[20], [40], [60]], ["AGE"]) + + check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_previous_mappings_are_available( + spark: SparkSession, +) -> None: + """ + Columns created in a previous column mapping can be used in other column + mappings. + """ + config = { + "column_mappings": [ + {"column_name": "AGE", "alias": "AGE_HLINK"}, + {"column_name": "AGE_HLINK", "alias": "AGE_HLINK2"}, + ] + } + df_a = spark.createDataFrame([[70], [50], [30]], ["AGE"]) + df_b = spark.createDataFrame([[20], [40], [60]], ["AGE"]) + + check_column_mappings(config, df_a, df_b) From 8212c076ab751c8a239ff6be7d854d122c1aa0d2 Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 19 Feb 2024 19:31:13 +0000 Subject: [PATCH 4/8] [#130] Add a couple of failing tests for the bug What's happening is that conf_validations is not taking the override_column_X attributes into account. It needs to look at these attributes and change which column name it looks for in the input datasources if the attributes are present. --- hlink/tests/conf_validations_test.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/hlink/tests/conf_validations_test.py b/hlink/tests/conf_validations_test.py index e9d6d6a..bca6992 100644 --- a/hlink/tests/conf_validations_test.py +++ b/hlink/tests/conf_validations_test.py @@ -151,3 +151,31 @@ def test_check_column_mappings_previous_mappings_are_available( df_b = spark.createDataFrame([[20], [40], [60]], ["AGE"]) check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_override_column_a(spark: SparkSession) -> None: + """ + The override_column_a attribute lets you control which column you read from + in datasource A. + """ + config = { + "column_mappings": [{"column_name": "AGE", "override_column_a": "ageColumn"}] + } + df_a = spark.createDataFrame([[20], [40], [60]], ["ageColumn"]) + df_b = spark.createDataFrame([[70], [50], [30]], ["AGE"]) + + check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_override_column_b(spark: SparkSession) -> None: + """ + The override_column_b attribute lets you control which column you read from + in datasource B. + """ + config = { + "column_mappings": [{"column_name": "ageColumn", "override_column_b": "AGE"}] + } + df_a = spark.createDataFrame([[20], [40], [60]], ["ageColumn"]) + df_b = spark.createDataFrame([[70], [50], [30]], ["AGE"]) + + check_column_mappings(config, df_a, df_b) From e5d318dbd9ee20e91330721e450262514bedef0b Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 19 Feb 2024 20:05:46 +0000 Subject: [PATCH 5/8] [#130] Handle override_column_a and override_column_b in check_column_mappings() --- hlink/scripts/lib/conf_validations.py | 31 +++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/hlink/scripts/lib/conf_validations.py b/hlink/scripts/lib/conf_validations.py index 59f63ad..d20e079 100644 --- a/hlink/scripts/lib/conf_validations.py +++ b/hlink/scripts/lib/conf_validations.py @@ -276,22 +276,41 @@ def check_column_mappings(config, df_a, df_b): column_name = c.get("column_name") set_value_column_a = c.get("set_value_column_a") set_value_column_b = c.get("set_value_column_b") + override_column_a = c.get("override_column_a") + override_column_b = c.get("override_column_b") + if not column_name: raise ValueError( f"The following [[column_mappings]] has no 'column_name' attribute: {c}" ) if set_value_column_a is None: - if column_name.lower() not in [c.lower() for c in df_a.columns]: - if column_name not in columns_available: + datasource_a_columns = [column.lower() for column in df_a.columns] + + if override_column_a is not None: + if override_column_a.lower() not in datasource_a_columns: raise ValueError( - f"Within a [[column_mappings]] the column_name: '{column_name}' does not exist in datasource_a and no previous [[column_mapping]] alias exists for it. \nColumn mapping: {c}. \nAvailable columns: \n {df_a.columns}" + f"Within a [[column_mappings]] the override_column_a column '{override_column_a}' does not exist in datasource_a.\nColumn mapping: {c}.\nAvailable columns: {df_a.columns}" ) + else: + if column_name.lower() not in datasource_a_columns: + if column_name not in columns_available: + raise ValueError( + f"Within a [[column_mappings]] the column_name: '{column_name}' does not exist in datasource_a and no previous [[column_mapping]] alias exists for it. \nColumn mapping: {c}. \nAvailable columns: \n {df_a.columns}" + ) if set_value_column_b is None: - if column_name.lower() not in [c.lower() for c in df_b.columns]: - if column_name not in columns_available: + datasource_b_columns = [column.lower() for column in df_b.columns] + + if override_column_b is not None: + if override_column_b.lower() not in datasource_b_columns: raise ValueError( - f"Within a [[column_mappings]] the column_name: '{column_name}' does not exist in datasource_b and no previous [[column_mapping]] alias exists for it. Column mapping: {c}. Available columns: \n {df_b.columns}" + f"Within a [[column_mappings]] the override_column_b column '{override_column_b}' does not exist in datasource_b.\nColumn mapping: {c}.\nAvailable columns: {df_b.columns}" ) + else: + if column_name.lower() not in datasource_b_columns: + if column_name not in columns_available: + raise ValueError( + f"Within a [[column_mappings]] the column_name: '{column_name}' does not exist in datasource_b and no previous [[column_mapping]] alias exists for it. Column mapping: {c}. Available columns: \n {df_b.columns}" + ) if alias in columns_available: duplicates.append(alias) elif not alias and column_name in columns_available: From 913093660b156b2455b6e6f4e15c98b9f8c51b57 Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 19 Feb 2024 20:33:57 +0000 Subject: [PATCH 6/8] [#130] Factor out some logic from check_column_mappings() --- hlink/scripts/lib/conf_validations.py | 73 ++++++++++++++++----------- hlink/tests/conf_validations_test.py | 4 +- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/hlink/scripts/lib/conf_validations.py b/hlink/scripts/lib/conf_validations.py index d20e079..2a38b2a 100644 --- a/hlink/scripts/lib/conf_validations.py +++ b/hlink/scripts/lib/conf_validations.py @@ -3,9 +3,12 @@ # in this project's top-level directory, and also on-line at: # https://github.com/ipums/hlink -from pyspark.sql.utils import AnalysisException from os import path +from typing import Any, Literal + import colorama +from pyspark.sql.utils import AnalysisException +from pyspark.sql import DataFrame def print_checking(section: str): @@ -265,6 +268,44 @@ def check_substitution_columns(config, columns_available): ) +def check_column_mappings_column_available( + column_mapping: dict[str, Any], + df: DataFrame, + previous_mappings: list[str], + a_or_b: Literal["a", "b"], +) -> None: + """ + Check whether a column in a column mapping is available or not. Raise a + ValueError if it is not available. + + previous_mappings is a list of columns mapped by previous column mappings. + """ + column_name = column_mapping["column_name"] + override_column = column_mapping.get(f"override_column_{a_or_b}") + df_columns_lower = [column.lower() for column in df.columns] + + if override_column is not None: + if override_column.lower() not in df_columns_lower: + raise ValueError( + f"Within a [[column_mappings]] the override_column_{a_or_b} column " + f"'{override_column}' does not exist in datasource_{a_or_b}.\n" + f"Column mapping: {column_mapping}\n" + f"Available columns: {df.columns}" + ) + else: + if ( + column_name.lower() not in df_columns_lower + and column_name not in previous_mappings + ): + raise ValueError( + f"Within a [[column_mappings]] the column_name '{column_name}' " + f"does not exist in datasource_{a_or_b} and no previous " + "[[column_mapping]] alias exists for it.\n" + f"Column mapping: {column_mapping}.\n" + f"Available columns:\n {df.columns}" + ) + + def check_column_mappings(config, df_a, df_b): column_mappings = config.get("column_mappings") if not column_mappings: @@ -276,41 +317,15 @@ def check_column_mappings(config, df_a, df_b): column_name = c.get("column_name") set_value_column_a = c.get("set_value_column_a") set_value_column_b = c.get("set_value_column_b") - override_column_a = c.get("override_column_a") - override_column_b = c.get("override_column_b") if not column_name: raise ValueError( f"The following [[column_mappings]] has no 'column_name' attribute: {c}" ) if set_value_column_a is None: - datasource_a_columns = [column.lower() for column in df_a.columns] - - if override_column_a is not None: - if override_column_a.lower() not in datasource_a_columns: - raise ValueError( - f"Within a [[column_mappings]] the override_column_a column '{override_column_a}' does not exist in datasource_a.\nColumn mapping: {c}.\nAvailable columns: {df_a.columns}" - ) - else: - if column_name.lower() not in datasource_a_columns: - if column_name not in columns_available: - raise ValueError( - f"Within a [[column_mappings]] the column_name: '{column_name}' does not exist in datasource_a and no previous [[column_mapping]] alias exists for it. \nColumn mapping: {c}. \nAvailable columns: \n {df_a.columns}" - ) + check_column_mappings_column_available(c, df_a, columns_available, "a") if set_value_column_b is None: - datasource_b_columns = [column.lower() for column in df_b.columns] - - if override_column_b is not None: - if override_column_b.lower() not in datasource_b_columns: - raise ValueError( - f"Within a [[column_mappings]] the override_column_b column '{override_column_b}' does not exist in datasource_b.\nColumn mapping: {c}.\nAvailable columns: {df_b.columns}" - ) - else: - if column_name.lower() not in datasource_b_columns: - if column_name not in columns_available: - raise ValueError( - f"Within a [[column_mappings]] the column_name: '{column_name}' does not exist in datasource_b and no previous [[column_mapping]] alias exists for it. Column mapping: {c}. Available columns: \n {df_b.columns}" - ) + check_column_mappings_column_available(c, df_b, columns_available, "b") if alias in columns_available: duplicates.append(alias) elif not alias and column_name in columns_available: diff --git a/hlink/tests/conf_validations_test.py b/hlink/tests/conf_validations_test.py index bca6992..5507989 100644 --- a/hlink/tests/conf_validations_test.py +++ b/hlink/tests/conf_validations_test.py @@ -73,7 +73,7 @@ def test_check_column_mappings_column_name_not_available_datasource_a( df_b = spark.createDataFrame([[70, 123], [50, 123], [30, 123]], ["AGE", "HEIGHT"]) expected_err = ( - r"Within a \[\[column_mappings\]\] the column_name: 'HEIGHT' " + r"Within a \[\[column_mappings\]\] the column_name 'HEIGHT' " r"does not exist in datasource_a and no previous \[\[column_mapping\]\] " "alias exists for it" ) @@ -110,7 +110,7 @@ def test_check_column_mappings_column_name_not_available_datasource_b( df_b = spark.createDataFrame([[20], [40], [60]], ["AGE"]) expected_err = ( - r"Within a \[\[column_mappings\]\] the column_name: 'HEIGHT' " + r"Within a \[\[column_mappings\]\] the column_name 'HEIGHT' " r"does not exist in datasource_b and no previous \[\[column_mapping\]\] " "alias exists for it" ) From fb2de50fa739e3b07cc5632fb63ce5c1ad4198fc Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 19 Feb 2024 20:47:11 +0000 Subject: [PATCH 7/8] [#130] Add two tests for error conditions with override_column_X --- hlink/tests/conf_validations_test.py | 44 ++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/hlink/tests/conf_validations_test.py b/hlink/tests/conf_validations_test.py index 5507989..9cf896c 100644 --- a/hlink/tests/conf_validations_test.py +++ b/hlink/tests/conf_validations_test.py @@ -179,3 +179,47 @@ def test_check_column_mappings_override_column_b(spark: SparkSession) -> None: df_b = spark.createDataFrame([[70], [50], [30]], ["AGE"]) check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_override_column_a_not_present( + spark: SparkSession, +) -> None: + """ + The override_column_a column must be present in datasource A. + """ + config = { + "column_mappings": [ + {"column_name": "AGE", "override_column_a": "oops_not_there"} + ] + } + df_a = spark.createDataFrame([[20], [40], [60]], ["ageColumn"]) + df_b = spark.createDataFrame([[70], [50], [30]], ["AGE"]) + + expected_err = ( + r"Within a \[\[column_mappings\]\] the override_column_a column " + "'oops_not_there' does not exist in datasource_a" + ) + with pytest.raises(ValueError, match=expected_err): + check_column_mappings(config, df_a, df_b) + + +def test_check_column_mappings_override_column_b_not_present( + spark: SparkSession, +) -> None: + """ + The override_column_b column must be present in datasource B. + """ + config = { + "column_mappings": [ + {"column_name": "AGE", "override_column_b": "oops_not_there"} + ] + } + df_a = spark.createDataFrame([[20], [40], [60]], ["AGE"]) + df_b = spark.createDataFrame([[70], [50], [30]], ["AGE"]) + + expected_err = ( + r"Within a \[\[column_mappings\]\] the override_column_b column " + "'oops_not_there' does not exist in datasource_b" + ) + with pytest.raises(ValueError, match=expected_err): + check_column_mappings(config, df_a, df_b) From 96d8c0a5f9826dc669c8eefb6d3f36ab4782d27a Mon Sep 17 00:00:00 2001 From: rileyh Date: Mon, 19 Feb 2024 21:05:49 +0000 Subject: [PATCH 8/8] [#130] Add type hints to check_column_mappings --- hlink/scripts/lib/conf_validations.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hlink/scripts/lib/conf_validations.py b/hlink/scripts/lib/conf_validations.py index 2a38b2a..399b7a5 100644 --- a/hlink/scripts/lib/conf_validations.py +++ b/hlink/scripts/lib/conf_validations.py @@ -306,7 +306,9 @@ def check_column_mappings_column_available( ) -def check_column_mappings(config, df_a, df_b): +def check_column_mappings( + config: dict[str, Any], df_a: DataFrame, df_b: DataFrame +) -> list[str]: column_mappings = config.get("column_mappings") if not column_mappings: raise ValueError("No [[column_mappings]] exist in the conf file.")