Skip to content

Commit

Permalink
WIP migrate to start_year partition
Browse files Browse the repository at this point in the history
  • Loading branch information
e-belfer committed Oct 26, 2023
1 parent 33220e9 commit b06585e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 147 deletions.
183 changes: 76 additions & 107 deletions src/pudl/extract/phmsagas.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,19 @@
"""Retrieve data from PHMSA natural gas spreadsheets for analysis.
"""Retrieves data from PHMSA natural gas spreadsheets for analysis.
This modules pulls data from PHMSA's published Excel spreadsheets.
This code is for use analyzing PHMSA data.
"""
from collections import defaultdict
import zipfile as zf

import pandas as pd
from dagster import (
AssetOut,
DynamicOut,
DynamicOutput,
Output,
graph_asset,
multi_asset,
op,
)

import pudl

import pudl.logging_helpers
from pudl.extract import excel

logger = pudl.logging_helpers.get_logger(__name__)


class Extractor(excel.GenericExtractor):
"""Extractor for the excel dataset EIA860."""

# TODO (e-belfer): Handle partitions, which aren't yearly.
"""Extractor for the excel dataset PHMSA."""

def __init__(self, *args, **kwargs):
"""Initialize the module.
Expand All @@ -47,97 +33,80 @@ def process_raw(self, df, page, **partition):
"""
df = df.rename(columns=self._metadata.get_column_map(page, **partition))
if "report_year" not in df.columns:
df["report_year"] = list(partition.values())[0]
df["report_year"] = list(partition.values())[0] # Fix
self.cols_added = ["report_year"]
# Eventually we should probably make this a transform
# for col in ["generator_id", "boiler_id"]:
# if col in df.columns:
# df = remove_leading_zeros_from_numeric_strings(df=df, col_name=col)
df = self.add_data_maturity(df, page, **partition)
return df

@staticmethod
def get_dtypes(page, **partition):
"""Returns dtypes for plant id columns."""
return {
"Plant ID": pd.Int64Dtype(),
"Plant Id": pd.Int64Dtype(),
}

def load_excel_file(self, page, **partition):
"""Produce the ExcelFile object for the given (partition, page).
We adapt this method because PHMSA has multiple files per partition.
Args:
page (str): pudl name for the dataset contents, eg
"boiler_generator_assn" or "coal_stocks"
partition: partition to load. (ex: 2009 for year partition or
"2020-08" for year_month partition)
Returns:
pd.ExcelFile instance with the parsed excel spreadsheet frame
"""
# Get all zipfiles for partitions
files = self.ds.get_zipfile_resources(self._dataset_name, **partition)

# For each zipfile, get a list of file names.
for file_name, file in files:
file_names = self.ds.get_zipfile_file_names(file)
for xlsx_filename in file_names:
if xlsx_filename not in self._file_cache and file.endswith(".xlsx"):
excel_file = pd.ExcelFile(zf.read(xlsx_filename))
self._file_cache[xlsx_filename] = excel_file

return self._file_cache[xlsx_filename] # FIX THIS, obviously.


# TODO (bendnorman): Add this information to the metadata
raw_table_names = ("raw_phmsa__distribution",)


@op(
out=DynamicOut(),
required_resource_keys={"dataset_settings"},
)
def phmsa_years_from_settings(context):
"""Return set of years for PHMSA in settings.
These will be used to kick off worker processes to load each year of data in
parallel.
"""
phmsa_settings = context.resources.dataset_settings.phmsagas
for year in phmsa_settings.years:
yield DynamicOutput(year, mapping_key=str(year))


@op(
required_resource_keys={"datastore", "dataset_settings"},
)
def load_single_phmsa_year(context, year: int) -> dict[str, pd.DataFrame]:
"""Load a single year of PHMSA data from file.
Args:
context:
context: dagster keyword that provides access to resources and config.
year:
Year to load.
Returns:
Loaded data in a dataframe.
"""
ds = context.resources.datastore
return Extractor(ds).extract(year=[year])


@op
def merge_phmsa_years(
yearly_dfs: list[dict[str, pd.DataFrame]]
) -> dict[str, pd.DataFrame]:
"""Merge yearly PHMSA dataframes."""
merged = defaultdict(list)
for dfs in yearly_dfs:
for page in dfs:
merged[page].append(dfs[page])

for page in merged:
merged[page] = pd.concat(merged[page])

return merged


@graph_asset
def phmsa_raw_dfs() -> dict[str, pd.DataFrame]:
"""All loaded PHMSA dataframes.
This asset creates a dynamic graph of ops to load EIA860 data in parallel.
"""
years = phmsa_years_from_settings()
dfs = years.map(lambda year: load_single_phmsa_year(year))
return merge_phmsa_years(dfs.collect())


# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
@multi_asset(
outs={table_name: AssetOut() for table_name in sorted(raw_table_names)},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_phmsa(context, phmsa_raw_dfs):
"""Extract raw PHMSA data from excel sheets into dataframes.
Args:
context: dagster keyword that provides access to resources and config.
Returns:
A tuple of extracted EIA dataframes.
"""
# create descriptive table_names
phmsa_raw_dfs = {
"raw_phmsa__" + table_name: df for table_name, df in phmsa_raw_dfs.items()
}
dict(sorted(phmsa_raw_dfs.items()))

return (
Output(output_name=table_name, value=df)
for table_name, df in phmsa_raw_dfs.items()
)
raw_table_names = ("raw_phmsagas__distribution",)

# phmsa_raw_dfs = excel.raw_df_factory(Extractor, name="phmsagas")


# # TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
# @multi_asset(
# outs={table_name: AssetOut() for table_name in sorted(raw_table_names)},
# required_resource_keys={"datastore", "dataset_settings"},
# )
# def extract_phmsagas(context, phmsa_raw_dfs):
# """Extract raw PHMSA gas data from excel sheets into dataframes.

# Args:
# context: dagster keyword that provides access to resources and config.

# Returns:
# A tuple of extracted PHMSA gas dataframes.
# """
# ds = context.resources.datastore

# # create descriptive table_names
# phmsa_raw_dfs = {
# "raw_phmsagas__" + table_name: df for table_name, df in phmsa_raw_dfs.items()
# }
# phmsa_raw_dfs = dict(sorted(phmsa_raw_dfs.items()))

# return (
# Output(output_name=table_name, value=df)
# for table_name, df in phmsa_raw_dfs.items()
# )
2 changes: 1 addition & 1 deletion src/pudl/metadata/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@
"working_partitions": {
# 1970 - 1989 are all in one CSV in multiple tabs with multi-column headers
# and will need to be more extensively processed, not currently integrated.
"years": sorted(set(range(1990, 2023))),
"start_years": {1970, 2001, 2004, 2010},
},
"keywords": sorted(set(KEYWORDS["phmsa"] + KEYWORDS["us_govt"])),
"license_raw": LICENSES["us-govt"],
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/package_data/settings/etl_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ datasets:
states: [ID, ME]
years: [2020, 2022]
phmsagas:
years: [2020, 2022]
start_years: [2010]
37 changes: 1 addition & 36 deletions src/pudl/package_data/settings/etl_full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -301,39 +301,4 @@ datasets:
2022,
]
phmsagas:
years:
[
1990,
1991,
1992,
1993,
1994,
1995,
1996,
1997,
1998,
1999,
2000,
2001,
2002,
2003,
2004,
2005,
2006,
2007,
2008,
2009,
2010,
2011,
2012,
2013,
2014,
2015,
2016,
2017,
2018,
2019,
2020,
2021,
2022,
]
start_years: [1970, 2001, 2004, 2010]
4 changes: 2 additions & 2 deletions src/pudl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ class PhmsaGasSettings(GenericDatasetSettings):
Args:
data_source: DataSource metadata object
years: list of years to validate.
start_years: list of zipped data start years to validate.
"""

data_source: ClassVar[DataSource] = DataSource.from_id("phmsagas")
years: list[int] = data_source.working_partitions["years"]
start_years: list[int] = data_source.working_partitions["start_years"]


class Eia923Settings(GenericDatasetSettings):
Expand Down

0 comments on commit b06585e

Please sign in to comment.