diff --git a/src/pudl/helpers.py b/src/pudl/helpers.py index ddf8d6666d..343cabda5a 100644 --- a/src/pudl/helpers.py +++ b/src/pudl/helpers.py @@ -1815,10 +1815,10 @@ def assert_cols_areclose( # instead of just whether or not there are matches. mismatch = df.loc[ ~np.isclose( - df[a_cols], - df[b_cols], + np.ma.masked_where(np.isnan(df[a_cols]), df[a_cols]), + np.ma.masked_where(np.isnan(df[b_cols]), df[b_cols]), equal_nan=True, - ) + ).filled() ] mismatch_ratio = len(mismatch) / len(df) if mismatch_ratio > mismatch_threshold: diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 289398aaba..a8b8512c61 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -729,6 +729,48 @@ def filter_for_freshest_data( return deduped + @staticmethod + def refine_report_year(df: pd.DataFrame, xbrl_years: list[int]) -> pd.DataFrame: + """Set a fact's report year by its actual dates. + + Sometimes a fact belongs to a context which has no ReportYear + associated with it; other times there are multiple ReportYears + associated with a single filing. In these cases the report year of a + specific fact may be associated with the other years in the filing. + + In many cases we can infer the actual report year from the fact's + associated time period - either duration or instant. + """ + is_duration = len({"start_date", "end_date"} - set(df.columns)) == 0 + is_instant = "date" in df.columns + + def get_year(df: pd.DataFrame, col: str) -> pd.Series: + datetimes = pd.to_datetime(df.loc[:, col]) + if datetimes.isna().any(): + raise ValueError(f"{col} has null values!") + return datetimes.apply(lambda x: x.year) + + if is_duration: + start_years = get_year(df, "start_date") + end_years = get_year(df, "end_date") + if not (start_years == end_years).all(): + raise ValueError("start_date and end_date are in different years!") + new_report_years = start_years + elif is_instant: + new_report_years = get_year(df, "date") + else: + raise ValueError("Attempted to read a non-instant, non-duration table.") + + # we include XBRL data from before our "officially supported" XBRL + # range because we want to use it to set start-of-year values for the + # first XBRL year. + xbrl_years_plus_one_previous = [min(xbrl_years) - 1] + xbrl_years + return ( + df.assign(report_year=new_report_years) + .loc[lambda df: df.report_year.isin(xbrl_years_plus_one_previous)] + .reset_index(drop=True) + ) + def _get_primary_key(self, sched_table_name: str) -> list[str]: # TODO (daz): as of 2023-10-13, our datapackage.json is merely # "frictionless-like" so we manually parse it as JSON. once we make our @@ -769,26 +811,26 @@ def load_input(self, context: InputContext) -> pd.DataFrame: engine = self.engine - id_table = "identification_001_duration" - sched_table_name = re.sub("_instant|_duration", "", table_name) with engine.connect() as con: df = pd.read_sql( - f""" - SELECT {table_name}.*, {id_table}.report_year FROM {table_name} - JOIN {id_table} ON {id_table}.filing_name = {table_name}.filing_name - WHERE {id_table}.report_year BETWEEN :min_year AND :max_year; - """, # noqa: S608 - table names not supplied by user + f"SELECT {table_name}.* FROM {table_name}", # noqa: S608 - table names not supplied by user con=con, - params={ - "min_year": min(ferc1_settings.xbrl_years), - "max_year": max(ferc1_settings.xbrl_years), - }, ).assign(sched_table_name=sched_table_name) primary_key = self._get_primary_key(table_name) - deduped = self.filter_for_freshest_data(df, primary_key=primary_key) - return deduped.drop(columns=["publication_time"]) + + return ( + df.pipe( + FercXBRLSQLiteIOManager.filter_for_freshest_data, + primary_key=primary_key, + ) + .pipe( + FercXBRLSQLiteIOManager.refine_report_year, + xbrl_years=ferc1_settings.xbrl_years, + ) + .drop(columns=["publication_time"]) + ) @io_manager(required_resource_keys={"dataset_settings"}) diff --git a/src/pudl/package_data/glue/pudl_id_mapping.xlsx b/src/pudl/package_data/glue/pudl_id_mapping.xlsx index cb43d619fe..c181119ea2 100644 Binary files a/src/pudl/package_data/glue/pudl_id_mapping.xlsx and b/src/pudl/package_data/glue/pudl_id_mapping.xlsx differ diff --git a/src/pudl/transform/ferc1.py b/src/pudl/transform/ferc1.py index c1202d5d90..6ad17c9990 100644 --- a/src/pudl/transform/ferc1.py +++ b/src/pudl/transform/ferc1.py @@ -599,12 +599,8 @@ def unstack_balances_to_report_year_instant_xbrl( if not params.unstack_balances_to_report_year: return df - df["year"] = pd.to_datetime(df["date"]).dt.year - # Check that the originally reported records are annually unique. - # year and report_year aren't necessarily the same since previous year data - # is often reported in the current report year, but we're constructing a table - # where report_year is part of the primary key, so we have to do this: - unique_cols = [c for c in primary_key_cols if c != "report_year"] + ["year"] + # report year always corresponds to the year of "date" + unique_cols = set(primary_key_cols).union({"report_year"}) if df.duplicated(unique_cols).any(): raise AssertionError( "Looks like there are multiple entries per year--not sure which to use " @@ -615,28 +611,26 @@ def unstack_balances_to_report_year_instant_xbrl( "Looks like there are some values in here that aren't from the end of " "the year. We can't use those to calculate start and end balances." ) - df.loc[df.report_year == (df.year + 1), "balance_type"] = "starting_balance" - df.loc[df.report_year == df.year, "balance_type"] = "ending_balance" - if df.balance_type.isna().any(): - # Remove rows from years that are not representative of start/end dates - # for a given report year (i.e., the report year and one year prior). - logger.warning( - f"Dropping unexpected years: " - f"{df.loc[df.balance_type.isna(), 'year'].unique()}" - ) - df = df[df["balance_type"].notna()].copy() - df = ( - df.drop(["year", "date"], axis="columns") + + ending_balances = df.assign(balance_type="ending_balance") + starting_balances = df.assign( + report_year=df.report_year + 1, balance_type="starting_balance" + ) + all_balances = pd.concat([starting_balances, ending_balances]) + # for the first year, we expect no starting balances; for the last year, we expect no ending balances. + first_last_year_stripped = all_balances.loc[ + lambda df: ~df.report_year.isin({df.report_year.min(), df.report_year.max()}) + ] + unstacked_by_year = ( + first_last_year_stripped.drop(columns=["date"]) .set_index(primary_key_cols + ["balance_type", "sched_table_name"]) .unstack("balance_type") ) - # This turns a multi-index into a single-level index with tuples of strings - # as the keys, and then converts the tuples of strings into a single string - # by joining their values with an underscore. This results in column labels - # like boiler_plant_equipment_steam_production_starting_balance - df.columns = ["_".join(items) for items in df.columns.to_flat_index()] - df = df.reset_index() - return df + # munge multi-index into flat index, separated by _ + unstacked_by_year.columns = [ + "_".join(items) for items in unstacked_by_year.columns.to_flat_index() + ] + return unstacked_by_year.reset_index() class CombineAxisColumnsXbrl(TransformParams): @@ -2163,7 +2157,8 @@ def merge_instant_and_duration_tables_xbrl( ], axis="columns", ).reset_index() - return out_df + + return out_df.loc[out_df.report_year.isin(Ferc1Settings().xbrl_years)] @cache_df("process_instant_xbrl") def process_instant_xbrl(self, df: pd.DataFrame) -> pd.DataFrame: @@ -4697,7 +4692,7 @@ def check_double_year_earnings_types(self, df: pd.DataFrame) -> pd.DataFrame: df=intra_year_facts, a_cols=["ending_balance_previous"], b_cols=["starting_balance_current"], - mismatch_threshold=0.02, + mismatch_threshold=0.05, message="'Previous year ending balance' should be the same as " "'current year starting balance' for all years!", ) diff --git a/src/pudl/validate.py b/src/pudl/validate.py index 3fe248f680..7e3d7f79d8 100644 --- a/src/pudl/validate.py +++ b/src/pudl/validate.py @@ -832,9 +832,9 @@ def plot_vs_agg(orig_df, agg_df, validation_cases): "data_col": "capability_ratio", "weight_col": "", }, - { + { # XBRL data (post-2021) reports 0 capability for ~22% of plants, so we exclude. "title": "Capability Ratio (tails)", - "query": "", + "query": "report_year < 2021 | plant_capability_mw > 0", "low_q": 0.05, "low_bound": 0.5, "hi_q": 0.95, diff --git a/test/integration/etl_test.py b/test/integration/etl_test.py index f5781a4abe..21ea53dd13 100644 --- a/test/integration/etl_test.py +++ b/test/integration/etl_test.py @@ -191,6 +191,6 @@ def test_extract_xbrl(self, ferc1_engine_dbf): for table_type, df in xbrl_tables.items(): # Some raw xbrl tables are empty if not df.empty and table_type == "duration": - assert (df.report_year >= 2021).all() and ( + assert (df.report_year >= 2020).all() and ( df.report_year < 2022 ).all(), f"Unexpected years found in table: {table_name}" diff --git a/test/unit/io_managers_test.py b/test/unit/io_managers_test.py index 79c5c5caa1..28042a7373 100644 --- a/test/unit/io_managers_test.py +++ b/test/unit/io_managers_test.py @@ -334,17 +334,8 @@ def test_ferc_xbrl_sqlite_io_manager_dedupes(mocker, tmp_path): ] ) - id_table = pd.DataFrame.from_records( - [ - {"filing_name": "Utility_Co_0001", "report_year": 2021}, - {"filing_name": "Utility_Co_0002", "report_year": 2021}, - ] - ) - conn = sa.create_engine(f"sqlite:///{db_path}") df.to_sql("test_table_instant", conn) - id_table.to_sql("identification_001_duration", conn) - input_context = build_input_context( asset_key=AssetKey("test_table_instant"), resources={ @@ -401,8 +392,9 @@ def test_filter_for_freshest_data(df): suffixes=["_in", "_out"], indicator=True, ).set_index(xbrl_context_cols) - hypothesis.note(f"Found these contexts in input data: {original_contexts}") - hypothesis.note(f"The freshest data: {deduped}") + hypothesis.note(f"Found these contexts in input data:\n{original_contexts}") + hypothesis.note(f"The freshest data:\n{deduped}") + hypothesis.note(f"Paired by context:\n{paired_by_context}") assert (paired_by_context._merge == "both").all() # for every row in the output - its publication time is greater than or equal to all of the other ones for that [entity_id, utility_type, date] in the input data @@ -410,3 +402,104 @@ def test_filter_for_freshest_data(df): paired_by_context["publication_time_out"] >= paired_by_context["publication_time_in"] ).all() + + +def test_report_year_fixing_instant(): + instant_df = pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "date": "2020-07-01", + "report_year": 3021, + "factoid": "replace report year with date year", + }, + ] + ) + + observed = FercXBRLSQLiteIOManager.refine_report_year( + instant_df, xbrl_years=[2021, 2022] + ).report_year + expected = pd.Series([2020]) + assert (observed == expected).all() + + +def test_report_year_fixing_duration(): + duration_df = pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "start_date": "2004-01-01", + "end_date": "2004-12-31", + "report_year": 3021, + "factoid": "filter out since the report year is out of bounds", + }, + { + "entity_id": "123", + "start_date": "2021-01-01", + "end_date": "2021-12-31", + "report_year": 3021, + "factoid": "replace report year with date year", + }, + ] + ) + + observed = FercXBRLSQLiteIOManager.refine_report_year( + duration_df, xbrl_years=[2021, 2022] + ).report_year + expected = pd.Series([2021]) + assert (observed == expected).all() + + +@pytest.mark.parametrize( + "df, match", + [ + ( + pd.DataFrame.from_records( + [ + {"entity_id": "123", "report_year": 3021, "date": ""}, + ] + ), + "date has null values", + ), + ( + pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "report_year": 3021, + "start_date": "", + "end_date": "2020-12-31", + }, + ] + ), + "start_date has null values", + ), + ( + pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "report_year": 3021, + "start_date": "2020-06-01", + "end_date": "2021-05-31", + }, + ] + ), + "start_date and end_date are in different years", + ), + ( + pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "report_year": 3021, + }, + ] + ), + "Attempted to read a non-instant, non-duration table", + ), + ], +) +def test_report_year_fixing_bad_values(df, match): + with pytest.raises(ValueError, match=match): + FercXBRLSQLiteIOManager.refine_report_year(df, xbrl_years=[2021, 2022]) diff --git a/test/unit/transform/ferc1_test.py b/test/unit/transform/ferc1_test.py index a942c436c3..8fd67a10c3 100644 --- a/test/unit/transform/ferc1_test.py +++ b/test/unit/transform/ferc1_test.py @@ -355,10 +355,11 @@ def test_unstack_balances_to_report_year_instant_xbrl(): StringIO( """ idx,entity_id,date,report_year,sched_table_name,test_value -0,1,2021-12-31,2021,table_name,2000 -1,1,2020-12-31,2021,table_name,1000 -2,2,2021-12-31,2021,table_name,21000 -3,2,2020-12-31,2021,table_name,8000 +0,1,2022-12-31,2022,table_name,2022.1 +1,1,2021-12-31,2021,table_name,2021.1 +2,1,2020-12-31,2020,table_name,2020.1 +3,2,2021-12-31,2021,table_name,2021.2 +4,2,2020-12-31,2020,table_name,2020.2 """ ), ) @@ -371,12 +372,15 @@ def test_unstack_balances_to_report_year_instant_xbrl(): params=params, primary_key_cols=pk_cols, ) + # because there are NaNs in idx when we unstack, both idx balances are floats. df_expected = pd.read_csv( StringIO( """ entity_id,report_year,sched_table_name,idx_ending_balance,idx_starting_balance,test_value_ending_balance,test_value_starting_balance -1,2021,table_name,0,1,2000,1000 -2,2021,table_name,2,3,21000,8000 +1,2021,table_name,1.0,2.0,2021.1,2020.1 +1,2022,table_name,0.0,1.0,2022.1,2021.1 +2,2021,table_name,3.0,4.0,2021.2,2020.2 +2,2022,table_name,,3.0,,2021.2 """ ), ) @@ -385,7 +389,15 @@ def test_unstack_balances_to_report_year_instant_xbrl(): # If there is more than one value per year (not report year) an AssertionError # should raise df_non_unique_years = df.copy() - df_non_unique_years.loc[4] = [4, 2, "2020-12-31", 2021, "table_name", 500] + df_non_unique_years.loc[len(df_non_unique_years.index)] = [ + 5, + 2, + "2020-12-31", + 2020, + "table_name", + 2020.15, + ] + with pytest.raises(AssertionError): unstack_balances_to_report_year_instant_xbrl( df_non_unique_years, params=params, primary_key_cols=pk_cols diff --git a/test/validate/ferc1_test.py b/test/validate/ferc1_test.py index 074f61f636..de3f1e5d1b 100644 --- a/test/validate/ferc1_test.py +++ b/test/validate/ferc1_test.py @@ -84,16 +84,16 @@ def test_no_null_cols_ferc1(pudl_out_ferc1, live_dbs, cols, df_name): @pytest.mark.parametrize( "df_name,expected_rows", [ - ("fbp_ferc1", 25_406), - ("fuel_ferc1", 48_815), - ("plant_in_service_ferc1", 315_112), - ("plants_all_ferc1", 54_415), - ("plants_hydro_ferc1", 6_798), - ("plants_pumped_storage_ferc1", 544), - ("plants_small_ferc1", 16_248), - ("plants_steam_ferc1", 30_825), - ("pu_ferc1", 7_528), - ("purchased_power_ferc1", 197_947), + ("fbp_ferc1", 26_188), + ("fuel_ferc1", 50_039), + ("plant_in_service_ferc1", 335_750), + ("plants_all_ferc1", 56_409), + ("plants_hydro_ferc1", 6_979), + ("plants_pumped_storage_ferc1", 562), + ("plants_small_ferc1", 16_989), + ("plants_steam_ferc1", 31_879), + ("pu_ferc1", 7_698), + ("purchased_power_ferc1", 204_720), ], ) def test_minmax_rows(pudl_out_ferc1, live_dbs, expected_rows, df_name):