diff --git a/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py index fde7df568c..e4ea42d26b 100644 --- a/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py +++ b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py @@ -6,7 +6,9 @@ import requests from bs4 import BeautifulSoup -from pydantic import HttpUrl, parse_obj_as +from pydantic import HttpUrl, ValidationError, parse_obj_as + +from airflow.exceptions import AirflowException xlsx_urls = { "ridership_url": "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release", @@ -19,34 +21,93 @@ } -# pushes the scraped URL value to XCom def push_url_to_xcom(key, scraped_url, context): - task_instance = context["ti"] - task_instance.xcom_push(key=key, value=scraped_url) + """Push the scraped URL value to XCom with proper error handling.""" + task_instance = context.get("ti") + if task_instance is None: + raise AirflowException("Task instance not found in context") + + try: + task_instance.xcom_push(key=key, value=scraped_url) + except Exception as e: + logging.error(f"Error pushing URL to XCom for key {key}: {e}") + raise AirflowException(f"Failed to push URL to XCom: {e}") -# Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/' def href_matcher(href): + """Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'""" return ( href and href.startswith("/sites/fta.dot.gov/files/") and href.endswith(".xlsx") ) -def scrape_ntd_xlsx_urls(**context): - for key, value in xlsx_urls.items(): - url = value - req = requests.get(url) - soup = BeautifulSoup(req.text, "html.parser") +def make_http_request(url, key): + """Make HTTP request with proper error handling.""" + try: + response = requests.get(url) + response.raise_for_status() + return response + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred while fetching {url}: {e}") + raise AirflowException(f"HTTP error for {key}: {e}") + except requests.exceptions.RequestException as e: + logging.error(f"Error occurred while fetching {url}: {e}") + raise AirflowException(f"Request failed for {key}: {e}") + + +def parse_html_content(response_text, url, key): + """Parse HTML content with error handling.""" + try: + return BeautifulSoup(response_text, "html.parser") + except Exception as e: + logging.error(f"Error parsing HTML for {url}: {e}") + raise AirflowException(f"HTML parsing failed for {key}: {e}") + - link = soup.find("a", href=href_matcher) +def find_and_validate_xlsx_link(soup, key, url): + """Find and validate XLSX download link.""" + link = soup.find("a", href=href_matcher) + if not link: + error_msg = f"No XLSX download link found for {key} at {url}" + logging.error(error_msg) + raise AirflowException(error_msg) + + file_link = link.get("href") + if not file_link: + error_msg = f"Found link for {key} but href attribute is missing" + logging.error(error_msg) + raise AirflowException(error_msg) + + updated_url = f"https://www.transit.dot.gov{file_link}" + try: + return parse_obj_as(HttpUrl, updated_url) + except ValidationError as e: + logging.error(f"URL validation failed for {updated_url}: {e}") + raise AirflowException(f"Invalid URL constructed for {key}: {e}") + + +def scrape_ntd_xlsx_urls(**context): + """Main function to scrape XLSX URLs and push them to XCom.""" + for key, url in xlsx_urls.items(): + try: + # Make HTTP request + response = make_http_request(url, key) - # Extract the href if the link is found - file_link = link["href"] if link else None + # Parse HTML content + soup = parse_html_content(response.text, url, key) - updated_url = f"https://www.transit.dot.gov{file_link}" + # Find and validate XLSX link + validated_url = find_and_validate_xlsx_link(soup, key, url) - validated_url = parse_obj_as(HttpUrl, updated_url) + logging.info(f"Successfully validated URL for {key}: {validated_url}") - logging.info(f"Validated URL: {validated_url}.") + # Push to XCom + push_url_to_xcom(key=key, scraped_url=validated_url, context=context) - push_url_to_xcom(key=key, scraped_url=validated_url, context=context) + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise + except Exception as e: + # Log any unhandled exceptions and re-raise as AirflowException + logging.error(f"Unexpected error processing {key}: {e}") + raise AirflowException(f"Failed to process {key}: {e}") diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index eb106897a9..7f70111a5f 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -52,6 +52,8 @@ x-airflow-common: AIRFLOW__CORE__BASE_LOG_FOLDER: /opt/airflow/gcs/logs AIRFLOW__CORE__PLUGINS_FOLDER: /opt/airflow/gcs/plugins AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE: 'true' + # AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 120 + # this option prevents a DAG from trying to run all dagruns back to its # start date. this lets you it spin up docker, unpause a dag, and just diff --git a/airflow/plugins/operators/scrape_ntd_api.py b/airflow/plugins/operators/scrape_ntd_api.py index cb541a63c2..75f5ac1cfe 100644 --- a/airflow/plugins/operators/scrape_ntd_api.py +++ b/airflow/plugins/operators/scrape_ntd_api.py @@ -13,6 +13,7 @@ ) from pydantic import HttpUrl, parse_obj_as +from airflow.exceptions import AirflowException from airflow.models import BaseOperator # type: ignore API_BUCKET = os.environ["CALITP_BUCKET__NTD_API_DATA_PRODUCTS"] @@ -40,37 +41,52 @@ def filename(self) -> str: class Config: arbitrary_types_allowed = True - def fetch_from_ntd_api(self): - """ """ - - logging.info(f"Downloading NTD data for {self.year} / {self.product}.") + def _make_api_request(self, url: str) -> bytes: + """Make API request with proper error handling.""" + try: + response = requests.get(url) + response.raise_for_status() + return response.content + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred: {e}") + raise AirflowException(f"HTTP error in NTD API request: {e}") + except requests.exceptions.RequestException as e: + logging.error(f"Request error occurred: {e}") + raise AirflowException(f"Error in NTD API request: {e}") + + def _validate_response_content(self, content: bytes) -> bytes: + """Validate API response content.""" + if content is None or len(content) == 0: + logging.info( + f"There is no data to download for {self.year} / {self.product}. Ending pipeline." + ) + return None + logging.info( + f"Downloaded {self.product} data for {self.year} with {len(content)} rows!" + ) + return content + def fetch_from_ntd_api(self): + """Fetch data from NTD API with proper error handling.""" try: + # Construct and validate URL url = ( self.root_url + self.endpoint_id + self.file_format + "?$limit=5000000" ) - validated_url = parse_obj_as(HttpUrl, url) - response = requests.get(validated_url).content - - if response is None or len(response) == 0: - logging.info( - f"There is no data to download for {self.year} / {self.product}. Ending pipeline." - ) + # Make API request + response_content = self._make_api_request(validated_url) - pass - else: - logging.info( - f"Downloaded {self.product} data for {self.year} with {len(response)} rows!" - ) - - return response - - except requests.exceptions.RequestException as e: - logging.info(f"An error occurred: {e}") + # Validate response content + return self._validate_response_content(response_content) + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages raise + except Exception as e: + logging.error(f"Unexpected error occurred: {e}") + raise AirflowException(f"Unexpected error in NTD API request: {e}") class JSONExtract(NtdDataProductAPIExtract): @@ -110,17 +126,47 @@ def __init__( super().__init__(**kwargs) - def execute(self, **kwargs): - api_content = self.extract.fetch_from_ntd_api() - - decode_api_content = api_content.decode("utf-8") + def _process_api_content(self, api_content: bytes) -> pd.DataFrame: + """Process API content into a DataFrame with error handling.""" + try: + decode_api_content = api_content.decode("utf-8") + df = pd.read_json(decode_api_content) + return df.rename(make_name_bq_safe, axis="columns") + except ValueError as e: + logging.error(f"Error parsing JSON data: {e}") + raise AirflowException(f"Failed to parse JSON data: {e}") + except Exception as e: + logging.error(f"Error processing API content: {e}") + raise AirflowException(f"Failed to process API content: {e}") + + def _save_dataframe(self, df: pd.DataFrame) -> None: + """Save DataFrame as compressed JSONL with error handling.""" + try: + gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) + self.extract.save_content(fs=get_fs(), content=gzipped_content) + except Exception as e: + logging.error(f"Error saving processed data: {e}") + raise AirflowException(f"Failed to save processed data: {e}") - df = pd.read_json(decode_api_content) + def execute(self, **kwargs): + """Execute the operator with proper error handling.""" + try: + # Fetch API content + api_content = self.extract.fetch_from_ntd_api() + if api_content is None: + return None - df = df.rename(make_name_bq_safe, axis="columns") + # Process API content + df = self._process_api_content(api_content) - self.gzipped_content = gzip.compress( - df.to_json(orient="records", lines=True).encode() - ) + # Save processed data + self._save_dataframe(df) - self.extract.save_content(fs=get_fs(), content=self.gzipped_content) + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise + except Exception as e: + logging.error(f"Error processing NTD API data: {e}") + raise AirflowException(f"Failed to process NTD API data: {e}") diff --git a/airflow/plugins/operators/scrape_ntd_xlsx.py b/airflow/plugins/operators/scrape_ntd_xlsx.py index 4c19f21008..5e82ae6b5e 100644 --- a/airflow/plugins/operators/scrape_ntd_xlsx.py +++ b/airflow/plugins/operators/scrape_ntd_xlsx.py @@ -2,7 +2,7 @@ import logging import os from io import BytesIO -from typing import ClassVar, List # Optional +from typing import ClassVar, Dict, List, Optional, Tuple import pandas as pd # type: ignore import pendulum @@ -14,6 +14,7 @@ ) from pydantic import HttpUrl, parse_obj_as +from airflow.exceptions import AirflowException from airflow.models import BaseOperator # type: ignore RAW_XLSX_BUCKET = os.environ["CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"] @@ -46,12 +47,22 @@ } -# pulls the URL from XCom -def pull_url_from_xcom(key, context): - task_instance = context["ti"] - pulled_value = task_instance.xcom_pull(task_ids="scrape_ntd_xlsx_urls", key=key) - print(f"Pulled value from XCom: {pulled_value}") - return pulled_value +def pull_url_from_xcom(key: str, context: dict) -> str: + """Pull URL from XCom with proper error handling.""" + try: + task_instance = context["ti"] + if task_instance is None: + raise AirflowException("Task instance not found in context") + + pulled_value = task_instance.xcom_pull(task_ids="scrape_ntd_xlsx_urls", key=key) + if pulled_value is None: + raise AirflowException(f"No URL found in XCom for key: {key}") + + print(f"Pulled value from XCom: {pulled_value}") + return pulled_value + except Exception as e: + logging.error(f"Error pulling URL from XCom: {e}") + raise AirflowException(f"Failed to pull URL from XCom: {e}") class NtdDataProductXLSXExtract(PartitionedGCSArtifact): @@ -74,36 +85,45 @@ def filename(self) -> str: class Config: arbitrary_types_allowed = True - def fetch_from_ntd_xlsx(self, file_url): - # As of now, the NTD XLSX download links change every time they update the file, so we have special handling for that here, which is dependent on - # another dag task called scrape_ntd_xlsx_urls.py. if we look to download other xlsx files from the DOT portal and they - # also change the file name every time they publish, they we will have to add the same handling for all of these files and make it programmatic - - validated_url = parse_obj_as(HttpUrl, file_url) - - logging.info(f"reading file from url {validated_url}") - + def _make_request(self, url: str) -> bytes: + """Make HTTP request with proper error handling.""" try: - excel_content = requests.get(validated_url).content - - if excel_content is None or len(excel_content) == 0: - logging.info( - f"There is no data to download for {self.year} / {self.product}. Ending pipeline." - ) - - pass - - else: - logging.info( - f"Downloaded {self.product} data for {self.year} with {len(excel_content)} rows!" - ) + response = requests.get(url) + response.raise_for_status() + return response.content + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred: {e}") + raise AirflowException(f"HTTP error in XLSX download: {e}") + except requests.exceptions.RequestException as e: + logging.error(f"Request error occurred: {e}") + raise AirflowException(f"Error downloading XLSX file: {e}") + + def _validate_content(self, content: bytes) -> Optional[bytes]: + """Validate downloaded content.""" + if content is None or len(content) == 0: + logging.info( + f"There is no data to download for {self.year} / {self.product}. Ending pipeline." + ) + return None + logging.info( + f"Downloaded {self.product} data for {self.year} with {len(content)} rows!" + ) + return content - return excel_content + def fetch_from_ntd_xlsx(self, file_url: str) -> Optional[bytes]: + """Fetch XLSX file with proper error handling.""" + try: + validated_url = parse_obj_as(HttpUrl, file_url) + logging.info(f"reading file from url {validated_url}") - except requests.exceptions.RequestException as e: - logging.info(f"An error occurred: {e}") + excel_content = self._make_request(validated_url) + return self._validate_content(excel_content) + except AirflowException: raise + except Exception as e: + logging.error(f"Unexpected error occurred: {e}") + raise AirflowException(f"Unexpected error in XLSX download: {e}") class RawExtract(NtdDataProductXLSXExtract): @@ -120,8 +140,8 @@ class NtdDataProductXLSXOperator(BaseOperator): def __init__( self, product: str, - xlsx_file_url, - year: int, + xlsx_file_url: str, + year: str, *args, **kwargs, ): @@ -139,44 +159,78 @@ def __init__( super().__init__(*args, **kwargs) - def execute(self, context, *args, **kwargs): + def _get_download_url(self, context: dict) -> str: + """Get download URL from XCom if needed.""" download_url = self.raw_excel_extract.file_url - key = (self.product, self.year) - if key in xcom_keys: download_url = pull_url_from_xcom(key=xcom_keys[key], context=context) - - # see what is returned logging.info(f"reading {self.product} url as {download_url}") + return download_url - excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url) - - self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content) - - excel_data = BytesIO(excel_content) - df_dict = pd.read_excel(excel_data, sheet_name=None, engine="openpyxl") - - for key, df in df_dict.items(): + def _read_excel_file(self, excel_content: bytes) -> Dict[str, pd.DataFrame]: + """Read Excel file with proper error handling.""" + try: + excel_data = BytesIO(excel_content) + return pd.read_excel(excel_data, sheet_name=None, engine="openpyxl") + except Exception as e: + logging.error(f"Error reading Excel file: {e}") + raise AirflowException(f"Failed to read Excel file: {e}") + + def _process_sheet(self, sheet_name: str, df: pd.DataFrame) -> Tuple[str, bytes]: + """Process a single Excel sheet with proper error handling.""" + try: df = df.rename(make_name_bq_safe, axis="columns") - logging.info(f"read {df.shape[0]} rows and {df.shape[1]} columns") - self.clean_gzipped_content = gzip.compress( + gzipped_content = gzip.compress( df.to_json(orient="records", lines=True).encode() ) - tab_name = "" + tab_name = make_name_bq_safe(sheet_name) + return tab_name, gzipped_content + except Exception as e: + logging.error(f"Error processing sheet {sheet_name}: {e}") + raise AirflowException(f"Failed to process Excel sheet {sheet_name}: {e}") - tab_name = make_name_bq_safe(key) - - # Save clean gzipped jsonl files to the clean bucket - self.clean_excel_extract = CleanExtract( + def _save_processed_sheet(self, tab_name: str, gzipped_content: bytes) -> None: + """Save processed sheet with proper error handling.""" + try: + clean_excel_extract = CleanExtract( year=self.year, product=self.product + "/" + self.year + "/" + tab_name, filename=f"{self.year}__{self.product}__{tab_name}.jsonl.gz", ) + clean_excel_extract.save_content(fs=get_fs(), content=gzipped_content) + except Exception as e: + logging.error(f"Error saving processed sheet {tab_name}: {e}") + raise AirflowException(f"Failed to save processed sheet {tab_name}: {e}") - self.clean_excel_extract.save_content( - fs=get_fs(), content=self.clean_gzipped_content - ) + def execute(self, context: dict, *args, **kwargs) -> None: + """Execute the operator with proper error handling.""" + try: + # Get download URL + download_url = self._get_download_url(context) + + # Download Excel file + excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url) + if excel_content is None: + return None + + # Save raw content + self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content) + + # Read Excel file + df_dict = self._read_excel_file(excel_content) + + # Process each sheet + for sheet_name, df in df_dict.items(): + tab_name, gzipped_content = self._process_sheet(sheet_name, df) + self._save_processed_sheet(tab_name, gzipped_content) + + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise + except Exception as e: + logging.error(f"Error in XLSX operator execution: {e}") + raise AirflowException(f"XLSX operator execution failed: {e}") diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index a538df5495..7ebbc389fe 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -1,7 +1,7 @@ import gzip import logging import os -from typing import ClassVar, List +from typing import Any, ClassVar, Dict, List, Optional import pandas as pd # type: ignore import pendulum @@ -9,6 +9,7 @@ from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore from pydantic import HttpUrl, parse_obj_as +from airflow.exceptions import AirflowException from airflow.models import BaseOperator # type: ignore API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] @@ -20,34 +21,25 @@ class StateGeoportalAPIExtract(PartitionedGCSArtifact): dt: pendulum.Date = execution_ts.date() partition_names: ClassVar[List[str]] = ["dt", "execution_ts"] - # The name to be used in the data warehouse to refer to the data - # product. + # The name to be used in the data warehouse to refer to the data product. product: str - # The root of the ArcGIS services. As of Nov 2024, this should - # be "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/". + # The root of the ArcGIS services. root_url: str - # The name of the service being requested. In the feature service's - # URL, this will be everything between the root and "/FeatureServer". - # Don't include a leading or trailing slash. + # The name of the service being requested. service: str - # The layer to query. This will usually be "0", so that is the - # default. + # The layer to query. This will usually be "0". layer: str = "0" - # The query filter. By default, all rows will be returned from the - # service. Refer to the ArcGIS documentation for syntax: - # https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters + # The query filter. By default, all rows will be returned. where: str = "1=1" - # A comma-separated list of fields to include in the results. Use - # "*" (the default) to include all fields. + # A comma-separated list of fields to include in the results. outFields: str = "*" - # The number of records to request for each API call (the operator - # will request all data from the layer in batches of this size). + # The number of records to request for each API call. resultRecordCount: int @property @@ -61,13 +53,37 @@ def filename(self) -> str: class Config: arbitrary_types_allowed = True - def fetch_from_state_geoportal(self): - """ """ - - logging.info(f"Downloading state geoportal data for {self.product}.") + def _make_api_request(self, url: str, params: Dict[str, Any], offset: int) -> Dict: + """Make API request with proper error handling.""" + try: + params["resultOffset"] = offset + response = requests.get(url, params=params) + response.raise_for_status() + return response.json() + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error in batch request at offset {offset}: {e}") + raise AirflowException(f"HTTP error in geoportal request: {e}") + except requests.exceptions.RequestException as e: + logging.error(f"Request error in batch at offset {offset}: {e}") + raise AirflowException(f"Request error in geoportal request: {e}") + except Exception as e: + logging.error(f"Error processing batch at offset {offset}: {e}") + raise AirflowException(f"Error processing geoportal batch: {e}") + + def _validate_features(self, data: Dict, offset: int) -> Optional[List]: + """Validate features from API response.""" + if "features" not in data or not data["features"]: + if offset == 0: + logging.info( + f"There is no data to download for {self.product}. Ending pipeline." + ) + return None + return data["features"] + def fetch_from_state_geoportal(self) -> Optional[List]: + """Fetch data from state geoportal with proper error handling.""" try: - # Set up the parameters for the request + # Set up the request URL and parameters url = f"{self.root_url}/{self.service}/FeatureServer/{self.layer}/query" validated_url = parse_obj_as(HttpUrl, url) @@ -78,61 +94,54 @@ def fetch_from_state_geoportal(self): "resultRecordCount": self.resultRecordCount, } - all_features = [] # To store all retrieved rows + all_features = [] offset = 0 while True: - # Update the resultOffset for each request - params["resultOffset"] = offset - - # Make the request - response = requests.get(validated_url, params=params) - response.raise_for_status() - data = response.json() + # Make API request for current batch + data = self._make_api_request(validated_url, params, offset) - # Break the loop if there are no more features - if "features" not in data or not data["features"]: + # Validate and process features + features = self._validate_features(data, offset) + if features is None: break - # Append the retrieved features - all_features.extend(data["features"]) - - # Increment the offset + all_features.extend(features) offset += params["resultRecordCount"] - if all_features is None or len(all_features) == 0: - logging.info( - f"There is no data to download for {self.product}. Ending pipeline." - ) - - pass - else: - logging.info( - f"Downloaded {self.product} data with {len(all_features)} rows!" - ) + if not all_features: + return None - return all_features - - except requests.exceptions.RequestException as e: - logging.info(f"An error occurred: {e}") + logging.info( + f"Downloaded {self.product} data with {len(all_features)} rows!" + ) + return all_features + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages raise - - -# # Function to convert coordinates to WKT format -def to_wkt(geometry_type, coordinates): - if geometry_type == "LineString": - # Format as a LineString - coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates]) - return f"LINESTRING({coords_str})" - elif geometry_type == "MultiLineString": - # Format as a MultiLineString - multiline_coords_str = ", ".join( - f"({', '.join([f'{lng} {lat}' for lng, lat in line])})" - for line in coordinates - ) - return f"MULTILINESTRING({multiline_coords_str})" - else: + except Exception as e: + logging.error(f"Error in geoportal data fetch: {e}") + raise AirflowException(f"Failed to fetch geoportal data: {e}") + + +def to_wkt(geometry_type: str, coordinates: List) -> Optional[str]: + """Convert coordinates to WKT format with proper error handling.""" + try: + if geometry_type == "LineString": + coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates]) + return f"LINESTRING({coords_str})" + elif geometry_type == "MultiLineString": + multiline_coords_str = ", ".join( + f"({', '.join([f'{lng} {lat}' for lng, lat in line])})" + for line in coordinates + ) + return f"MULTILINESTRING({multiline_coords_str})" + else: + logging.warning(f"Unsupported geometry type: {geometry_type}") + return None + except Exception as e: + logging.error(f"Error converting coordinates to WKT: {e}") return None @@ -151,11 +160,11 @@ class StateGeoportalAPIOperator(BaseOperator): def __init__( self, - product, - root_url, - service, - layer, - resultRecordCount, + product: str, + root_url: str, + service: str, + layer: str, + resultRecordCount: int, **kwargs, ): self.product = product @@ -180,13 +189,18 @@ def __init__( super().__init__(**kwargs) - def execute(self, **kwargs): - api_content = self.extract.fetch_from_state_geoportal() - - df = pd.json_normalize(api_content) + def _normalize_json_data(self, api_content: List) -> pd.DataFrame: + """Normalize JSON data with proper error handling.""" + try: + return pd.json_normalize(api_content) + except Exception as e: + logging.error(f"Error normalizing JSON data: {e}") + raise AirflowException(f"Failed to normalize geoportal data: {e}") - if self.product == "state_highway_network": - # Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing + def _process_highway_network(self, df: pd.DataFrame) -> pd.DataFrame: + """Process state highway network data with proper error handling.""" + try: + # Select columns to keep df = df[ [ "properties.Route", @@ -199,19 +213,51 @@ def execute(self, **kwargs): ] ] - # Dynamically create a mapping by removing known prefixes + # Create dynamic column mapping columns = {col: col.split(".")[-1] for col in df.columns} - - # Rename columns using the dynamically created mapping df = df.rename(columns=columns) - # Create new column with WKT format + # Create WKT coordinates df["wkt_coordinates"] = df.apply( lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 ) + return df + except Exception as e: + logging.error(f"Error processing state highway network data: {e}") + raise AirflowException(f"Failed to process state highway network data: {e}") - # Compress the DataFrame content and save it - self.gzipped_content = gzip.compress( - df.to_json(orient="records", lines=True).encode() - ) - self.extract.save_content(fs=get_fs(), content=self.gzipped_content) + def _save_dataframe(self, df: pd.DataFrame) -> None: + """Save DataFrame as compressed JSONL with proper error handling.""" + try: + gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) + self.extract.save_content(fs=get_fs(), content=gzipped_content) + except Exception as e: + logging.error(f"Error saving processed data: {e}") + raise AirflowException(f"Failed to save processed geoportal data: {e}") + + def execute(self, **kwargs) -> None: + """Execute the operator with proper error handling.""" + try: + # Fetch API content + api_content = self.extract.fetch_from_state_geoportal() + if api_content is None: + return None + + # Normalize JSON data + df = self._normalize_json_data(api_content) + + # Process state highway network if applicable + if self.product == "state_highway_network": + df = self._process_highway_network(df) + + # Save processed data + self._save_dataframe(df) + + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise + except Exception as e: + logging.error(f"Error in geoportal operator execution: {e}") + raise AirflowException(f"Geoportal operator execution failed: {e}")