Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exception handling for recent data scraping airflow operators #3607

Merged
merged 4 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 78 additions & 17 deletions airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import requests
from bs4 import BeautifulSoup
from pydantic import HttpUrl, parse_obj_as
from pydantic import HttpUrl, ValidationError, parse_obj_as

from airflow.exceptions import AirflowException

xlsx_urls = {
"ridership_url": "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release",
Expand All @@ -19,34 +21,93 @@
}


# pushes the scraped URL value to XCom
def push_url_to_xcom(key, scraped_url, context):
task_instance = context["ti"]
task_instance.xcom_push(key=key, value=scraped_url)
"""Push the scraped URL value to XCom with proper error handling."""
task_instance = context.get("ti")
if task_instance is None:
raise AirflowException("Task instance not found in context")

try:
task_instance.xcom_push(key=key, value=scraped_url)
except Exception as e:
logging.error(f"Error pushing URL to XCom for key {key}: {e}")
raise AirflowException(f"Failed to push URL to XCom: {e}")


# Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'
def href_matcher(href):
"""Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'"""
return (
href and href.startswith("/sites/fta.dot.gov/files/") and href.endswith(".xlsx")
)


def scrape_ntd_xlsx_urls(**context):
for key, value in xlsx_urls.items():
url = value
req = requests.get(url)
soup = BeautifulSoup(req.text, "html.parser")
def make_http_request(url, key):
"""Make HTTP request with proper error handling."""
try:
response = requests.get(url)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred while fetching {url}: {e}")
raise AirflowException(f"HTTP error for {key}: {e}")
except requests.exceptions.RequestException as e:
logging.error(f"Error occurred while fetching {url}: {e}")
raise AirflowException(f"Request failed for {key}: {e}")


def parse_html_content(response_text, url, key):
"""Parse HTML content with error handling."""
try:
return BeautifulSoup(response_text, "html.parser")
except Exception as e:
logging.error(f"Error parsing HTML for {url}: {e}")
raise AirflowException(f"HTML parsing failed for {key}: {e}")


link = soup.find("a", href=href_matcher)
def find_and_validate_xlsx_link(soup, key, url):
"""Find and validate XLSX download link."""
link = soup.find("a", href=href_matcher)
if not link:
error_msg = f"No XLSX download link found for {key} at {url}"
logging.error(error_msg)
raise AirflowException(error_msg)

file_link = link.get("href")
if not file_link:
error_msg = f"Found link for {key} but href attribute is missing"
logging.error(error_msg)
raise AirflowException(error_msg)

updated_url = f"https://www.transit.dot.gov{file_link}"
try:
return parse_obj_as(HttpUrl, updated_url)
except ValidationError as e:
logging.error(f"URL validation failed for {updated_url}: {e}")
raise AirflowException(f"Invalid URL constructed for {key}: {e}")


def scrape_ntd_xlsx_urls(**context):
"""Main function to scrape XLSX URLs and push them to XCom."""
for key, url in xlsx_urls.items():
try:
erikamov marked this conversation as resolved.
Show resolved Hide resolved
# Make HTTP request
response = make_http_request(url, key)

# Extract the href if the link is found
file_link = link["href"] if link else None
# Parse HTML content
soup = parse_html_content(response.text, url, key)

updated_url = f"https://www.transit.dot.gov{file_link}"
# Find and validate XLSX link
validated_url = find_and_validate_xlsx_link(soup, key, url)

validated_url = parse_obj_as(HttpUrl, updated_url)
logging.info(f"Successfully validated URL for {key}: {validated_url}")

logging.info(f"Validated URL: {validated_url}.")
# Push to XCom
push_url_to_xcom(key=key, scraped_url=validated_url, context=context)

push_url_to_xcom(key=key, scraped_url=validated_url, context=context)
except AirflowException:
# Re-raise AirflowExceptions as they already have proper error messages
raise
except Exception as e:
# Log any unhandled exceptions and re-raise as AirflowException
logging.error(f"Unexpected error processing {key}: {e}")
raise AirflowException(f"Failed to process {key}: {e}")
2 changes: 2 additions & 0 deletions airflow/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ x-airflow-common:
AIRFLOW__CORE__BASE_LOG_FOLDER: /opt/airflow/gcs/logs
AIRFLOW__CORE__PLUGINS_FOLDER: /opt/airflow/gcs/plugins
AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE: 'true'
# AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 120


# this option prevents a DAG from trying to run all dagruns back to its
# start date. this lets you it spin up docker, unpause a dag, and just
Expand Down
108 changes: 77 additions & 31 deletions airflow/plugins/operators/scrape_ntd_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from pydantic import HttpUrl, parse_obj_as

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator # type: ignore

API_BUCKET = os.environ["CALITP_BUCKET__NTD_API_DATA_PRODUCTS"]
Expand Down Expand Up @@ -40,37 +41,52 @@ def filename(self) -> str:
class Config:
arbitrary_types_allowed = True

def fetch_from_ntd_api(self):
""" """

logging.info(f"Downloading NTD data for {self.year} / {self.product}.")
def _make_api_request(self, url: str) -> bytes:
"""Make API request with proper error handling."""
try:
response = requests.get(url)
response.raise_for_status()
return response.content
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred: {e}")
raise AirflowException(f"HTTP error in NTD API request: {e}")
except requests.exceptions.RequestException as e:
logging.error(f"Request error occurred: {e}")
raise AirflowException(f"Error in NTD API request: {e}")

def _validate_response_content(self, content: bytes) -> bytes:
"""Validate API response content."""
if content is None or len(content) == 0:
logging.info(
f"There is no data to download for {self.year} / {self.product}. Ending pipeline."
)
return None
logging.info(
f"Downloaded {self.product} data for {self.year} with {len(content)} rows!"
)
return content

def fetch_from_ntd_api(self):
"""Fetch data from NTD API with proper error handling."""
try:
# Construct and validate URL
url = (
self.root_url + self.endpoint_id + self.file_format + "?$limit=5000000"
)

validated_url = parse_obj_as(HttpUrl, url)

response = requests.get(validated_url).content

if response is None or len(response) == 0:
logging.info(
f"There is no data to download for {self.year} / {self.product}. Ending pipeline."
)
# Make API request
response_content = self._make_api_request(validated_url)

pass
else:
logging.info(
f"Downloaded {self.product} data for {self.year} with {len(response)} rows!"
)

return response

except requests.exceptions.RequestException as e:
logging.info(f"An error occurred: {e}")
# Validate response content
return self._validate_response_content(response_content)

except AirflowException:
# Re-raise AirflowExceptions as they already have proper error messages
raise
except Exception as e:
logging.error(f"Unexpected error occurred: {e}")
raise AirflowException(f"Unexpected error in NTD API request: {e}")


class JSONExtract(NtdDataProductAPIExtract):
Expand Down Expand Up @@ -110,17 +126,47 @@ def __init__(

super().__init__(**kwargs)

def execute(self, **kwargs):
api_content = self.extract.fetch_from_ntd_api()

decode_api_content = api_content.decode("utf-8")
def _process_api_content(self, api_content: bytes) -> pd.DataFrame:
"""Process API content into a DataFrame with error handling."""
try:
decode_api_content = api_content.decode("utf-8")
df = pd.read_json(decode_api_content)
return df.rename(make_name_bq_safe, axis="columns")
except ValueError as e:
logging.error(f"Error parsing JSON data: {e}")
raise AirflowException(f"Failed to parse JSON data: {e}")
except Exception as e:
logging.error(f"Error processing API content: {e}")
raise AirflowException(f"Failed to process API content: {e}")

def _save_dataframe(self, df: pd.DataFrame) -> None:
"""Save DataFrame as compressed JSONL with error handling."""
try:
gzipped_content = gzip.compress(
df.to_json(orient="records", lines=True).encode()
)
self.extract.save_content(fs=get_fs(), content=gzipped_content)
except Exception as e:
logging.error(f"Error saving processed data: {e}")
raise AirflowException(f"Failed to save processed data: {e}")

df = pd.read_json(decode_api_content)
def execute(self, **kwargs):
"""Execute the operator with proper error handling."""
try:
# Fetch API content
api_content = self.extract.fetch_from_ntd_api()
if api_content is None:
return None

df = df.rename(make_name_bq_safe, axis="columns")
# Process API content
df = self._process_api_content(api_content)

self.gzipped_content = gzip.compress(
df.to_json(orient="records", lines=True).encode()
)
# Save processed data
self._save_dataframe(df)

self.extract.save_content(fs=get_fs(), content=self.gzipped_content)
except AirflowException:
# Re-raise AirflowExceptions as they already have proper error messages
raise
except Exception as e:
logging.error(f"Error processing NTD API data: {e}")
raise AirflowException(f"Failed to process NTD API data: {e}")
Loading