Skip to content

Commit

Permalink
Merge pull request #2947 from catalyst-cooperative/ferc1-2022-report_…
Browse files Browse the repository at this point in the history
…year_fix

FERC1 2022 report year fix
  • Loading branch information
zschira authored Oct 26, 2023
2 parents a34a3fe + 8fc0e81 commit 6cc735d
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 74 deletions.
6 changes: 3 additions & 3 deletions src/pudl/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1815,10 +1815,10 @@ def assert_cols_areclose(
# instead of just whether or not there are matches.
mismatch = df.loc[
~np.isclose(
df[a_cols],
df[b_cols],
np.ma.masked_where(np.isnan(df[a_cols]), df[a_cols]),
np.ma.masked_where(np.isnan(df[b_cols]), df[b_cols]),
equal_nan=True,
)
).filled()
]
mismatch_ratio = len(mismatch) / len(df)
if mismatch_ratio > mismatch_threshold:
Expand Down
68 changes: 55 additions & 13 deletions src/pudl/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,48 @@ def filter_for_freshest_data(

return deduped

@staticmethod
def refine_report_year(df: pd.DataFrame, xbrl_years: list[int]) -> pd.DataFrame:
"""Set a fact's report year by its actual dates.
Sometimes a fact belongs to a context which has no ReportYear
associated with it; other times there are multiple ReportYears
associated with a single filing. In these cases the report year of a
specific fact may be associated with the other years in the filing.
In many cases we can infer the actual report year from the fact's
associated time period - either duration or instant.
"""
is_duration = len({"start_date", "end_date"} - set(df.columns)) == 0
is_instant = "date" in df.columns

def get_year(df: pd.DataFrame, col: str) -> pd.Series:
datetimes = pd.to_datetime(df.loc[:, col])
if datetimes.isna().any():
raise ValueError(f"{col} has null values!")
return datetimes.apply(lambda x: x.year)

if is_duration:
start_years = get_year(df, "start_date")
end_years = get_year(df, "end_date")
if not (start_years == end_years).all():
raise ValueError("start_date and end_date are in different years!")
new_report_years = start_years
elif is_instant:
new_report_years = get_year(df, "date")
else:
raise ValueError("Attempted to read a non-instant, non-duration table.")

# we include XBRL data from before our "officially supported" XBRL
# range because we want to use it to set start-of-year values for the
# first XBRL year.
xbrl_years_plus_one_previous = [min(xbrl_years) - 1] + xbrl_years
return (
df.assign(report_year=new_report_years)
.loc[lambda df: df.report_year.isin(xbrl_years_plus_one_previous)]
.reset_index(drop=True)
)

def _get_primary_key(self, sched_table_name: str) -> list[str]:
# TODO (daz): as of 2023-10-13, our datapackage.json is merely
# "frictionless-like" so we manually parse it as JSON. once we make our
Expand Down Expand Up @@ -769,26 +811,26 @@ def load_input(self, context: InputContext) -> pd.DataFrame:

engine = self.engine

id_table = "identification_001_duration"

sched_table_name = re.sub("_instant|_duration", "", table_name)
with engine.connect() as con:
df = pd.read_sql(
f"""
SELECT {table_name}.*, {id_table}.report_year FROM {table_name}
JOIN {id_table} ON {id_table}.filing_name = {table_name}.filing_name
WHERE {id_table}.report_year BETWEEN :min_year AND :max_year;
""", # noqa: S608 - table names not supplied by user
f"SELECT {table_name}.* FROM {table_name}", # noqa: S608 - table names not supplied by user
con=con,
params={
"min_year": min(ferc1_settings.xbrl_years),
"max_year": max(ferc1_settings.xbrl_years),
},
).assign(sched_table_name=sched_table_name)

primary_key = self._get_primary_key(table_name)
deduped = self.filter_for_freshest_data(df, primary_key=primary_key)
return deduped.drop(columns=["publication_time"])

return (
df.pipe(
FercXBRLSQLiteIOManager.filter_for_freshest_data,
primary_key=primary_key,
)
.pipe(
FercXBRLSQLiteIOManager.refine_report_year,
xbrl_years=ferc1_settings.xbrl_years,
)
.drop(columns=["publication_time"])
)


@io_manager(required_resource_keys={"dataset_settings"})
Expand Down
Binary file modified src/pudl/package_data/glue/pudl_id_mapping.xlsx
Binary file not shown.
49 changes: 22 additions & 27 deletions src/pudl/transform/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,8 @@ def unstack_balances_to_report_year_instant_xbrl(
if not params.unstack_balances_to_report_year:
return df

df["year"] = pd.to_datetime(df["date"]).dt.year
# Check that the originally reported records are annually unique.
# year and report_year aren't necessarily the same since previous year data
# is often reported in the current report year, but we're constructing a table
# where report_year is part of the primary key, so we have to do this:
unique_cols = [c for c in primary_key_cols if c != "report_year"] + ["year"]
# report year always corresponds to the year of "date"
unique_cols = set(primary_key_cols).union({"report_year"})
if df.duplicated(unique_cols).any():
raise AssertionError(
"Looks like there are multiple entries per year--not sure which to use "
Expand All @@ -615,28 +611,26 @@ def unstack_balances_to_report_year_instant_xbrl(
"Looks like there are some values in here that aren't from the end of "
"the year. We can't use those to calculate start and end balances."
)
df.loc[df.report_year == (df.year + 1), "balance_type"] = "starting_balance"
df.loc[df.report_year == df.year, "balance_type"] = "ending_balance"
if df.balance_type.isna().any():
# Remove rows from years that are not representative of start/end dates
# for a given report year (i.e., the report year and one year prior).
logger.warning(
f"Dropping unexpected years: "
f"{df.loc[df.balance_type.isna(), 'year'].unique()}"
)
df = df[df["balance_type"].notna()].copy()
df = (
df.drop(["year", "date"], axis="columns")

ending_balances = df.assign(balance_type="ending_balance")
starting_balances = df.assign(
report_year=df.report_year + 1, balance_type="starting_balance"
)
all_balances = pd.concat([starting_balances, ending_balances])
# for the first year, we expect no starting balances; for the last year, we expect no ending balances.
first_last_year_stripped = all_balances.loc[
lambda df: ~df.report_year.isin({df.report_year.min(), df.report_year.max()})
]
unstacked_by_year = (
first_last_year_stripped.drop(columns=["date"])
.set_index(primary_key_cols + ["balance_type", "sched_table_name"])
.unstack("balance_type")
)
# This turns a multi-index into a single-level index with tuples of strings
# as the keys, and then converts the tuples of strings into a single string
# by joining their values with an underscore. This results in column labels
# like boiler_plant_equipment_steam_production_starting_balance
df.columns = ["_".join(items) for items in df.columns.to_flat_index()]
df = df.reset_index()
return df
# munge multi-index into flat index, separated by _
unstacked_by_year.columns = [
"_".join(items) for items in unstacked_by_year.columns.to_flat_index()
]
return unstacked_by_year.reset_index()


class CombineAxisColumnsXbrl(TransformParams):
Expand Down Expand Up @@ -2163,7 +2157,8 @@ def merge_instant_and_duration_tables_xbrl(
],
axis="columns",
).reset_index()
return out_df

return out_df.loc[out_df.report_year.isin(Ferc1Settings().xbrl_years)]

@cache_df("process_instant_xbrl")
def process_instant_xbrl(self, df: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -4697,7 +4692,7 @@ def check_double_year_earnings_types(self, df: pd.DataFrame) -> pd.DataFrame:
df=intra_year_facts,
a_cols=["ending_balance_previous"],
b_cols=["starting_balance_current"],
mismatch_threshold=0.02,
mismatch_threshold=0.05,
message="'Previous year ending balance' should be the same as "
"'current year starting balance' for all years!",
)
Expand Down
4 changes: 2 additions & 2 deletions src/pudl/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,9 @@ def plot_vs_agg(orig_df, agg_df, validation_cases):
"data_col": "capability_ratio",
"weight_col": "",
},
{
{ # XBRL data (post-2021) reports 0 capability for ~22% of plants, so we exclude.
"title": "Capability Ratio (tails)",
"query": "",
"query": "report_year < 2021 | plant_capability_mw > 0",
"low_q": 0.05,
"low_bound": 0.5,
"hi_q": 0.95,
Expand Down
2 changes: 1 addition & 1 deletion test/integration/etl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,6 @@ def test_extract_xbrl(self, ferc1_engine_dbf):
for table_type, df in xbrl_tables.items():
# Some raw xbrl tables are empty
if not df.empty and table_type == "duration":
assert (df.report_year >= 2021).all() and (
assert (df.report_year >= 2020).all() and (
df.report_year < 2022
).all(), f"Unexpected years found in table: {table_name}"
115 changes: 104 additions & 11 deletions test/unit/io_managers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,8 @@ def test_ferc_xbrl_sqlite_io_manager_dedupes(mocker, tmp_path):
]
)

id_table = pd.DataFrame.from_records(
[
{"filing_name": "Utility_Co_0001", "report_year": 2021},
{"filing_name": "Utility_Co_0002", "report_year": 2021},
]
)

conn = sa.create_engine(f"sqlite:///{db_path}")
df.to_sql("test_table_instant", conn)
id_table.to_sql("identification_001_duration", conn)

input_context = build_input_context(
asset_key=AssetKey("test_table_instant"),
resources={
Expand Down Expand Up @@ -401,12 +392,114 @@ def test_filter_for_freshest_data(df):
suffixes=["_in", "_out"],
indicator=True,
).set_index(xbrl_context_cols)
hypothesis.note(f"Found these contexts in input data: {original_contexts}")
hypothesis.note(f"The freshest data: {deduped}")
hypothesis.note(f"Found these contexts in input data:\n{original_contexts}")
hypothesis.note(f"The freshest data:\n{deduped}")
hypothesis.note(f"Paired by context:\n{paired_by_context}")
assert (paired_by_context._merge == "both").all()

# for every row in the output - its publication time is greater than or equal to all of the other ones for that [entity_id, utility_type, date] in the input data
assert (
paired_by_context["publication_time_out"]
>= paired_by_context["publication_time_in"]
).all()


def test_report_year_fixing_instant():
instant_df = pd.DataFrame.from_records(
[
{
"entity_id": "123",
"date": "2020-07-01",
"report_year": 3021,
"factoid": "replace report year with date year",
},
]
)

observed = FercXBRLSQLiteIOManager.refine_report_year(
instant_df, xbrl_years=[2021, 2022]
).report_year
expected = pd.Series([2020])
assert (observed == expected).all()


def test_report_year_fixing_duration():
duration_df = pd.DataFrame.from_records(
[
{
"entity_id": "123",
"start_date": "2004-01-01",
"end_date": "2004-12-31",
"report_year": 3021,
"factoid": "filter out since the report year is out of bounds",
},
{
"entity_id": "123",
"start_date": "2021-01-01",
"end_date": "2021-12-31",
"report_year": 3021,
"factoid": "replace report year with date year",
},
]
)

observed = FercXBRLSQLiteIOManager.refine_report_year(
duration_df, xbrl_years=[2021, 2022]
).report_year
expected = pd.Series([2021])
assert (observed == expected).all()


@pytest.mark.parametrize(
"df, match",
[
(
pd.DataFrame.from_records(
[
{"entity_id": "123", "report_year": 3021, "date": ""},
]
),
"date has null values",
),
(
pd.DataFrame.from_records(
[
{
"entity_id": "123",
"report_year": 3021,
"start_date": "",
"end_date": "2020-12-31",
},
]
),
"start_date has null values",
),
(
pd.DataFrame.from_records(
[
{
"entity_id": "123",
"report_year": 3021,
"start_date": "2020-06-01",
"end_date": "2021-05-31",
},
]
),
"start_date and end_date are in different years",
),
(
pd.DataFrame.from_records(
[
{
"entity_id": "123",
"report_year": 3021,
},
]
),
"Attempted to read a non-instant, non-duration table",
),
],
)
def test_report_year_fixing_bad_values(df, match):
with pytest.raises(ValueError, match=match):
FercXBRLSQLiteIOManager.refine_report_year(df, xbrl_years=[2021, 2022])
26 changes: 19 additions & 7 deletions test/unit/transform/ferc1_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,11 @@ def test_unstack_balances_to_report_year_instant_xbrl():
StringIO(
"""
idx,entity_id,date,report_year,sched_table_name,test_value
0,1,2021-12-31,2021,table_name,2000
1,1,2020-12-31,2021,table_name,1000
2,2,2021-12-31,2021,table_name,21000
3,2,2020-12-31,2021,table_name,8000
0,1,2022-12-31,2022,table_name,2022.1
1,1,2021-12-31,2021,table_name,2021.1
2,1,2020-12-31,2020,table_name,2020.1
3,2,2021-12-31,2021,table_name,2021.2
4,2,2020-12-31,2020,table_name,2020.2
"""
),
)
Expand All @@ -371,12 +372,15 @@ def test_unstack_balances_to_report_year_instant_xbrl():
params=params,
primary_key_cols=pk_cols,
)
# because there are NaNs in idx when we unstack, both idx balances are floats.
df_expected = pd.read_csv(
StringIO(
"""
entity_id,report_year,sched_table_name,idx_ending_balance,idx_starting_balance,test_value_ending_balance,test_value_starting_balance
1,2021,table_name,0,1,2000,1000
2,2021,table_name,2,3,21000,8000
1,2021,table_name,1.0,2.0,2021.1,2020.1
1,2022,table_name,0.0,1.0,2022.1,2021.1
2,2021,table_name,3.0,4.0,2021.2,2020.2
2,2022,table_name,,3.0,,2021.2
"""
),
)
Expand All @@ -385,7 +389,15 @@ def test_unstack_balances_to_report_year_instant_xbrl():
# If there is more than one value per year (not report year) an AssertionError
# should raise
df_non_unique_years = df.copy()
df_non_unique_years.loc[4] = [4, 2, "2020-12-31", 2021, "table_name", 500]
df_non_unique_years.loc[len(df_non_unique_years.index)] = [
5,
2,
"2020-12-31",
2020,
"table_name",
2020.15,
]

with pytest.raises(AssertionError):
unstack_balances_to_report_year_instant_xbrl(
df_non_unique_years, params=params, primary_key_cols=pk_cols
Expand Down
Loading

0 comments on commit 6cc735d

Please sign in to comment.