Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Validation Framework: Source + Product data #1241

Merged
merged 14 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion admin/run_environment/constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ mypy==1.14.1
mypy-boto3-s3==1.35.93
# via boto3-stubs
mypy-extensions==1.0.0
# via mypy
# via
# mypy
# typing-inspect
nest-asyncio==1.6.0
# via ipykernel
networkx==3.4.2
Expand All @@ -359,6 +361,7 @@ numpy==2.2.1
# matplotlib
# pandas
# pandas-stubs
# pandera
# pydeck
# pyogrio
# rasterio
Expand All @@ -381,6 +384,7 @@ packaging==24.2
# geopandas
# ipykernel
# matplotlib
# pandera
# plotly
# pyogrio
# pytest
Expand All @@ -393,10 +397,13 @@ pandas==2.2.3
# geopandas
# leafmap
# mapclassify
# pandera
# streamlit
# streamlit-aggrid
pandas-stubs==2.2.3.241126
# via -r /__w/data-engineering/data-engineering/admin/ops/../run_environment/requirements.in
pandera==0.22.1
# via -r /__w/data-engineering/data-engineering/admin/ops/../run_environment/requirements.in
parsedatetime==2.6
# via agate
parso==0.8.4
Expand Down Expand Up @@ -467,6 +474,7 @@ pydantic==2.10.5
# via
# -r /__w/data-engineering/data-engineering/admin/ops/../run_environment/requirements.in
# dbt-semantic-interfaces
# pandera
pydantic-core==2.27.2
# via pydantic
pydeck==0.9.1
Expand Down Expand Up @@ -675,6 +683,8 @@ traittypes==0.2.1
# via
# bqplot
# ipyleaflet
typeguard==4.4.1
# via pandera
typer==0.15.1
# via -r /__w/data-engineering/data-engineering/admin/ops/../run_environment/requirements.in
types-awscrt==0.23.6
Expand Down Expand Up @@ -719,7 +729,11 @@ typing-extensions==4.12.2
# sqlalchemy
# sqlalchemy-stubs
# streamlit
# typeguard
# typer
# typing-inspect
typing-inspect==0.9.0
# via pandera
tzdata==2024.2
# via pandas
urllib3==2.3.0
Expand Down
1 change: 1 addition & 0 deletions admin/run_environment/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ openpyxl
openpyxl-stubs
pandas
pandas-stubs
pandera
plotly
pre-commit
psycopg2-binary
Expand Down
16 changes: 15 additions & 1 deletion admin/run_environment/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ mypy==1.14.1
mypy-boto3-s3==1.35.93
# via boto3-stubs
mypy-extensions==1.0.0
# via mypy
# via
# mypy
# typing-inspect
nest-asyncio==1.6.0
# via ipykernel
networkx==3.4.2
Expand All @@ -359,6 +361,7 @@ numpy==2.2.1
# matplotlib
# pandas
# pandas-stubs
# pandera
# pydeck
# pyogrio
# rasterio
Expand All @@ -381,6 +384,7 @@ packaging==24.2
# geopandas
# ipykernel
# matplotlib
# pandera
# plotly
# pyogrio
# pytest
Expand All @@ -393,10 +397,13 @@ pandas==2.2.3
# geopandas
# leafmap
# mapclassify
# pandera
# streamlit
# streamlit-aggrid
pandas-stubs==2.2.3.241126
# via -r /__w/data-engineering/data-engineering/admin/ops/../run_environment/requirements.in
pandera==0.22.1
# via -r /__w/data-engineering/data-engineering/admin/ops/../run_environment/requirements.in
parsedatetime==2.6
# via agate
parso==0.8.4
Expand Down Expand Up @@ -467,6 +474,7 @@ pydantic==2.10.5
# via
# -r /__w/data-engineering/data-engineering/admin/ops/../run_environment/requirements.in
# dbt-semantic-interfaces
# pandera
pydantic-core==2.27.2
# via pydantic
pydeck==0.9.1
Expand Down Expand Up @@ -675,6 +683,8 @@ traittypes==0.2.1
# via
# bqplot
# ipyleaflet
typeguard==4.4.1
# via pandera
typer==0.15.1
# via -r /__w/data-engineering/data-engineering/admin/ops/../run_environment/requirements.in
types-awscrt==0.23.6
Expand Down Expand Up @@ -719,7 +729,11 @@ typing-extensions==4.12.2
# sqlalchemy
# sqlalchemy-stubs
# streamlit
# typeguard
# typer
# typing-inspect
typing-inspect==0.9.0
# via pandera
tzdata==2024.2
# via pandas
urllib3==2.3.0
Expand Down
9 changes: 7 additions & 2 deletions dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dcpy.utils.logging import logger

import dcpy.models.product.dataset.metadata as md
import dcpy.models.dataset as dataset
from .utils import SOCRATA_USER, SOCRATA_PASSWORD, _socrata_request

SOCRATA_REVISION_APPLY_TIMEOUT_SECS = 10 * 60 # Ten Mins
Expand Down Expand Up @@ -176,7 +177,9 @@ def __init__(self, col: md.DatasetColumn):
self.display_name = col.name
self.description = col.description
self.is_primary_key = (
bool(col.checks.is_primary_key) if col.checks else False
bool(col.checks.is_primary_key)
if isinstance(col.checks, dataset.Checks)
else False
)

class Attachment(TypedDict):
Expand Down Expand Up @@ -298,7 +301,9 @@ def calculate_pushed_col_metadata(self, our_columns: list[md.DatasetColumn]):
new_col["initial_output_column_id"] = new_col["id"]

new_col["is_primary_key"] = (
True if (our_col.checks and our_col.checks.is_primary_key) else False
bool(our_col.checks.is_primary_key)
if isinstance(our_col.checks, dataset.Checks)
else False
)

new_col["display_name"] = our_col.name
Expand Down
2 changes: 2 additions & 0 deletions dcpy/lifecycle/package/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@
)
)

if isinstance(col.checks, list): # TODO: delete after refactoring
raise NotImplementedError("Must be old dataset.Checks format to run checks")

Check warning on line 222 in dcpy/lifecycle/package/validate.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/package/validate.py#L222

Added line #L222 was not covered by tests
# Check Nulls
if col.checks and col.checks.non_nullable:
if not df_only_col_nulls.empty:
Expand Down
3 changes: 3 additions & 0 deletions dcpy/lifecycle/validate/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from . import pandera_custom_checks

# from .data import run
14 changes: 14 additions & 0 deletions dcpy/lifecycle/validate/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pathlib import Path

Check warning on line 1 in dcpy/lifecycle/validate/data.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/validate/data.py#L1

Added line #L1 was not covered by tests


def run(

Check warning on line 4 in dcpy/lifecycle/validate/data.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/validate/data.py#L4

Added line #L4 was not covered by tests
dataset_id: str,
input_path: Path,
):
# TODO: read in data from input_path to pandas dataframe

# TODO: get dataset template

# TODO: run data checks

raise NotImplementedError

Check warning on line 14 in dcpy/lifecycle/validate/data.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/validate/data.py#L14

Added line #L14 was not covered by tests
9 changes: 9 additions & 0 deletions dcpy/lifecycle/validate/pandera_custom_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pandera import extensions


@extensions.register_check_method(check_type="element_wise")
def is_geom_point(s):
try:
return s.geom_type == "Point"
except ValueError:
return False

Check warning on line 9 in dcpy/lifecycle/validate/pandera_custom_checks.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/validate/pandera_custom_checks.py#L8-L9

Added lines #L8 - L9 were not covered by tests
132 changes: 132 additions & 0 deletions dcpy/lifecycle/validate/pandera_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import pandera as pa
import pandas as pd
import geopandas as gpd
from inspect import (
signature,
) # used for checking expected attributes in a class signuture

from dcpy.models.dataset import Column, CheckAttributes, Checks


def create_check(check: str | dict[str, CheckAttributes]) -> pa.Check:
"""
Creates a Pandera `Check` object from a given check definition.

Args:
check:
A string representing the name of the check or a dictionary with the
check name as the key and check attibutes as the value.
Returns:
pa.Check:
A Pandera `Check` object constructed with the specified parameters.
Raises:
AssertionError:
If the `check` dictionary does not contain exactly one key-value pair.
ValueError:
If the check name is not registered or if attributes cannot be parsed
or used to create a valid `Check`.
"""
allowed_check_names = {
**pa.Check.CHECK_FUNCTION_REGISTRY,
**pa.Check.REGISTERED_CUSTOM_CHECKS,
}

if isinstance(check, str):
check_name = check
check_args = None
elif isinstance(check, dict):
assert len(check) == 1, (
"`utils.create_pa_check` expects exactly 1 key-value pair in `check` param."
)
check_name, check_args = next(iter(check.items()))

if check_name not in allowed_check_names:
raise ValueError(f"Unregistered check name: '{check_name}'.")

# Retrieve constructor for the specified check name from pandera.
# The constructor requires check-specific parameters and also accepts **kwargs
# for generic parameters shared across all Check objects like "description" attribute
check_constructor = getattr(pa.Check, check_name)

if check_args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to make use of it, you could call dcpy.utils.introspect.validate_kwargs here - it would validate types of arguments as well. You'd still need to raise the error based on the returned object

check_expected_params = signature(check_constructor).parameters
invalid_check_keys = set(check_args.args.keys()) - set(
check_expected_params.keys()
)
if invalid_check_keys:
alexrichey marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
f"Invalid argument keys found for check '{check_name}': {invalid_check_keys}. "
f"Valid argument keys are: {sorted(check_expected_params.keys())}."
)

try:
check_obj = (
check_constructor(
**check_args.args,
raise_warning=check_args.warn_only,
description=check_args.description,
name=check_args.name,
title=check_args.title,
n_failure_cases=check_args.n_failure_cases,
groups=check_args.groups,
groupby=check_args.groupby,
ignore_na=check_args.ignore_na,
)
if check_args
else check_constructor()
)
except Exception as e:
raise ValueError(
f"Check '{check_name}' couldn't be created. Error message: {e}"
)

return check_obj


def create_checks(checks: list[str | dict[str, CheckAttributes]]) -> list[pa.Check]:
"""Create Pandera checks."""
pandera_checks = [create_check(check) for check in checks]
return pandera_checks


def create_column_with_checks(column: Column) -> pa.Column:
"""Create Pandera column validator object."""
if isinstance(column.checks, Checks):
raise NotImplementedError(

Check warning on line 95 in dcpy/lifecycle/validate/pandera_utils.py

View check run for this annotation

Codecov / codecov/patch

dcpy/lifecycle/validate/pandera_utils.py#L95

Added line #L95 was not covered by tests
"Pandera checks are not implemented for old Column.checks format"
)
data_checks = create_checks(column.checks) if column.checks else None
return pa.Column(
# TODO: implement `dtype` param
coerce=True, # coerce column to defined data type. This decision is up for debate
checks=data_checks,
required=column.is_required,
description=column.description,
nullable=True, # TODO: temp solution. Need to figure out what to do with this (equivalent to can be null)
)


def run_data_checks(
df: pd.DataFrame | gpd.GeoDataFrame, columns: list[Column]
) -> pd.DataFrame | gpd.GeoDataFrame:
"""
Validate a DataFrame or GeoDataFrame against a schema defined by a list of columns with Pandera.

Args:
df (pd.DataFrame | gpd.GeoDataFrame): The input DataFrame to validate.
columns (list[Column]): List of column definitions specifying validation rules.

Raises:
AssertionError: If column names in `columns` are not unique.
"""

column_names = [column.id for column in columns]
assert len(column_names) == len(set(column_names)), (
"Columns should have unique names"
)

dataframe_checks = {}
for column in columns:
dataframe_checks[column.id] = create_column_with_checks(column)

return pa.DataFrameSchema(dataframe_checks).validate(df)
Loading