From a09b0fe4bc69eca3ece3a63ddd7b635a8d85ed6e Mon Sep 17 00:00:00 2001 From: hrfmartins Date: Thu, 23 Jan 2025 14:31:09 +0100 Subject: [PATCH 01/35] feat: customizable column names for DQEngine along with placeholer for other future configurations --- src/databricks/labs/dqx/engine.py | 72 ++++++++++++++++---------- src/databricks/labs/dqx/rule.py | 26 +++++++--- tests/integration/test_apply_checks.py | 32 +++++++++++- 3 files changed, 94 insertions(+), 36 deletions(-) diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py index c18552f..2a87f9a 100644 --- a/src/databricks/labs/dqx/engine.py +++ b/src/databricks/labs/dqx/engine.py @@ -5,14 +5,17 @@ import itertools from pathlib import Path from collections.abc import Callable -from typing import Any +from typing import Any, Optional +from dataclasses import dataclass, field import yaml import pyspark.sql.functions as F from pyspark.sql import DataFrame -from databricks.labs.dqx.rule import DQRule, Criticality, Columns, DQRuleColSet, ChecksValidationStatus +from databricks.labs.dqx.rule import DQRule, Criticality, DQRuleColSet, ChecksValidationStatus, ColumnArguments, \ + ExtraParams, DefaultColumnNames from databricks.labs.dqx.utils import deserialize_dicts from databricks.labs.dqx import col_functions from databricks.labs.blueprint.installation import Installation + from databricks.labs.dqx.base import DQEngineBase, DQEngineCoreBase from databricks.labs.dqx.config import WorkspaceConfig, RunConfig from databricks.sdk.errors import NotFound @@ -24,7 +27,26 @@ class DQEngineCore(DQEngineCoreBase): - """Data Quality Engine Core class to apply data quality checks to a given dataframe.""" + """Data Quality Engine Core class to apply data quality checks to a given dataframe. + Args: + workspace_client (WorkspaceClient): WorkspaceClient instance to use for accessing the workspace. + extra_params (ExtraParams): Extra parameters for the DQEngine. + """ + + def __init__(self, workspace_client: WorkspaceClient, extra_params: ExtraParams | None = None): + super().__init__(workspace_client) + + extra_params = extra_params or ExtraParams() + + self._column_names = { + ColumnArguments.ERRORS: extra_params.column_names.get( + ColumnArguments.ERRORS.value, DefaultColumnNames.ERRORS.value + ), + ColumnArguments.WARNINGS: extra_params.column_names.get( + ColumnArguments.WARNINGS.value, DefaultColumnNames.WARNINGS.value + ), + } + def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: if not checks: @@ -32,8 +54,8 @@ def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: warning_checks = self._get_check_columns(checks, Criticality.WARN.value) error_checks = self._get_check_columns(checks, Criticality.ERROR.value) - ndf = self._create_results_map(df, error_checks, Columns.ERRORS.value) - ndf = self._create_results_map(ndf, warning_checks, Columns.WARNINGS.value) + ndf = self._create_results_map(df, error_checks, self._column_names[ColumnArguments.ERRORS]) + ndf = self._create_results_map(ndf, warning_checks, self._column_names[ColumnArguments.WARNINGS]) return ndf @@ -57,12 +79,13 @@ def apply_checks_by_metadata_and_split( return good_df, bad_df + def apply_checks_by_metadata( - self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None - ) -> DataFrame: - dq_rule_checks = self.build_checks_by_metadata(checks, glbs) + self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None + ) -> DataFrame: + dq_rule_checks = self.build_checks_by_metadata(checks, glbs) - return self.apply_checks(df, dq_rule_checks) + return self.apply_checks(df, dq_rule_checks) @staticmethod def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: @@ -77,13 +100,11 @@ def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> C return status - @staticmethod - def get_invalid(df: DataFrame) -> DataFrame: - return df.where(F.col(Columns.ERRORS.value).isNotNull() | F.col(Columns.WARNINGS.value).isNotNull()) + def get_invalid(self, df: DataFrame) -> DataFrame: + return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNotNull() | F.col(self._column_names[ColumnArguments.WARNINGS]).isNotNull()) - @staticmethod - def get_valid(df: DataFrame) -> DataFrame: - return df.where(F.col(Columns.ERRORS.value).isNull()).drop(Columns.ERRORS.value, Columns.WARNINGS.value) + def get_valid(self, df: DataFrame) -> DataFrame: + return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNull()).drop(self._column_names[ColumnArguments.ERRORS], self._column_names[ColumnArguments.WARNINGS]) @staticmethod def load_checks_from_local_file(path: str) -> list[dict]: @@ -177,8 +198,7 @@ def _get_check_columns(checks: list[DQRule], criticality: str) -> list[DQRule]: """ return [check for check in checks if check.rule_criticality == criticality] - @staticmethod - def _append_empty_checks(df: DataFrame) -> DataFrame: + def _append_empty_checks(self, df: DataFrame) -> DataFrame: """Append empty checks at the end of dataframe. :param df: dataframe without checks @@ -186,8 +206,8 @@ def _append_empty_checks(df: DataFrame) -> DataFrame: """ return df.select( "*", - F.lit(None).cast("map").alias(Columns.ERRORS.value), - F.lit(None).cast("map").alias(Columns.WARNINGS.value), + F.lit(None).cast("map").alias(self._column_names[ColumnArguments.ERRORS]), + F.lit(None).cast("map").alias(self._column_names[ColumnArguments.WARNINGS]), ) @staticmethod @@ -350,9 +370,9 @@ def _resolve_function(func_name: str, glbs: dict[str, Any] | None = None, fail_o class DQEngine(DQEngineBase): """Data Quality Engine class to apply data quality checks to a given dataframe.""" - def __init__(self, workspace_client: WorkspaceClient, engine: DQEngineCoreBase | None = None): + def __init__(self, workspace_client: WorkspaceClient, engine: DQEngineCoreBase | None = None, extra_params: ExtraParams | None = None): super().__init__(workspace_client) - self._engine = engine or DQEngineCore(workspace_client) + self._engine = engine or DQEngineCore(workspace_client, extra_params) def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: return self._engine.apply_checks(df, checks) @@ -374,13 +394,11 @@ def apply_checks_by_metadata( def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: return DQEngineCore.validate_checks(checks, glbs) - @staticmethod - def get_invalid(df: DataFrame) -> DataFrame: - return DQEngineCore.get_invalid(df) + def get_invalid(self, df: DataFrame) -> DataFrame: + return self._engine.get_invalid(df) - @staticmethod - def get_valid(df: DataFrame) -> DataFrame: - return DQEngineCore.get_valid(df) + def get_valid(self, df: DataFrame) -> DataFrame: + return self._engine.get_valid(df) @staticmethod def load_checks_from_local_file(path: str) -> list[dict]: diff --git a/src/databricks/labs/dqx/rule.py b/src/databricks/labs/dqx/rule.py index b04ed4b..ee6b20e 100644 --- a/src/databricks/labs/dqx/rule.py +++ b/src/databricks/labs/dqx/rule.py @@ -1,27 +1,39 @@ from enum import Enum from dataclasses import dataclass, field import functools as ft -from typing import Any +from typing import Any, Optional from collections.abc import Callable from pyspark.sql import Column import pyspark.sql.functions as F from databricks.labs.dqx.utils import get_column_name -# TODO: make this configurable -class Columns(Enum): +class Criticality(Enum): + """Enum class to represent criticality of the check.""" + + WARN = "warn" + ERROR = "error" + + +class DefaultColumnNames(Enum): """Enum class to represent columns in the dataframe that will be used for error and warning reporting.""" ERRORS = "_errors" WARNINGS = "_warnings" -class Criticality(Enum): - """Enum class to represent criticality of the check.""" +class ColumnArguments(Enum): + """Enum class that is used as input parsing for custom column naming.""" - WARN = "warn" - ERROR = "error" + ERRORS = "errors" + WARNINGS = "warnings" + + +@dataclass(frozen=True) +class ExtraParams: + """Class to represent extra parameters for DQEngine.""" + column_names: Optional[dict[str, str]] = field(default_factory=dict) @dataclass(frozen=True) class DQRule: diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 63b9684..397569a 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -4,12 +4,16 @@ from pyspark.sql import Column from chispa.dataframe_comparer import assert_df_equality # type: ignore from databricks.labs.dqx.col_functions import is_not_null_and_not_empty, make_condition -from databricks.labs.dqx.engine import DQEngine +from databricks.labs.dqx.engine import ( + DQRule, + DQEngine, + ExtraParams, +) from databricks.labs.dqx.rule import DQRule - SCHEMA = "a: int, b: int, c: int" EXPECTED_SCHEMA = SCHEMA + ", _errors: map, _warnings: map" +EXPECTED_SCHEMA_WITH_CUSTOM_NAMES = SCHEMA + ", ERROR: map, WARN: map" def test_apply_checks_on_empty_checks(ws, spark): @@ -491,3 +495,27 @@ def test_get_invalid_records(ws, spark): ) assert_df_equality(invalid_df, expected_invalid_df) + +def test_apply_checks_with_custom_column_naming(ws, spark): + dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names = {'errors': 'ERROR', 'warnings': 'WARN'})) + test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) + + checks = [{"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}] + checked = dq_engine.apply_checks_by_metadata(test_df, checks, globals()) + + assert 'ERROR' in checked.columns + assert 'WARN' in checked.columns + + expected = spark.createDataFrame( + [ + [1, 3, 3, None, None], + [2, None, 4, None, None], + [None, 4, None, None, {"col_a_is_null_or_empty": "new check failed"}], + [None, None, None, None, {"col_a_is_null_or_empty": "new check failed"}], + ], + EXPECTED_SCHEMA_WITH_CUSTOM_NAMES, + ) + + assert_df_equality(checked, expected, ignore_nullable=True) + + From 8cb6ab02d1edba1bd3e137729dce443fb26a44c1 Mon Sep 17 00:00:00 2001 From: hrfmartins Date: Sun, 2 Feb 2025 09:33:39 +0100 Subject: [PATCH 02/35] fix: linting issues fix --- src/databricks/labs/dqx/base.py | 6 ++-- src/databricks/labs/dqx/engine.py | 40 +++++++++++++++++--------- src/databricks/labs/dqx/rule.py | 5 ++-- tests/integration/test_apply_checks.py | 6 ++-- 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/databricks/labs/dqx/base.py b/src/databricks/labs/dqx/base.py index b1bbffa..651e696 100644 --- a/src/databricks/labs/dqx/base.py +++ b/src/databricks/labs/dqx/base.py @@ -113,18 +113,16 @@ def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> C :return ValidationStatus: The validation status. """ - @staticmethod @abc.abstractmethod - def get_invalid(df: DataFrame) -> DataFrame: + def get_invalid(self, df: DataFrame) -> DataFrame: """ Get records that violate data quality checks (records with warnings and errors). @param df: input DataFrame. @return: dataframe with error and warning rows and corresponding reporting columns. """ - @staticmethod @abc.abstractmethod - def get_valid(df: DataFrame) -> DataFrame: + def get_valid(self, df: DataFrame) -> DataFrame: """ Get records that don't violate data quality checks (records with warnings but no errors). @param df: input DataFrame. diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py index 2a87f9a..d98f0fa 100644 --- a/src/databricks/labs/dqx/engine.py +++ b/src/databricks/labs/dqx/engine.py @@ -5,13 +5,19 @@ import itertools from pathlib import Path from collections.abc import Callable -from typing import Any, Optional -from dataclasses import dataclass, field +from typing import Any import yaml import pyspark.sql.functions as F from pyspark.sql import DataFrame -from databricks.labs.dqx.rule import DQRule, Criticality, DQRuleColSet, ChecksValidationStatus, ColumnArguments, \ - ExtraParams, DefaultColumnNames +from databricks.labs.dqx.rule import ( + DQRule, + Criticality, + DQRuleColSet, + ChecksValidationStatus, + ColumnArguments, + ExtraParams, + DefaultColumnNames, +) from databricks.labs.dqx.utils import deserialize_dicts from databricks.labs.dqx import col_functions from databricks.labs.blueprint.installation import Installation @@ -47,7 +53,6 @@ def __init__(self, workspace_client: WorkspaceClient, extra_params: ExtraParams ), } - def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: if not checks: return self._append_empty_checks(df) @@ -79,13 +84,12 @@ def apply_checks_by_metadata_and_split( return good_df, bad_df - def apply_checks_by_metadata( - self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None - ) -> DataFrame: - dq_rule_checks = self.build_checks_by_metadata(checks, glbs) + self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None + ) -> DataFrame: + dq_rule_checks = self.build_checks_by_metadata(checks, glbs) - return self.apply_checks(df, dq_rule_checks) + return self.apply_checks(df, dq_rule_checks) @staticmethod def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: @@ -101,10 +105,15 @@ def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> C return status def get_invalid(self, df: DataFrame) -> DataFrame: - return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNotNull() | F.col(self._column_names[ColumnArguments.WARNINGS]).isNotNull()) + return df.where( + F.col(self._column_names[ColumnArguments.ERRORS]).isNotNull() + | F.col(self._column_names[ColumnArguments.WARNINGS]).isNotNull() + ) def get_valid(self, df: DataFrame) -> DataFrame: - return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNull()).drop(self._column_names[ColumnArguments.ERRORS], self._column_names[ColumnArguments.WARNINGS]) + return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNull()).drop( + self._column_names[ColumnArguments.ERRORS], self._column_names[ColumnArguments.WARNINGS] + ) @staticmethod def load_checks_from_local_file(path: str) -> list[dict]: @@ -370,7 +379,12 @@ def _resolve_function(func_name: str, glbs: dict[str, Any] | None = None, fail_o class DQEngine(DQEngineBase): """Data Quality Engine class to apply data quality checks to a given dataframe.""" - def __init__(self, workspace_client: WorkspaceClient, engine: DQEngineCoreBase | None = None, extra_params: ExtraParams | None = None): + def __init__( + self, + workspace_client: WorkspaceClient, + engine: DQEngineCoreBase | None = None, + extra_params: ExtraParams | None = None, + ): super().__init__(workspace_client) self._engine = engine or DQEngineCore(workspace_client, extra_params) diff --git a/src/databricks/labs/dqx/rule.py b/src/databricks/labs/dqx/rule.py index ee6b20e..598a608 100644 --- a/src/databricks/labs/dqx/rule.py +++ b/src/databricks/labs/dqx/rule.py @@ -1,7 +1,7 @@ from enum import Enum from dataclasses import dataclass, field import functools as ft -from typing import Any, Optional +from typing import Any from collections.abc import Callable from pyspark.sql import Column import pyspark.sql.functions as F @@ -33,7 +33,8 @@ class ColumnArguments(Enum): class ExtraParams: """Class to represent extra parameters for DQEngine.""" - column_names: Optional[dict[str, str]] = field(default_factory=dict) + column_names: dict[str, str] = field(default_factory=dict) + @dataclass(frozen=True) class DQRule: diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 397569a..3ca17fc 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5,7 +5,6 @@ from chispa.dataframe_comparer import assert_df_equality # type: ignore from databricks.labs.dqx.col_functions import is_not_null_and_not_empty, make_condition from databricks.labs.dqx.engine import ( - DQRule, DQEngine, ExtraParams, ) @@ -496,8 +495,9 @@ def test_get_invalid_records(ws, spark): assert_df_equality(invalid_df, expected_invalid_df) + def test_apply_checks_with_custom_column_naming(ws, spark): - dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names = {'errors': 'ERROR', 'warnings': 'WARN'})) + dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) checks = [{"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}] @@ -517,5 +517,3 @@ def test_apply_checks_with_custom_column_naming(ws, spark): ) assert_df_equality(checked, expected, ignore_nullable=True) - - From 1fdfab473599d424e52f1dc9b6a30b0eaea31743 Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Tue, 4 Feb 2025 11:24:11 +0000 Subject: [PATCH 03/35] Update tests/integration/test_apply_checks.py Co-authored-by: Marcin Wojtyczka --- tests/integration/test_apply_checks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index d264700..e80f246 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -504,7 +504,6 @@ def test_apply_checks_with_custom_column_naming(ws, spark): checked = dq_engine.apply_checks_by_metadata(test_df, checks, globals()) assert 'ERROR' in checked.columns - assert 'WARN' in checked.columns expected = spark.createDataFrame( [ From 5c40db9004de70006f114766e9f6a73da0b3b56f Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Tue, 4 Feb 2025 11:24:16 +0000 Subject: [PATCH 04/35] Update tests/integration/test_apply_checks.py Co-authored-by: Marcin Wojtyczka --- tests/integration/test_apply_checks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index e80f246..88ef463 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -503,7 +503,6 @@ def test_apply_checks_with_custom_column_naming(ws, spark): checks = [{"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}] checked = dq_engine.apply_checks_by_metadata(test_df, checks, globals()) - assert 'ERROR' in checked.columns expected = spark.createDataFrame( [ From c63bafeee2a65b8892ac09d0810ad743cfb7c66b Mon Sep 17 00:00:00 2001 From: hrfmartins Date: Tue, 4 Feb 2025 21:22:58 +0100 Subject: [PATCH 05/35] tests: added test_custom_column_name_mappings_split_metadata to test custom column mappings against apply_checks_by_metadata_and_split --- tests/integration/test_apply_checks.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 3ca17fc..02be791 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -517,3 +517,25 @@ def test_apply_checks_with_custom_column_naming(ws, spark): ) assert_df_equality(checked, expected, ignore_nullable=True) + +def test_custom_column_name_mappings_split_metadata(ws, spark): + dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) + test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) + + + checks = [ + {"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}, + {"criticality": "error", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}}} + ] + good, bad = dq_engine.apply_checks_by_metadata_and_split(test_df, checks, globals()) + + assert_df_equality(good, spark.createDataFrame([ + [1, 3, 3], [None, 4, None] + ], SCHEMA), ignore_nullable=True) + + assert_df_equality(bad, spark.createDataFrame([ + [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], + [None, 4, None, None, {"col_a_is_null_or_empty": "new check failed"}], + [None, None, None, {"col_b_is_null_or_empty": "Column b is null or empty"}, + {"col_a_is_null_or_empty": "new check failed"}], + ], EXPECTED_SCHEMA_WITH_CUSTOM_NAMES)) From f931b2ff6434582629f9cec7c048d500fddb3a56 Mon Sep 17 00:00:00 2001 From: hrfmartins Date: Tue, 4 Feb 2025 21:40:59 +0100 Subject: [PATCH 06/35] docs: added documentation on custom column namings --- demos/dqx_demo_library.py | 34 +++++++++++++++++++++++++- docs/dqx/docs/guide.mdx | 50 +++++++++++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 13 deletions(-) diff --git a/demos/dqx_demo_library.py b/demos/dqx_demo_library.py index e773929..7831c2a 100644 --- a/demos/dqx_demo_library.py +++ b/demos/dqx_demo_library.py @@ -344,4 +344,36 @@ def ends_with_foo(col_name: str) -> Column: dq_engine = DQEngine(WorkspaceClient()) valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals()) -display(valid_and_quarantined_df) \ No newline at end of file +display(valid_and_quarantined_df) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Applying custom column names + +# COMMAND ---------- + +from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.engine import ( + DQEngine, + ExtraParams, + DQRule +) + +# using ExtraParams class to configure the engine with custom column names +extra_parameters = ExtraParams(column_names={'errors': 'ERRORS', 'warnings': 'WARNINGS'}) + +ws = WorkspaceClient() +dq_engine = DQEngine(ws, extra_params=extra_parameters) + +schema = "col1: string" +input_df = spark.createDataFrame([["str1"], ["foo"], ["str3"]], schema) + +checks = [ DQRule( + name='col_1_is_null_or_empty', + criticality='error', + check=is_not_null_and_not_empty('col1')), + ] + +valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals()) +display(valid_and_quarantined_df) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index 838b3d3..b01c88c 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -55,7 +55,7 @@ dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_D print(dlt_expectations) ``` -### Using CLI +### Using CLI You can optionally install DQX in the workspace, see the [Installation Guide](/docs/installation#dqx-installation-in-a-databricks-workspace). As part of the installation, a config, dashboards and profiler workflow is installed. The workflow can be run manually in the workspace UI or using the CLI as below. @@ -116,7 +116,7 @@ print(status) ``` Note that checks are validated automatically when applied as part of the -`apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods +`apply_checks_by_metadata_and_split` and `apply_checks_by_metadata` methods (see [Quality rules defined as config](#quality-rules-defined-as-config)). ### Using CLI @@ -178,7 +178,7 @@ checks = dq_engine.load_checks_from_installation(assume_user=True, run_config_na input_df = spark.read.table("catalog1.schema1.table1") -# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes +# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks) # Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`) @@ -198,7 +198,7 @@ checks = dq_engine.load_checks_from_workspace_file(workspace_path="/Shared/App1/ input_df = spark.read.table("catalog1.schema1.table1") -# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes +# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks) # Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`) @@ -220,7 +220,7 @@ dq_engine = DQEngine(WorkspaceClient()) input_df = spark.read.table("catalog1.schema1.table1") -# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes +# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks) # Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`) @@ -241,21 +241,21 @@ from databricks.sdk import WorkspaceClient dq_engine = DQEngine(WorkspaceClient()) checks = DQRuleColSet( # define rule for multiple columns at once - columns=["col1", "col2"], - criticality="error", + columns=["col1", "col2"], + criticality="error", check_func=is_not_null).get_rules() + [ DQRule( # define rule for a single column name='col3_is_null_or_empty', - criticality='error', + criticality='error', check=is_not_null_and_not_empty('col3')), - DQRule( # name auto-generated if not provided - criticality='warn', + DQRule( # name auto-generated if not provided + criticality='warn', check=value_is_in_list('col4', ['1', '2'])) ] input_df = spark.read.table("catalog1.schema1.table1") -# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes +# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes valid_df, quarantined_df = dq_engine.apply_checks_and_split(input_df, checks) # Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`) @@ -300,7 +300,7 @@ checks = yaml.safe_load(""" input_df = spark.read.table("catalog1.schema1.table1") -# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes +# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks) # Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`) @@ -411,3 +411,29 @@ dq_engine = DQEngine(ws) For details on the specific methods available in the engine, visit to the [reference](/docs/reference#dq-engine-methods) section. Information on testing applications that use `DQEngine` can be found [here](/docs/reference#testing-applications-using-dqx). + +## Additional Configuration + +### Custom Error and Warning Columns + +By default, DQX adds `_error` and `_warning` columns to the output dataframe to indicate the quality issues. + +It is possible to customize the names of these columns by sending extra configurations to the engine. + +```python +from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.engine import ( + DQEngine, + ExtraParams, +) + +# using ExtraParams class to configure the engine with custom column names +extra_parameters = ExtraParams(column_names={'errors': 'ERRORS', 'warnings': 'WARNINGS'}) + +ws = WorkspaceClient() +dq_engine = DQEngine(ws, extra_params=extra_parameters) + +... +``` + + From fa0a9a5adebf1eb1e93f44f63a66d66ae3d11df5 Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Wed, 5 Feb 2025 09:12:34 +0000 Subject: [PATCH 07/35] Update docs/dqx/docs/guide.mdx Co-authored-by: Marcin Wojtyczka --- docs/dqx/docs/guide.mdx | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index b01c88c..314e0c5 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -432,7 +432,6 @@ extra_parameters = ExtraParams(column_names={'errors': 'ERRORS', 'warnings': 'WA ws = WorkspaceClient() dq_engine = DQEngine(ws, extra_params=extra_parameters) - ... ``` From 0c3da1edada1ff1618b1a98a83b68947f7b17cc3 Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Wed, 5 Feb 2025 09:12:44 +0000 Subject: [PATCH 08/35] Update tests/integration/test_apply_checks.py Co-authored-by: Marcin Wojtyczka --- tests/integration/test_apply_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index c54a2a5..a31092b 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -516,7 +516,7 @@ def test_apply_checks_with_custom_column_naming(ws, spark): assert_df_equality(checked, expected, ignore_nullable=True) -def test_custom_column_name_mappings_split_metadata(ws, spark): +def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) From 3f1eab96aba4abe4f0171fe48c6a210b0fef10d4 Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Wed, 5 Feb 2025 09:12:50 +0000 Subject: [PATCH 09/35] Update tests/integration/test_apply_checks.py Co-authored-by: Marcin Wojtyczka --- tests/integration/test_apply_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index a31092b..f94d7cb 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -501,7 +501,7 @@ def test_apply_checks_with_custom_column_naming(ws, spark): test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) checks = [{"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}] - checked = dq_engine.apply_checks_by_metadata(test_df, checks, globals()) + checked = dq_engine.apply_checks_by_metadata(test_df, checks) expected = spark.createDataFrame( From 8e04b110f0f2f55c8ad886a1374e50e0046ad31e Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Wed, 5 Feb 2025 09:13:00 +0000 Subject: [PATCH 10/35] Update tests/integration/test_apply_checks.py Co-authored-by: Marcin Wojtyczka --- tests/integration/test_apply_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index f94d7cb..01e9515 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -525,7 +525,7 @@ def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): {"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}, {"criticality": "error", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}}} ] - good, bad = dq_engine.apply_checks_by_metadata_and_split(test_df, checks, globals()) + good, bad = dq_engine.apply_checks_by_metadata_and_split(test_df, checks) assert_df_equality(good, spark.createDataFrame([ [1, 3, 3], [None, 4, None] From a37a5758cef09d3c8ef897a58699003c4e49cfd9 Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Wed, 5 Feb 2025 09:13:27 +0000 Subject: [PATCH 11/35] Update docs/dqx/docs/guide.mdx Co-authored-by: Marcin Wojtyczka --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index 314e0c5..4cd0bc2 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -416,7 +416,7 @@ Information on testing applications that use `DQEngine` can be found [here](/doc ### Custom Error and Warning Columns -By default, DQX adds `_error` and `_warning` columns to the output dataframe to indicate the quality issues. +By default, DQX appends `_error` and `_warning` reporting columns to the output DataFrame to flag quality issues. It is possible to customize the names of these columns by sending extra configurations to the engine. From 8628fe39b71026a181366570292ca1528b76360f Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Wed, 5 Feb 2025 09:13:38 +0000 Subject: [PATCH 12/35] Update docs/dqx/docs/guide.mdx Co-authored-by: Marcin Wojtyczka --- docs/dqx/docs/guide.mdx | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index 4cd0bc2..b8d115e 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -432,7 +432,6 @@ extra_parameters = ExtraParams(column_names={'errors': 'ERRORS', 'warnings': 'WA ws = WorkspaceClient() dq_engine = DQEngine(ws, extra_params=extra_parameters) -... ``` From 2101023d7ad3a48128a54c33f69c1485e6bac5f8 Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Wed, 5 Feb 2025 09:13:50 +0000 Subject: [PATCH 13/35] Update docs/dqx/docs/guide.mdx Co-authored-by: Marcin Wojtyczka --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index b8d115e..f62bfbf 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -418,7 +418,7 @@ Information on testing applications that use `DQEngine` can be found [here](/doc By default, DQX appends `_error` and `_warning` reporting columns to the output DataFrame to flag quality issues. -It is possible to customize the names of these columns by sending extra configurations to the engine. +You can customize the names of these reporting columns by specifying additional configurations in the engine. ```python from databricks.sdk import WorkspaceClient From 297ab7cd718a33b3729f8d6d178eba915555d742 Mon Sep 17 00:00:00 2001 From: hrfmartins <32126871+hrfmartins@users.noreply.github.com> Date: Wed, 5 Feb 2025 09:14:14 +0000 Subject: [PATCH 14/35] Update docs/dqx/docs/guide.mdx Co-authored-by: Marcin Wojtyczka --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index f62bfbf..215fc09 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -427,7 +427,7 @@ from databricks.labs.dqx.engine import ( ExtraParams, ) -# using ExtraParams class to configure the engine with custom column names +# customize reporting column names extra_parameters = ExtraParams(column_names={'errors': 'ERRORS', 'warnings': 'WARNINGS'}) ws = WorkspaceClient() From 98ceca43481660f3192cd261f9eba6ee97e8ecaa Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:03:19 +0100 Subject: [PATCH 15/35] Update demos/dqx_demo_library.py --- demos/dqx_demo_library.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/demos/dqx_demo_library.py b/demos/dqx_demo_library.py index 4e254ad..d0d17f2 100644 --- a/demos/dqx_demo_library.py +++ b/demos/dqx_demo_library.py @@ -372,6 +372,8 @@ def ends_with_foo(col_name: str) -> Column: DQRule ) +from databricks.labs.dqx.col_functions import is_not_null_and_not_empty + # using ExtraParams class to configure the engine with custom column names extra_parameters = ExtraParams(column_names={'errors': 'ERRORS', 'warnings': 'WARNINGS'}) From d78bbfd52be77fbc4e9dcd7bbeb68a85eecf906e Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:03:37 +0100 Subject: [PATCH 16/35] Update demos/dqx_demo_library.py --- demos/dqx_demo_library.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demos/dqx_demo_library.py b/demos/dqx_demo_library.py index d0d17f2..4bab10b 100644 --- a/demos/dqx_demo_library.py +++ b/demos/dqx_demo_library.py @@ -389,5 +389,5 @@ def ends_with_foo(col_name: str) -> Column: check=is_not_null_and_not_empty('col1')), ] -valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals()) +valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks) display(valid_and_quarantined_df) From fbcb0f97c4e79eed3222629d698f488cbbe61eb0 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:03:47 +0100 Subject: [PATCH 17/35] Update docs/dqx/docs/guide.mdx --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index 26749f8..e030979 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -426,7 +426,7 @@ Information on testing applications that use `DQEngine` can be found [here](/doc ## Additional Configuration -### Custom Error and Warning Columns +### Customizing Reporting Error and Warning Columns By default, DQX appends `_error` and `_warning` reporting columns to the output DataFrame to flag quality issues. From 9be3f7d422781d99ca3d246f47a03accc07fbe50 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:11:44 +0100 Subject: [PATCH 18/35] Update tests/integration/test_apply_checks.py --- tests/integration/test_apply_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index b9b72dc..b941d8e 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -604,5 +604,5 @@ def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], [None, 4, None, None, {"col_a_is_null_or_empty": "new check failed"}], [None, None, None, {"col_b_is_null_or_empty": "Column b is null or empty"}, - {"col_a_is_null_or_empty": "new check failed"}], + {"col_a_is_null_or_empty": "Column a is null or empty"}], ], EXPECTED_SCHEMA_WITH_CUSTOM_NAMES)) From a60490037989cdd6f55994143905d02f0f6b8919 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:11:50 +0100 Subject: [PATCH 19/35] Update tests/integration/test_apply_checks.py --- tests/integration/test_apply_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index b941d8e..c533dc5 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -602,7 +602,7 @@ def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): assert_df_equality(bad, spark.createDataFrame([ [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], - [None, 4, None, None, {"col_a_is_null_or_empty": "new check failed"}], + [None, 4, None, None, {"col_a_is_null_or_empty": "Column a is null or empty"}], [None, None, None, {"col_b_is_null_or_empty": "Column b is null or empty"}, {"col_a_is_null_or_empty": "Column a is null or empty"}], ], EXPECTED_SCHEMA_WITH_CUSTOM_NAMES)) From a96ea965324d1fad66af9ca49340318972519399 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:11:56 +0100 Subject: [PATCH 20/35] Update tests/integration/test_apply_checks.py --- tests/integration/test_apply_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index c533dc5..cd80840 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -591,7 +591,7 @@ def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): checks = [ - {"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}, + {"criticality": "warn", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "a"}}}, {"criticality": "error", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}}} ] good, bad = dq_engine.apply_checks_by_metadata_and_split(test_df, checks) From 938b3b4f9cf0aed079fae49d600ce915cf2f7407 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:23:06 +0100 Subject: [PATCH 21/35] Update test_apply_checks.py --- tests/integration/test_apply_checks.py | 151 +------------------------ 1 file changed, 6 insertions(+), 145 deletions(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index cd80840..2254b69 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5,11 +5,10 @@ from chispa.dataframe_comparer import assert_df_equality # type: ignore from databricks.labs.dqx.col_functions import is_not_null_and_not_empty, make_condition from databricks.labs.dqx.engine import ( + DQRule, DQEngine, ExtraParams, ) -from databricks.labs.dqx.rule import DQRule, DQRuleColSet - SCHEMA = "a: int, b: int, c: int" EXPECTED_SCHEMA = SCHEMA + ", _errors: map, _warnings: map" @@ -50,6 +49,7 @@ def test_apply_checks_passed(ws, spark): checked = dq_engine.apply_checks(test_df, checks) expected = spark.createDataFrame([[1, 3, 3, None, None]], EXPECTED_SCHEMA) + assert_df_equality(checked, expected, ignore_nullable=True) @@ -383,74 +383,6 @@ def test_apply_checks_by_metadata(ws, spark): assert_df_equality(checked, expected, ignore_nullable=True) -def test_apply_checks_with_filter(ws, spark): - dq_engine = DQEngine(ws) - test_df = spark.createDataFrame( - [[1, 3, 3], [2, None, 4], [3, 4, None], [4, None, None], [None, None, None]], SCHEMA - ) - - checks = DQRuleColSet( - check_func=is_not_null_and_not_empty, criticality="warn", filter="b>3", columns=["a", "c"] - ).get_rules() + [ - DQRule( - name="col_b_is_null_or_empty", - criticality="error", - check=is_not_null_and_not_empty("b"), - filter="a<3", - ) - ] - - checked = dq_engine.apply_checks(test_df, checks) - - expected = spark.createDataFrame( - [ - [1, 3, 3, None, None], - [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], - [3, 4, None, None, {"col_c_is_null_or_empty": "Column c is null or empty"}], - [4, None, None, None, None], - [None, None, None, None, None], - ], - EXPECTED_SCHEMA, - ) - - assert_df_equality(checked, expected, ignore_nullable=True) - - -def test_apply_checks_by_metadata_with_filter(ws, spark): - dq_engine = DQEngine(ws) - test_df = spark.createDataFrame( - [[1, 3, 3], [2, None, 4], [3, 4, None], [4, None, None], [None, None, None]], SCHEMA - ) - - checks = [ - { - "criticality": "warn", - "filter": "b>3", - "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_names": ["b", "c"]}}, - }, - { - "criticality": "error", - "filter": "a<3", - "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}}, - }, - ] - - checked = dq_engine.apply_checks_by_metadata(test_df, checks, globals()) - - expected = spark.createDataFrame( - [ - [1, 3, 3, None, None], - [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], - [3, 4, None, None, {"col_c_is_null_or_empty": "Column c is null or empty"}], - [4, None, None, None, None], - [None, None, None, None, None], - ], - EXPECTED_SCHEMA, - ) - - assert_df_equality(checked, expected, ignore_nullable=True) - - def test_apply_checks_from_json_file_by_metadata(ws, spark): dq_engine = DQEngine(ws) schema = "col1: int, col2: int, col3: int, col4 int" @@ -510,68 +442,19 @@ def test_apply_checks_by_metadata_with_func_defined_outside_framework(ws, spark) def col_test_check_func(col_name: str) -> Column: check_col = F.col(col_name) - check_col = check_col.try_cast("string") condition = check_col.isNull() | (check_col == "") | (check_col == "null") return make_condition(condition, "new check failed", f"{col_name}_is_null_or_empty") -def test_get_valid_records(ws, spark): - dq_engine = DQEngine(ws) - - test_df = spark.createDataFrame( - [ - [1, 1, 1, None, None], - [None, 2, 2, None, {"col_a_is_null_or_empty": "check failed"}], - [None, 2, 2, {"col_b_is_null_or_empty": "check failed"}, None], - ], - EXPECTED_SCHEMA, - ) - - valid_df = dq_engine.get_valid(test_df) - - expected_valid_df = spark.createDataFrame( - [ - [1, 1, 1], - [None, 2, 2], - ], - SCHEMA, - ) - - assert_df_equality(valid_df, expected_valid_df) - - -def test_get_invalid_records(ws, spark): - dq_engine = DQEngine(ws) - - test_df = spark.createDataFrame( - [ - [1, 1, 1, None, None], - [None, 2, 2, None, {"col_a_is_null_or_empty": "check failed"}], - [None, 2, 2, {"col_b_is_null_or_empty": "check failed"}, None], - ], - EXPECTED_SCHEMA, - ) - - invalid_df = dq_engine.get_invalid(test_df) - - expected_invalid_df = spark.createDataFrame( - [ - [None, 2, 2, None, {"col_a_is_null_or_empty": "check failed"}], - [None, 2, 2, {"col_b_is_null_or_empty": "check failed"}, None], - ], - EXPECTED_SCHEMA, - ) - - assert_df_equality(invalid_df, expected_invalid_df) - - def test_apply_checks_with_custom_column_naming(ws, spark): - dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) + dq_engine = DQEngine(ws, ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) checks = [{"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}] - checked = dq_engine.apply_checks_by_metadata(test_df, checks) + checked = dq_engine.apply_checks_by_metadata(test_df, checks, globals()) + assert 'ERROR' in checked.columns + assert 'WARN' in checked.columns expected = spark.createDataFrame( [ @@ -584,25 +467,3 @@ def test_apply_checks_with_custom_column_naming(ws, spark): ) assert_df_equality(checked, expected, ignore_nullable=True) - -def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): - dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) - test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) - - - checks = [ - {"criticality": "warn", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "a"}}}, - {"criticality": "error", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}}} - ] - good, bad = dq_engine.apply_checks_by_metadata_and_split(test_df, checks) - - assert_df_equality(good, spark.createDataFrame([ - [1, 3, 3], [None, 4, None] - ], SCHEMA), ignore_nullable=True) - - assert_df_equality(bad, spark.createDataFrame([ - [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], - [None, 4, None, None, {"col_a_is_null_or_empty": "Column a is null or empty"}], - [None, None, None, {"col_b_is_null_or_empty": "Column b is null or empty"}, - {"col_a_is_null_or_empty": "Column a is null or empty"}], - ], EXPECTED_SCHEMA_WITH_CUSTOM_NAMES)) From f7782098de719cc207dba433c8d603cb16af8a5a Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:25:04 +0100 Subject: [PATCH 22/35] Update test_apply_checks.py --- tests/integration/test_apply_checks.py | 161 +++++++++++++++++++++++-- 1 file changed, 154 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 2254b69..5f8c160 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5,10 +5,11 @@ from chispa.dataframe_comparer import assert_df_equality # type: ignore from databricks.labs.dqx.col_functions import is_not_null_and_not_empty, make_condition from databricks.labs.dqx.engine import ( - DQRule, DQEngine, ExtraParams, ) +from databricks.labs.dqx.rule import DQRule, DQRuleColSet + SCHEMA = "a: int, b: int, c: int" EXPECTED_SCHEMA = SCHEMA + ", _errors: map, _warnings: map" @@ -49,7 +50,6 @@ def test_apply_checks_passed(ws, spark): checked = dq_engine.apply_checks(test_df, checks) expected = spark.createDataFrame([[1, 3, 3, None, None]], EXPECTED_SCHEMA) - assert_df_equality(checked, expected, ignore_nullable=True) @@ -383,6 +383,74 @@ def test_apply_checks_by_metadata(ws, spark): assert_df_equality(checked, expected, ignore_nullable=True) +def test_apply_checks_with_filter(ws, spark): + dq_engine = DQEngine(ws) + test_df = spark.createDataFrame( + [[1, 3, 3], [2, None, 4], [3, 4, None], [4, None, None], [None, None, None]], SCHEMA + ) + + checks = DQRuleColSet( + check_func=is_not_null_and_not_empty, criticality="warn", filter="b>3", columns=["a", "c"] + ).get_rules() + [ + DQRule( + name="col_b_is_null_or_empty", + criticality="error", + check=is_not_null_and_not_empty("b"), + filter="a<3", + ) + ] + + checked = dq_engine.apply_checks(test_df, checks) + + expected = spark.createDataFrame( + [ + [1, 3, 3, None, None], + [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], + [3, 4, None, None, {"col_c_is_null_or_empty": "Column c is null or empty"}], + [4, None, None, None, None], + [None, None, None, None, None], + ], + EXPECTED_SCHEMA, + ) + + assert_df_equality(checked, expected, ignore_nullable=True) + + +def test_apply_checks_by_metadata_with_filter(ws, spark): + dq_engine = DQEngine(ws) + test_df = spark.createDataFrame( + [[1, 3, 3], [2, None, 4], [3, 4, None], [4, None, None], [None, None, None]], SCHEMA + ) + + checks = [ + { + "criticality": "warn", + "filter": "b>3", + "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_names": ["b", "c"]}}, + }, + { + "criticality": "error", + "filter": "a<3", + "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}}, + }, + ] + + checked = dq_engine.apply_checks_by_metadata(test_df, checks, globals()) + + expected = spark.createDataFrame( + [ + [1, 3, 3, None, None], + [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], + [3, 4, None, None, {"col_c_is_null_or_empty": "Column c is null or empty"}], + [4, None, None, None, None], + [None, None, None, None, None], + ], + EXPECTED_SCHEMA, + ) + + assert_df_equality(checked, expected, ignore_nullable=True) + + def test_apply_checks_from_json_file_by_metadata(ws, spark): dq_engine = DQEngine(ws) schema = "col1: int, col2: int, col3: int, col4 int" @@ -442,19 +510,67 @@ def test_apply_checks_by_metadata_with_func_defined_outside_framework(ws, spark) def col_test_check_func(col_name: str) -> Column: check_col = F.col(col_name) + check_col = check_col.try_cast("string") condition = check_col.isNull() | (check_col == "") | (check_col == "null") return make_condition(condition, "new check failed", f"{col_name}_is_null_or_empty") +def test_get_valid_records(ws, spark): + dq_engine = DQEngine(ws) + + test_df = spark.createDataFrame( + [ + [1, 1, 1, None, None], + [None, 2, 2, None, {"col_a_is_null_or_empty": "check failed"}], + [None, 2, 2, {"col_b_is_null_or_empty": "check failed"}, None], + ], + EXPECTED_SCHEMA, + ) + + valid_df = dq_engine.get_valid(test_df) + + expected_valid_df = spark.createDataFrame( + [ + [1, 1, 1], + [None, 2, 2], + ], + SCHEMA, + ) + + assert_df_equality(valid_df, expected_valid_df) + + +def test_get_invalid_records(ws, spark): + dq_engine = DQEngine(ws) + + test_df = spark.createDataFrame( + [ + [1, 1, 1, None, None], + [None, 2, 2, None, {"col_a_is_null_or_empty": "check failed"}], + [None, 2, 2, {"col_b_is_null_or_empty": "check failed"}, None], + ], + EXPECTED_SCHEMA, + ) + + invalid_df = dq_engine.get_invalid(test_df) + + expected_invalid_df = spark.createDataFrame( + [ + [None, 2, 2, None, {"col_a_is_null_or_empty": "check failed"}], + [None, 2, 2, {"col_b_is_null_or_empty": "check failed"}, None], + ], + EXPECTED_SCHEMA, + ) + + assert_df_equality(invalid_df, expected_invalid_df) + + def test_apply_checks_with_custom_column_naming(ws, spark): - dq_engine = DQEngine(ws, ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) + dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) checks = [{"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}] - checked = dq_engine.apply_checks_by_metadata(test_df, checks, globals()) - - assert 'ERROR' in checked.columns - assert 'WARN' in checked.columns + checked = dq_engine.apply_checks_by_metadata(test_df, checks) expected = spark.createDataFrame( [ @@ -467,3 +583,34 @@ def test_apply_checks_with_custom_column_naming(ws, spark): ) assert_df_equality(checked, expected, ignore_nullable=True) + + +def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): + dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) + test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) + + checks = [ + {"criticality": "warn", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "a"}}}, + {"criticality": "error", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}}}, + ] + good, bad = dq_engine.apply_checks_by_metadata_and_split(test_df, checks) + + assert_df_equality(good, spark.createDataFrame([[1, 3, 3], [None, 4, None]], SCHEMA), ignore_nullable=True) + + assert_df_equality( + bad, + spark.createDataFrame( + [ + [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], + [None, 4, None, None, {"col_a_is_null_or_empty": "Column a is null or empty"}], + [ + None, + None, + None, + {"col_b_is_null_or_empty": "Column b is null or empty"}, + {"col_a_is_null_or_empty": "Column a is null or empty"}, + ], + ], + EXPECTED_SCHEMA_WITH_CUSTOM_NAMES, + ), + ) From 0561034ce1a42e07f6709580ca32b7583b348c34 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:25:32 +0100 Subject: [PATCH 23/35] Update engine.py --- src/databricks/labs/dqx/engine.py | 602 +++++++++++++++++------------- 1 file changed, 346 insertions(+), 256 deletions(-) diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py index ce2a665..d08a0bd 100644 --- a/src/databricks/labs/dqx/engine.py +++ b/src/databricks/labs/dqx/engine.py @@ -5,199 +5,177 @@ import itertools from pathlib import Path from collections.abc import Callable +from dataclasses import dataclass, field +from enum import Enum from typing import Any import yaml + import pyspark.sql.functions as F -from pyspark.sql import DataFrame -from databricks.labs.dqx.rule import ( - DQRule, - Criticality, - DQRuleColSet, - ChecksValidationStatus, - ColumnArguments, - ExtraParams, - DefaultColumnNames, -) -from databricks.labs.dqx.utils import deserialize_dicts +from pyspark.sql import Column, DataFrame from databricks.labs.dqx import col_functions from databricks.labs.blueprint.installation import Installation -from databricks.labs.dqx.base import DQEngineBase, DQEngineCoreBase -from databricks.labs.dqx.config import WorkspaceConfig, RunConfig -from databricks.sdk.errors import NotFound -from databricks.sdk.service.workspace import ImportFormat +from databricks.labs.dqx.base import DQEngineBase +from databricks.labs.dqx.config import WorkspaceConfig +from databricks.labs.dqx.utils import get_column_name from databricks.sdk import WorkspaceClient - +from databricks.sdk.errors import NotFound logger = logging.getLogger(__name__) -class DQEngineCore(DQEngineCoreBase): - """Data Quality Engine Core class to apply data quality checks to a given dataframe. - Args: - workspace_client (WorkspaceClient): WorkspaceClient instance to use for accessing the workspace. - extra_params (ExtraParams): Extra parameters for the DQEngine. - """ +class DefaultColumnNames(Enum): + """Enum class to represent columns in the dataframe that will be used for error and warning reporting.""" - def __init__(self, workspace_client: WorkspaceClient, extra_params: ExtraParams | None = None): - super().__init__(workspace_client) + ERRORS = "_errors" + WARNINGS = "_warnings" - extra_params = extra_params or ExtraParams() - self._column_names = { - ColumnArguments.ERRORS: extra_params.column_names.get( - ColumnArguments.ERRORS.value, DefaultColumnNames.ERRORS.value - ), - ColumnArguments.WARNINGS: extra_params.column_names.get( - ColumnArguments.WARNINGS.value, DefaultColumnNames.WARNINGS.value - ), - } +class ColumnArguments(Enum): + """Enum class that is used as input parsing for custom column naming.""" - def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: - if not checks: - return self._append_empty_checks(df) + ERRORS = "errors" + WARNINGS = "warnings" - warning_checks = self._get_check_columns(checks, Criticality.WARN.value) - error_checks = self._get_check_columns(checks, Criticality.ERROR.value) - ndf = self._create_results_map(df, error_checks, self._column_names[ColumnArguments.ERRORS]) - ndf = self._create_results_map(ndf, warning_checks, self._column_names[ColumnArguments.WARNINGS]) - return ndf +class Criticality(Enum): + """Enum class to represent criticality of the check.""" - def apply_checks_and_split(self, df: DataFrame, checks: list[DQRule]) -> tuple[DataFrame, DataFrame]: - if not checks: - return df, self._append_empty_checks(df).limit(0) + WARN = "warn" + ERROR = "error" - checked_df = self.apply_checks(df, checks) - good_df = self.get_valid(checked_df) - bad_df = self.get_invalid(checked_df) +@dataclass(frozen=True) +class ChecksValidationStatus: + """Class to represent the validation status.""" - return good_df, bad_df + _errors: list[str] = field(default_factory=list) - def apply_checks_by_metadata_and_split( - self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None - ) -> tuple[DataFrame, DataFrame]: - dq_rule_checks = self.build_checks_by_metadata(checks, glbs) + def add_error(self, error: str): + """Add an error to the validation status.""" + self._errors.append(error) - good_df, bad_df = self.apply_checks_and_split(df, dq_rule_checks) + def add_errors(self, errors: list[str]): + """Add an error to the validation status.""" + self._errors.extend(errors) - return good_df, bad_df + @property + def has_errors(self) -> bool: + """Check if there are any errors in the validation status.""" + return bool(self._errors) - def apply_checks_by_metadata( - self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None - ) -> DataFrame: - dq_rule_checks = self.build_checks_by_metadata(checks, glbs) + @property + def errors(self) -> list[str]: + """Get the list of errors in the validation status.""" + return self._errors - return self.apply_checks(df, dq_rule_checks) + def to_string(self) -> str: + """Convert the validation status to a string.""" + if self.has_errors: + return "\n".join(self._errors) + return "No errors found" - @staticmethod - def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: - status = ChecksValidationStatus() + def __str__(self) -> str: + """String representation of the ValidationStatus class.""" + return self.to_string() - for check in checks: - logger.debug(f"Processing check definition: {check}") - if isinstance(check, dict): - status.add_errors(DQEngineCore._validate_checks_dict(check, glbs)) - else: - status.add_error(f"Unsupported check type: {type(check)}") - return status +@dataclass(frozen=True) +class DQRule: + """Class to represent a data quality rule consisting of following fields: + * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - + it will be used as an error/warning message, or `null` if it's evaluated to `false` + * `name` - optional name that will be given to a resulting column. Autogenerated if not provided + * `criticality` (optional) - possible values are `error` (critical problems), and `warn` (potential problems) + """ - def get_invalid(self, df: DataFrame) -> DataFrame: - return df.where( - F.col(self._column_names[ColumnArguments.ERRORS]).isNotNull() - | F.col(self._column_names[ColumnArguments.WARNINGS]).isNotNull() - ) + check: Column + name: str = "" + criticality: str = Criticality.ERROR.value - def get_valid(self, df: DataFrame) -> DataFrame: - return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNull()).drop( - self._column_names[ColumnArguments.ERRORS], self._column_names[ColumnArguments.WARNINGS] - ) + def __post_init__(self): + # take the name from the alias of the column expression if not provided + object.__setattr__(self, "name", self.name if self.name else "col_" + get_column_name(self.check)) - @staticmethod - def load_checks_from_local_file(path: str) -> list[dict]: - if not path: - raise ValueError("filename must be provided") + @ft.cached_property + def rule_criticality(self) -> str: + """Returns criticality of the check. - try: - checks = Installation.load_local(list[dict[str, str]], Path(path)) - return deserialize_dicts(checks) - except FileNotFoundError: - msg = f"Checks file {path} missing" - raise FileNotFoundError(msg) from None + :return: string describing criticality - `warn` or `error`. + :raises ValueError: if criticality is invalid. + """ + criticality = self.criticality + if criticality not in {Criticality.WARN.value, Criticality.ERROR.value}: + raise ValueError(f"Invalid criticality value: {criticality}") - @staticmethod - def save_checks_in_local_file(checks: list[dict], path: str): - if not path: - raise ValueError("filename must be provided") + return criticality - try: - with open(path, 'w', encoding="utf-8") as file: - yaml.safe_dump(checks, file) - except FileNotFoundError: - msg = f"Checks file {path} missing" - raise FileNotFoundError(msg) from None + def check_column(self) -> Column: + """Creates a Column object from the given check. - @staticmethod - def build_checks_by_metadata(checks: list[dict], glbs: dict[str, Any] | None = None) -> list[DQRule]: - """Build checks based on check specification, i.e. function name plus arguments. + :return: Column object + """ + return F.when(self.check.isNull(), F.lit(None).cast("string")).otherwise(self.check) + + +@dataclass(frozen=True) +class DQRuleColSet: + """Class to represent a data quality col rule set which defines quality check function for a set of columns. + The class consists of the following fields: + * `columns` - list of column names to which the given check function should be applied + * `criticality` - criticality level ('warn' or 'error') + * `check_func` - check function to be applied + * `check_func_args` - non-keyword / positional arguments for the check function after the col_name + * `check_func_kwargs` - keyword /named arguments for the check function after the col_name + """ - :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: - * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - - it will be used as an error/warning message, or `null` if it's evaluated to `false` - * `name` - name that will be given to a resulting column. Autogenerated if not provided - * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), - and `warn` (data is going into both dataframes) - :param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module). - If not specified, then only built-in functions are used for the checks. - :return: list of data quality check rules + columns: list[str] + check_func: Callable + criticality: str = Criticality.ERROR.value + check_func_args: list[Any] = field(default_factory=list) + check_func_kwargs: dict[str, Any] = field(default_factory=dict) + + def get_rules(self) -> list[DQRule]: + """Build a list of rules for a set of columns. + + :return: list of dq rules """ - status = DQEngineCore.validate_checks(checks, glbs) - if status.has_errors: - raise ValueError(str(status)) + rules = [] + for col_name in self.columns: + rule = DQRule( + criticality=self.criticality, + check=self.check_func(col_name, *self.check_func_args, **self.check_func_kwargs), + ) + rules.append(rule) + return rules - dq_rule_checks = [] - for check_def in checks: - logger.debug(f"Processing check definition: {check_def}") - check = check_def.get("check", {}) - func_name = check.get("function", None) - func = DQEngineCore._resolve_function(func_name, glbs, fail_on_missing=True) - assert func # should already be validated - func_args = check.get("arguments", {}) - criticality = check_def.get("criticality", "error") - filter_expr = check_def.get("filter") - if "col_names" in func_args: - logger.debug(f"Adding DQRuleColSet with columns: {func_args['col_names']}") - dq_rule_checks += DQRuleColSet( - columns=func_args["col_names"], - check_func=func, - criticality=criticality, - filter=filter_expr, - # provide arguments without "col_names" - check_func_kwargs={k: func_args[k] for k in func_args.keys() - {"col_names"}}, - ).get_rules() - else: - name = check_def.get("name", None) - check_func = func(**func_args) - dq_rule_checks.append(DQRule(check=check_func, name=name, criticality=criticality, filter=filter_expr)) +@dataclass(frozen=True) +class ExtraParams: + """Class to represent extra parameters for DQEngine.""" - logger.debug("Exiting build_checks_by_metadata function with dq_rule_checks") - return dq_rule_checks + column_names: dict[str, str] | None = field(default_factory=dict) - @staticmethod - def build_checks(*rules_col_set: DQRuleColSet) -> list[DQRule]: - """ - Build rules from dq rules and rule sets. - :param rules_col_set: list of dq rules which define multiple columns for the same check function - :return: list of dq rules - """ - rules_nested = [rule_set.get_rules() for rule_set in rules_col_set] - flat_rules = list(itertools.chain(*rules_nested)) +class DQEngine(DQEngineBase): + """Data Quality Engine class to apply data quality checks to a given dataframe. - return list(filter(None, flat_rules)) + Args: + workspace_client (WorkspaceClient): WorkspaceClient instance to use for accessing the workspace. + extra_params (ExtraParams): Extra parameters for the DQEngine. + """ + + def __init__(self, workspace_client: WorkspaceClient, extra_params: ExtraParams | None = None): + super().__init__(workspace_client) + + extra_params = extra_params or ExtraParams() + column_names = extra_params.column_names or {} + self._column_names = { + ColumnArguments.ERRORS: column_names.get(ColumnArguments.ERRORS.value, DefaultColumnNames.ERRORS.value), + ColumnArguments.WARNINGS: column_names.get( + ColumnArguments.WARNINGS.value, DefaultColumnNames.WARNINGS.value + ), + } @staticmethod def _get_check_columns(checks: list[DQRule], criticality: str) -> list[DQRule]: @@ -244,6 +222,88 @@ def _create_results_map(df: DataFrame, checks: list[DQRule], dest_col: str) -> D m_col = F.map_filter(m_col, lambda _, v: v.isNotNull()) return df.withColumn(dest_col, F.when(F.size(m_col) > 0, m_col).otherwise(empty_type)) + def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: + """Applies data quality checks to a given dataframe. + + :param df: dataframe to check + :param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class. + :return: dataframe with errors and warning reporting columns + """ + if not checks: + return self._append_empty_checks(df) + + warning_checks = self._get_check_columns(checks, Criticality.WARN.value) + error_checks = self._get_check_columns(checks, Criticality.ERROR.value) + ndf = self._create_results_map(df, error_checks, self._column_names[ColumnArguments.ERRORS]) + ndf = self._create_results_map(ndf, warning_checks, self._column_names[ColumnArguments.WARNINGS]) + + return ndf + + def apply_checks_and_split(self, df: DataFrame, checks: list[DQRule]) -> tuple[DataFrame, DataFrame]: + """Applies data quality checks to a given dataframe and split it into two ("good" and "bad"), + according to the data quality checks. + + :param df: dataframe to check + :param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class. + :return: two dataframes - "good" which includes warning rows but no reporting columns, and "data" having + error and warning rows and corresponding reporting columns + """ + if not checks: + return df, self._append_empty_checks(df).limit(0) + + checked_df = self.apply_checks(df, checks) + + good_df = self.get_valid(checked_df) + bad_df = self.get_invalid(checked_df) + + return good_df, bad_df + + def get_invalid(self, df: DataFrame) -> DataFrame: + """ + Get records that violate data quality checks (records with warnings and errors). + @param df: input DataFrame. + @return: dataframe with error and warning rows and corresponding reporting columns. + """ + return df.where( + F.col(self._column_names[ColumnArguments.ERRORS]).isNotNull() + | F.col(self._column_names[ColumnArguments.WARNINGS]).isNotNull() + ) + + def get_valid(self, df: DataFrame) -> DataFrame: + """ + Get records that don't violate data quality checks (records with warnings but no errors). + @param df: input DataFrame. + @return: dataframe with warning rows but no reporting columns. + """ + return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNull()).drop( + self._column_names[ColumnArguments.ERRORS], self._column_names[ColumnArguments.WARNINGS] + ) + + @staticmethod + def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: + """ + Validate the input dict to ensure they conform to expected structure and types. + + Each check can be a dictionary. The function validates + the presence of required keys, the existence and callability of functions, and the types + of arguments passed to these functions. + + :param checks: List of checks to apply to the dataframe. Each check should be a dictionary. + :param glbs: Optional dictionary of global functions that can be used in checks. + + :return ValidationStatus: The validation status. + """ + status = ChecksValidationStatus() + + for check in checks: + logger.debug(f"Processing check definition: {check}") + if isinstance(check, dict): + status.add_errors(DQEngine._validate_checks_dict(check, glbs)) + else: + status.add_error(f"Unsupported check type: {type(check)}") + + return status + @staticmethod def _validate_checks_dict(check: dict, glbs: dict[str, Any] | None) -> list[str]: """ @@ -266,7 +326,7 @@ def _validate_checks_dict(check: dict, glbs: dict[str, Any] | None) -> list[str] elif not isinstance(check["check"], dict): errors.append(f"'check' field should be a dictionary: {check}") else: - errors.extend(DQEngineCore._validate_check_block(check, glbs)) + errors.extend(DQEngine._validate_check_block(check, glbs)) return errors @@ -288,12 +348,12 @@ def _validate_check_block(check: dict, glbs: dict[str, Any] | None) -> list[str] return [f"'function' field is missing in the 'check' block: {check}"] func_name = check_block["function"] - func = DQEngineCore._resolve_function(func_name, glbs, fail_on_missing=False) + func = DQEngine.resolve_function(func_name, glbs, fail_on_missing=False) if not callable(func): return [f"function '{func_name}' is not defined: {check}"] arguments = check_block.get("arguments", {}) - return DQEngineCore._validate_check_function_arguments(arguments, func, check) + return DQEngine._validate_check_function_arguments(arguments, func, check) @staticmethod def _validate_check_function_arguments(arguments: dict, func: Callable, check: dict) -> list[str]: @@ -322,9 +382,9 @@ def _validate_check_function_arguments(arguments: dict, func: Callable, check: d 'col_name' if k == 'col_names' else k: arguments['col_names'][0] if k == 'col_names' else v for k, v in arguments.items() } - return DQEngineCore._validate_func_args(arguments, func, check) + return DQEngine._validate_func_args(arguments, func, check) - return DQEngineCore._validate_func_args(arguments, func, check) + return DQEngine._validate_func_args(arguments, func, check) @staticmethod def _validate_func_args(arguments: dict, func: Callable, check: dict) -> list[str]: @@ -366,7 +426,52 @@ def cached_signature(check_func): return errors @staticmethod - def _resolve_function(func_name: str, glbs: dict[str, Any] | None = None, fail_on_missing=True) -> Callable | None: + def build_checks_by_metadata(checks: list[dict], glbs: dict[str, Any] | None = None) -> list[DQRule]: + """Build checks based on check specification, i.e. function name plus arguments. + + :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: + * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - + it will be used as an error/warning message, or `null` if it's evaluated to `false` + * `name` - name that will be given to a resulting column. Autogenerated if not provided + * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), + and `warn` (data is going into both dataframes) + :param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module). + If not specified, then only built-in functions are used for the checks. + :return: list of data quality check rules + """ + status = DQEngine.validate_checks(checks, glbs) + if status.has_errors: + raise ValueError(str(status)) + + dq_rule_checks = [] + for check_def in checks: + logger.debug(f"Processing check definition: {check_def}") + check = check_def.get("check", {}) + func_name = check.get("function", None) + func = DQEngine.resolve_function(func_name, glbs, fail_on_missing=True) + assert func # should already be validated + func_args = check.get("arguments", {}) + criticality = check_def.get("criticality", "error") + + if "col_names" in func_args: + logger.debug(f"Adding DQRuleColSet with columns: {func_args['col_names']}") + dq_rule_checks += DQRuleColSet( + columns=func_args["col_names"], + check_func=func, + criticality=criticality, + # provide arguments without "col_names" + check_func_kwargs={k: func_args[k] for k in func_args.keys() - {"col_names"}}, + ).get_rules() + else: + name = check_def.get("name", None) + check_func = func(**func_args) + dq_rule_checks.append(DQRule(check=check_func, name=name, criticality=criticality)) + + logger.debug("Exiting build_checks_by_metadata function with dq_rule_checks") + return dq_rule_checks + + @staticmethod + def resolve_function(func_name: str, glbs: dict[str, Any] | None = None, fail_on_missing=True) -> Callable | None: logger.debug(f"Resolving function: {func_name}") if glbs: func = glbs.get(func_name) @@ -377,48 +482,85 @@ def _resolve_function(func_name: str, glbs: dict[str, Any] | None = None, fail_o logger.debug(f"Function {func_name} resolved successfully") return func - -class DQEngine(DQEngineBase): - """Data Quality Engine class to apply data quality checks to a given dataframe.""" - - def __init__( - self, - workspace_client: WorkspaceClient, - engine: DQEngineCoreBase | None = None, - extra_params: ExtraParams | None = None, - ): - super().__init__(workspace_client) - self._engine = engine or DQEngineCore(workspace_client, extra_params) - - def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: - return self._engine.apply_checks(df, checks) - - def apply_checks_and_split(self, df: DataFrame, checks: list[DQRule]) -> tuple[DataFrame, DataFrame]: - return self._engine.apply_checks_and_split(df, checks) - def apply_checks_by_metadata_and_split( self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None ) -> tuple[DataFrame, DataFrame]: - return self._engine.apply_checks_by_metadata_and_split(df, checks, glbs) + """Wrapper around `apply_checks_and_split` for use in the metadata-driven pipelines. The main difference + is how the checks are specified - instead of using functions directly, they are described as function name plus + arguments. + + :param df: dataframe to check + :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: + * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - + it will be used as an error/warning message, or `null` if it's evaluated to `false` + * `name` - name that will be given to a resulting column. Autogenerated if not provided + * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), + and `warn` (data is going into both dataframes) + :param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module). + If not specified, then only built-in functions are used for the checks. + :return: two dataframes - "good" which includes warning rows but no reporting columns, and "bad" having + error and warning rows and corresponding reporting columns + """ + dq_rule_checks = self.build_checks_by_metadata(checks, glbs) + + good_df, bad_df = self.apply_checks_and_split(df, dq_rule_checks) + + return good_df, bad_df def apply_checks_by_metadata( self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None ) -> DataFrame: - return self._engine.apply_checks_by_metadata(df, checks, glbs) + """Wrapper around `apply_checks` for use in the metadata-driven pipelines. The main difference + is how the checks are specified - instead of using functions directly, they are described as function name plus + arguments. + + :param df: dataframe to check + :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: + * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - + it will be used as an error/warning message, or `null` if it's evaluated to `false` + * `name` - name that will be given to a resulting column. Autogenerated if not provided + * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), + and `warn` (data is going into both dataframes) + :param glbs: dictionary with functions mapping (eg. ``globals()`` of calling module). + If not specified, then only built-in functions are used for the checks. + :return: dataframe with errors and warning reporting columns + """ + dq_rule_checks = self.build_checks_by_metadata(checks, glbs) + + return self.apply_checks(df, dq_rule_checks) @staticmethod - def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: - return DQEngineCore.validate_checks(checks, glbs) + def build_checks(*rules_col_set: DQRuleColSet) -> list[DQRule]: + """ + Build rules from dq rules and rule sets. - def get_invalid(self, df: DataFrame) -> DataFrame: - return self._engine.get_invalid(df) + :param rules_col_set: list of dq rules which define multiple columns for the same check function + :return: list of dq rules + """ + rules_nested = [rule_set.get_rules() for rule_set in rules_col_set] + flat_rules = list(itertools.chain(*rules_nested)) - def get_valid(self, df: DataFrame) -> DataFrame: - return self._engine.get_valid(df) + return list(filter(None, flat_rules)) @staticmethod - def load_checks_from_local_file(path: str) -> list[dict]: - return DQEngineCore.load_checks_from_local_file(path) + def load_checks_from_local_file(filename: str) -> list[dict]: + """ + Load checks (dq rules) from a file (json or yml) in the local file system. + This does not require installation of DQX in the workspace. + The returning checks can be used as input for `apply_checks_by_metadata` function. + + :param filename: file name / path containing the checks. + :return: list of dq rules + """ + if not filename: + raise ValueError("filename must be provided") + + try: + checks = Installation.load_local(list[dict[str, str]], Path(filename)) + return DQEngine._deserialize_dicts(checks) + except FileNotFoundError: + msg = f"Checks file {filename} missing" + raise FileNotFoundError(msg) from None def load_checks_from_workspace_file(self, workspace_path: str) -> list[dict]: """Load checks (dq rules) from a file (json or yml) in the workspace. @@ -447,70 +589,6 @@ def load_checks_from_installation( :param assume_user: if True, assume user installation :return: list of dq rules """ - installation = self._get_installation(assume_user, product_name) - run_config = self._load_run_config(installation, run_config_name) - filename = run_config.checks_file or "checks.yml" - - logger.info(f"Loading quality rules (checks) from {installation.install_folder()}/{filename} in the workspace.") - return self._load_checks_from_file(installation, filename) - - @staticmethod - def save_checks_in_local_file(checks: list[dict], path: str): - return DQEngineCore.save_checks_in_local_file(checks, path) - - def save_checks_in_installation( - self, - checks: list[dict], - run_config_name: str | None = "default", - product_name: str = "dqx", - assume_user: bool = True, - ): - """ - Save checks (dq rules) to yml file in the installation folder. - - :param checks: list of dq rules to save - :param run_config_name: name of the run (config) to use - :param product_name: name of the product/installation directory - :param assume_user: if True, assume user installation - """ - installation = self._get_installation(assume_user, product_name) - run_config = self._load_run_config(installation, run_config_name) - - logger.info( - f"Saving quality rules (checks) to {installation.install_folder()}/{run_config.checks_file} " - f"in the workspace." - ) - installation.upload(run_config.checks_file, yaml.safe_dump(checks).encode('utf-8')) - - def save_checks_in_workspace_file(self, checks: list[dict], workspace_path: str): - """Save checks (dq rules) to yml file in the workspace. - This does not require installation of DQX in the workspace. - - :param checks: list of dq rules to save - :param workspace_path: destination path to the file in the workspace. - """ - workspace_dir = os.path.dirname(workspace_path) - - logger.info(f"Saving quality rules (checks) to {workspace_path} in the workspace.") - self.ws.workspace.mkdirs(workspace_dir) - self.ws.workspace.upload( - workspace_path, yaml.safe_dump(checks).encode('utf-8'), format=ImportFormat.AUTO, overwrite=True - ) - - def load_run_config( - self, run_config_name: str | None = "default", assume_user: bool = True, product_name: str = "dqx" - ) -> RunConfig: - """ - Load run configuration from the installation. - - :param run_config_name: name of the run configuration to use - :param assume_user: if True, assume user installation - :param product_name: name of the product - """ - installation = self._get_installation(assume_user, product_name) - return self._load_run_config(installation, run_config_name) - - def _get_installation(self, assume_user, product_name): if assume_user: installation = Installation.assume_user_home(self.ws, product_name) else: @@ -518,19 +596,31 @@ def _get_installation(self, assume_user, product_name): # verify the installation installation.current(self.ws, product_name, assume_user=assume_user) - return installation - @staticmethod - def _load_run_config(installation, run_config_name): - """Load run configuration from the installation.""" config = installation.load(WorkspaceConfig) - return config.get_run_config(run_config_name) + run_config = config.get_run_config(run_config_name) + filename = run_config.checks_file # use check file from the config - @staticmethod - def _load_checks_from_file(installation: Installation, filename: str) -> list[dict]: + logger.info(f"Loading quality rules (checks) from {installation.install_folder()}/{filename} in the workspace.") + return self._load_checks_from_file(installation, filename) + + def _load_checks_from_file(self, installation: Installation, filename: str) -> list[dict]: try: checks = installation.load(list[dict[str, str]], filename=filename) - return deserialize_dicts(checks) + return self._deserialize_dicts(checks) except NotFound: msg = f"Checks file {filename} missing" raise NotFound(msg) from None + + @classmethod + def _deserialize_dicts(cls, checks: list[dict[str, str]]) -> list[dict]: + """ + deserialize string fields instances containing dictionaries + @param checks: list of checks + @return: + """ + for item in checks: + for key, value in item.items(): + if value.startswith("{") and value.endswith("}"): + item[key] = yaml.safe_load(value.replace("'", '"')) + return checks From 0dcea46f7fcfa529be1507fe21eb462cd893f94a Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 17:32:17 +0100 Subject: [PATCH 24/35] Update engine.py --- src/databricks/labs/dqx/engine.py | 602 +++++++++++++----------------- 1 file changed, 256 insertions(+), 346 deletions(-) diff --git a/src/databricks/labs/dqx/engine.py b/src/databricks/labs/dqx/engine.py index d08a0bd..ce2a665 100644 --- a/src/databricks/labs/dqx/engine.py +++ b/src/databricks/labs/dqx/engine.py @@ -5,177 +5,199 @@ import itertools from pathlib import Path from collections.abc import Callable -from dataclasses import dataclass, field -from enum import Enum from typing import Any import yaml - import pyspark.sql.functions as F -from pyspark.sql import Column, DataFrame +from pyspark.sql import DataFrame +from databricks.labs.dqx.rule import ( + DQRule, + Criticality, + DQRuleColSet, + ChecksValidationStatus, + ColumnArguments, + ExtraParams, + DefaultColumnNames, +) +from databricks.labs.dqx.utils import deserialize_dicts from databricks.labs.dqx import col_functions from databricks.labs.blueprint.installation import Installation -from databricks.labs.dqx.base import DQEngineBase -from databricks.labs.dqx.config import WorkspaceConfig -from databricks.labs.dqx.utils import get_column_name -from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.base import DQEngineBase, DQEngineCoreBase +from databricks.labs.dqx.config import WorkspaceConfig, RunConfig from databricks.sdk.errors import NotFound - -logger = logging.getLogger(__name__) +from databricks.sdk.service.workspace import ImportFormat +from databricks.sdk import WorkspaceClient -class DefaultColumnNames(Enum): - """Enum class to represent columns in the dataframe that will be used for error and warning reporting.""" +logger = logging.getLogger(__name__) - ERRORS = "_errors" - WARNINGS = "_warnings" +class DQEngineCore(DQEngineCoreBase): + """Data Quality Engine Core class to apply data quality checks to a given dataframe. + Args: + workspace_client (WorkspaceClient): WorkspaceClient instance to use for accessing the workspace. + extra_params (ExtraParams): Extra parameters for the DQEngine. + """ -class ColumnArguments(Enum): - """Enum class that is used as input parsing for custom column naming.""" + def __init__(self, workspace_client: WorkspaceClient, extra_params: ExtraParams | None = None): + super().__init__(workspace_client) - ERRORS = "errors" - WARNINGS = "warnings" + extra_params = extra_params or ExtraParams() + self._column_names = { + ColumnArguments.ERRORS: extra_params.column_names.get( + ColumnArguments.ERRORS.value, DefaultColumnNames.ERRORS.value + ), + ColumnArguments.WARNINGS: extra_params.column_names.get( + ColumnArguments.WARNINGS.value, DefaultColumnNames.WARNINGS.value + ), + } -class Criticality(Enum): - """Enum class to represent criticality of the check.""" + def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: + if not checks: + return self._append_empty_checks(df) - WARN = "warn" - ERROR = "error" + warning_checks = self._get_check_columns(checks, Criticality.WARN.value) + error_checks = self._get_check_columns(checks, Criticality.ERROR.value) + ndf = self._create_results_map(df, error_checks, self._column_names[ColumnArguments.ERRORS]) + ndf = self._create_results_map(ndf, warning_checks, self._column_names[ColumnArguments.WARNINGS]) + return ndf -@dataclass(frozen=True) -class ChecksValidationStatus: - """Class to represent the validation status.""" + def apply_checks_and_split(self, df: DataFrame, checks: list[DQRule]) -> tuple[DataFrame, DataFrame]: + if not checks: + return df, self._append_empty_checks(df).limit(0) - _errors: list[str] = field(default_factory=list) + checked_df = self.apply_checks(df, checks) - def add_error(self, error: str): - """Add an error to the validation status.""" - self._errors.append(error) + good_df = self.get_valid(checked_df) + bad_df = self.get_invalid(checked_df) - def add_errors(self, errors: list[str]): - """Add an error to the validation status.""" - self._errors.extend(errors) + return good_df, bad_df - @property - def has_errors(self) -> bool: - """Check if there are any errors in the validation status.""" - return bool(self._errors) + def apply_checks_by_metadata_and_split( + self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None + ) -> tuple[DataFrame, DataFrame]: + dq_rule_checks = self.build_checks_by_metadata(checks, glbs) - @property - def errors(self) -> list[str]: - """Get the list of errors in the validation status.""" - return self._errors + good_df, bad_df = self.apply_checks_and_split(df, dq_rule_checks) - def to_string(self) -> str: - """Convert the validation status to a string.""" - if self.has_errors: - return "\n".join(self._errors) - return "No errors found" + return good_df, bad_df - def __str__(self) -> str: - """String representation of the ValidationStatus class.""" - return self.to_string() + def apply_checks_by_metadata( + self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None + ) -> DataFrame: + dq_rule_checks = self.build_checks_by_metadata(checks, glbs) + return self.apply_checks(df, dq_rule_checks) -@dataclass(frozen=True) -class DQRule: - """Class to represent a data quality rule consisting of following fields: - * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - - it will be used as an error/warning message, or `null` if it's evaluated to `false` - * `name` - optional name that will be given to a resulting column. Autogenerated if not provided - * `criticality` (optional) - possible values are `error` (critical problems), and `warn` (potential problems) - """ + @staticmethod + def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: + status = ChecksValidationStatus() - check: Column - name: str = "" - criticality: str = Criticality.ERROR.value + for check in checks: + logger.debug(f"Processing check definition: {check}") + if isinstance(check, dict): + status.add_errors(DQEngineCore._validate_checks_dict(check, glbs)) + else: + status.add_error(f"Unsupported check type: {type(check)}") - def __post_init__(self): - # take the name from the alias of the column expression if not provided - object.__setattr__(self, "name", self.name if self.name else "col_" + get_column_name(self.check)) + return status - @ft.cached_property - def rule_criticality(self) -> str: - """Returns criticality of the check. + def get_invalid(self, df: DataFrame) -> DataFrame: + return df.where( + F.col(self._column_names[ColumnArguments.ERRORS]).isNotNull() + | F.col(self._column_names[ColumnArguments.WARNINGS]).isNotNull() + ) - :return: string describing criticality - `warn` or `error`. - :raises ValueError: if criticality is invalid. - """ - criticality = self.criticality - if criticality not in {Criticality.WARN.value, Criticality.ERROR.value}: - raise ValueError(f"Invalid criticality value: {criticality}") + def get_valid(self, df: DataFrame) -> DataFrame: + return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNull()).drop( + self._column_names[ColumnArguments.ERRORS], self._column_names[ColumnArguments.WARNINGS] + ) - return criticality + @staticmethod + def load_checks_from_local_file(path: str) -> list[dict]: + if not path: + raise ValueError("filename must be provided") - def check_column(self) -> Column: - """Creates a Column object from the given check. + try: + checks = Installation.load_local(list[dict[str, str]], Path(path)) + return deserialize_dicts(checks) + except FileNotFoundError: + msg = f"Checks file {path} missing" + raise FileNotFoundError(msg) from None - :return: Column object - """ - return F.when(self.check.isNull(), F.lit(None).cast("string")).otherwise(self.check) - - -@dataclass(frozen=True) -class DQRuleColSet: - """Class to represent a data quality col rule set which defines quality check function for a set of columns. - The class consists of the following fields: - * `columns` - list of column names to which the given check function should be applied - * `criticality` - criticality level ('warn' or 'error') - * `check_func` - check function to be applied - * `check_func_args` - non-keyword / positional arguments for the check function after the col_name - * `check_func_kwargs` - keyword /named arguments for the check function after the col_name - """ + @staticmethod + def save_checks_in_local_file(checks: list[dict], path: str): + if not path: + raise ValueError("filename must be provided") - columns: list[str] - check_func: Callable - criticality: str = Criticality.ERROR.value - check_func_args: list[Any] = field(default_factory=list) - check_func_kwargs: dict[str, Any] = field(default_factory=dict) + try: + with open(path, 'w', encoding="utf-8") as file: + yaml.safe_dump(checks, file) + except FileNotFoundError: + msg = f"Checks file {path} missing" + raise FileNotFoundError(msg) from None - def get_rules(self) -> list[DQRule]: - """Build a list of rules for a set of columns. + @staticmethod + def build_checks_by_metadata(checks: list[dict], glbs: dict[str, Any] | None = None) -> list[DQRule]: + """Build checks based on check specification, i.e. function name plus arguments. - :return: list of dq rules + :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: + * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - + it will be used as an error/warning message, or `null` if it's evaluated to `false` + * `name` - name that will be given to a resulting column. Autogenerated if not provided + * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), + and `warn` (data is going into both dataframes) + :param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module). + If not specified, then only built-in functions are used for the checks. + :return: list of data quality check rules """ - rules = [] - for col_name in self.columns: - rule = DQRule( - criticality=self.criticality, - check=self.check_func(col_name, *self.check_func_args, **self.check_func_kwargs), - ) - rules.append(rule) - return rules - - -@dataclass(frozen=True) -class ExtraParams: - """Class to represent extra parameters for DQEngine.""" + status = DQEngineCore.validate_checks(checks, glbs) + if status.has_errors: + raise ValueError(str(status)) - column_names: dict[str, str] | None = field(default_factory=dict) + dq_rule_checks = [] + for check_def in checks: + logger.debug(f"Processing check definition: {check_def}") + check = check_def.get("check", {}) + func_name = check.get("function", None) + func = DQEngineCore._resolve_function(func_name, glbs, fail_on_missing=True) + assert func # should already be validated + func_args = check.get("arguments", {}) + criticality = check_def.get("criticality", "error") + filter_expr = check_def.get("filter") + if "col_names" in func_args: + logger.debug(f"Adding DQRuleColSet with columns: {func_args['col_names']}") + dq_rule_checks += DQRuleColSet( + columns=func_args["col_names"], + check_func=func, + criticality=criticality, + filter=filter_expr, + # provide arguments without "col_names" + check_func_kwargs={k: func_args[k] for k in func_args.keys() - {"col_names"}}, + ).get_rules() + else: + name = check_def.get("name", None) + check_func = func(**func_args) + dq_rule_checks.append(DQRule(check=check_func, name=name, criticality=criticality, filter=filter_expr)) -class DQEngine(DQEngineBase): - """Data Quality Engine class to apply data quality checks to a given dataframe. + logger.debug("Exiting build_checks_by_metadata function with dq_rule_checks") + return dq_rule_checks - Args: - workspace_client (WorkspaceClient): WorkspaceClient instance to use for accessing the workspace. - extra_params (ExtraParams): Extra parameters for the DQEngine. - """ + @staticmethod + def build_checks(*rules_col_set: DQRuleColSet) -> list[DQRule]: + """ + Build rules from dq rules and rule sets. - def __init__(self, workspace_client: WorkspaceClient, extra_params: ExtraParams | None = None): - super().__init__(workspace_client) + :param rules_col_set: list of dq rules which define multiple columns for the same check function + :return: list of dq rules + """ + rules_nested = [rule_set.get_rules() for rule_set in rules_col_set] + flat_rules = list(itertools.chain(*rules_nested)) - extra_params = extra_params or ExtraParams() - column_names = extra_params.column_names or {} - self._column_names = { - ColumnArguments.ERRORS: column_names.get(ColumnArguments.ERRORS.value, DefaultColumnNames.ERRORS.value), - ColumnArguments.WARNINGS: column_names.get( - ColumnArguments.WARNINGS.value, DefaultColumnNames.WARNINGS.value - ), - } + return list(filter(None, flat_rules)) @staticmethod def _get_check_columns(checks: list[DQRule], criticality: str) -> list[DQRule]: @@ -222,88 +244,6 @@ def _create_results_map(df: DataFrame, checks: list[DQRule], dest_col: str) -> D m_col = F.map_filter(m_col, lambda _, v: v.isNotNull()) return df.withColumn(dest_col, F.when(F.size(m_col) > 0, m_col).otherwise(empty_type)) - def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: - """Applies data quality checks to a given dataframe. - - :param df: dataframe to check - :param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class. - :return: dataframe with errors and warning reporting columns - """ - if not checks: - return self._append_empty_checks(df) - - warning_checks = self._get_check_columns(checks, Criticality.WARN.value) - error_checks = self._get_check_columns(checks, Criticality.ERROR.value) - ndf = self._create_results_map(df, error_checks, self._column_names[ColumnArguments.ERRORS]) - ndf = self._create_results_map(ndf, warning_checks, self._column_names[ColumnArguments.WARNINGS]) - - return ndf - - def apply_checks_and_split(self, df: DataFrame, checks: list[DQRule]) -> tuple[DataFrame, DataFrame]: - """Applies data quality checks to a given dataframe and split it into two ("good" and "bad"), - according to the data quality checks. - - :param df: dataframe to check - :param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class. - :return: two dataframes - "good" which includes warning rows but no reporting columns, and "data" having - error and warning rows and corresponding reporting columns - """ - if not checks: - return df, self._append_empty_checks(df).limit(0) - - checked_df = self.apply_checks(df, checks) - - good_df = self.get_valid(checked_df) - bad_df = self.get_invalid(checked_df) - - return good_df, bad_df - - def get_invalid(self, df: DataFrame) -> DataFrame: - """ - Get records that violate data quality checks (records with warnings and errors). - @param df: input DataFrame. - @return: dataframe with error and warning rows and corresponding reporting columns. - """ - return df.where( - F.col(self._column_names[ColumnArguments.ERRORS]).isNotNull() - | F.col(self._column_names[ColumnArguments.WARNINGS]).isNotNull() - ) - - def get_valid(self, df: DataFrame) -> DataFrame: - """ - Get records that don't violate data quality checks (records with warnings but no errors). - @param df: input DataFrame. - @return: dataframe with warning rows but no reporting columns. - """ - return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNull()).drop( - self._column_names[ColumnArguments.ERRORS], self._column_names[ColumnArguments.WARNINGS] - ) - - @staticmethod - def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: - """ - Validate the input dict to ensure they conform to expected structure and types. - - Each check can be a dictionary. The function validates - the presence of required keys, the existence and callability of functions, and the types - of arguments passed to these functions. - - :param checks: List of checks to apply to the dataframe. Each check should be a dictionary. - :param glbs: Optional dictionary of global functions that can be used in checks. - - :return ValidationStatus: The validation status. - """ - status = ChecksValidationStatus() - - for check in checks: - logger.debug(f"Processing check definition: {check}") - if isinstance(check, dict): - status.add_errors(DQEngine._validate_checks_dict(check, glbs)) - else: - status.add_error(f"Unsupported check type: {type(check)}") - - return status - @staticmethod def _validate_checks_dict(check: dict, glbs: dict[str, Any] | None) -> list[str]: """ @@ -326,7 +266,7 @@ def _validate_checks_dict(check: dict, glbs: dict[str, Any] | None) -> list[str] elif not isinstance(check["check"], dict): errors.append(f"'check' field should be a dictionary: {check}") else: - errors.extend(DQEngine._validate_check_block(check, glbs)) + errors.extend(DQEngineCore._validate_check_block(check, glbs)) return errors @@ -348,12 +288,12 @@ def _validate_check_block(check: dict, glbs: dict[str, Any] | None) -> list[str] return [f"'function' field is missing in the 'check' block: {check}"] func_name = check_block["function"] - func = DQEngine.resolve_function(func_name, glbs, fail_on_missing=False) + func = DQEngineCore._resolve_function(func_name, glbs, fail_on_missing=False) if not callable(func): return [f"function '{func_name}' is not defined: {check}"] arguments = check_block.get("arguments", {}) - return DQEngine._validate_check_function_arguments(arguments, func, check) + return DQEngineCore._validate_check_function_arguments(arguments, func, check) @staticmethod def _validate_check_function_arguments(arguments: dict, func: Callable, check: dict) -> list[str]: @@ -382,9 +322,9 @@ def _validate_check_function_arguments(arguments: dict, func: Callable, check: d 'col_name' if k == 'col_names' else k: arguments['col_names'][0] if k == 'col_names' else v for k, v in arguments.items() } - return DQEngine._validate_func_args(arguments, func, check) + return DQEngineCore._validate_func_args(arguments, func, check) - return DQEngine._validate_func_args(arguments, func, check) + return DQEngineCore._validate_func_args(arguments, func, check) @staticmethod def _validate_func_args(arguments: dict, func: Callable, check: dict) -> list[str]: @@ -426,52 +366,7 @@ def cached_signature(check_func): return errors @staticmethod - def build_checks_by_metadata(checks: list[dict], glbs: dict[str, Any] | None = None) -> list[DQRule]: - """Build checks based on check specification, i.e. function name plus arguments. - - :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: - * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - - it will be used as an error/warning message, or `null` if it's evaluated to `false` - * `name` - name that will be given to a resulting column. Autogenerated if not provided - * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), - and `warn` (data is going into both dataframes) - :param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module). - If not specified, then only built-in functions are used for the checks. - :return: list of data quality check rules - """ - status = DQEngine.validate_checks(checks, glbs) - if status.has_errors: - raise ValueError(str(status)) - - dq_rule_checks = [] - for check_def in checks: - logger.debug(f"Processing check definition: {check_def}") - check = check_def.get("check", {}) - func_name = check.get("function", None) - func = DQEngine.resolve_function(func_name, glbs, fail_on_missing=True) - assert func # should already be validated - func_args = check.get("arguments", {}) - criticality = check_def.get("criticality", "error") - - if "col_names" in func_args: - logger.debug(f"Adding DQRuleColSet with columns: {func_args['col_names']}") - dq_rule_checks += DQRuleColSet( - columns=func_args["col_names"], - check_func=func, - criticality=criticality, - # provide arguments without "col_names" - check_func_kwargs={k: func_args[k] for k in func_args.keys() - {"col_names"}}, - ).get_rules() - else: - name = check_def.get("name", None) - check_func = func(**func_args) - dq_rule_checks.append(DQRule(check=check_func, name=name, criticality=criticality)) - - logger.debug("Exiting build_checks_by_metadata function with dq_rule_checks") - return dq_rule_checks - - @staticmethod - def resolve_function(func_name: str, glbs: dict[str, Any] | None = None, fail_on_missing=True) -> Callable | None: + def _resolve_function(func_name: str, glbs: dict[str, Any] | None = None, fail_on_missing=True) -> Callable | None: logger.debug(f"Resolving function: {func_name}") if glbs: func = glbs.get(func_name) @@ -482,85 +377,48 @@ def resolve_function(func_name: str, glbs: dict[str, Any] | None = None, fail_on logger.debug(f"Function {func_name} resolved successfully") return func - def apply_checks_by_metadata_and_split( - self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None - ) -> tuple[DataFrame, DataFrame]: - """Wrapper around `apply_checks_and_split` for use in the metadata-driven pipelines. The main difference - is how the checks are specified - instead of using functions directly, they are described as function name plus - arguments. - :param df: dataframe to check - :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: - * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - - it will be used as an error/warning message, or `null` if it's evaluated to `false` - * `name` - name that will be given to a resulting column. Autogenerated if not provided - * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), - and `warn` (data is going into both dataframes) - :param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module). - If not specified, then only built-in functions are used for the checks. - :return: two dataframes - "good" which includes warning rows but no reporting columns, and "bad" having - error and warning rows and corresponding reporting columns - """ - dq_rule_checks = self.build_checks_by_metadata(checks, glbs) +class DQEngine(DQEngineBase): + """Data Quality Engine class to apply data quality checks to a given dataframe.""" + + def __init__( + self, + workspace_client: WorkspaceClient, + engine: DQEngineCoreBase | None = None, + extra_params: ExtraParams | None = None, + ): + super().__init__(workspace_client) + self._engine = engine or DQEngineCore(workspace_client, extra_params) - good_df, bad_df = self.apply_checks_and_split(df, dq_rule_checks) + def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame: + return self._engine.apply_checks(df, checks) - return good_df, bad_df + def apply_checks_and_split(self, df: DataFrame, checks: list[DQRule]) -> tuple[DataFrame, DataFrame]: + return self._engine.apply_checks_and_split(df, checks) + + def apply_checks_by_metadata_and_split( + self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None + ) -> tuple[DataFrame, DataFrame]: + return self._engine.apply_checks_by_metadata_and_split(df, checks, glbs) def apply_checks_by_metadata( self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None ) -> DataFrame: - """Wrapper around `apply_checks` for use in the metadata-driven pipelines. The main difference - is how the checks are specified - instead of using functions directly, they are described as function name plus - arguments. - - :param df: dataframe to check - :param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields: - * `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true - - it will be used as an error/warning message, or `null` if it's evaluated to `false` - * `name` - name that will be given to a resulting column. Autogenerated if not provided - * `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe), - and `warn` (data is going into both dataframes) - :param glbs: dictionary with functions mapping (eg. ``globals()`` of calling module). - If not specified, then only built-in functions are used for the checks. - :return: dataframe with errors and warning reporting columns - """ - dq_rule_checks = self.build_checks_by_metadata(checks, glbs) - - return self.apply_checks(df, dq_rule_checks) + return self._engine.apply_checks_by_metadata(df, checks, glbs) @staticmethod - def build_checks(*rules_col_set: DQRuleColSet) -> list[DQRule]: - """ - Build rules from dq rules and rule sets. + def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus: + return DQEngineCore.validate_checks(checks, glbs) - :param rules_col_set: list of dq rules which define multiple columns for the same check function - :return: list of dq rules - """ - rules_nested = [rule_set.get_rules() for rule_set in rules_col_set] - flat_rules = list(itertools.chain(*rules_nested)) + def get_invalid(self, df: DataFrame) -> DataFrame: + return self._engine.get_invalid(df) - return list(filter(None, flat_rules)) + def get_valid(self, df: DataFrame) -> DataFrame: + return self._engine.get_valid(df) @staticmethod - def load_checks_from_local_file(filename: str) -> list[dict]: - """ - Load checks (dq rules) from a file (json or yml) in the local file system. - This does not require installation of DQX in the workspace. - The returning checks can be used as input for `apply_checks_by_metadata` function. - - :param filename: file name / path containing the checks. - :return: list of dq rules - """ - if not filename: - raise ValueError("filename must be provided") - - try: - checks = Installation.load_local(list[dict[str, str]], Path(filename)) - return DQEngine._deserialize_dicts(checks) - except FileNotFoundError: - msg = f"Checks file {filename} missing" - raise FileNotFoundError(msg) from None + def load_checks_from_local_file(path: str) -> list[dict]: + return DQEngineCore.load_checks_from_local_file(path) def load_checks_from_workspace_file(self, workspace_path: str) -> list[dict]: """Load checks (dq rules) from a file (json or yml) in the workspace. @@ -589,6 +447,70 @@ def load_checks_from_installation( :param assume_user: if True, assume user installation :return: list of dq rules """ + installation = self._get_installation(assume_user, product_name) + run_config = self._load_run_config(installation, run_config_name) + filename = run_config.checks_file or "checks.yml" + + logger.info(f"Loading quality rules (checks) from {installation.install_folder()}/{filename} in the workspace.") + return self._load_checks_from_file(installation, filename) + + @staticmethod + def save_checks_in_local_file(checks: list[dict], path: str): + return DQEngineCore.save_checks_in_local_file(checks, path) + + def save_checks_in_installation( + self, + checks: list[dict], + run_config_name: str | None = "default", + product_name: str = "dqx", + assume_user: bool = True, + ): + """ + Save checks (dq rules) to yml file in the installation folder. + + :param checks: list of dq rules to save + :param run_config_name: name of the run (config) to use + :param product_name: name of the product/installation directory + :param assume_user: if True, assume user installation + """ + installation = self._get_installation(assume_user, product_name) + run_config = self._load_run_config(installation, run_config_name) + + logger.info( + f"Saving quality rules (checks) to {installation.install_folder()}/{run_config.checks_file} " + f"in the workspace." + ) + installation.upload(run_config.checks_file, yaml.safe_dump(checks).encode('utf-8')) + + def save_checks_in_workspace_file(self, checks: list[dict], workspace_path: str): + """Save checks (dq rules) to yml file in the workspace. + This does not require installation of DQX in the workspace. + + :param checks: list of dq rules to save + :param workspace_path: destination path to the file in the workspace. + """ + workspace_dir = os.path.dirname(workspace_path) + + logger.info(f"Saving quality rules (checks) to {workspace_path} in the workspace.") + self.ws.workspace.mkdirs(workspace_dir) + self.ws.workspace.upload( + workspace_path, yaml.safe_dump(checks).encode('utf-8'), format=ImportFormat.AUTO, overwrite=True + ) + + def load_run_config( + self, run_config_name: str | None = "default", assume_user: bool = True, product_name: str = "dqx" + ) -> RunConfig: + """ + Load run configuration from the installation. + + :param run_config_name: name of the run configuration to use + :param assume_user: if True, assume user installation + :param product_name: name of the product + """ + installation = self._get_installation(assume_user, product_name) + return self._load_run_config(installation, run_config_name) + + def _get_installation(self, assume_user, product_name): if assume_user: installation = Installation.assume_user_home(self.ws, product_name) else: @@ -596,31 +518,19 @@ def load_checks_from_installation( # verify the installation installation.current(self.ws, product_name, assume_user=assume_user) + return installation + @staticmethod + def _load_run_config(installation, run_config_name): + """Load run configuration from the installation.""" config = installation.load(WorkspaceConfig) - run_config = config.get_run_config(run_config_name) - filename = run_config.checks_file # use check file from the config - - logger.info(f"Loading quality rules (checks) from {installation.install_folder()}/{filename} in the workspace.") - return self._load_checks_from_file(installation, filename) + return config.get_run_config(run_config_name) - def _load_checks_from_file(self, installation: Installation, filename: str) -> list[dict]: + @staticmethod + def _load_checks_from_file(installation: Installation, filename: str) -> list[dict]: try: checks = installation.load(list[dict[str, str]], filename=filename) - return self._deserialize_dicts(checks) + return deserialize_dicts(checks) except NotFound: msg = f"Checks file {filename} missing" raise NotFound(msg) from None - - @classmethod - def _deserialize_dicts(cls, checks: list[dict[str, str]]) -> list[dict]: - """ - deserialize string fields instances containing dictionaries - @param checks: list of checks - @return: - """ - for item in checks: - for key, value in item.items(): - if value.startswith("{") and value.endswith("}"): - item[key] = yaml.safe_load(value.replace("'", '"')) - return checks From 65a9e039e4726c2dddcbba6c2433ed5f90310bb5 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:01:05 +0100 Subject: [PATCH 25/35] Update test_apply_checks.py --- tests/integration/test_apply_checks.py | 62 ++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 5f8c160..214f158 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -8,12 +8,11 @@ DQEngine, ExtraParams, ) -from databricks.labs.dqx.rule import DQRule, DQRuleColSet - +from databricks.labs.dqx.rule import DQRule, DQRuleColSet, ColumnArguments SCHEMA = "a: int, b: int, c: int" EXPECTED_SCHEMA = SCHEMA + ", _errors: map, _warnings: map" -EXPECTED_SCHEMA_WITH_CUSTOM_NAMES = SCHEMA + ", ERROR: map, WARN: map" +EXPECTED_SCHEMA_WITH_CUSTOM_NAMES = SCHEMA + ", dq_errors: map, dq_warnings: map" def test_apply_checks_on_empty_checks(ws, spark): @@ -566,18 +565,25 @@ def test_get_invalid_records(ws, spark): def test_apply_checks_with_custom_column_naming(ws, spark): - dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) + dq_engine = DQEngine( + ws, + extra_params=ExtraParams( + column_names={ColumnArguments.ERRORS.value: "dq_errors", ColumnArguments.WARNINGS.value: "dq_warnings"} + ), + ) test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) - checks = [{"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}] + checks = [ + {"criticality": "warn", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "a"}}} + ] checked = dq_engine.apply_checks_by_metadata(test_df, checks) expected = spark.createDataFrame( [ [1, 3, 3, None, None], [2, None, 4, None, None], - [None, 4, None, None, {"col_a_is_null_or_empty": "new check failed"}], - [None, None, None, None, {"col_a_is_null_or_empty": "new check failed"}], + [None, 4, None, None, {"col_a_is_null_or_empty": "Column a is null or empty"}], + [None, None, None, None, {"col_a_is_null_or_empty": "Column a is null or empty"}], ], EXPECTED_SCHEMA_WITH_CUSTOM_NAMES, ) @@ -586,7 +592,12 @@ def test_apply_checks_with_custom_column_naming(ws, spark): def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): - dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'})) + dq_engine = DQEngine( + ws, + extra_params=ExtraParams( + column_names={ColumnArguments.ERRORS.value: "dq_errors", ColumnArguments.WARNINGS.value: "dq_warnings"} + ), + ) test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) checks = [ @@ -614,3 +625,38 @@ def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): EXPECTED_SCHEMA_WITH_CUSTOM_NAMES, ), ) + +def test_apply_checks_by_metadata_with_custom_column_naming_fallback_to_default(ws, spark): + dq_engine = DQEngine( + ws, + extra_params=ExtraParams( + column_names={"errors_invalid": "dq_errors", "warnings_invalid": "dq_warnings"} + ), + ) + test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) + + checks = [ + {"criticality": "warn", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "a"}}}, + {"criticality": "error", "check": {"function": "is_not_null_and_not_empty", "arguments": {"col_name": "b"}}}, + ] + good, bad = dq_engine.apply_checks_by_metadata_and_split(test_df, checks) + + assert_df_equality(good, spark.createDataFrame([[1, 3, 3], [None, 4, None]], SCHEMA), ignore_nullable=True) + + assert_df_equality( + bad, + spark.createDataFrame( + [ + [2, None, 4, {"col_b_is_null_or_empty": "Column b is null or empty"}, None], + [None, 4, None, None, {"col_a_is_null_or_empty": "Column a is null or empty"}], + [ + None, + None, + None, + {"col_b_is_null_or_empty": "Column b is null or empty"}, + {"col_a_is_null_or_empty": "Column a is null or empty"}, + ], + ], + EXPECTED_SCHEMA, + ), + ) From 9de33fe701619fe8ed35f8c1cfc0832b66baf4fe Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:01:37 +0100 Subject: [PATCH 26/35] Update docs/dqx/docs/guide.mdx --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index e030979..ad213dc 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -440,7 +440,7 @@ from databricks.labs.dqx.engine import ( ) # customize reporting column names -extra_parameters = ExtraParams(column_names={'errors': 'ERRORS', 'warnings': 'WARNINGS'}) +extra_parameters = ExtraParams(column_names={"errors": "dq_errors", "warnings": "dq_warnings"}) ws = WorkspaceClient() dq_engine = DQEngine(ws, extra_params=extra_parameters) From 49200cbcb9126a4339f8457ceaf83c2143ea182f Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:15:07 +0100 Subject: [PATCH 27/35] Update test_apply_checks.py --- tests/integration/test_apply_checks.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 214f158..93d4a1e 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -626,12 +626,11 @@ def test_apply_checks_by_metadata_with_custom_column_naming(ws, spark): ), ) + def test_apply_checks_by_metadata_with_custom_column_naming_fallback_to_default(ws, spark): dq_engine = DQEngine( ws, - extra_params=ExtraParams( - column_names={"errors_invalid": "dq_errors", "warnings_invalid": "dq_warnings"} - ), + extra_params=ExtraParams(column_names={"errors_invalid": "dq_errors", "warnings_invalid": "dq_warnings"}), ) test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA) From 82d75c4e15bf5f32594844195cf576a3f7b594ab Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:20:03 +0100 Subject: [PATCH 28/35] Update demos/dqx_demo_library.py --- demos/dqx_demo_library.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/demos/dqx_demo_library.py b/demos/dqx_demo_library.py index 4bab10b..26a736c 100644 --- a/demos/dqx_demo_library.py +++ b/demos/dqx_demo_library.py @@ -375,7 +375,7 @@ def ends_with_foo(col_name: str) -> Column: from databricks.labs.dqx.col_functions import is_not_null_and_not_empty # using ExtraParams class to configure the engine with custom column names -extra_parameters = ExtraParams(column_names={'errors': 'ERRORS', 'warnings': 'WARNINGS'}) +extra_parameters = ExtraParams(column_names={"errors": "dq_errors", "warnings": "dq_warnings"}) ws = WorkspaceClient() dq_engine = DQEngine(ws, extra_params=extra_parameters) From d4fc536bd43891b0435683fa5eee74342ca60db1 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:20:09 +0100 Subject: [PATCH 29/35] Update docs/dqx/docs/guide.mdx --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index ad213dc..e114a05 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -246,7 +246,7 @@ checks = DQRuleColSet( # define rule for multiple columns at once check_func=is_not_null).get_rules() + [ DQRule( # define rule for a single column name='col3_is_null_or_empty', - criticality='error', + criticality="error", check=is_not_null_and_not_empty('col3')), DQRule( # define rule with a filter name='col_4_is_null_or_empty', From c2462fc846bf490599505d3e0dffeb5686d6c137 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:20:14 +0100 Subject: [PATCH 30/35] Update docs/dqx/docs/guide.mdx --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index e114a05..c7b1288 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -245,7 +245,7 @@ checks = DQRuleColSet( # define rule for multiple columns at once criticality="error", check_func=is_not_null).get_rules() + [ DQRule( # define rule for a single column - name='col3_is_null_or_empty', + name="col3_is_null_or_empty", criticality="error", check=is_not_null_and_not_empty('col3')), DQRule( # define rule with a filter From 3cf6f6eb9e7241995b5071c6590e048e9feb1480 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:20:19 +0100 Subject: [PATCH 31/35] Update docs/dqx/docs/guide.mdx --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index c7b1288..659977c 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -249,7 +249,7 @@ checks = DQRuleColSet( # define rule for multiple columns at once criticality="error", check=is_not_null_and_not_empty('col3')), DQRule( # define rule with a filter - name='col_4_is_null_or_empty', + name="col_4_is_null_or_empty", criticality='error', filter='col1<3', check=is_not_null_and_not_empty('col4')), From 5a3449fe1e84ea87fe76a5bb819cdd08efec7505 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:23:59 +0100 Subject: [PATCH 32/35] Update docs/dqx/docs/guide.mdx --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index 659977c..9af9d5d 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -252,7 +252,7 @@ checks = DQRuleColSet( # define rule for multiple columns at once name="col_4_is_null_or_empty", criticality='error', filter='col1<3', - check=is_not_null_and_not_empty('col4')), + check=is_not_null_and_not_empty("col4")), DQRule( # name auto-generated if not provided criticality='warn', check=value_is_in_list('col4', ['1', '2'])) From 3abf7a6c1c0cbdf3eebb50372c3cefa693f6ef94 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:24:06 +0100 Subject: [PATCH 33/35] Update docs/dqx/docs/guide.mdx --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index 9af9d5d..e5351b7 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -251,7 +251,7 @@ checks = DQRuleColSet( # define rule for multiple columns at once DQRule( # define rule with a filter name="col_4_is_null_or_empty", criticality='error', - filter='col1<3', + filter="col1<3", check=is_not_null_and_not_empty("col4")), DQRule( # name auto-generated if not provided criticality='warn', From d945c9b50e8a95967c07e8f8e37d211407c575e0 Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:24:11 +0100 Subject: [PATCH 34/35] Update docs/dqx/docs/guide.mdx --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index e5351b7..140778a 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -250,7 +250,7 @@ checks = DQRuleColSet( # define rule for multiple columns at once check=is_not_null_and_not_empty('col3')), DQRule( # define rule with a filter name="col_4_is_null_or_empty", - criticality='error', + criticality="error", filter="col1<3", check=is_not_null_and_not_empty("col4")), DQRule( # name auto-generated if not provided From ba26252137433cd94f0e575840c4eba2046dc69a Mon Sep 17 00:00:00 2001 From: Marcin Wojtyczka Date: Fri, 7 Feb 2025 18:24:16 +0100 Subject: [PATCH 35/35] Update docs/dqx/docs/guide.mdx --- docs/dqx/docs/guide.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dqx/docs/guide.mdx b/docs/dqx/docs/guide.mdx index 140778a..76e3269 100644 --- a/docs/dqx/docs/guide.mdx +++ b/docs/dqx/docs/guide.mdx @@ -247,7 +247,7 @@ checks = DQRuleColSet( # define rule for multiple columns at once DQRule( # define rule for a single column name="col3_is_null_or_empty", criticality="error", - check=is_not_null_and_not_empty('col3')), + check=is_not_null_and_not_empty("col3")), DQRule( # define rule with a filter name="col_4_is_null_or_empty", criticality="error",