Skip to content

Commit

Permalink
Clean EIA 860 and 923 FGD operation and maintenance data (#3403)
Browse files Browse the repository at this point in the history
* Stash changes

* Stash changes to 923 FGD table

* Fix drop NA behavior

* Fix docstrings

* Add fields to PUDL metadata

* Fix drop nas, encode columns, fix booleans and combine raw maintenance columns

* Add alembic migration for harvested FGD table

* Add FGD operational status to encoding FKs

* Update alembic

* Add table to core and address PR comments

* Stash changes

* Change EIA 923 FGD table to _core, write asset checks, fix for fast ETL

* Add WIP 860 transform

* Stash changes

* Add fields, rename raw vars to conform to existing fields better

* Add 923 and 860 to pudl.sqlite, encode all tables, fix PK and dtype issues

* Update 860 docstrings

* Restore environments, update release notes, update schedule in 860 description

* Add sorbent type coding table to PUDL

* Fix FK for fgd operational status

* Fix docs build indentation failure

* [pre-commit.ci] auto fixes from pre-commit.com hooks

For more information, see https://pre-commit.ci

* Clean up migrations, remove crufty resource def

* Fix encoding, fix typos, spell out FGD

* Fix docs build ref failure

* Add year range to ratio helper, add unit tests for helper functions, trim returns

* No cover for asset checks

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
e-belfer and pre-commit-ci[bot] authored Mar 19, 2024
1 parent 565bf00 commit 69edbc5
Show file tree
Hide file tree
Showing 14 changed files with 1,089 additions and 34 deletions.
9 changes: 9 additions & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ PUDL Release Notes
v2024.X.X (2024-XX-XX)
---------------------------------------------------------------------------------------

New Data Coverage
^^^^^^^^^^^^^^^^^
* Added cleaned EIA860 Schedule 8E FGD Equipment and EIA923 Schedule 8C FGD Operation
and Maintenance data to the PUDL database as
:ref:`i_core_eia923__fgd_operation_maintenance` and
:ref:`i_core_eia860__fgd_equipment`. Once harvested, these tables will eventually be
removed from the database, but they are being published until then. See :issue:`3394`
and :issue:`3392`, and :pr:`3403`.

Data Cleaning
^^^^^^^^^^^^^

Expand Down
119 changes: 119 additions & 0 deletions migrations/versions/b8ae440a2d32_add_923_and_860_fgd_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Add 923 and 860 FGD tables
Revision ID: b8ae440a2d32
Revises: 5eb340696bf0
Create Date: 2024-03-14 11:51:29.710624
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'b8ae440a2d32'
down_revision = '5eb340696bf0'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('core_eia__codes_sorbent_types',
sa.Column('code', sa.Text(), nullable=False, comment='Originally reported short code.'),
sa.Column('label', sa.Text(), nullable=True, comment='Longer human-readable code using snake_case'),
sa.Column('description', sa.Text(), nullable=True, comment='Long human-readable description of the meaning of a code/label.'),
sa.PrimaryKeyConstraint('code', name=op.f('pk_core_eia__codes_sorbent_types'))
)
op.create_table('_core_eia860__fgd_equipment',
sa.Column('report_date', sa.Date(), nullable=False, comment='Date reported.'),
sa.Column('plant_id_eia', sa.Integer(), nullable=False, comment='The unique six-digit facility identification number, also called an ORISPL, assigned by the Energy Information Administration.'),
sa.Column('so2_control_id_eia', sa.Text(), nullable=False, comment='Sulfur dioxide control identification number. This ID is not a unique identifier.'),
sa.Column('utility_id_eia', sa.Integer(), nullable=True, comment='The EIA Utility Identification number.'),
sa.Column('utility_name_eia', sa.Text(), nullable=True, comment='The name of the utility.'),
sa.Column('state', sa.Text(), nullable=True, comment='Two letter US state abbreviation.'),
sa.Column('state_id_fips', sa.Text(), nullable=True, comment='Two digit state FIPS code.'),
sa.Column('county', sa.Text(), nullable=True, comment='County name.'),
sa.Column('county_id_fips', sa.Text(), nullable=True, comment='County ID from the Federal Information Processing Standard Publication 6-4.'),
sa.Column('fgd_operating_date', sa.Date(), nullable=True, comment='The actual or projected in-service datetime of this flue gas desulfurization system'),
sa.Column('fgd_operational_status_code', sa.Text(), nullable=True, comment='Operating status code for flue gas desulfurization equipment.'),
sa.Column('flue_gas_bypass_fgd', sa.Boolean(), nullable=True, comment='Indicates whether flue gas can bypass the flue gas desulfurization unit.'),
sa.Column('byproduct_recovery', sa.Boolean(), nullable=True, comment='Is salable byproduct is recovered by the unit?'),
sa.Column('sludge_pond', sa.Boolean(), nullable=True, comment='Indicates if there is a sludge pond associated with this unit.'),
sa.Column('sludge_pond_lined', sa.Boolean(), nullable=True, comment='Indicates whether the sludge pond is lined.'),
sa.Column('pond_landfill_requirements_acre_foot_per_year', sa.Float(), nullable=True, comment='Annual pond and land fill requirements for flue gas desulfurization equipment.'),
sa.Column('fgd_structure_cost', sa.Float(), nullable=True, comment='Actual installed costs for the existing systems or the anticipated costs of structures and equipment to bring a planned flue gas desulfurization system into commercial operation.'),
sa.Column('fgd_other_cost', sa.Float(), nullable=True, comment='Other actual installed costs for installation of a flue gas desulfurization unit or the anticipated other costs pertaining to the installation of a flue gas desulfurization unit.'),
sa.Column('sludge_disposal_cost', sa.Float(), nullable=True, comment='Actual installed costs for the existing sludge transport and disposal systems or the anticipated costs of sludge transport and disposal systems to bring a planned system into commercial operation.'),
sa.Column('total_fgd_equipment_cost', sa.Float(), nullable=True, comment='Total actual installed costs for the existing flue gas desulfurization unit or the anticipated costs to bring a planned flue gas desulfurization unit into commercial operation.'),
sa.Column('fgd_trains_100pct', sa.Float(), nullable=True, comment='Total number of flue gas desulfurization unit scrubber trains operated at 100 percent load.'),
sa.Column('fgd_trains_total', sa.Float(), nullable=True, comment='Total number of flue gas desulfurization unit scrubber trains.'),
sa.Column('flue_gas_entering_fgd_pct_of_total', sa.Float(), nullable=True, comment='Ratio of all flue gas that is entering the flue gas desulfurization unit.'),
sa.Column('flue_gas_exit_rate_cubic_feet_per_minute', sa.Float(), nullable=True, comment='Actual flue gas exit rate, in cubic feet per minute.'),
sa.Column('flue_gas_exit_temperature_fahrenheit', sa.Float(), nullable=True, comment='Flue gas exit temperature, in degrees Fahrenheit.'),
sa.Column('so2_emission_rate_lbs_per_hour', sa.Float(), nullable=True, comment='Sulfur dioxide emission rate when operating at 100 percent load (pounds per hour).'),
sa.Column('so2_equipment_type_1', sa.Text(), nullable=True, comment='Type of sulfur dioxide control equipment.'),
sa.Column('so2_equipment_type_2', sa.Text(), nullable=True, comment='Type of sulfur dioxide control equipment.'),
sa.Column('so2_equipment_type_3', sa.Text(), nullable=True, comment='Type of sulfur dioxide control equipment.'),
sa.Column('so2_equipment_type_4', sa.Text(), nullable=True, comment='Type of sulfur dioxide control equipment.'),
sa.Column('so2_removal_efficiency_design', sa.Float(), nullable=True, comment='Designed removal efficiency for sulfur dioxide when operating at 100 percent load. Reported at the nearest 0.1 percent by weight of gases removed from the flue gas.'),
sa.Column('specifications_of_coal_ash', sa.Float(), nullable=True, comment='Design fuel specifications for ash when burning coal or petroleum coke (nearest 0.1 percent by weight).'),
sa.Column('specifications_of_coal_sulfur', sa.Float(), nullable=True, comment='Design fuel specifications for sulfur when burning coal or petroleum coke (nearest 0.1 percent by weight).'),
sa.Column('sorbent_type_1', sa.Text(), nullable=True, comment='Type of sorbent used by this sulfur dioxide control equipment.'),
sa.Column('sorbent_type_2', sa.Text(), nullable=True, comment='Type of sorbent used by this sulfur dioxide control equipment.'),
sa.Column('sorbent_type_3', sa.Text(), nullable=True, comment='Type of sorbent used by this sulfur dioxide control equipment.'),
sa.Column('sorbent_type_4', sa.Text(), nullable=True, comment='Type of sorbent used by this sulfur dioxide control equipment.'),
sa.Column('fgd_manufacturer', sa.Text(), nullable=True, comment='Nname of flue gas desulfurization equipment manufacturer.'),
sa.Column('fgd_manufacturer_code', sa.Text(), nullable=True, comment='Code corresponding to name of flue gas desulfurization equipment manufacturer.'),
sa.Column('steam_plant_type_code', sa.Integer(), nullable=True, comment='Code that describes types of steam plants from EIA 860. See steam_plant_types_eia table for more details.'),
sa.Column('plant_summer_capacity_mw', sa.Float(), nullable=True, comment='The plant summer capacity associated with the operating generators at the plant'),
sa.Column('water_source', sa.Text(), nullable=True, comment='Name of water source associated with the plant.'),
sa.Column('data_maturity', sa.Text(), nullable=True, comment='Level of maturity of the data record. Some data sources report less-than-final data. PUDL sometimes includes this data, but use at your own risk.'),
sa.ForeignKeyConstraint(['data_maturity'], ['core_pudl__codes_data_maturities.code'], name=op.f('fk__core_eia860__fgd_equipment_data_maturity_core_pudl__codes_data_maturities')),
sa.ForeignKeyConstraint(['fgd_manufacturer_code'], ['core_eia__codes_environmental_equipment_manufacturers.code'], name=op.f('fk__core_eia860__fgd_equipment_fgd_manufacturer_code_core_eia__codes_environmental_equipment_manufacturers')),
sa.ForeignKeyConstraint(['fgd_operational_status_code'], ['core_eia__codes_operational_status.code'], name=op.f('fk__core_eia860__fgd_equipment_fgd_operational_status_code_core_eia__codes_operational_status')),
sa.ForeignKeyConstraint(['plant_id_eia', 'report_date'], ['core_eia860__scd_plants.plant_id_eia', 'core_eia860__scd_plants.report_date'], name=op.f('fk__core_eia860__fgd_equipment_plant_id_eia_core_eia860__scd_plants')),
sa.ForeignKeyConstraint(['so2_equipment_type_1'], ['core_eia__codes_emission_control_equipment_types.code'], name=op.f('fk__core_eia860__fgd_equipment_so2_equipment_type_1_core_eia__codes_emission_control_equipment_types')),
sa.ForeignKeyConstraint(['so2_equipment_type_2'], ['core_eia__codes_emission_control_equipment_types.code'], name=op.f('fk__core_eia860__fgd_equipment_so2_equipment_type_2_core_eia__codes_emission_control_equipment_types')),
sa.ForeignKeyConstraint(['so2_equipment_type_3'], ['core_eia__codes_emission_control_equipment_types.code'], name=op.f('fk__core_eia860__fgd_equipment_so2_equipment_type_3_core_eia__codes_emission_control_equipment_types')),
sa.ForeignKeyConstraint(['so2_equipment_type_4'], ['core_eia__codes_emission_control_equipment_types.code'], name=op.f('fk__core_eia860__fgd_equipment_so2_equipment_type_4_core_eia__codes_emission_control_equipment_types')),
sa.ForeignKeyConstraint(['sorbent_type_1'], ['core_eia__codes_sorbent_types.code'], name=op.f('fk__core_eia860__fgd_equipment_sorbent_type_1_core_eia__codes_sorbent_types')),
sa.ForeignKeyConstraint(['sorbent_type_2'], ['core_eia__codes_sorbent_types.code'], name=op.f('fk__core_eia860__fgd_equipment_sorbent_type_2_core_eia__codes_sorbent_types')),
sa.ForeignKeyConstraint(['sorbent_type_3'], ['core_eia__codes_sorbent_types.code'], name=op.f('fk__core_eia860__fgd_equipment_sorbent_type_3_core_eia__codes_sorbent_types')),
sa.ForeignKeyConstraint(['sorbent_type_4'], ['core_eia__codes_sorbent_types.code'], name=op.f('fk__core_eia860__fgd_equipment_sorbent_type_4_core_eia__codes_sorbent_types')),
sa.ForeignKeyConstraint(['steam_plant_type_code'], ['core_eia__codes_steam_plant_types.code'], name=op.f('fk__core_eia860__fgd_equipment_steam_plant_type_code_core_eia__codes_steam_plant_types')),
sa.ForeignKeyConstraint(['utility_id_eia', 'report_date'], ['core_eia860__scd_utilities.utility_id_eia', 'core_eia860__scd_utilities.report_date'], name=op.f('fk__core_eia860__fgd_equipment_utility_id_eia_core_eia860__scd_utilities')),
sa.PrimaryKeyConstraint('plant_id_eia', 'so2_control_id_eia', 'report_date', name=op.f('pk__core_eia860__fgd_equipment'))
)
op.create_table('_core_eia923__fgd_operation_maintenance',
sa.Column('report_date', sa.Date(), nullable=False, comment='Date reported.'),
sa.Column('plant_id_eia', sa.Integer(), nullable=False, comment='The unique six-digit facility identification number, also called an ORISPL, assigned by the Energy Information Administration.'),
sa.Column('so2_control_id_eia', sa.Text(), nullable=False, comment='Sulfur dioxide control identification number. This ID is not a unique identifier.'),
sa.Column('opex_fgd_feed_materials_chemical', sa.Integer(), nullable=True, comment='Annual operation and maintenance expenditures for feed materials and chemicals for flue gas desulfurization equipment, excluding electricity.'),
sa.Column('opex_fgd_labor_supervision', sa.Integer(), nullable=True, comment='Annual operation and maintenance expenditures for labor and supervision of flue gas desulfurization equipment, excluding electricity.'),
sa.Column('opex_fgd_land_acquisition', sa.Integer(), nullable=True, comment='Annual operation and maintenance expenditures for land acquisition for flue gas desulfurization equipment, excluding electricity.'),
sa.Column('opex_fgd_maintenance_material_other', sa.Integer(), nullable=True, comment='Annual operation and maintenance expenditures for maintenance, materials and all other costs of flue gas desulfurization equipment, excluding electricity'),
sa.Column('opex_fgd_waste_disposal', sa.Integer(), nullable=True, comment='Annual operation and maintenance expenditures for waste disposal, excluding electricity.'),
sa.Column('opex_fgd_total_cost', sa.Integer(), nullable=True, comment='Annual total cost of operation and maintenance expenditures on flue gas desulfurization equipment, excluding electricity'),
sa.Column('fgd_control_flag', sa.Boolean(), nullable=True, comment='Indicates whether or not a plant has a flue gas desulfurization control unit.'),
sa.Column('fgd_operational_status_code', sa.Text(), nullable=True, comment='Operating status code for flue gas desulfurization equipment.'),
sa.Column('fgd_hours_in_service', sa.Integer(), nullable=True, comment='Number of hours the flue gas desulfurization equipment was in operation during the year.'),
sa.Column('fgd_electricity_consumption_mwh', sa.Float(), nullable=True, comment='Electric power consumed by the flue gas desulfurization unit (in MWh).'),
sa.Column('fgd_sorbent_consumption_1000_tons', sa.Float(), nullable=True, comment='Quantity of flue gas desulfurization sorbent used, to the nearest 0.1 thousand tons.'),
sa.Column('so2_removal_efficiency_tested', sa.Float(), nullable=True, comment='Removal efficiency for sulfur dioxide (to the nearest 0.1 percent by weight) at tested rate at 100 percent load.'),
sa.Column('so2_removal_efficiency_annual', sa.Float(), nullable=True, comment='Removal efficiency for sulfur dioxide (to the nearest 0.1 percent by weight) based on designed firing rate and hours in operation (listed as a percentage).'),
sa.Column('so2_test_date', sa.Date(), nullable=True, comment='Date of most recent test for sulfur dioxide removal efficiency.'),
sa.Column('data_maturity', sa.Text(), nullable=True, comment='Level of maturity of the data record. Some data sources report less-than-final data. PUDL sometimes includes this data, but use at your own risk.'),
sa.ForeignKeyConstraint(['data_maturity'], ['core_pudl__codes_data_maturities.code'], name=op.f('fk__core_eia923__fgd_operation_maintenance_data_maturity_core_pudl__codes_data_maturities')),
sa.ForeignKeyConstraint(['fgd_operational_status_code'], ['core_eia__codes_operational_status.code'], name=op.f('fk__core_eia923__fgd_operation_maintenance_fgd_operational_status_code_core_eia__codes_operational_status')),
sa.ForeignKeyConstraint(['plant_id_eia', 'report_date'], ['core_eia860__scd_plants.plant_id_eia', 'core_eia860__scd_plants.report_date'], name=op.f('fk__core_eia923__fgd_operation_maintenance_plant_id_eia_core_eia860__scd_plants')),
sa.PrimaryKeyConstraint('plant_id_eia', 'report_date', 'so2_control_id_eia', name=op.f('pk__core_eia923__fgd_operation_maintenance'))
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('_core_eia923__fgd_operation_maintenance')
op.drop_table('_core_eia860__fgd_equipment')
op.drop_table('core_eia__codes_sorbent_types')
# ### end Alembic commands ###
70 changes: 70 additions & 0 deletions src/pudl/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,76 @@ def dedupe_on_category(
return dedup_df.drop_duplicates(subset=base_cols, keep="first")


def dedupe_and_drop_nas(
dedup_df: pd.DataFrame,
primary_key_cols: list[str],
) -> pd.DataFrame:
"""Deduplicate a df by comparing primary key columns and dropping null rows.
When a primary key appears twice in a dataframe, and one record is all null other
than the primary keys, drop the null row.
Args:
dedup_df: the dataframe with the records to deduplicate.
primary_key_cols: list of columns which must not be duplicated.
Returns:
The deduplicated dataframe.
"""
dupes = dedup_df.loc[dedup_df.duplicated(subset=primary_key_cols, keep=False)]
dupe_groups = dupes.groupby(primary_key_cols)
if (dupe_groups.nunique() > 1).any().any(): # noqa: PD101
raise AssertionError(
f"Duplicate records with disagreeing data: {dupes[dupes.set_index(primary_key_cols).index.duplicated(keep=False)]}"
)
deduped = dupe_groups.first().reset_index()
# replace the duplicated rows with the deduped versions
return pd.concat(
[dedup_df.drop_duplicates(subset=primary_key_cols, keep=False), deduped],
ignore_index=True,
)


def standardize_percentages_ratio(
frac_df: pd.DataFrame,
mixed_cols: list[str],
years_to_standardize: list[int],
) -> pd.DataFrame:
"""Standardize year-to-year changes in mixed percentage/ratio reporting in a column.
When a column uses both 0-1 and 0-100 scales to describe percentages, standardize
the years using 0-100 scales to 0-1 ratios/fractions.
Args:
frac_df: the dataframe with the columns to standardize.
mixed_cols: list of columns which should get standardized to the 0-1 scale.
years_to_standardize: range of dates over which the standardization should occur.
Returns:
The standardized dataframe.
"""
logger.info(f"Standardizing ratios and percentages for {mixed_cols}")
for col in mixed_cols:
if not pd.api.types.is_numeric_dtype(frac_df[col]):
raise AssertionError(
f"{col}: Standardization method requires numeric dtype."
)
if "report_year" in frac_df:
dates = (frac_df.report_year >= min(years_to_standardize)) & (
frac_df.report_year <= max(years_to_standardize)
)
elif "report_date" in frac_df:
dates = (frac_df.report_date.dt.year >= min(years_to_standardize)) & (
frac_df.report_date.dt.year <= max(years_to_standardize)
)
frac_df.loc[dates, col] /= 100
if frac_df[col].max() > 1:
raise AssertionError(
f"{col}: Values >100pct observed: {frac_df.loc[frac_df[col]>1][col].unique()}"
)
return frac_df


def calc_capacity_factor(
df: pd.DataFrame,
freq: Literal["YS", "MS"],
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/metadata/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def encode(
unknown_codes = set(col.dropna()).difference(self.code_map)
if unknown_codes:
raise ValueError(
f"Found unknown codes while encoding {self.name}: {unknown_codes=}"
f"Found unknown codes while encoding {col.name}: {unknown_codes=}"
)
col = col.map(self.code_map)
if dtype:
Expand Down
Loading

0 comments on commit 69edbc5

Please sign in to comment.