Skip to content

Commit

Permalink
Finish validation prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
zschira committed Jan 3, 2025
1 parent a90fbf0 commit b78b53a
Showing 1 changed file with 301 additions and 9 deletions.
310 changes: 301 additions & 9 deletions notebooks/work-in-progress/validation_prototypes.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"id": "07ae2e83-02ea-420f-88ea-b74047d92177",
"metadata": {},
"source": [
"## Helper tools\n",
"Define some useful tools for prepping tests."
"## Read assets\n",
"In production dagster will handle loading assets, so using a helper function for testing purposes."
]
},
{
Expand All @@ -35,11 +35,10 @@
"metadata": {},
"outputs": [],
"source": [
"from pudl.etl import defs\n",
"import pandas as pd\n",
"\n",
"def _get_asset(asset_key: str) -> pd.DataFrame:\n",
" return defs.load_asset_value(asset_key)"
" return pd.read_parquet(f\"https://s3.us-west-2.amazonaws.com/pudl.catalyst.coop/nightly/{asset_key}.parquet\")"
]
},
{
Expand All @@ -61,26 +60,31 @@
"import great_expectations as gx\n",
"from typing import Type, Any\n",
"from dataclasses import dataclass\n",
"from collections.abc import Callable\n",
"\n",
"@dataclass\n",
"class ValidationResult:\n",
" \"\"\"This would be an AssetCheckResult in production.\"\"\"\n",
" passed: bool\n",
" metadata: dict\n",
" description: str | None\n",
" context: Any\n",
"\n",
"def validation_factory(\n",
" asset_name: str,\n",
" expectation: Type[gx.expectations.Expectation],\n",
" expectation_config: dict,\n",
" preprocessing_func: Callable | None = None,\n",
" fast_etl_expectation_config: dict | None = None,\n",
" description: str | None = None,\n",
"):\n",
" \"\"\"Return a function which will execute a great expectations expectation.\"\"\"\n",
" def _validation():\n",
" df = _get_asset(asset_name)\n",
"\n",
" # Apply preprocessing\n",
" if preprocessing_func is not None:\n",
" df = preprocessing_func(df)\n",
"\n",
" # Connect to data\n",
" context = gx.get_context()\n",
" batch = context.data_sources.pandas_default.read_dataframe(df)\n",
Expand All @@ -95,8 +99,17 @@
" passed=validation_result.success,\n",
" description=description,\n",
" metadata=validation_result.result,\n",
" context=context,\n",
" )"
" )\n",
" return _validation"
]
},
{
"cell_type": "markdown",
"id": "3045f14d-aa95-46c7-9db9-e49e9264c2af",
"metadata": {},
"source": [
"### Prototype FERC1 bounds check\n",
"Using a fairly basic example test case, this API feels a bit clunky and confusing. There's a lot you have to understand to use it effectively."
]
},
{
Expand All @@ -108,8 +121,287 @@
"source": [
"validation = validation_factory(\n",
" \"out_ferc1__yearly_steam_plants_fuel_by_plant_sched402\",\n",
" gx.exp"
" gx.expectations.ExpectColumnQuantileValuesToBeBetween,\n",
" expectation_config={\n",
" \"column\": \"gas_cost_per_mmbtu\",\n",
" \"quantile_ranges\": {\n",
" \"quantiles\": [0.05, 0.50, 0.90],\n",
" \"value_ranges\": [[1.5, 2.1], [2.0, 10.0], [8.0, 15.0]],\n",
" },\n",
" },\n",
" preprocessing_func=lambda df: pd.DataFrame(\n",
" {\n",
" \"gas_cost_per_mmbtu\": (\n",
" df[\"gas_fraction_cost\"] * df[\"fuel_cost\"]\n",
" ) / (df[\"gas_fraction_mmbtu\"] * df[\"fuel_mmbtu\"])\n",
" }\n",
" )\n",
")()"
]
},
{
"cell_type": "markdown",
"id": "99cadf8c-9747-44a0-bc4d-aacaa71a677d",
"metadata": {},
"source": [
"## Utility function API\n",
"An alternate approach to the factory function based API is to just provide a selection of utility/helper functions and having developers create `asset_check`'s directly. This will allow for a clear and straightforward place to manipulate assets before executing an expectation.\n",
"\n",
"The first obvious utility function we would need is one that will take a dataframe and a preconfigured expectation, then handle the boiler plate GX setup and execute the expectation."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2c0e7852-b901-4f92-a017-0fc6b8b8bb3e",
"metadata": {},
"outputs": [],
"source": [
"def validate_expectation(df, expectation: gx.expectations.Expectation, description: str | None = None) -> ValidationResult:\n",
" # Connect to data\n",
" context = gx.get_context()\n",
" batch = context.data_sources.pandas_default.read_dataframe(df)\n",
"\n",
" # Execute\n",
" validation_result = batch.validate(expectation)\n",
"\n",
" return ValidationResult(\n",
" passed=validation_result.success,\n",
" description=description,\n",
" metadata=validation_result.result,\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "763d957f-a6b6-4b4c-8fe3-a1a51a20063d",
"metadata": {},
"source": [
"### Prototype FERC1 bounds check\n",
"We'll re-implement the FERC bounds check from above using this new API. This approach feels much more readable to me."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "087cebf9-ceb3-4f1f-8157-645f05e407a3",
"metadata": {},
"outputs": [],
"source": [
"#Apply asset_check decorator here\n",
"def ferc1_fbp_bounds_check() -> ValidationResult:\n",
" df = _get_asset(\"out_ferc1__yearly_steam_plants_fuel_by_plant_sched402\")\n",
" df = pd.DataFrame(\n",
" {\n",
" \"gas_cost_per_mmbtu\": (\n",
" df[\"gas_fraction_cost\"] * df[\"fuel_cost\"]\n",
" ) / (df[\"gas_fraction_mmbtu\"] * df[\"fuel_mmbtu\"])\n",
" }\n",
" )\n",
" expectation = gx.expectations.ExpectColumnQuantileValuesToBeBetween(\n",
" # Get configuration based on fast/full etl in production\n",
" column=\"gas_cost_per_mmbtu\",\n",
" quantile_ranges={\n",
" \"quantiles\": [0.05, 0.50, 0.90],\n",
" \"value_ranges\": [[1.5, 2.1], [2.0, 10.0], [8.0, 15.0]],\n",
" },\n",
"\n",
" )\n",
" return validate_expectation(df, expectation)\n",
"ferc1_fbp_bounds_check()"
]
},
{
"cell_type": "markdown",
"id": "9bc5da0e-c896-4bf6-9e48-678a5495da25",
"metadata": {},
"source": [
"### Prototype `vs_historical` checks\n",
"#### Problem 1: Access multiple tables\n",
"To demonstrate a more complex use case, we'll prototype the `test_agg_vs_historical` validation function for EIA 923 boiler fuel data (found in `test/validate/bf_eia923_test.py`). This method compares the `out_eia923__boiler_fuel` table to aggregated versions of this table. This means that the `asset_check` needs access to multiple assets, which is not how `asset_check`'s normally work.\n",
"\n",
"There are a couple options I see for handling this multi-asset need:\n",
"\n",
"1. Apply `asset_check` to the downstream aggregated asset and just read the upstream asset from parquet\n",
"\n",
"This approach should work fine for this use case, but it embeds an implicit dependency between required assets. If we were to write a validation that looks at two assets that don't have a direct dependency, then you could end up running the `asset_check` before one of the assets has been materialized.\n",
"\n",
"2. Create stub assets that depend on all necessary upstream assets and apply `asset_check` to this downstream stub. This asset wouldn't actually do anything, but guarantee that upstream assets have been run before running the `asset_check`.\n",
"\n",
"I think option `2.` seems more robust and clear to me. Either way for the purpose of this notebook we'll simply load both assets from parquet.\n",
"\n",
"#### Problem 2: Computing weighted quantile\n",
"GX has tooling right out of the box for checking quantiles, but nothing for working with weighted quantiles. Perhaps there's a way to preprocess the data, then apply the basic quantile check tools, but this doesn't seem possible to me. Another option would be to develop a custom SQL based expectation that will check weighted quantiles. I spent quite a bit of time trying to get this working with no luck. I was able to develop a query that mostly mimicks the behavior of the existing `vs_historical` function and works as expected using duckdb. However, when trying to use this query with GX I couldn't get it working and the `UnexpectedRowsExpectation` returns very minimal feedback as to what went wrong.\n",
"\n",
"Fortunately, by using `asset_check`'s as the basis for the framework, we aren't explicitly tied to GX and can simply write plain python validations. For the time being, I would recommend simply wrapping the existing method in an `asset_check`.\n",
"\n",
"Below is a demonstration of the SQL query if we ever want to try to get the `UnexpectedRowsExpectation` version working:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3dc0d45f-ddaf-4efd-97fd-12d079ea50e3",
"metadata": {},
"outputs": [],
"source": [
"# Prep parameters\n",
"\n",
"from pudl.validate import historical_distribution\n",
"\n",
"bf_eia923 = _get_asset(\"out_eia923__boiler_fuel\")\n",
"bf_eia923_agg = _get_asset(\"out_eia923__monthly_boiler_fuel\")\n",
"\n",
"bf_eia923 = bf_eia923[bf_eia923[\"fuel_type_code_pudl\"] == \"coal\"]\n",
"bf_eia923_agg = bf_eia923_agg[bf_eia923_agg[\"fuel_type_code_pudl\"] == \"coal\"]\n",
"\n",
"lower_bound = min(historical_distribution(\n",
" bf_eia923,\n",
" data_col=\"ash_content_pct\",\n",
" weight_col=\"fuel_consumed_units\",\n",
" quantile=0.2\n",
"))\n",
"\n",
"data_col = \"ash_content_pct\"\n",
"weight_col = \"fuel_consumed_units\"\n",
"quantile = 0.2"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3dce0c4a-3967-4454-bf4f-8d8a15bb2cdb",
"metadata": {},
"outputs": [],
"source": [
"# Create query with parameters\n",
"query = (\n",
" \"WITH CumulativeWeights AS ( \"\n",
" \" SELECT \"\n",
" f\" {data_col}, \"\n",
" f\" {weight_col}, \"\n",
" f\" SUM({weight_col}) OVER (ORDER BY {data_col}) AS cumulative_weight, \"\n",
" f\" SUM({weight_col}) OVER () AS total_weight \"\n",
" \" FROM bf \"\n",
" \" WHERE fuel_type_code_pudl='coal' \"\n",
" \"), \"\n",
" \"QuantileData AS ( \"\n",
" \" SELECT \"\n",
" f\" {data_col}, \"\n",
" f\" {weight_col}, \"\n",
" \" cumulative_weight, \"\n",
" \" total_weight, \"\n",
" \" cumulative_weight / total_weight AS cumulative_probability \"\n",
" \" FROM CumulativeWeights \"\n",
" \")\"\n",
" f\"SELECT {data_col} \"\n",
" \"FROM QuantileData \"\n",
" f\"WHERE cumulative_probability >= {quantile} AND {data_col} < {lower_bound} \"\n",
" f\"ORDER BY {data_col} \"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "586ef024-c139-458d-a039-30c3cc2b702a",
"metadata": {},
"outputs": [],
"source": [
"# Execute query using duckdb\n",
"import duckdb\n",
"\n",
"asset_key = \"out_eia923__monthly_boiler_fuel\"\n",
"bf = duckdb.read_parquet(f\"https://s3.us-west-2.amazonaws.com/pudl.catalyst.coop/nightly/{asset_key}.parquet\")\n",
"duckdb.query(query).fetchall()"
]
},
{
"cell_type": "markdown",
"id": "45dae615-972c-489d-9461-147e169eb9e5",
"metadata": {},
"source": [
"### Prototype high memory usage validations\n",
"GX has flexible options for connecting to data, which could provide support for high-memory usecases, but unfortunately none of this options quite fit with our architecture. In these cases, we can once again fallback on the `asset_check`'s underlying the framework, and use duckdb for highly efficient validations. A simple and flexible solution to enable this would be to provide a factory function that takes the asset name, and a SQL query that is expected to return no rows. This factory will then generate an `asset_check`, which will handle boiler plate setup, execute the query on the asset and return a failure with details if the query returns one or more rows."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0cf1ec40-fb8c-434f-a901-12b2dc36283c",
"metadata": {},
"outputs": [],
"source": [
"def duckdb_asset_check_factory(\n",
" asset_key: str,\n",
" query: str,\n",
" description: str | None = None,\n",
" limit: int | None = 10,\n",
"):\n",
" # Apply `asset_check` decorator here\n",
" def asset_check_factory():\n",
" # Append limit to query\n",
" modified_query = f\"{query} LIMIT {limit};\" \n",
" asset = duckdb.read_parquet(f\"https://s3.us-west-2.amazonaws.com/pudl.catalyst.coop/nightly/{asset_key}.parquet\")\n",
" returned_rows = duckdb.query(modified_query).fetchall()\n",
"\n",
" return ValidationResult(\n",
" passed=len(returned_rows) == 0,\n",
" description=description,\n",
" metadata={\"extra_rows\": returned_rows, \"num_extra_rows\": len(returned_rows)},\n",
" )\n",
" return asset_check_factory"
]
},
{
"cell_type": "markdown",
"id": "84786d5b-d17f-4dad-8068-c0b60b95d4a9",
"metadata": {},
"source": [
"#### Demonstrate with VCERARE asset"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8fa39861-ad6d-472c-8fec-15e3b020e42b",
"metadata": {},
"outputs": [],
"source": [
"duckdb_asset_check_factory(\n",
" asset_key=\"out_vcerare__hourly_available_capacity_factor\",\n",
" query=\"SELECT county_or_lake_name FROM asset WHERE county_or_lake_name IS NULL\",\n",
")()"
]
},
{
"cell_type": "markdown",
"id": "ff17c261-a5ea-4b29-a28d-84f70045124c",
"metadata": {},
"source": [
"And a failing case."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b58cfb8f-45f4-41a2-9502-0ea33788348b",
"metadata": {},
"outputs": [],
"source": [
"duckdb_asset_check_factory(\n",
" asset_key=\"out_vcerare__hourly_available_capacity_factor\",\n",
" query=\"SELECT * FROM asset WHERE capacity_factor_solar_pv > 1.0\",\n",
")()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "34a3380d-6ec8-4ff6-b21d-04c8c0f6ad16",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand All @@ -128,7 +420,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.7"
"version": "3.12.8"
}
},
"nbformat": 4,
Expand Down

0 comments on commit b78b53a

Please sign in to comment.