Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Customizable column names and extra config placeholder #127

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a09b0fe
feat: customizable column names for DQEngine along with placeholer fo…
Jan 23, 2025
8cb6ab0
fix: linting issues fix
hrfmartins Feb 2, 2025
7b21fa2
Merge branch 'main' into feature/customizable-column-names
mwojtyczka Feb 3, 2025
ee032f4
Merge branch 'main' into feature/customizable-column-names
mwojtyczka Feb 3, 2025
1fdfab4
Update tests/integration/test_apply_checks.py
hrfmartins Feb 4, 2025
5c40db9
Update tests/integration/test_apply_checks.py
hrfmartins Feb 4, 2025
0e433b7
Merge branch 'main' into feature/customizable-column-names
mwojtyczka Feb 4, 2025
c63bafe
tests: added test_custom_column_name_mappings_split_metadata to test …
hrfmartins Feb 4, 2025
a62a6f1
Merge remote-tracking branch 'origin/feature/customizable-column-name…
hrfmartins Feb 4, 2025
f931b2f
docs: added documentation on custom column namings
hrfmartins Feb 4, 2025
fa0a9a5
Update docs/dqx/docs/guide.mdx
hrfmartins Feb 5, 2025
0c3da1e
Update tests/integration/test_apply_checks.py
hrfmartins Feb 5, 2025
3f1eab9
Update tests/integration/test_apply_checks.py
hrfmartins Feb 5, 2025
8e04b11
Update tests/integration/test_apply_checks.py
hrfmartins Feb 5, 2025
a37a575
Update docs/dqx/docs/guide.mdx
hrfmartins Feb 5, 2025
8628fe3
Update docs/dqx/docs/guide.mdx
hrfmartins Feb 5, 2025
2101023
Update docs/dqx/docs/guide.mdx
hrfmartins Feb 5, 2025
297ab7c
Update docs/dqx/docs/guide.mdx
hrfmartins Feb 5, 2025
41eb0ec
Merge branch 'main' into feature/customizable-column-names
mwojtyczka Feb 6, 2025
59fa496
Merge branch 'main' into feature/customizable-column-names
mwojtyczka Feb 7, 2025
411f221
Merge branch 'main' into feature/customizable-column-names
mwojtyczka Feb 7, 2025
98ceca4
Update demos/dqx_demo_library.py
mwojtyczka Feb 7, 2025
d78bbfd
Update demos/dqx_demo_library.py
mwojtyczka Feb 7, 2025
fbcb0f9
Update docs/dqx/docs/guide.mdx
mwojtyczka Feb 7, 2025
9be3f7d
Update tests/integration/test_apply_checks.py
mwojtyczka Feb 7, 2025
a604900
Update tests/integration/test_apply_checks.py
mwojtyczka Feb 7, 2025
a96ea96
Update tests/integration/test_apply_checks.py
mwojtyczka Feb 7, 2025
938b3b4
Update test_apply_checks.py
mwojtyczka Feb 7, 2025
f778209
Update test_apply_checks.py
mwojtyczka Feb 7, 2025
0561034
Update engine.py
mwojtyczka Feb 7, 2025
0dcea46
Update engine.py
mwojtyczka Feb 7, 2025
65a9e03
Update test_apply_checks.py
mwojtyczka Feb 7, 2025
9de33fe
Update docs/dqx/docs/guide.mdx
mwojtyczka Feb 7, 2025
49200cb
Update test_apply_checks.py
mwojtyczka Feb 7, 2025
82d75c4
Update demos/dqx_demo_library.py
mwojtyczka Feb 7, 2025
d4fc536
Update docs/dqx/docs/guide.mdx
mwojtyczka Feb 7, 2025
c2462fc
Update docs/dqx/docs/guide.mdx
mwojtyczka Feb 7, 2025
3cf6f6e
Update docs/dqx/docs/guide.mdx
mwojtyczka Feb 7, 2025
5a3449f
Update docs/dqx/docs/guide.mdx
mwojtyczka Feb 7, 2025
3abf7a6
Update docs/dqx/docs/guide.mdx
mwojtyczka Feb 7, 2025
d945c9b
Update docs/dqx/docs/guide.mdx
mwojtyczka Feb 7, 2025
ba26252
Update docs/dqx/docs/guide.mdx
mwojtyczka Feb 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/databricks/labs/dqx/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,16 @@ def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> C
:return ValidationStatus: The validation status.
"""

@staticmethod
@abc.abstractmethod
def get_invalid(df: DataFrame) -> DataFrame:
def get_invalid(self, df: DataFrame) -> DataFrame:
"""
Get records that violate data quality checks (records with warnings and errors).
@param df: input DataFrame.
@return: dataframe with error and warning rows and corresponding reporting columns.
"""

@staticmethod
@abc.abstractmethod
def get_valid(df: DataFrame) -> DataFrame:
def get_valid(self, df: DataFrame) -> DataFrame:
"""
Get records that don't violate data quality checks (records with warnings but no errors).
@param df: input DataFrame.
Expand Down
76 changes: 54 additions & 22 deletions src/databricks/labs/dqx/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@
import yaml
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from databricks.labs.dqx.rule import DQRule, Criticality, Columns, DQRuleColSet, ChecksValidationStatus
from databricks.labs.dqx.rule import (
DQRule,
Criticality,
DQRuleColSet,
ChecksValidationStatus,
ColumnArguments,
ExtraParams,
DefaultColumnNames,
)
from databricks.labs.dqx.utils import deserialize_dicts
from databricks.labs.dqx import col_functions
from databricks.labs.blueprint.installation import Installation

from databricks.labs.dqx.base import DQEngineBase, DQEngineCoreBase
from databricks.labs.dqx.config import WorkspaceConfig, RunConfig
from databricks.sdk.errors import NotFound
Expand All @@ -24,16 +33,34 @@


class DQEngineCore(DQEngineCoreBase):
"""Data Quality Engine Core class to apply data quality checks to a given dataframe."""
"""Data Quality Engine Core class to apply data quality checks to a given dataframe.
Args:
workspace_client (WorkspaceClient): WorkspaceClient instance to use for accessing the workspace.
extra_params (ExtraParams): Extra parameters for the DQEngine.
"""

def __init__(self, workspace_client: WorkspaceClient, extra_params: ExtraParams | None = None):
super().__init__(workspace_client)

extra_params = extra_params or ExtraParams()

self._column_names = {
ColumnArguments.ERRORS: extra_params.column_names.get(
ColumnArguments.ERRORS.value, DefaultColumnNames.ERRORS.value
),
ColumnArguments.WARNINGS: extra_params.column_names.get(
ColumnArguments.WARNINGS.value, DefaultColumnNames.WARNINGS.value
),
}

def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame:
if not checks:
return self._append_empty_checks(df)

warning_checks = self._get_check_columns(checks, Criticality.WARN.value)
error_checks = self._get_check_columns(checks, Criticality.ERROR.value)
ndf = self._create_results_map(df, error_checks, Columns.ERRORS.value)
ndf = self._create_results_map(ndf, warning_checks, Columns.WARNINGS.value)
ndf = self._create_results_map(df, error_checks, self._column_names[ColumnArguments.ERRORS])
ndf = self._create_results_map(ndf, warning_checks, self._column_names[ColumnArguments.WARNINGS])

return ndf

Expand Down Expand Up @@ -77,13 +104,16 @@ def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> C

return status

@staticmethod
def get_invalid(df: DataFrame) -> DataFrame:
return df.where(F.col(Columns.ERRORS.value).isNotNull() | F.col(Columns.WARNINGS.value).isNotNull())
def get_invalid(self, df: DataFrame) -> DataFrame:
return df.where(
F.col(self._column_names[ColumnArguments.ERRORS]).isNotNull()
| F.col(self._column_names[ColumnArguments.WARNINGS]).isNotNull()
)

@staticmethod
def get_valid(df: DataFrame) -> DataFrame:
return df.where(F.col(Columns.ERRORS.value).isNull()).drop(Columns.ERRORS.value, Columns.WARNINGS.value)
def get_valid(self, df: DataFrame) -> DataFrame:
return df.where(F.col(self._column_names[ColumnArguments.ERRORS]).isNull()).drop(
self._column_names[ColumnArguments.ERRORS], self._column_names[ColumnArguments.WARNINGS]
)

@staticmethod
def load_checks_from_local_file(path: str) -> list[dict]:
Expand Down Expand Up @@ -177,17 +207,16 @@ def _get_check_columns(checks: list[DQRule], criticality: str) -> list[DQRule]:
"""
return [check for check in checks if check.rule_criticality == criticality]

@staticmethod
def _append_empty_checks(df: DataFrame) -> DataFrame:
def _append_empty_checks(self, df: DataFrame) -> DataFrame:
"""Append empty checks at the end of dataframe.

:param df: dataframe without checks
:return: dataframe with checks
"""
return df.select(
"*",
F.lit(None).cast("map<string, string>").alias(Columns.ERRORS.value),
F.lit(None).cast("map<string, string>").alias(Columns.WARNINGS.value),
F.lit(None).cast("map<string, string>").alias(self._column_names[ColumnArguments.ERRORS]),
F.lit(None).cast("map<string, string>").alias(self._column_names[ColumnArguments.WARNINGS]),
)

@staticmethod
Expand Down Expand Up @@ -350,9 +379,14 @@ def _resolve_function(func_name: str, glbs: dict[str, Any] | None = None, fail_o
class DQEngine(DQEngineBase):
"""Data Quality Engine class to apply data quality checks to a given dataframe."""

def __init__(self, workspace_client: WorkspaceClient, engine: DQEngineCoreBase | None = None):
def __init__(
self,
workspace_client: WorkspaceClient,
engine: DQEngineCoreBase | None = None,
extra_params: ExtraParams | None = None,
):
super().__init__(workspace_client)
self._engine = engine or DQEngineCore(workspace_client)
self._engine = engine or DQEngineCore(workspace_client, extra_params)

def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame:
return self._engine.apply_checks(df, checks)
Expand All @@ -374,13 +408,11 @@ def apply_checks_by_metadata(
def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus:
return DQEngineCore.validate_checks(checks, glbs)

@staticmethod
def get_invalid(df: DataFrame) -> DataFrame:
return DQEngineCore.get_invalid(df)
def get_invalid(self, df: DataFrame) -> DataFrame:
return self._engine.get_invalid(df)

@staticmethod
def get_valid(df: DataFrame) -> DataFrame:
return DQEngineCore.get_valid(df)
def get_valid(self, df: DataFrame) -> DataFrame:
return self._engine.get_valid(df)

@staticmethod
def load_checks_from_local_file(path: str) -> list[dict]:
Expand Down
25 changes: 19 additions & 6 deletions src/databricks/labs/dqx/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,32 @@
from databricks.labs.dqx.utils import get_column_name


# TODO: make this configurable
class Columns(Enum):
class Criticality(Enum):
"""Enum class to represent criticality of the check."""

WARN = "warn"
ERROR = "error"


class DefaultColumnNames(Enum):
"""Enum class to represent columns in the dataframe that will be used for error and warning reporting."""

ERRORS = "_errors"
WARNINGS = "_warnings"


class Criticality(Enum):
"""Enum class to represent criticality of the check."""
class ColumnArguments(Enum):
"""Enum class that is used as input parsing for custom column naming."""

WARN = "warn"
ERROR = "error"
ERRORS = "errors"
WARNINGS = "warnings"


@dataclass(frozen=True)
class ExtraParams:
"""Class to represent extra parameters for DQEngine."""

column_names: dict[str, str] = field(default_factory=dict)


@dataclass(frozen=True)
Expand Down
30 changes: 28 additions & 2 deletions tests/integration/test_apply_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
from pyspark.sql import Column
from chispa.dataframe_comparer import assert_df_equality # type: ignore
from databricks.labs.dqx.col_functions import is_not_null_and_not_empty, make_condition
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.engine import (
DQEngine,
ExtraParams,
)
from databricks.labs.dqx.rule import DQRule


SCHEMA = "a: int, b: int, c: int"
EXPECTED_SCHEMA = SCHEMA + ", _errors: map<string,string>, _warnings: map<string,string>"
EXPECTED_SCHEMA_WITH_CUSTOM_NAMES = SCHEMA + ", ERROR: map<string,string>, WARN: map<string,string>"
mwojtyczka marked this conversation as resolved.
Show resolved Hide resolved


def test_apply_checks_on_empty_checks(ws, spark):
Expand Down Expand Up @@ -491,3 +494,26 @@ def test_get_invalid_records(ws, spark):
)

assert_df_equality(invalid_df, expected_invalid_df)


def test_apply_checks_with_custom_column_naming(ws, spark):
dq_engine = DQEngine(ws, extra_params=ExtraParams(column_names={'errors': 'ERROR', 'warnings': 'WARN'}))
test_df = spark.createDataFrame([[1, 3, 3], [2, None, 4], [None, 4, None], [None, None, None]], SCHEMA)

checks = [{"criticality": "warn", "check": {"function": "col_test_check_func", "arguments": {"col_name": "a"}}}]
hrfmartins marked this conversation as resolved.
Show resolved Hide resolved
mwojtyczka marked this conversation as resolved.
Show resolved Hide resolved
checked = dq_engine.apply_checks_by_metadata(test_df, checks, globals())
mwojtyczka marked this conversation as resolved.
Show resolved Hide resolved
hrfmartins marked this conversation as resolved.
Show resolved Hide resolved

assert 'ERROR' in checked.columns
hrfmartins marked this conversation as resolved.
Show resolved Hide resolved
assert 'WARN' in checked.columns
hrfmartins marked this conversation as resolved.
Show resolved Hide resolved

expected = spark.createDataFrame(
[
[1, 3, 3, None, None],
[2, None, 4, None, None],
[None, 4, None, None, {"col_a_is_null_or_empty": "new check failed"}],
[None, None, None, None, {"col_a_is_null_or_empty": "new check failed"}],
],
EXPECTED_SCHEMA_WITH_CUSTOM_NAMES,
)

assert_df_equality(checked, expected, ignore_nullable=True)
Loading