Skip to content

Commit

Permalink
Merge pull request #3096 from catalyst-cooperative/cems-quarterly
Browse files Browse the repository at this point in the history
Update CEMS partitions to handle year-quarter files
  • Loading branch information
cmgosnell authored Dec 12, 2023
2 parents 4babaaf + bfe6203 commit f020a07
Show file tree
Hide file tree
Showing 15 changed files with 198 additions and 201 deletions.
6 changes: 6 additions & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ v2023.12.XX
outputs describing historical utility and balancing authority service territories. See
:issue:`1174` and :pr:`3086`.

Data Coverage
^^^^^^^^^^^^^
* Updated :doc:`data_sources/epacems` to switch to pulling the quarterly updates of
CEMS instead of the annual files. Integrates CEMS through 2023q3. See issue
:issue:`2973` & PR :pr:`3096`.

---------------------------------------------------------------------------------------
v2023.12.01
---------------------------------------------------------------------------------------
Expand Down
24 changes: 22 additions & 2 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,23 @@
"epacems_io_manager": epacems_io_manager,
}

# By default, limit CEMS year processing concurrency to prevent memory overload.
default_config = {
"execution": {
"config": {
"multiprocess": {
"tag_concurrency_limits": [
{
"key": "datasource",
"value": "epacems",
"limit": 2,
}
],
},
}
}
}


def create_non_cems_selection(all_assets: list[AssetsDefinition]) -> AssetSelection:
"""Create a selection of assets excluding CEMS and all downstream assets.
Expand Down Expand Up @@ -127,7 +144,9 @@ def load_dataset_settings_from_file(setting_filename: str) -> dict:
resources=default_resources,
jobs=[
define_asset_job(
name="etl_full", description="This job executes all years of all assets."
name="etl_full",
description="This job executes all years of all assets.",
config=default_config,
),
define_asset_job(
name="etl_full_no_cems",
Expand All @@ -137,7 +156,8 @@ def load_dataset_settings_from_file(setting_filename: str) -> dict:
),
define_asset_job(
name="etl_fast",
config={
config=default_config
| {
"resources": {
"dataset_settings": {
"config": load_dataset_settings_from_file("etl_fast")
Expand Down
49 changes: 36 additions & 13 deletions src/pudl/etl/epacems_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from dagster import AssetIn, DynamicOut, DynamicOutput, asset, graph_asset, op

import pudl
from pudl.extract.epacems import EpaCemsPartition
from pudl.metadata.classes import Resource
from pudl.metadata.enums import EPACEMS_STATES
from pudl.workspace.setup import PudlPaths

logger = pudl.logging_helpers.get_logger(__name__)


YearPartitions = namedtuple("YearPartitions", ["year", "states"])
YearPartitions = namedtuple("YearPartitions", ["year_quarters"])


@op(
Expand All @@ -37,11 +39,17 @@ def get_years_from_settings(context):
parallel.
"""
epacems_settings = context.resources.dataset_settings.epacems
for year in epacems_settings.years:
years = {
EpaCemsPartition(year_quarter=yq).year for yq in epacems_settings.year_quarters
}
for year in years:
yield DynamicOutput(year, mapping_key=str(year))


@op(required_resource_keys={"datastore", "dataset_settings"})
@op(
required_resource_keys={"datastore", "dataset_settings"},
tags={"datasource": "epacems"},
)
def process_single_year(
context,
year,
Expand All @@ -64,23 +72,29 @@ def process_single_year(
partitioned_path = PudlPaths().output_dir / "hourly_emissions_epacems"
partitioned_path.mkdir(exist_ok=True)

for state in epacems_settings.states:
logger.info(f"Processing EPA CEMS hourly data for {year}-{state}")
df = pudl.extract.epacems.extract(year=year, state=state, ds=ds)
year_quarters_in_year = {
yq
for yq in epacems_settings.year_quarters
if EpaCemsPartition(year_quarter=yq).year == year
}

for year_quarter in year_quarters_in_year:
logger.info(f"Processing EPA CEMS hourly data for {year_quarter}")
df = pudl.extract.epacems.extract(year_quarter=year_quarter, ds=ds)
if not df.empty: # If state-year combination has data
df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia)
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)

# Write to a directory of partitioned parquet files
with pq.ParquetWriter(
where=partitioned_path / f"epacems-{year}-{state}.parquet",
where=partitioned_path / f"epacems-{year_quarter}.parquet",
schema=schema,
compression="snappy",
version="2.6",
) as partitioned_writer:
partitioned_writer.write_table(table)

return YearPartitions(year, epacems_settings.states)
return YearPartitions(year_quarters_in_year)


@op
Expand All @@ -98,12 +112,21 @@ def consolidate_partitions(context, partitions: list[YearPartitions]) -> None:
with pq.ParquetWriter(
where=monolithic_path, schema=schema, compression="snappy", version="2.6"
) as monolithic_writer:
for year, states in partitions:
for state in states:
for year_partition in partitions:
for state in EPACEMS_STATES:
monolithic_writer.write_table(
pq.read_table(
source=partitioned_path / f"epacems-{year}-{state}.parquet",
schema=schema,
# Concat a slice of each state's data from all quarters in a year
# and write to parquet to create year-state row groups
pa.concat_tables(
[
pq.read_table(
source=partitioned_path
/ f"epacems-{year_quarter}.parquet",
filters=[[("state", "=", state.upper())]],
schema=schema,
)
for year_quarter in year_partition.year_quarters
]
)
)

Expand Down
68 changes: 32 additions & 36 deletions src/pudl/extract/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
during the transform process with help from the crosswalk.
"""
from pathlib import Path
from typing import NamedTuple
from typing import Annotated

import pandas as pd
from pydantic import BaseModel, StringConstraints

import pudl.logging_helpers
from pudl.metadata.classes import Resource
Expand Down Expand Up @@ -97,56 +98,52 @@
"""Set: The set of EPA CEMS columns to ignore when reading data."""


class EpaCemsPartition(NamedTuple):
class EpaCemsPartition(BaseModel):
"""Represents EpaCems partition identifying unique resource file."""

year: str
state: str
year_quarter: Annotated[
str, StringConstraints(strict=True, pattern=r"^(19|20)\d{2}[q][1-4]$")
]

def get_key(self):
"""Returns hashable key for use with EpaCemsDatastore."""
return (self.year, self.state.lower())
@property
def year(self):
"""Return the year associated with the year_quarter."""
return pd.to_datetime(self.year_quarter).year

def get_filters(self):
"""Returns filters for retrieving given partition resource from Datastore."""
return {"year": self.year, "state": self.state.lower()}
return {"year_quarter": self.year_quarter}

def get_annual_file(self) -> Path:
def get_quarterly_file(self) -> Path:
"""Return the name of the CSV file that holds annual hourly data."""
return Path(f"epacems-{self.year}-{self.state.lower()}.csv")
return Path(
f"epacems-{self.year}-{pd.to_datetime(self.year_quarter).quarter}.csv"
)


class EpaCemsDatastore:
"""Helper class to extract EpaCems resources from datastore.
EpaCems resources are identified by a year and a state. Each of these zip files
contain monthly zip files that in turn contain csv files. This class implements
get_data_frame method that will concatenate tables for a given state and month
across all months.
EpaCems resources are identified by a year and a quarter. Each of these zip files
contains one csv file. This class implements get_data_frame method that will
rename columns for a quarterly CSV file.
"""

def __init__(self, datastore: Datastore):
"""Construct datastore wrapper for loading raw EPA CEMS data into dataframes."""
self.datastore = datastore

def get_data_frame(self, partition: EpaCemsPartition) -> pd.DataFrame:
"""Constructs dataframe from a zipfile for a given (year, state) partition."""
"""Constructs dataframe from a zipfile for a given (year_quarter) partition."""
archive = self.datastore.get_zipfile_resource(
"epacems", **partition.get_filters()
)

# Get names of files in zip file
files = self.datastore.get_zipfile_file_names(archive)

# If archive has one csv file in it, this is a yearly CSV (archived after 08/23)
# and this CSV does not need to be concatenated.
if len(files) == 1 and files[0].endswith(".csv"):
with archive.open(str(partition.get_annual_file()), "r") as csv_file:
df = self._csv_to_dataframe(
csv_file, ignore_cols=API_IGNORE_COLS, rename_dict=API_RENAME_DICT
)
return df
raise AssertionError(f"Unexpected archive format. Found files: {files}.")
with archive.open(str(partition.get_quarterly_file()), "r") as csv_file:
df = self._csv_to_dataframe(
csv_file, ignore_cols=API_IGNORE_COLS, rename_dict=API_RENAME_DICT
)
return df

def _csv_to_dataframe(
self, csv_file: Path, ignore_cols: dict[str, str], rename_dict: dict[str, str]
Expand All @@ -167,25 +164,24 @@ def _csv_to_dataframe(
).rename(columns=rename_dict)


def extract(year: int, state: str, ds: Datastore):
def extract(year_quarter: str, ds: Datastore) -> pd.DataFrame:
"""Coordinate the extraction of EPA CEMS hourly DataFrames.
Args:
year: report year of the data to extract
state: report state of the data to extract
year_quarter: report year and quarter of the data to extract
ds: Initialized datastore
Yields:
pandas.DataFrame: A single state-year of EPA CEMS hourly emissions data.
A single quarter of EPA CEMS hourly emissions data.
"""
ds = EpaCemsDatastore(ds)
partition = EpaCemsPartition(state=state, year=year)
partition = EpaCemsPartition(year_quarter=year_quarter)
year = partition.year
# We have to assign the reporting year for partitioning purposes
try:
df = ds.get_data_frame(partition).assign(year=year)
except KeyError: # If no state-year combination found, return empty df.
logger.warning(
f"No data found for {state} in {year}. Returning empty dataframe."
)
# If the requested quarter is not found, return an empty df with expected columns:
except KeyError:
logger.warning(f"No data found for {year_quarter}. Returning empty dataframe.")
res = Resource.from_id("hourly_emissions_epacems")
df = res.format_df(pd.DataFrame())
return df
9 changes: 6 additions & 3 deletions src/pudl/metadata/sources.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Metadata and operational constants."""
from typing import Any

import pandas as pd

from pudl.metadata.constants import CONTRIBUTORS, KEYWORDS, LICENSES
from pudl.metadata.enums import EPACEMS_STATES

SOURCES: dict[str, Any] = {
"censusdp1tract": {
Expand Down Expand Up @@ -272,8 +273,10 @@
},
"field_namespace": "epacems",
"working_partitions": {
"years": sorted(set(range(1995, 2023))),
"states": sorted(EPACEMS_STATES),
"year_quarters": [
str(q).lower()
for q in pd.period_range(start="1995q1", end="2023q3", freq="Q")
]
},
"contributors": [
CONTRIBUTORS["catalyst-cooperative"],
Expand Down
10 changes: 4 additions & 6 deletions src/pudl/output/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import dask.dataframe as dd

from pudl.settings import EpaCemsSettings
from pudl.workspace.setup import PudlPaths


Expand Down Expand Up @@ -123,10 +122,9 @@ def epacems(
from :mod:`pudl.workspace`
Returns:
The requested epacems data
The requested epacems data. If requested states or years are not available, no
error will be raised.
"""
epacems_settings = EpaCemsSettings(states=states, years=years)

# columns=None is handled by dd.read_parquet; gives all columns
if columns is not None:
# nonexistent columns are handled by dd.read_parquet; raises ValueError
Expand All @@ -142,8 +140,8 @@ def epacems(
index=False,
split_row_groups=True,
filters=year_state_filter(
states=epacems_settings.states,
years=epacems_settings.years,
states=states,
years=years,
),
)
return epacems
3 changes: 1 addition & 2 deletions src/pudl/package_data/settings/etl_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,4 @@ datasets:
# Note that the CEMS data relies on EIA 860 data for plant locations,
# so if you're loading CEMS data for a particular year, you should
# also load the EIA 860 data for that year if possible
states: [ID, ME]
years: [2020, 2022]
year_quarters: ["2022q1"]
33 changes: 1 addition & 32 deletions src/pudl/package_data/settings/etl_full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,35 +270,4 @@ datasets:
# Note that the CEMS data relies on EIA 860 data for plant locations,
# so if you're loading CEMS data for a particular year, you should
# also load the EIA 860 data for that year if possible
states: [all]
years:
[
1995,
1996,
1997,
1998,
1999,
2000,
2001,
2002,
2003,
2004,
2005,
2006,
2007,
2008,
2009,
2010,
2011,
2012,
2013,
2014,
2015,
2016,
2017,
2018,
2019,
2020,
2021,
2022,
]
year_quarters: ["all"]
Loading

0 comments on commit f020a07

Please sign in to comment.