Skip to content

Commit

Permalink
Merge pull request #3258 from catalyst-cooperative/phmsa-transmission-k
Browse files Browse the repository at this point in the history
PHMSA gas extract step for transmission part k
  • Loading branch information
cmgosnell authored Jan 19, 2024
2 parents 72ce726 + eb739ed commit a996d30
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 33 deletions.
101 changes: 86 additions & 15 deletions devtools/debug-column-mapping.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,37 @@
"First, select the raw dataset you're going to be mapping and locate all relevant file directories."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pudl\n",
"from pudl.workspace.datastore import ZenodoDoiSettings\n",
"import os\n",
"import importlib\n",
"from pathlib import Path\n",
"import pandas as pd\n",
"from zipfile import ZipFile\n",
"import logging\n",
"import sys\n",
"import types\n",
"\n",
"import pudl\n",
"from pudl.workspace.datastore import ZenodoDoiSettings\n",
"from pudl.extract.phmsagas import Extractor\n",
"\n",
"logger = logging.getLogger()\n",
"logger.setLevel(logging.INFO)\n",
"handler = logging.StreamHandler(stream=sys.stdout)\n",
"formatter = logging.Formatter('%(message)s')\n",
"handler.setFormatter(formatter)\n",
"logger.handlers = [handler]"
"logger = pudl.logging_helpers.get_logger(\"__name__\")"
]
},
{
Expand All @@ -50,8 +59,19 @@
"source": [
"dataset = \"phmsagas\"\n",
"doi_path = getattr(ZenodoDoiSettings(), dataset).replace(\"/\", \"-\")\n",
"data_path = os.path.join(os.getenv(\"PUDL_INPUT\"),dataset,doi_path) # Get path to raw data\n",
"map_path = os.path.join(Path(pudl.package_data.__file__).parents[0], dataset) # Get path to mapping CSVs"
"pudl_paths = pudl.workspace.setup.PudlPaths()\n",
"data_path = os.path.join(pudl_paths.pudl_input,dataset,doi_path) # Get path to raw data\n",
"map_path = os.path.join(Path(pudl.package_data.__file__).parents[0], dataset) # Get path to mapping CSVs\n",
"ds = pudl.workspace.datastore.Datastore(pudl_paths.pudl_input)"
]
},
{
"cell_type": "markdown",
"metadata": {
"jp-MarkdownHeadingCollapsed": true
},
"source": [
"## File Check"
]
},
{
Expand Down Expand Up @@ -107,6 +127,13 @@
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Column Map Check"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down Expand Up @@ -139,7 +166,7 @@
" continue\n",
" return match[0]\n",
"\n",
"ds = pudl.workspace.datastore.Datastore()\n",
"\n",
"for page in file_map.index:\n",
" if not table_subset or page in table_subset:\n",
" column_maps = pd.read_csv(\n",
Expand Down Expand Up @@ -178,11 +205,55 @@
"source": [
"Go back and fix any incorrectly labelled columns. Then run the cell above again, until all columns are correctly labelled."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Extractor Check"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"## SETTINGS FOR EXTRACTOR\n",
"extractor_phmsagas = Extractor(ds=ds)\n",
"\n",
"# recommend changing the loglevel here to warning to only get the baddies\n",
"pudl.logging_helpers.configure_root_logger(loglevel=\"WARNING\")\n",
"\n",
"# IF you wanna restrict the years\n",
"working_years = list(range(1990,2023))\n",
"# IF you want to restrict the pages to extract here is a lil way to do that\n",
"# you give pages_you_want_to_extract a lil of pages you want to extract\n",
"# if pages_you_want_to_extract if nothing, you'll get the standard pages\n",
"pages_you_want_to_extract = []\n",
"all_pages = extractor_phmsagas._metadata.get_all_pages()\n",
"def _new_page_getter(self):\n",
" if pages_you_want_to_extract:\n",
" return pages_you_want_to_extract\n",
" else:\n",
" return all_pages\n",
"extractor_phmsagas._metadata.get_all_pages = types.MethodType(_new_page_getter, extractor_phmsagas)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"## RUN THE EXTRACTOR\n",
"extracted_dfs = extractor_phmsagas.extract(year=working_years)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "pudl-dev",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
Expand All @@ -196,9 +267,9 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
"version": "3.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
"nbformat_minor": 4
}
13 changes: 8 additions & 5 deletions src/pudl/extract/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ def get_all_pages(self):
"""Returns list of all known pages."""
return sorted(self._column_map.keys())

def get_form(self, page) -> str:
"""Returns the form name for a given page."""
return self._page_part_map.loc[page, "form"]

@staticmethod
def _load_csv(package, filename):
"""Load metadata from a filename that is found in a package."""
Expand Down Expand Up @@ -200,7 +204,6 @@ def __init__(self, ds):
def process_raw(self, df, page, **partition):
"""Transforms raw dataframe and rename columns."""
df = self.add_data_maturity(df, page, **partition)
self.cols_added.append("data_label")
return df.rename(columns=self._metadata.get_column_map(page, **partition))

def add_data_maturity(self, df: pd.DataFrame, page, **partition) -> pd.DataFrame:
Expand Down Expand Up @@ -321,13 +324,13 @@ def extract(self, **partitions):
missing_raw_cols = set(expected_cols).difference(newdata.columns)
if extra_raw_cols:
logger.warning(
f"Extra columns found in extracted table of "
f"{page}/{str_part}: {extra_raw_cols}"
f"{page}/{str_part}:Extra columns found in extracted table:"
f"\n{extra_raw_cols}"
)
if missing_raw_cols:
logger.warning(
"Expected columns not found in extracted table of "
f"{page}/{str_part}: {missing_raw_cols}"
f"{page}/{str_part}: Expected columns not found in extracted table:"
f"\n{missing_raw_cols}"
)
df = pd.concat(dfs, sort=True, ignore_index=True)

Expand Down
33 changes: 20 additions & 13 deletions src/pudl/extract/phmsagas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""


import pandas as pd
from dagster import AssetOut, Output, multi_asset

import pudl.logging_helpers
Expand All @@ -25,7 +26,7 @@ def __init__(self, *args, **kwargs):
self.cols_added = []
super().__init__(*args, **kwargs)

def process_final_page(self, df, page):
def process_renamed(self, newdata: pd.DataFrame, page: str, **partition):
"""Drop columns that get mapped to other assets.
Older years of PHMSA data have one Excel tab in the raw data, while newer data
Expand All @@ -35,18 +36,23 @@ def process_final_page(self, df, page):
older years, filter by the list of columns specified for the page, with a
warning.
"""
to_drop = [
c
for c in df.columns
if c not in self._metadata.get_all_columns(page)
and c not in self.cols_added
]
if to_drop:
logger.warning(
f"Dropping columns {to_drop} that are not mapped to this asset."
)
df = df.drop(columns=to_drop, errors="ignore")
return df
if (int(partition["year"]) < 2010) and (
self._metadata.get_form(page) == "gas_transmission_gathering"
):
to_drop = [
c
for c in newdata.columns
if c not in self._metadata.get_all_columns(page)
and c not in self.cols_added
]
str_part = str(list(partition.values())[0])
if to_drop:
logger.info(
f"{page}/{str_part}: Dropping columns that are not mapped to this asset:"
f"\n{to_drop}"
)
newdata = newdata.drop(columns=to_drop, errors="ignore")
return newdata


# TODO (bendnorman): Add this information to the metadata
Expand All @@ -56,6 +62,7 @@ def process_final_page(self, df, page):
"raw_phmsagas__yearly_miles_of_gathering_pipe_by_nps",
"raw_phmsagas__yearly_miles_of_transmission_pipe_by_nps",
"raw_phmsagas__yearly_inspections_and_assessments",
"raw_phmsagas__yearly_miles_of_transmission_pipe_by_smys",
)

phmsagas_raw_dfs = excel.raw_df_factory(Extractor, name="phmsagas")
Expand Down
Loading

0 comments on commit a996d30

Please sign in to comment.