From 608adc78a4ff044b1ea1575b942aac14c6ec95cf Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Thu, 19 Dec 2024 13:41:29 -0500 Subject: [PATCH 1/4] add exception handling for recent data scraping airflow operators --- airflow/plugins/operators/scrape_ntd_api.py | 56 +++-- airflow/plugins/operators/scrape_ntd_xlsx.py | 142 +++++++------ .../operators/scrape_state_geoportal.py | 191 +++++++++++------- 3 files changed, 230 insertions(+), 159 deletions(-) diff --git a/airflow/plugins/operators/scrape_ntd_api.py b/airflow/plugins/operators/scrape_ntd_api.py index cb541a63c2..a4fbf8e7af 100644 --- a/airflow/plugins/operators/scrape_ntd_api.py +++ b/airflow/plugins/operators/scrape_ntd_api.py @@ -13,6 +13,7 @@ ) from pydantic import HttpUrl, parse_obj_as +from airflow.exceptions import AirflowException from airflow.models import BaseOperator # type: ignore API_BUCKET = os.environ["CALITP_BUCKET__NTD_API_DATA_PRODUCTS"] @@ -52,25 +53,30 @@ def fetch_from_ntd_api(self): validated_url = parse_obj_as(HttpUrl, url) - response = requests.get(validated_url).content + response = requests.get(validated_url) + response.raise_for_status() # Raises an HTTPError for bad responses + response_content = response.content - if response is None or len(response) == 0: + if response_content is None or len(response_content) == 0: logging.info( f"There is no data to download for {self.year} / {self.product}. Ending pipeline." ) + return None - pass - else: - logging.info( - f"Downloaded {self.product} data for {self.year} with {len(response)} rows!" - ) - - return response + logging.info( + f"Downloaded {self.product} data for {self.year} with {len(response_content)} rows!" + ) + return response_content + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred: {e}") + raise AirflowException(f"HTTP error in NTD API request: {e}") except requests.exceptions.RequestException as e: - logging.info(f"An error occurred: {e}") - - raise + logging.error(f"Request error occurred: {e}") + raise AirflowException(f"Error in NTD API request: {e}") + except Exception as e: + logging.error(f"Unexpected error occurred: {e}") + raise AirflowException(f"Unexpected error in NTD API request: {e}") class JSONExtract(NtdDataProductAPIExtract): @@ -111,16 +117,24 @@ def __init__( super().__init__(**kwargs) def execute(self, **kwargs): - api_content = self.extract.fetch_from_ntd_api() - - decode_api_content = api_content.decode("utf-8") + try: + api_content = self.extract.fetch_from_ntd_api() + if api_content is None: + return None - df = pd.read_json(decode_api_content) + decode_api_content = api_content.decode("utf-8") + df = pd.read_json(decode_api_content) + df = df.rename(make_name_bq_safe, axis="columns") - df = df.rename(make_name_bq_safe, axis="columns") + self.gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) - self.gzipped_content = gzip.compress( - df.to_json(orient="records", lines=True).encode() - ) + self.extract.save_content(fs=get_fs(), content=self.gzipped_content) - self.extract.save_content(fs=get_fs(), content=self.gzipped_content) + except ValueError as e: + logging.error(f"Error parsing JSON data: {e}") + raise AirflowException(f"Failed to parse JSON data: {e}") + except Exception as e: + logging.error(f"Error processing NTD API data: {e}") + raise AirflowException(f"Failed to process NTD API data: {e}") diff --git a/airflow/plugins/operators/scrape_ntd_xlsx.py b/airflow/plugins/operators/scrape_ntd_xlsx.py index 4c19f21008..c2a01552ad 100644 --- a/airflow/plugins/operators/scrape_ntd_xlsx.py +++ b/airflow/plugins/operators/scrape_ntd_xlsx.py @@ -14,6 +14,7 @@ ) from pydantic import HttpUrl, parse_obj_as +from airflow.exceptions import AirflowException from airflow.models import BaseOperator # type: ignore RAW_XLSX_BUCKET = os.environ["CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"] @@ -48,10 +49,16 @@ # pulls the URL from XCom def pull_url_from_xcom(key, context): - task_instance = context["ti"] - pulled_value = task_instance.xcom_pull(task_ids="scrape_ntd_xlsx_urls", key=key) - print(f"Pulled value from XCom: {pulled_value}") - return pulled_value + try: + task_instance = context["ti"] + pulled_value = task_instance.xcom_pull(task_ids="scrape_ntd_xlsx_urls", key=key) + if pulled_value is None: + raise AirflowException(f"No URL found in XCom for key: {key}") + print(f"Pulled value from XCom: {pulled_value}") + return pulled_value + except Exception as e: + logging.error(f"Error pulling URL from XCom: {e}") + raise AirflowException(f"Failed to pull URL from XCom: {e}") class NtdDataProductXLSXExtract(PartitionedGCSArtifact): @@ -75,35 +82,34 @@ class Config: arbitrary_types_allowed = True def fetch_from_ntd_xlsx(self, file_url): - # As of now, the NTD XLSX download links change every time they update the file, so we have special handling for that here, which is dependent on - # another dag task called scrape_ntd_xlsx_urls.py. if we look to download other xlsx files from the DOT portal and they - # also change the file name every time they publish, they we will have to add the same handling for all of these files and make it programmatic - validated_url = parse_obj_as(HttpUrl, file_url) - logging.info(f"reading file from url {validated_url}") try: - excel_content = requests.get(validated_url).content + response = requests.get(validated_url) + response.raise_for_status() # Raises an HTTPError for bad responses + excel_content = response.content if excel_content is None or len(excel_content) == 0: logging.info( f"There is no data to download for {self.year} / {self.product}. Ending pipeline." ) + return None - pass - - else: - logging.info( - f"Downloaded {self.product} data for {self.year} with {len(excel_content)} rows!" - ) - - return excel_content + logging.info( + f"Downloaded {self.product} data for {self.year} with {len(excel_content)} rows!" + ) + return excel_content + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred: {e}") + raise AirflowException(f"HTTP error in XLSX download: {e}") except requests.exceptions.RequestException as e: - logging.info(f"An error occurred: {e}") - - raise + logging.error(f"Request error occurred: {e}") + raise AirflowException(f"Error downloading XLSX file: {e}") + except Exception as e: + logging.error(f"Unexpected error occurred: {e}") + raise AirflowException(f"Unexpected error in XLSX download: {e}") class RawExtract(NtdDataProductXLSXExtract): @@ -140,43 +146,57 @@ def __init__( super().__init__(*args, **kwargs) def execute(self, context, *args, **kwargs): - download_url = self.raw_excel_extract.file_url - - key = (self.product, self.year) - - if key in xcom_keys: - download_url = pull_url_from_xcom(key=xcom_keys[key], context=context) - - # see what is returned - logging.info(f"reading {self.product} url as {download_url}") - - excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url) - - self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content) - - excel_data = BytesIO(excel_content) - df_dict = pd.read_excel(excel_data, sheet_name=None, engine="openpyxl") - - for key, df in df_dict.items(): - df = df.rename(make_name_bq_safe, axis="columns") - - logging.info(f"read {df.shape[0]} rows and {df.shape[1]} columns") - - self.clean_gzipped_content = gzip.compress( - df.to_json(orient="records", lines=True).encode() - ) - - tab_name = "" - - tab_name = make_name_bq_safe(key) - - # Save clean gzipped jsonl files to the clean bucket - self.clean_excel_extract = CleanExtract( - year=self.year, - product=self.product + "/" + self.year + "/" + tab_name, - filename=f"{self.year}__{self.product}__{tab_name}.jsonl.gz", - ) - - self.clean_excel_extract.save_content( - fs=get_fs(), content=self.clean_gzipped_content - ) + try: + download_url = self.raw_excel_extract.file_url + + key = (self.product, self.year) + if key in xcom_keys: + download_url = pull_url_from_xcom(key=xcom_keys[key], context=context) + + logging.info(f"reading {self.product} url as {download_url}") + + excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url) + if excel_content is None: + return None + + # Save raw content + self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content) + + # Process Excel file + excel_data = BytesIO(excel_content) + try: + df_dict = pd.read_excel(excel_data, sheet_name=None, engine="openpyxl") + except Exception as e: + logging.error(f"Error reading Excel file: {e}") + raise AirflowException(f"Failed to read Excel file: {e}") + + # Process each sheet + for key, df in df_dict.items(): + try: + df = df.rename(make_name_bq_safe, axis="columns") + logging.info(f"read {df.shape[0]} rows and {df.shape[1]} columns") + + self.clean_gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) + + tab_name = make_name_bq_safe(key) + + # Save clean gzipped jsonl files to the clean bucket + self.clean_excel_extract = CleanExtract( + year=self.year, + product=self.product + "/" + self.year + "/" + tab_name, + filename=f"{self.year}__{self.product}__{tab_name}.jsonl.gz", + ) + + self.clean_excel_extract.save_content( + fs=get_fs(), content=self.clean_gzipped_content + ) + + except Exception as e: + logging.error(f"Error processing sheet {key}: {e}") + raise AirflowException(f"Failed to process Excel sheet {key}: {e}") + + except Exception as e: + logging.error(f"Error in XLSX operator execution: {e}") + raise AirflowException(f"XLSX operator execution failed: {e}") diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index a538df5495..e8fb636fec 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -9,6 +9,7 @@ from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore from pydantic import HttpUrl, parse_obj_as +from airflow.exceptions import AirflowException from airflow.models import BaseOperator # type: ignore API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] @@ -82,57 +83,72 @@ def fetch_from_state_geoportal(self): offset = 0 while True: - # Update the resultOffset for each request - params["resultOffset"] = offset - - # Make the request - response = requests.get(validated_url, params=params) - response.raise_for_status() - data = response.json() - - # Break the loop if there are no more features - if "features" not in data or not data["features"]: - break - - # Append the retrieved features - all_features.extend(data["features"]) - - # Increment the offset - offset += params["resultRecordCount"] - - if all_features is None or len(all_features) == 0: + try: + # Update the resultOffset for each request + params["resultOffset"] = offset + + # Make the request + response = requests.get(validated_url, params=params) + response.raise_for_status() # Raises an HTTPError for bad responses + data = response.json() + + # Break the loop if there are no more features + if "features" not in data or not data["features"]: + break + + # Append the retrieved features + all_features.extend(data["features"]) + + # Increment the offset + offset += params["resultRecordCount"] + + except requests.exceptions.HTTPError as e: + logging.error( + f"HTTP error in batch request at offset {offset}: {e}" + ) + raise AirflowException(f"HTTP error in geoportal request: {e}") + except requests.exceptions.RequestException as e: + logging.error(f"Request error in batch at offset {offset}: {e}") + raise AirflowException(f"Request error in geoportal request: {e}") + except Exception as e: + logging.error(f"Error processing batch at offset {offset}: {e}") + raise AirflowException(f"Error processing geoportal batch: {e}") + + if not all_features: logging.info( f"There is no data to download for {self.product}. Ending pipeline." ) + return None - pass - else: - logging.info( - f"Downloaded {self.product} data with {len(all_features)} rows!" - ) - - return all_features - - except requests.exceptions.RequestException as e: - logging.info(f"An error occurred: {e}") + logging.info( + f"Downloaded {self.product} data with {len(all_features)} rows!" + ) + return all_features - raise + except Exception as e: + logging.error(f"Error in geoportal data fetch: {e}") + raise AirflowException(f"Failed to fetch geoportal data: {e}") -# # Function to convert coordinates to WKT format +# Function to convert coordinates to WKT format def to_wkt(geometry_type, coordinates): - if geometry_type == "LineString": - # Format as a LineString - coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates]) - return f"LINESTRING({coords_str})" - elif geometry_type == "MultiLineString": - # Format as a MultiLineString - multiline_coords_str = ", ".join( - f"({', '.join([f'{lng} {lat}' for lng, lat in line])})" - for line in coordinates - ) - return f"MULTILINESTRING({multiline_coords_str})" - else: + try: + if geometry_type == "LineString": + # Format as a LineString + coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates]) + return f"LINESTRING({coords_str})" + elif geometry_type == "MultiLineString": + # Format as a MultiLineString + multiline_coords_str = ", ".join( + f"({', '.join([f'{lng} {lat}' for lng, lat in line])})" + for line in coordinates + ) + return f"MULTILINESTRING({multiline_coords_str})" + else: + logging.warning(f"Unsupported geometry type: {geometry_type}") + return None + except Exception as e: + logging.error(f"Error converting coordinates to WKT: {e}") return None @@ -181,37 +197,58 @@ def __init__( super().__init__(**kwargs) def execute(self, **kwargs): - api_content = self.extract.fetch_from_state_geoportal() - - df = pd.json_normalize(api_content) - - if self.product == "state_highway_network": - # Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing - df = df[ - [ - "properties.Route", - "properties.County", - "properties.District", - "properties.RouteType", - "properties.Direction", - "geometry.type", - "geometry.coordinates", - ] - ] - - # Dynamically create a mapping by removing known prefixes - columns = {col: col.split(".")[-1] for col in df.columns} - - # Rename columns using the dynamically created mapping - df = df.rename(columns=columns) - - # Create new column with WKT format - df["wkt_coordinates"] = df.apply( - lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 - ) - - # Compress the DataFrame content and save it - self.gzipped_content = gzip.compress( - df.to_json(orient="records", lines=True).encode() - ) - self.extract.save_content(fs=get_fs(), content=self.gzipped_content) + try: + api_content = self.extract.fetch_from_state_geoportal() + if api_content is None: + return None + + try: + df = pd.json_normalize(api_content) + except Exception as e: + logging.error(f"Error normalizing JSON data: {e}") + raise AirflowException(f"Failed to normalize geoportal data: {e}") + + if self.product == "state_highway_network": + try: + # Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing + df = df[ + [ + "properties.Route", + "properties.County", + "properties.District", + "properties.RouteType", + "properties.Direction", + "geometry.type", + "geometry.coordinates", + ] + ] + + # Dynamically create a mapping by removing known prefixes + columns = {col: col.split(".")[-1] for col in df.columns} + + # Rename columns using the dynamically created mapping + df = df.rename(columns=columns) + + # Create new column with WKT format + df["wkt_coordinates"] = df.apply( + lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 + ) + except Exception as e: + logging.error(f"Error processing state highway network data: {e}") + raise AirflowException( + f"Failed to process state highway network data: {e}" + ) + + # Compress the DataFrame content and save it + try: + self.gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) + self.extract.save_content(fs=get_fs(), content=self.gzipped_content) + except Exception as e: + logging.error(f"Error saving processed data: {e}") + raise AirflowException(f"Failed to save processed geoportal data: {e}") + + except Exception as e: + logging.error(f"Error in geoportal operator execution: {e}") + raise AirflowException(f"Geoportal operator execution failed: {e}") From 60b57efb486c2f3ab0ab40f35b9567c2fe8a72b9 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Thu, 19 Dec 2024 13:42:01 -0500 Subject: [PATCH 2/4] add placeholder dag import timeout variable for local development --- airflow/docker-compose.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index eb106897a9..7f70111a5f 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -52,6 +52,8 @@ x-airflow-common: AIRFLOW__CORE__BASE_LOG_FOLDER: /opt/airflow/gcs/logs AIRFLOW__CORE__PLUGINS_FOLDER: /opt/airflow/gcs/plugins AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE: 'true' + # AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 120 + # this option prevents a DAG from trying to run all dagruns back to its # start date. this lets you it spin up docker, unpause a dag, and just From 2bc5d979856fc389cc05508112fa359a7f3ead7a Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Thu, 19 Dec 2024 13:53:55 -0500 Subject: [PATCH 3/4] add exception handling for dynamic xlsx ursl scraping --- .../scrape_ntd_xlsx_urls.py | 70 +++++++++++++++---- 1 file changed, 56 insertions(+), 14 deletions(-) diff --git a/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py index fde7df568c..1606ce02f0 100644 --- a/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py +++ b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py @@ -6,7 +6,9 @@ import requests from bs4 import BeautifulSoup -from pydantic import HttpUrl, parse_obj_as +from pydantic import HttpUrl, ValidationError, parse_obj_as + +from airflow.exceptions import AirflowException xlsx_urls = { "ridership_url": "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release", @@ -21,8 +23,14 @@ # pushes the scraped URL value to XCom def push_url_to_xcom(key, scraped_url, context): - task_instance = context["ti"] - task_instance.xcom_push(key=key, value=scraped_url) + try: + task_instance = context.get("ti") + if task_instance is None: + raise AirflowException("Task instance not found in context") + task_instance.xcom_push(key=key, value=scraped_url) + except Exception as e: + logging.error(f"Error pushing URL to XCom for key {key}: {e}") + raise AirflowException(f"Failed to push URL to XCom: {e}") # Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/' @@ -33,20 +41,54 @@ def href_matcher(href): def scrape_ntd_xlsx_urls(**context): - for key, value in xlsx_urls.items(): - url = value - req = requests.get(url) - soup = BeautifulSoup(req.text, "html.parser") + for key, url in xlsx_urls.items(): + try: + # Make HTTP request with proper error handling + try: + response = requests.get(url) + response.raise_for_status() # Raises HTTPError for bad responses + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred while fetching {url}: {e}") + raise AirflowException(f"HTTP error for {key}: {e}") + except requests.exceptions.RequestException as e: + logging.error(f"Error occurred while fetching {url}: {e}") + raise AirflowException(f"Request failed for {key}: {e}") + + # Parse HTML with error handling + try: + soup = BeautifulSoup(response.text, "html.parser") + except Exception as e: + logging.error(f"Error parsing HTML for {url}: {e}") + raise AirflowException(f"HTML parsing failed for {key}: {e}") - link = soup.find("a", href=href_matcher) + # Find link with error handling + link = soup.find("a", href=href_matcher) + if not link: + error_msg = f"No XLSX download link found for {key} at {url}" + logging.error(error_msg) + raise AirflowException(error_msg) - # Extract the href if the link is found - file_link = link["href"] if link else None + # Extract href with error handling + file_link = link.get("href") + if not file_link: + error_msg = f"Found link for {key} but href attribute is missing" + logging.error(error_msg) + raise AirflowException(error_msg) - updated_url = f"https://www.transit.dot.gov{file_link}" + # Construct and validate URL + updated_url = f"https://www.transit.dot.gov{file_link}" + try: + validated_url = parse_obj_as(HttpUrl, updated_url) + except ValidationError as e: + logging.error(f"URL validation failed for {updated_url}: {e}") + raise AirflowException(f"Invalid URL constructed for {key}: {e}") - validated_url = parse_obj_as(HttpUrl, updated_url) + logging.info(f"Successfully validated URL for {key}: {validated_url}") - logging.info(f"Validated URL: {validated_url}.") + # Push to XCom + push_url_to_xcom(key=key, scraped_url=validated_url, context=context) - push_url_to_xcom(key=key, scraped_url=validated_url, context=context) + except Exception as e: + # Log any unhandled exceptions and re-raise as AirflowException + logging.error(f"Unexpected error processing {key}: {e}") + raise AirflowException(f"Failed to process {key}: {e}") From 71809590a8ab80436251e5e3719cdf61e545ac40 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Tue, 14 Jan 2025 10:47:27 -0500 Subject: [PATCH 4/4] unnest exception handling, break out operations to be more focused, add description --- .../scrape_ntd_xlsx_urls.py | 107 +++++---- airflow/plugins/operators/scrape_ntd_api.py | 100 +++++--- airflow/plugins/operators/scrape_ntd_xlsx.py | 158 +++++++----- .../operators/scrape_state_geoportal.py | 227 +++++++++--------- 4 files changed, 343 insertions(+), 249 deletions(-) diff --git a/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py index 1606ce02f0..e4ea42d26b 100644 --- a/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py +++ b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py @@ -21,73 +21,92 @@ } -# pushes the scraped URL value to XCom def push_url_to_xcom(key, scraped_url, context): + """Push the scraped URL value to XCom with proper error handling.""" + task_instance = context.get("ti") + if task_instance is None: + raise AirflowException("Task instance not found in context") + try: - task_instance = context.get("ti") - if task_instance is None: - raise AirflowException("Task instance not found in context") task_instance.xcom_push(key=key, value=scraped_url) except Exception as e: logging.error(f"Error pushing URL to XCom for key {key}: {e}") raise AirflowException(f"Failed to push URL to XCom: {e}") -# Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/' def href_matcher(href): + """Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'""" return ( href and href.startswith("/sites/fta.dot.gov/files/") and href.endswith(".xlsx") ) +def make_http_request(url, key): + """Make HTTP request with proper error handling.""" + try: + response = requests.get(url) + response.raise_for_status() + return response + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred while fetching {url}: {e}") + raise AirflowException(f"HTTP error for {key}: {e}") + except requests.exceptions.RequestException as e: + logging.error(f"Error occurred while fetching {url}: {e}") + raise AirflowException(f"Request failed for {key}: {e}") + + +def parse_html_content(response_text, url, key): + """Parse HTML content with error handling.""" + try: + return BeautifulSoup(response_text, "html.parser") + except Exception as e: + logging.error(f"Error parsing HTML for {url}: {e}") + raise AirflowException(f"HTML parsing failed for {key}: {e}") + + +def find_and_validate_xlsx_link(soup, key, url): + """Find and validate XLSX download link.""" + link = soup.find("a", href=href_matcher) + if not link: + error_msg = f"No XLSX download link found for {key} at {url}" + logging.error(error_msg) + raise AirflowException(error_msg) + + file_link = link.get("href") + if not file_link: + error_msg = f"Found link for {key} but href attribute is missing" + logging.error(error_msg) + raise AirflowException(error_msg) + + updated_url = f"https://www.transit.dot.gov{file_link}" + try: + return parse_obj_as(HttpUrl, updated_url) + except ValidationError as e: + logging.error(f"URL validation failed for {updated_url}: {e}") + raise AirflowException(f"Invalid URL constructed for {key}: {e}") + + def scrape_ntd_xlsx_urls(**context): + """Main function to scrape XLSX URLs and push them to XCom.""" for key, url in xlsx_urls.items(): try: - # Make HTTP request with proper error handling - try: - response = requests.get(url) - response.raise_for_status() # Raises HTTPError for bad responses - except requests.exceptions.HTTPError as e: - logging.error(f"HTTP error occurred while fetching {url}: {e}") - raise AirflowException(f"HTTP error for {key}: {e}") - except requests.exceptions.RequestException as e: - logging.error(f"Error occurred while fetching {url}: {e}") - raise AirflowException(f"Request failed for {key}: {e}") - - # Parse HTML with error handling - try: - soup = BeautifulSoup(response.text, "html.parser") - except Exception as e: - logging.error(f"Error parsing HTML for {url}: {e}") - raise AirflowException(f"HTML parsing failed for {key}: {e}") - - # Find link with error handling - link = soup.find("a", href=href_matcher) - if not link: - error_msg = f"No XLSX download link found for {key} at {url}" - logging.error(error_msg) - raise AirflowException(error_msg) - - # Extract href with error handling - file_link = link.get("href") - if not file_link: - error_msg = f"Found link for {key} but href attribute is missing" - logging.error(error_msg) - raise AirflowException(error_msg) - - # Construct and validate URL - updated_url = f"https://www.transit.dot.gov{file_link}" - try: - validated_url = parse_obj_as(HttpUrl, updated_url) - except ValidationError as e: - logging.error(f"URL validation failed for {updated_url}: {e}") - raise AirflowException(f"Invalid URL constructed for {key}: {e}") + # Make HTTP request + response = make_http_request(url, key) + + # Parse HTML content + soup = parse_html_content(response.text, url, key) + + # Find and validate XLSX link + validated_url = find_and_validate_xlsx_link(soup, key, url) logging.info(f"Successfully validated URL for {key}: {validated_url}") # Push to XCom push_url_to_xcom(key=key, scraped_url=validated_url, context=context) + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise except Exception as e: # Log any unhandled exceptions and re-raise as AirflowException logging.error(f"Unexpected error processing {key}: {e}") diff --git a/airflow/plugins/operators/scrape_ntd_api.py b/airflow/plugins/operators/scrape_ntd_api.py index a4fbf8e7af..75f5ac1cfe 100644 --- a/airflow/plugins/operators/scrape_ntd_api.py +++ b/airflow/plugins/operators/scrape_ntd_api.py @@ -41,39 +41,49 @@ def filename(self) -> str: class Config: arbitrary_types_allowed = True - def fetch_from_ntd_api(self): - """ """ + def _make_api_request(self, url: str) -> bytes: + """Make API request with proper error handling.""" + try: + response = requests.get(url) + response.raise_for_status() + return response.content + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error occurred: {e}") + raise AirflowException(f"HTTP error in NTD API request: {e}") + except requests.exceptions.RequestException as e: + logging.error(f"Request error occurred: {e}") + raise AirflowException(f"Error in NTD API request: {e}") - logging.info(f"Downloading NTD data for {self.year} / {self.product}.") + def _validate_response_content(self, content: bytes) -> bytes: + """Validate API response content.""" + if content is None or len(content) == 0: + logging.info( + f"There is no data to download for {self.year} / {self.product}. Ending pipeline." + ) + return None + logging.info( + f"Downloaded {self.product} data for {self.year} with {len(content)} rows!" + ) + return content + def fetch_from_ntd_api(self): + """Fetch data from NTD API with proper error handling.""" try: + # Construct and validate URL url = ( self.root_url + self.endpoint_id + self.file_format + "?$limit=5000000" ) - validated_url = parse_obj_as(HttpUrl, url) - response = requests.get(validated_url) - response.raise_for_status() # Raises an HTTPError for bad responses - response_content = response.content - - if response_content is None or len(response_content) == 0: - logging.info( - f"There is no data to download for {self.year} / {self.product}. Ending pipeline." - ) - return None + # Make API request + response_content = self._make_api_request(validated_url) - logging.info( - f"Downloaded {self.product} data for {self.year} with {len(response_content)} rows!" - ) - return response_content + # Validate response content + return self._validate_response_content(response_content) - except requests.exceptions.HTTPError as e: - logging.error(f"HTTP error occurred: {e}") - raise AirflowException(f"HTTP error in NTD API request: {e}") - except requests.exceptions.RequestException as e: - logging.error(f"Request error occurred: {e}") - raise AirflowException(f"Error in NTD API request: {e}") + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise except Exception as e: logging.error(f"Unexpected error occurred: {e}") raise AirflowException(f"Unexpected error in NTD API request: {e}") @@ -116,25 +126,47 @@ def __init__( super().__init__(**kwargs) - def execute(self, **kwargs): + def _process_api_content(self, api_content: bytes) -> pd.DataFrame: + """Process API content into a DataFrame with error handling.""" try: - api_content = self.extract.fetch_from_ntd_api() - if api_content is None: - return None - decode_api_content = api_content.decode("utf-8") df = pd.read_json(decode_api_content) - df = df.rename(make_name_bq_safe, axis="columns") + return df.rename(make_name_bq_safe, axis="columns") + except ValueError as e: + logging.error(f"Error parsing JSON data: {e}") + raise AirflowException(f"Failed to parse JSON data: {e}") + except Exception as e: + logging.error(f"Error processing API content: {e}") + raise AirflowException(f"Failed to process API content: {e}") - self.gzipped_content = gzip.compress( + def _save_dataframe(self, df: pd.DataFrame) -> None: + """Save DataFrame as compressed JSONL with error handling.""" + try: + gzipped_content = gzip.compress( df.to_json(orient="records", lines=True).encode() ) + self.extract.save_content(fs=get_fs(), content=gzipped_content) + except Exception as e: + logging.error(f"Error saving processed data: {e}") + raise AirflowException(f"Failed to save processed data: {e}") - self.extract.save_content(fs=get_fs(), content=self.gzipped_content) + def execute(self, **kwargs): + """Execute the operator with proper error handling.""" + try: + # Fetch API content + api_content = self.extract.fetch_from_ntd_api() + if api_content is None: + return None - except ValueError as e: - logging.error(f"Error parsing JSON data: {e}") - raise AirflowException(f"Failed to parse JSON data: {e}") + # Process API content + df = self._process_api_content(api_content) + + # Save processed data + self._save_dataframe(df) + + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise except Exception as e: logging.error(f"Error processing NTD API data: {e}") raise AirflowException(f"Failed to process NTD API data: {e}") diff --git a/airflow/plugins/operators/scrape_ntd_xlsx.py b/airflow/plugins/operators/scrape_ntd_xlsx.py index c2a01552ad..5e82ae6b5e 100644 --- a/airflow/plugins/operators/scrape_ntd_xlsx.py +++ b/airflow/plugins/operators/scrape_ntd_xlsx.py @@ -2,7 +2,7 @@ import logging import os from io import BytesIO -from typing import ClassVar, List # Optional +from typing import ClassVar, Dict, List, Optional, Tuple import pandas as pd # type: ignore import pendulum @@ -47,13 +47,17 @@ } -# pulls the URL from XCom -def pull_url_from_xcom(key, context): +def pull_url_from_xcom(key: str, context: dict) -> str: + """Pull URL from XCom with proper error handling.""" try: task_instance = context["ti"] + if task_instance is None: + raise AirflowException("Task instance not found in context") + pulled_value = task_instance.xcom_pull(task_ids="scrape_ntd_xlsx_urls", key=key) if pulled_value is None: raise AirflowException(f"No URL found in XCom for key: {key}") + print(f"Pulled value from XCom: {pulled_value}") return pulled_value except Exception as e: @@ -81,32 +85,42 @@ def filename(self) -> str: class Config: arbitrary_types_allowed = True - def fetch_from_ntd_xlsx(self, file_url): - validated_url = parse_obj_as(HttpUrl, file_url) - logging.info(f"reading file from url {validated_url}") - + def _make_request(self, url: str) -> bytes: + """Make HTTP request with proper error handling.""" try: - response = requests.get(validated_url) - response.raise_for_status() # Raises an HTTPError for bad responses - excel_content = response.content - - if excel_content is None or len(excel_content) == 0: - logging.info( - f"There is no data to download for {self.year} / {self.product}. Ending pipeline." - ) - return None - - logging.info( - f"Downloaded {self.product} data for {self.year} with {len(excel_content)} rows!" - ) - return excel_content - + response = requests.get(url) + response.raise_for_status() + return response.content except requests.exceptions.HTTPError as e: logging.error(f"HTTP error occurred: {e}") raise AirflowException(f"HTTP error in XLSX download: {e}") except requests.exceptions.RequestException as e: logging.error(f"Request error occurred: {e}") raise AirflowException(f"Error downloading XLSX file: {e}") + + def _validate_content(self, content: bytes) -> Optional[bytes]: + """Validate downloaded content.""" + if content is None or len(content) == 0: + logging.info( + f"There is no data to download for {self.year} / {self.product}. Ending pipeline." + ) + return None + logging.info( + f"Downloaded {self.product} data for {self.year} with {len(content)} rows!" + ) + return content + + def fetch_from_ntd_xlsx(self, file_url: str) -> Optional[bytes]: + """Fetch XLSX file with proper error handling.""" + try: + validated_url = parse_obj_as(HttpUrl, file_url) + logging.info(f"reading file from url {validated_url}") + + excel_content = self._make_request(validated_url) + return self._validate_content(excel_content) + + except AirflowException: + raise except Exception as e: logging.error(f"Unexpected error occurred: {e}") raise AirflowException(f"Unexpected error in XLSX download: {e}") @@ -126,8 +140,8 @@ class NtdDataProductXLSXOperator(BaseOperator): def __init__( self, product: str, - xlsx_file_url, - year: int, + xlsx_file_url: str, + year: str, *args, **kwargs, ): @@ -145,16 +159,60 @@ def __init__( super().__init__(*args, **kwargs) - def execute(self, context, *args, **kwargs): + def _get_download_url(self, context: dict) -> str: + """Get download URL from XCom if needed.""" + download_url = self.raw_excel_extract.file_url + key = (self.product, self.year) + if key in xcom_keys: + download_url = pull_url_from_xcom(key=xcom_keys[key], context=context) + logging.info(f"reading {self.product} url as {download_url}") + return download_url + + def _read_excel_file(self, excel_content: bytes) -> Dict[str, pd.DataFrame]: + """Read Excel file with proper error handling.""" + try: + excel_data = BytesIO(excel_content) + return pd.read_excel(excel_data, sheet_name=None, engine="openpyxl") + except Exception as e: + logging.error(f"Error reading Excel file: {e}") + raise AirflowException(f"Failed to read Excel file: {e}") + + def _process_sheet(self, sheet_name: str, df: pd.DataFrame) -> Tuple[str, bytes]: + """Process a single Excel sheet with proper error handling.""" try: - download_url = self.raw_excel_extract.file_url + df = df.rename(make_name_bq_safe, axis="columns") + logging.info(f"read {df.shape[0]} rows and {df.shape[1]} columns") - key = (self.product, self.year) - if key in xcom_keys: - download_url = pull_url_from_xcom(key=xcom_keys[key], context=context) + gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) - logging.info(f"reading {self.product} url as {download_url}") + tab_name = make_name_bq_safe(sheet_name) + return tab_name, gzipped_content + except Exception as e: + logging.error(f"Error processing sheet {sheet_name}: {e}") + raise AirflowException(f"Failed to process Excel sheet {sheet_name}: {e}") + + def _save_processed_sheet(self, tab_name: str, gzipped_content: bytes) -> None: + """Save processed sheet with proper error handling.""" + try: + clean_excel_extract = CleanExtract( + year=self.year, + product=self.product + "/" + self.year + "/" + tab_name, + filename=f"{self.year}__{self.product}__{tab_name}.jsonl.gz", + ) + clean_excel_extract.save_content(fs=get_fs(), content=gzipped_content) + except Exception as e: + logging.error(f"Error saving processed sheet {tab_name}: {e}") + raise AirflowException(f"Failed to save processed sheet {tab_name}: {e}") + def execute(self, context: dict, *args, **kwargs) -> None: + """Execute the operator with proper error handling.""" + try: + # Get download URL + download_url = self._get_download_url(context) + + # Download Excel file excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url) if excel_content is None: return None @@ -162,41 +220,17 @@ def execute(self, context, *args, **kwargs): # Save raw content self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content) - # Process Excel file - excel_data = BytesIO(excel_content) - try: - df_dict = pd.read_excel(excel_data, sheet_name=None, engine="openpyxl") - except Exception as e: - logging.error(f"Error reading Excel file: {e}") - raise AirflowException(f"Failed to read Excel file: {e}") + # Read Excel file + df_dict = self._read_excel_file(excel_content) # Process each sheet - for key, df in df_dict.items(): - try: - df = df.rename(make_name_bq_safe, axis="columns") - logging.info(f"read {df.shape[0]} rows and {df.shape[1]} columns") - - self.clean_gzipped_content = gzip.compress( - df.to_json(orient="records", lines=True).encode() - ) - - tab_name = make_name_bq_safe(key) - - # Save clean gzipped jsonl files to the clean bucket - self.clean_excel_extract = CleanExtract( - year=self.year, - product=self.product + "/" + self.year + "/" + tab_name, - filename=f"{self.year}__{self.product}__{tab_name}.jsonl.gz", - ) - - self.clean_excel_extract.save_content( - fs=get_fs(), content=self.clean_gzipped_content - ) - - except Exception as e: - logging.error(f"Error processing sheet {key}: {e}") - raise AirflowException(f"Failed to process Excel sheet {key}: {e}") + for sheet_name, df in df_dict.items(): + tab_name, gzipped_content = self._process_sheet(sheet_name, df) + self._save_processed_sheet(tab_name, gzipped_content) + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise except Exception as e: logging.error(f"Error in XLSX operator execution: {e}") raise AirflowException(f"XLSX operator execution failed: {e}") diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index e8fb636fec..7ebbc389fe 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -1,7 +1,7 @@ import gzip import logging import os -from typing import ClassVar, List +from typing import Any, ClassVar, Dict, List, Optional import pandas as pd # type: ignore import pendulum @@ -21,34 +21,25 @@ class StateGeoportalAPIExtract(PartitionedGCSArtifact): dt: pendulum.Date = execution_ts.date() partition_names: ClassVar[List[str]] = ["dt", "execution_ts"] - # The name to be used in the data warehouse to refer to the data - # product. + # The name to be used in the data warehouse to refer to the data product. product: str - # The root of the ArcGIS services. As of Nov 2024, this should - # be "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/". + # The root of the ArcGIS services. root_url: str - # The name of the service being requested. In the feature service's - # URL, this will be everything between the root and "/FeatureServer". - # Don't include a leading or trailing slash. + # The name of the service being requested. service: str - # The layer to query. This will usually be "0", so that is the - # default. + # The layer to query. This will usually be "0". layer: str = "0" - # The query filter. By default, all rows will be returned from the - # service. Refer to the ArcGIS documentation for syntax: - # https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters + # The query filter. By default, all rows will be returned. where: str = "1=1" - # A comma-separated list of fields to include in the results. Use - # "*" (the default) to include all fields. + # A comma-separated list of fields to include in the results. outFields: str = "*" - # The number of records to request for each API call (the operator - # will request all data from the layer in batches of this size). + # The number of records to request for each API call. resultRecordCount: int @property @@ -62,13 +53,37 @@ def filename(self) -> str: class Config: arbitrary_types_allowed = True - def fetch_from_state_geoportal(self): - """ """ + def _make_api_request(self, url: str, params: Dict[str, Any], offset: int) -> Dict: + """Make API request with proper error handling.""" + try: + params["resultOffset"] = offset + response = requests.get(url, params=params) + response.raise_for_status() + return response.json() + except requests.exceptions.HTTPError as e: + logging.error(f"HTTP error in batch request at offset {offset}: {e}") + raise AirflowException(f"HTTP error in geoportal request: {e}") + except requests.exceptions.RequestException as e: + logging.error(f"Request error in batch at offset {offset}: {e}") + raise AirflowException(f"Request error in geoportal request: {e}") + except Exception as e: + logging.error(f"Error processing batch at offset {offset}: {e}") + raise AirflowException(f"Error processing geoportal batch: {e}") - logging.info(f"Downloading state geoportal data for {self.product}.") + def _validate_features(self, data: Dict, offset: int) -> Optional[List]: + """Validate features from API response.""" + if "features" not in data or not data["features"]: + if offset == 0: + logging.info( + f"There is no data to download for {self.product}. Ending pipeline." + ) + return None + return data["features"] + def fetch_from_state_geoportal(self) -> Optional[List]: + """Fetch data from state geoportal with proper error handling.""" try: - # Set up the parameters for the request + # Set up the request URL and parameters url = f"{self.root_url}/{self.service}/FeatureServer/{self.layer}/query" validated_url = parse_obj_as(HttpUrl, url) @@ -79,45 +94,22 @@ def fetch_from_state_geoportal(self): "resultRecordCount": self.resultRecordCount, } - all_features = [] # To store all retrieved rows + all_features = [] offset = 0 while True: - try: - # Update the resultOffset for each request - params["resultOffset"] = offset - - # Make the request - response = requests.get(validated_url, params=params) - response.raise_for_status() # Raises an HTTPError for bad responses - data = response.json() - - # Break the loop if there are no more features - if "features" not in data or not data["features"]: - break - - # Append the retrieved features - all_features.extend(data["features"]) - - # Increment the offset - offset += params["resultRecordCount"] - - except requests.exceptions.HTTPError as e: - logging.error( - f"HTTP error in batch request at offset {offset}: {e}" - ) - raise AirflowException(f"HTTP error in geoportal request: {e}") - except requests.exceptions.RequestException as e: - logging.error(f"Request error in batch at offset {offset}: {e}") - raise AirflowException(f"Request error in geoportal request: {e}") - except Exception as e: - logging.error(f"Error processing batch at offset {offset}: {e}") - raise AirflowException(f"Error processing geoportal batch: {e}") + # Make API request for current batch + data = self._make_api_request(validated_url, params, offset) + + # Validate and process features + features = self._validate_features(data, offset) + if features is None: + break + + all_features.extend(features) + offset += params["resultRecordCount"] if not all_features: - logging.info( - f"There is no data to download for {self.product}. Ending pipeline." - ) return None logging.info( @@ -125,20 +117,21 @@ def fetch_from_state_geoportal(self): ) return all_features + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise except Exception as e: logging.error(f"Error in geoportal data fetch: {e}") raise AirflowException(f"Failed to fetch geoportal data: {e}") -# Function to convert coordinates to WKT format -def to_wkt(geometry_type, coordinates): +def to_wkt(geometry_type: str, coordinates: List) -> Optional[str]: + """Convert coordinates to WKT format with proper error handling.""" try: if geometry_type == "LineString": - # Format as a LineString coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates]) return f"LINESTRING({coords_str})" elif geometry_type == "MultiLineString": - # Format as a MultiLineString multiline_coords_str = ", ".join( f"({', '.join([f'{lng} {lat}' for lng, lat in line])})" for line in coordinates @@ -167,11 +160,11 @@ class StateGeoportalAPIOperator(BaseOperator): def __init__( self, - product, - root_url, - service, - layer, - resultRecordCount, + product: str, + root_url: str, + service: str, + layer: str, + resultRecordCount: int, **kwargs, ): self.product = product @@ -196,59 +189,75 @@ def __init__( super().__init__(**kwargs) - def execute(self, **kwargs): + def _normalize_json_data(self, api_content: List) -> pd.DataFrame: + """Normalize JSON data with proper error handling.""" + try: + return pd.json_normalize(api_content) + except Exception as e: + logging.error(f"Error normalizing JSON data: {e}") + raise AirflowException(f"Failed to normalize geoportal data: {e}") + + def _process_highway_network(self, df: pd.DataFrame) -> pd.DataFrame: + """Process state highway network data with proper error handling.""" + try: + # Select columns to keep + df = df[ + [ + "properties.Route", + "properties.County", + "properties.District", + "properties.RouteType", + "properties.Direction", + "geometry.type", + "geometry.coordinates", + ] + ] + + # Create dynamic column mapping + columns = {col: col.split(".")[-1] for col in df.columns} + df = df.rename(columns=columns) + + # Create WKT coordinates + df["wkt_coordinates"] = df.apply( + lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 + ) + return df + except Exception as e: + logging.error(f"Error processing state highway network data: {e}") + raise AirflowException(f"Failed to process state highway network data: {e}") + + def _save_dataframe(self, df: pd.DataFrame) -> None: + """Save DataFrame as compressed JSONL with proper error handling.""" try: + gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) + self.extract.save_content(fs=get_fs(), content=gzipped_content) + except Exception as e: + logging.error(f"Error saving processed data: {e}") + raise AirflowException(f"Failed to save processed geoportal data: {e}") + + def execute(self, **kwargs) -> None: + """Execute the operator with proper error handling.""" + try: + # Fetch API content api_content = self.extract.fetch_from_state_geoportal() if api_content is None: return None - try: - df = pd.json_normalize(api_content) - except Exception as e: - logging.error(f"Error normalizing JSON data: {e}") - raise AirflowException(f"Failed to normalize geoportal data: {e}") + # Normalize JSON data + df = self._normalize_json_data(api_content) + # Process state highway network if applicable if self.product == "state_highway_network": - try: - # Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing - df = df[ - [ - "properties.Route", - "properties.County", - "properties.District", - "properties.RouteType", - "properties.Direction", - "geometry.type", - "geometry.coordinates", - ] - ] - - # Dynamically create a mapping by removing known prefixes - columns = {col: col.split(".")[-1] for col in df.columns} - - # Rename columns using the dynamically created mapping - df = df.rename(columns=columns) - - # Create new column with WKT format - df["wkt_coordinates"] = df.apply( - lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 - ) - except Exception as e: - logging.error(f"Error processing state highway network data: {e}") - raise AirflowException( - f"Failed to process state highway network data: {e}" - ) - - # Compress the DataFrame content and save it - try: - self.gzipped_content = gzip.compress( - df.to_json(orient="records", lines=True).encode() - ) - self.extract.save_content(fs=get_fs(), content=self.gzipped_content) - except Exception as e: - logging.error(f"Error saving processed data: {e}") - raise AirflowException(f"Failed to save processed geoportal data: {e}") + df = self._process_highway_network(df) + + # Save processed data + self._save_dataframe(df) + except AirflowException: + # Re-raise AirflowExceptions as they already have proper error messages + raise except Exception as e: logging.error(f"Error in geoportal operator execution: {e}") raise AirflowException(f"Geoportal operator execution failed: {e}")