diff --git a/src/pudl/extract/phmsagas.py b/src/pudl/extract/phmsagas.py index 92b924d1cd..e21d465ea2 100644 --- a/src/pudl/extract/phmsagas.py +++ b/src/pudl/extract/phmsagas.py @@ -1,23 +1,11 @@ -"""Retrieve data from PHMSA natural gas spreadsheets for analysis. +"""Retrieves data from PHMSA natural gas spreadsheets for analysis. This modules pulls data from PHMSA's published Excel spreadsheets. - -This code is for use analyzing PHMSA data. """ -from collections import defaultdict +import zipfile as zf import pandas as pd -from dagster import ( - AssetOut, - DynamicOut, - DynamicOutput, - Output, - graph_asset, - multi_asset, - op, -) - -import pudl + import pudl.logging_helpers from pudl.extract import excel @@ -25,9 +13,7 @@ class Extractor(excel.GenericExtractor): - """Extractor for the excel dataset EIA860.""" - - # TODO (e-belfer): Handle partitions, which aren't yearly. + """Extractor for the excel dataset PHMSA.""" def __init__(self, *args, **kwargs): """Initialize the module. @@ -47,97 +33,80 @@ def process_raw(self, df, page, **partition): """ df = df.rename(columns=self._metadata.get_column_map(page, **partition)) if "report_year" not in df.columns: - df["report_year"] = list(partition.values())[0] + df["report_year"] = list(partition.values())[0] # Fix self.cols_added = ["report_year"] + # Eventually we should probably make this a transform + # for col in ["generator_id", "boiler_id"]: + # if col in df.columns: + # df = remove_leading_zeros_from_numeric_strings(df=df, col_name=col) + df = self.add_data_maturity(df, page, **partition) return df + @staticmethod + def get_dtypes(page, **partition): + """Returns dtypes for plant id columns.""" + return { + "Plant ID": pd.Int64Dtype(), + "Plant Id": pd.Int64Dtype(), + } + + def load_excel_file(self, page, **partition): + """Produce the ExcelFile object for the given (partition, page). + + We adapt this method because PHMSA has multiple files per partition. + + Args: + page (str): pudl name for the dataset contents, eg + "boiler_generator_assn" or "coal_stocks" + partition: partition to load. (ex: 2009 for year partition or + "2020-08" for year_month partition) + + Returns: + pd.ExcelFile instance with the parsed excel spreadsheet frame + """ + # Get all zipfiles for partitions + files = self.ds.get_zipfile_resources(self._dataset_name, **partition) + + # For each zipfile, get a list of file names. + for file_name, file in files: + file_names = self.ds.get_zipfile_file_names(file) + for xlsx_filename in file_names: + if xlsx_filename not in self._file_cache and file.endswith(".xlsx"): + excel_file = pd.ExcelFile(zf.read(xlsx_filename)) + self._file_cache[xlsx_filename] = excel_file + + return self._file_cache[xlsx_filename] # FIX THIS, obviously. + # TODO (bendnorman): Add this information to the metadata -raw_table_names = ("raw_phmsa__distribution",) - - -@op( - out=DynamicOut(), - required_resource_keys={"dataset_settings"}, -) -def phmsa_years_from_settings(context): - """Return set of years for PHMSA in settings. - - These will be used to kick off worker processes to load each year of data in - parallel. - """ - phmsa_settings = context.resources.dataset_settings.phmsagas - for year in phmsa_settings.years: - yield DynamicOutput(year, mapping_key=str(year)) - - -@op( - required_resource_keys={"datastore", "dataset_settings"}, -) -def load_single_phmsa_year(context, year: int) -> dict[str, pd.DataFrame]: - """Load a single year of PHMSA data from file. - - Args: - context: - context: dagster keyword that provides access to resources and config. - year: - Year to load. - - Returns: - Loaded data in a dataframe. - """ - ds = context.resources.datastore - return Extractor(ds).extract(year=[year]) - - -@op -def merge_phmsa_years( - yearly_dfs: list[dict[str, pd.DataFrame]] -) -> dict[str, pd.DataFrame]: - """Merge yearly PHMSA dataframes.""" - merged = defaultdict(list) - for dfs in yearly_dfs: - for page in dfs: - merged[page].append(dfs[page]) - - for page in merged: - merged[page] = pd.concat(merged[page]) - - return merged - - -@graph_asset -def phmsa_raw_dfs() -> dict[str, pd.DataFrame]: - """All loaded PHMSA dataframes. - - This asset creates a dynamic graph of ops to load EIA860 data in parallel. - """ - years = phmsa_years_from_settings() - dfs = years.map(lambda year: load_single_phmsa_year(year)) - return merge_phmsa_years(dfs.collect()) - - -# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return -@multi_asset( - outs={table_name: AssetOut() for table_name in sorted(raw_table_names)}, - required_resource_keys={"datastore", "dataset_settings"}, -) -def extract_phmsa(context, phmsa_raw_dfs): - """Extract raw PHMSA data from excel sheets into dataframes. - - Args: - context: dagster keyword that provides access to resources and config. - - Returns: - A tuple of extracted EIA dataframes. - """ - # create descriptive table_names - phmsa_raw_dfs = { - "raw_phmsa__" + table_name: df for table_name, df in phmsa_raw_dfs.items() - } - dict(sorted(phmsa_raw_dfs.items())) - - return ( - Output(output_name=table_name, value=df) - for table_name, df in phmsa_raw_dfs.items() - ) +raw_table_names = ("raw_phmsagas__distribution",) + +# phmsa_raw_dfs = excel.raw_df_factory(Extractor, name="phmsagas") + + +# # TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return +# @multi_asset( +# outs={table_name: AssetOut() for table_name in sorted(raw_table_names)}, +# required_resource_keys={"datastore", "dataset_settings"}, +# ) +# def extract_phmsagas(context, phmsa_raw_dfs): +# """Extract raw PHMSA gas data from excel sheets into dataframes. + +# Args: +# context: dagster keyword that provides access to resources and config. + +# Returns: +# A tuple of extracted PHMSA gas dataframes. +# """ +# ds = context.resources.datastore + +# # create descriptive table_names +# phmsa_raw_dfs = { +# "raw_phmsagas__" + table_name: df for table_name, df in phmsa_raw_dfs.items() +# } +# phmsa_raw_dfs = dict(sorted(phmsa_raw_dfs.items())) + +# return ( +# Output(output_name=table_name, value=df) +# for table_name, df in phmsa_raw_dfs.items() +# ) diff --git a/src/pudl/metadata/sources.py b/src/pudl/metadata/sources.py index f60a286d98..a6fd2d7736 100644 --- a/src/pudl/metadata/sources.py +++ b/src/pudl/metadata/sources.py @@ -566,7 +566,7 @@ "working_partitions": { # 1970 - 1989 are all in one CSV in multiple tabs with multi-column headers # and will need to be more extensively processed, not currently integrated. - "years": sorted(set(range(1990, 2023))), + "start_years": {1970, 2001, 2004, 2010}, }, "keywords": sorted(set(KEYWORDS["phmsa"] + KEYWORDS["us_govt"])), "license_raw": LICENSES["us-govt"], diff --git a/src/pudl/package_data/settings/etl_fast.yml b/src/pudl/package_data/settings/etl_fast.yml index 43692ceb98..dd14afbd7d 100644 --- a/src/pudl/package_data/settings/etl_fast.yml +++ b/src/pudl/package_data/settings/etl_fast.yml @@ -78,4 +78,4 @@ datasets: states: [ID, ME] years: [2020, 2022] phmsagas: - years: [2020, 2022] + start_years: [2010] diff --git a/src/pudl/package_data/settings/etl_full.yml b/src/pudl/package_data/settings/etl_full.yml index 186c6d9132..2f5ea6ddad 100644 --- a/src/pudl/package_data/settings/etl_full.yml +++ b/src/pudl/package_data/settings/etl_full.yml @@ -301,39 +301,4 @@ datasets: 2022, ] phmsagas: - years: - [ - 1990, - 1991, - 1992, - 1993, - 1994, - 1995, - 1996, - 1997, - 1998, - 1999, - 2000, - 2001, - 2002, - 2003, - 2004, - 2005, - 2006, - 2007, - 2008, - 2009, - 2010, - 2011, - 2012, - 2013, - 2014, - 2015, - 2016, - 2017, - 2018, - 2019, - 2020, - 2021, - 2022, - ] + start_years: [1970, 2001, 2004, 2010] diff --git a/src/pudl/settings.py b/src/pudl/settings.py index 5ecaadd2e9..96c66aaeeb 100644 --- a/src/pudl/settings.py +++ b/src/pudl/settings.py @@ -163,11 +163,11 @@ class PhmsaGasSettings(GenericDatasetSettings): Args: data_source: DataSource metadata object - years: list of years to validate. + start_years: list of zipped data start years to validate. """ data_source: ClassVar[DataSource] = DataSource.from_id("phmsagas") - years: list[int] = data_source.working_partitions["years"] + start_years: list[int] = data_source.working_partitions["start_years"] class Eia923Settings(GenericDatasetSettings):