Skip to content

Commit

Permalink
Merge pull request #10 from hrfmartins/refactor/refactored-to-classes
Browse files Browse the repository at this point in the history
refactor: refactored to classes definition on validators, allowing fo…
  • Loading branch information
hrfmartins authored Sep 3, 2024
2 parents ae4d50e + 3bbd519 commit 9bce009
Show file tree
Hide file tree
Showing 35 changed files with 518 additions and 325 deletions.
19 changes: 11 additions & 8 deletions dqframework/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import polars as pl

from dqframework.validators import Validator

columns = [
"id"
"check_id"
Expand Down Expand Up @@ -37,7 +39,7 @@ def __init__(
):
self.level = level
self.check_name: str = check_name
self.validations = []
self.validations: list[Validator] = []
self.pass_threshold = pass_threshold
self.check_id = str(uuid.uuid4())

Expand All @@ -46,16 +48,16 @@ def __call__(
) -> (pl.DataFrame, pl.DataFrame, pl.DataFrame):
if not self.validations:
raise ValueError("No validations added to the check")
correct_acc = df
correct_acc = pl.DataFrame()
incorrect_acc = pl.DataFrame()

dq_metrics = pl.DataFrame()
for validation in self.validations:
val_id = str(uuid.uuid4())
rule = validation[0]
column = validation[1]
rule = validation.__class__.__name__
column = validation.column

correct, incorrect = rule(correct_acc, *validation[1:])
correct, incorrect = validation.execute(df)
correct_acc = correct

# tag the incorrect with the check_id that failed
Expand All @@ -73,8 +75,8 @@ def __call__(
df,
incorrect,
column,
rule.__name__,
str(*validation[2:]),
rule,
str(validation.get_value()),
str(val_id),
),
]
Expand Down Expand Up @@ -144,7 +146,7 @@ def execute(self, df: pl.DataFrame):
self.results = results_df
self.status = Pipeline.Status.EXECUTED

return PipelineResults(aux_df, invalid_records, results_df)
return PipelineResults(df, aux_df, invalid_records, results_df)

def results_to_csv(self, path: str):
if self.status == Pipeline.Status.NOT_EXECUTED:
Expand All @@ -167,6 +169,7 @@ def results_to_json(self, path: str):

@dataclass
class PipelineResults:
original_records: pl.DataFrame
valid_records: pl.DataFrame
invalid_records: pl.DataFrame
results: pl.DataFrame
275 changes: 35 additions & 240 deletions dqframework/validators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,240 +1,35 @@
from datetime import datetime

import polars as pl


def has_min(df: pl.DataFrame, column: str, value: int) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has a minimum value of value
Parameters
:param df: DataFrame
:param column: name of the column to check
:param value: minimum value to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column) >= value)
incorrect = df.filter(pl.col(column) < value)

return correct, incorrect


def has_string_pattern(
df: pl.DataFrame, column: str, value: str
) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has a string pattern
Parameters
:param df: DataFrame
:param column: name of the column to check
:param value: pattern to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column).str.contains(value))
incorrect = df.filter(~pl.col(column).str.contains(value))

return correct, incorrect


def has_date_pattern(
df: pl.DataFrame, column: str, value: str = "%Y-%m-%d"
) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has a date pattern
Parameters
:param df: DataFrame
:param column: name of the column to check
:param value: pattern to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column).str.to_date(value, strict=False).is_not_null())
incorrect = df.filter(pl.col(column).str.to_date(value, strict=False).is_null())

return correct, incorrect


def has_max(df: pl.DataFrame, column: str, value: int) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has a maximum value of value
Parameters
:param df: DataFrame
:param column: name of the column to check
:param value: maximum value to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column) <= value)
incorrect = df.filter(pl.col(column) > value)

return correct, incorrect


def has_string_length(
df: pl.DataFrame, column: str, value: int
) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has a string length of value
Parameters
:param df: DataFrame
:param column: name of the column to check
:param value: length to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column).str.len_chars() == value)
incorrect = df.filter(pl.col(column).str.len_chars() != value)

return correct, incorrect


def has_string_length_between(
df: pl.DataFrame, column: str, min_value: int, max_value: int
) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has a string length between min_value and max_value
Parameters
:param df: DataFrame
:param column: name of the column to check
:param min_value: minimum length to check
:param max_value: maximum length to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(
(pl.col(column).str.len_chars() >= min_value)
& (pl.col(column).str.len_chars() <= max_value)
)
incorrect = df.filter(
(pl.col(column).str.len_chars() < min_value)
| (pl.col(column).str.len_chars() > max_value)
)

return correct, incorrect


def has_between(
df: pl.DataFrame, column: str, min_value: int, max_value: int
) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has a value between min_value and max_value
Parameters
:param df: DataFrame
:param column: name of the column to check
:param min_value: minimum value to check
:param max_value: maximum value to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter((pl.col(column) >= min_value) & (pl.col(column) <= max_value))
incorrect = df.filter((pl.col(column) < min_value) | (pl.col(column) > max_value))

return correct, incorrect


def has_max_length(
df: pl.DataFrame, column: str, value: int
) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has a maximum string length of value
Parameters
:param df: DataFrame
:param column: name of the column to check
:param value: maximum length to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column).str.len_chars() <= value)
incorrect = df.filter(pl.col(column).str.len_chars() > value)

return correct, incorrect


def is_complete(df: pl.DataFrame, column: str) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column is complete
Parameters
:param df: DataFrame
:param column: name of the column to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column).is_not_null())
incorrect = df.filter(pl.col(column).is_null())

return correct, incorrect


def is_unique(df: pl.DataFrame, column: str) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has unique values
Parameters
:param df: DataFrame
:param column: name of the column to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column).is_unique())
incorrect = df.filter(~pl.col(column).is_unique())

return correct, incorrect


def is_composite_key(df: pl.DataFrame, columns: list) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame has a composite key
Parameters
:param df: DataFrame
:param columns: list of columns to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(df.select(columns).is_unique())
incorrect = df.filter(~df.select(columns).is_unique())

return correct, incorrect


def no_future_dates(df: pl.DataFrame, column: str, date: datetime = datetime.today()):
"""
Check if a DataFrame column has no future dates
:param df: Dataframe
:param column: name of the column to check
:param date: datetime.datetime (default is right now)
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column) <= pl.lit(datetime.now()))
incorrect = df.filter(pl.col(column) > pl.lit(datetime.now()))

return correct, incorrect


def is_in(df: pl.DataFrame, column: str, values: list) -> (pl.DataFrame, pl.DataFrame):
"""
Check if a DataFrame column has values in a list
Parameters
:param df: DataFrame
:param column: name of the column to check
:param values: list of values to check
:return: (pl.DataFrame, pl.DataFrame) - correct, incorrect
"""

correct = df.filter(pl.col(column).is_in(values))
incorrect = df.filter(~pl.col(column).is_in(values))

return correct, incorrect
from .base_validator import Validator
from .base_validator import Validator
from .has_between import HasBetween
from .has_between import HasBetween
from .has_date_pattern import HasDatePattern
from .has_max import HasMax
from .has_min import HasMin
from .has_str_length import HasStrLength
from .has_str_length_between import HasStrLengthBetween
from .has_str_max_length import HasStrMaxLength
from .has_str_min_length import HasStrMinLength
from .has_str_pattern import HasStrPattern
from .is_complete import IsComplete
from .is_composite_key import IsCompositeKey
from .is_in import IsIn
from .is_unique import IsUnique
from .no_future_dates import NoFutureDates

__all__ = [
"HasBetween",
"Validator",
"HasDatePattern",
"HasMax",
"HasMin",
"HasStrLength",
"HasStrLengthBetween",
"HasStrMaxLength",
"HasStrMaxLength",
"HasStrPattern",
"IsComplete",
"IsCompositeKey",
"IsIn",
"IsUnique",
"NoFutureDates",
]
16 changes: 16 additions & 0 deletions dqframework/validators/base_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import abc
from typing import Optional

import polars as pl


class Validator(abc.ABC):
def __init__(self, column: str | list[str], value: Optional = None):
self.column = column
self.value = value

def execute(self, df: pl.DataFrame):
pass

def get_value(self):
return self.value
9 changes: 9 additions & 0 deletions dqframework/validators/check_operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from enum import Enum


class CheckOperator(Enum):
GT = "BIGGER_THAN"
ST = "SMALLER_THAN"
GE = "GREATER_OR_EQUAL_THAN"
SE = "GREATER_OR_EQUAL_THAN"
EQ = "EQUAL"
22 changes: 22 additions & 0 deletions dqframework/validators/has_between.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import polars as pl

from .base_validator import Validator


class HasBetween(Validator):
def __init__(self, column: str, min_value: int, max_value: int):
super().__init__(column, (min_value, max_value))
self.min_value = min_value
self.max_value = max_value

def validate(self, df: pl.DataFrame) -> (pl.DataFrame, pl.DataFrame):
correct = df.filter(
(pl.col(self.column) >= self.min_value)
& (pl.col(self.column) <= self.max_value)
)
incorrect = df.filter(
(pl.col(self.column) < self.min_value)
| (pl.col(self.column) > self.max_value)
)

return correct, incorrect
Loading

0 comments on commit 9bce009

Please sign in to comment.