diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 4406825934..9b859c6448 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -64,7 +64,6 @@ core_module_groups = { - "_core_eia176": [pudl.transform.eia176], "core_assn": [glue_assets], "core_censusdp1tract": [ pudl.convert.censusdp1tract_to_sqlite, diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index 4ad9e59ef2..484fedaf83 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -35,15 +35,6 @@ def process_raw( selection = self._metadata._get_partition_selection(partition) return df.assign(report_year=selection) - def process_renamed( - self, df: pd.DataFrame, page: str, **partition: PartitionSelection - ) -> pd.DataFrame: - """Strip and lowercase raw text fields (except ID).""" - text_fields = ["area", "atype", "company", "item"] - for tf in text_fields: - df[tf] = df[tf].str.strip().str.lower() - return df - raw_eia176__all_dfs = raw_df_factory(Extractor, name="eia176") diff --git a/src/pudl/transform/__init__.py b/src/pudl/transform/__init__.py index 68451cc9b9..a9faccbd6d 100644 --- a/src/pudl/transform/__init__.py +++ b/src/pudl/transform/__init__.py @@ -63,7 +63,6 @@ from . import ( classes, eia, - eia176, eia860, eia860m, eia861, diff --git a/src/pudl/transform/eia176.py b/src/pudl/transform/eia176.py deleted file mode 100644 index ff6fa2c6ef..0000000000 --- a/src/pudl/transform/eia176.py +++ /dev/null @@ -1,113 +0,0 @@ -"""Module to perform data cleaning functions on EIA176 data tables.""" - -import pandas as pd -from dagster import AssetCheckResult, AssetIn, AssetOut, asset_check, multi_asset - -from pudl.logging_helpers import get_logger - -logger = get_logger(__name__) - - -@multi_asset( - outs={ - "core_eia176__yearly_company_data": AssetOut(), - "core_eia861__yearly_aggregate_data": AssetOut(), - }, -) -def _core_eia176__data( - raw_eia176__data: pd.DataFrame, -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Take raw list and return two wide tables with primary keys and one column per variable. - - One table with data for each year and company, one with state- and US-level aggregates per year. - """ - raw_eia176__data = raw_eia176__data.astype({"report_year": int, "value": float}) - raw_eia176__data["variable_name"] = ( - raw_eia176__data["line"] + "_" + raw_eia176__data["atype"] - ) - - aggregate_primary_key = ["report_year", "area"] - company_primary_key = aggregate_primary_key + ["id"] - company_drop_columns = ["itemsort", "item", "atype", "line", "company"] - # We must drop 'id' here and cannot use as primary key because its arbitrary/duplicate in aggregate records - # 'id' is a reliable ID only in the context of granular company data - aggregate_drop_columns = company_drop_columns + ["id"] - - long_company = raw_eia176__data.loc[ - raw_eia176__data.company != "total of all companies" - ] - wide_company = get_wide_table( - long_table=long_company.drop(columns=company_drop_columns), - primary_key=company_primary_key, - ) - - long_aggregate = raw_eia176__data.loc[ - raw_eia176__data.company == "total of all companies" - ] - wide_aggregate = get_wide_table( - long_table=long_aggregate.drop(columns=aggregate_drop_columns), - primary_key=aggregate_primary_key, - ) - - return wide_company, wide_aggregate - - -def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataFrame: - """Take a 'long' or entity-attribute-value table and return a wide table with one column per attribute/variable.""" - unstacked = long_table.set_index(primary_key + ["variable_name"]).unstack( - level="variable_name" - ) - unstacked.columns = unstacked.columns.droplevel(0) - unstacked.columns.name = None # gets rid of "variable_name" name of columns index - return unstacked.reset_index().fillna(0) - - -@asset_check( - asset="core_eia176__yearly_company_data", - additional_ins={"core_eia861__yearly_aggregate_data": AssetIn()}, - blocking=True, -) -def validate_totals( - core_eia176__yearly_company_data: pd.DataFrame, - core_eia861__yearly_aggregate_data: pd.DataFrame, -) -> AssetCheckResult: - """Compare reported and calculated totals for different geographical aggregates, report any differences.""" - # First make it so we can directly compare reported aggregates to groupings of granular data - comparable_aggregates = core_eia861__yearly_aggregate_data.sort_values( - ["report_year", "area"] - ).fillna(0) - - # Group company data into state-level data and compare to reported totals - state_data = ( - core_eia176__yearly_company_data.drop(columns="id") - .groupby(["report_year", "area"]) - .sum() - .reset_index() - ) - aggregate_state = comparable_aggregates[ - comparable_aggregates.area != "u.s. total" - ].reset_index(drop=True) - # Compare using the same columns - state_diff = aggregate_state[state_data.columns].compare(state_data) - - # Group calculated state-level data into US-level data and compare to reported totals - us_data = ( - state_data.drop(columns="area") - .groupby("report_year") - .sum() - .sort_values("report_year") - .reset_index() - ) - aggregate_us = ( - comparable_aggregates[comparable_aggregates.area == "u.s. total"] - .drop(columns="area") - .sort_values("report_year") - .reset_index(drop=True) - ) - # Compare using the same columns - us_diff = aggregate_us[us_data.columns].compare(us_data) - - return AssetCheckResult(passed=bool(us_diff.empty and state_diff.empty)) - - -# TODO: Reasonable boundaries -- in a script/notebook in the 'validate' directory? How are those executed? diff --git a/test/unit/transform/eia176_test.py b/test/unit/transform/eia176_test.py deleted file mode 100644 index a0f5ced30c..0000000000 --- a/test/unit/transform/eia176_test.py +++ /dev/null @@ -1,207 +0,0 @@ -import pandas as pd -from pytest import fixture - -from pudl.transform.eia176 import _core_eia176__data, get_wide_table, validate_totals - -COLUMN_NAMES = [ - "area", - "atype", - "company", - "id", - "line", - "report_year", - "value", - "itemsort", - "item", -] - -ID_1 = "17673850NM" -VOLUME_1 = 30980426.0 -COMPANY_1 = [ - "new mexico", - "vl", - "new mexico gas company", - ID_1, - "1010", - "2022", - VOLUME_1, - "[10.1]", - "residential sales volume", -] - -ID_2 = "17635017NM" -VOLUME_2 = 532842.0 -COMPANY_2 = [ - "new mexico", - "vl", - "west texas gas inc", - ID_2, - "1010", - "2022", - VOLUME_2, - "[10.1]", - "residential sales volume", -] - -NM_VOLUME = VOLUME_1 + VOLUME_2 -NM_AGGREGATE = [ - "new mexico", - "vl", - "total of all companies", - # Aggregates appear to reuse an arbitrary company ID - ID_1, - "1010", - "2022", - NM_VOLUME, - "[10.1]", - "residential sales volume", -] - -ID_3 = "17635017TX" -VOLUME_3 = 1.0 -COMPANY_3 = [ - "texas", - "vl", - "west texas gas inc", - ID_3, - "1010", - "2022", - VOLUME_3, - "[10.1]", - "residential sales volume", -] - -TX_VOLUME = VOLUME_3 -TX_AGGREGATE = [ - "texas", - "vl", - "total of all companies", - # Aggregates appear to reuse an arbitrary company ID - ID_3, - "1010", - "2022", - VOLUME_3, - "[10.1]", - "residential sales volume", -] - -US_VOLUME = NM_VOLUME + TX_VOLUME -US_AGGREGATE = [ - "u.s. total", - "vl", - "total of all companies", - # Aggregates appear to reuse an arbitrary company ID - ID_1, - "1010", - "2022", - US_VOLUME, - "[10.1]", - "residential sales volume", -] - -ID_4 = "4" -VOLUME_4 = 4.0 -COMPANY_4 = [ - "alaska", - "vl", - "alaska gas inc", - ID_4, - "1020", - "2022", - VOLUME_4, - "[10.2]", - "some other volume", -] - -DROP_COLS = ["itemsort", "item", "atype", "line", "company"] - - -@fixture -def df(): - df = pd.DataFrame(columns=COLUMN_NAMES) - df.loc[0] = COMPANY_1 - df.loc[1] = COMPANY_2 - df.loc[2] = NM_AGGREGATE - df.loc[3] = COMPANY_3 - df.loc[4] = TX_AGGREGATE - df.loc[5] = US_AGGREGATE - df.loc[6] = COMPANY_4 - df = df.set_index(["area", "company"]) - return df - - -def test_core_eia176__data(df): - eav_model = df.loc[ - [ - ("new mexico", "new mexico gas company"), - ("new mexico", "total of all companies"), - ] - ].reset_index() - - wide_company, wide_aggregate = _core_eia176__data(eav_model) - assert wide_company.shape == (1, 4) - - company_row = wide_company.loc[0] - assert list(company_row.index) == ["report_year", "area", "id", "1010_vl"] - assert list(company_row.values) == [2022, "new mexico", ID_1, VOLUME_1] - - assert wide_aggregate.shape == (1, 3) - aggregate_row = wide_aggregate.loc[0] - assert list(aggregate_row.index) == ["report_year", "area", "1010_vl"] - assert list(aggregate_row.values) == [2022, "new mexico", NM_VOLUME] - - -def test_get_wide_table(df): - long_table = df.loc[ - [ - ("new mexico", "new mexico gas company"), - ("new mexico", "west texas gas inc"), - # Row measuring a different variable to test filling NAs - ("alaska", "alaska gas inc"), - ] - ].reset_index() - - long_table["variable_name"] = long_table["line"] + "_" + long_table["atype"] - long_table = long_table.drop(columns=DROP_COLS) - - primary_key = ["report_year", "area", "id"] - wide_table = get_wide_table(long_table, primary_key) - - assert wide_table.shape == (3, 5) - assert list(wide_table.loc[0].index) == [ - "report_year", - "area", - "id", - "1010_vl", - "1020_vl", - ] - assert list(wide_table.loc[0].values) == ["2022", "alaska", ID_4, 0, VOLUME_4] - assert list(wide_table.loc[1].values) == ["2022", "new mexico", ID_2, VOLUME_2, 0] - assert list(wide_table.loc[2].values) == ["2022", "new mexico", ID_1, VOLUME_1, 0] - - -def test_validate__totals(df): - # Our test data will have only measurements for this 1010_vl variable - company_data = df.loc[ - [ - ("new mexico", "new mexico gas company"), - ("new mexico", "west texas gas inc"), - ("texas", "west texas gas inc"), - ] - ].reset_index() - # Add the value for the 1010_vl variable - company_data["1010_vl"] = [str(v) for v in [VOLUME_1, VOLUME_2, VOLUME_3]] - company_data = company_data.drop(columns=DROP_COLS) - - aggregate_data = df.loc[ - [ - ("new mexico", "total of all companies"), - ("texas", "total of all companies"), - ("u.s. total", "total of all companies"), - ] - ].reset_index() - # Add the value for the 1010_vl variable - aggregate_data["1010_vl"] = [str(v) for v in [NM_VOLUME, TX_VOLUME, US_VOLUME]] - aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id"]) - - validate_totals(company_data, aggregate_data)