Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automate schedule downloads #61

Merged
merged 32 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a2af9bf
First commit for downloading and saving schedule data
dcjohnson24 Jul 18, 2023
4b38f62
Fix syntax error
dcjohnson24 Jul 18, 2023
50a8a4e
Change version constraint of mapclassify
dcjohnson24 Jul 19, 2023
f56d0d4
remove single quote
dcjohnson24 Jul 20, 2023
140ffbc
Run as a module
dcjohnson24 Jul 20, 2023
9f00363
Add print function for saving csv to public bucket
dcjohnson24 Jul 25, 2023
12f6b08
Download schedule daily at 5:30pm UTC
dcjohnson24 Jul 25, 2023
8aa3691
Save zipfile from transitchicago.com to s3
dcjohnson24 Jul 25, 2023
2ee3d05
Change method of uploading zipfile
dcjohnson24 Jul 26, 2023
7c6a42e
Check that objects exist in bucket
dcjohnson24 Jul 27, 2023
bc91766
Change yield to print
dcjohnson24 Jul 27, 2023
c0c153c
Separate downloading zip file and saving daily summaries
dcjohnson24 Aug 8, 2023
2dc18f3
remove job dependency
dcjohnson24 Aug 8, 2023
d35f310
Add args to same line
dcjohnson24 Aug 8, 2023
ee7b057
Save realtime summary file
dcjohnson24 Aug 13, 2023
461df42
Change to string
dcjohnson24 Aug 13, 2023
e1baeaa
Correct python version name
dcjohnson24 Aug 13, 2023
398d62a
Add quotes
dcjohnson24 Aug 13, 2023
77ef708
Add environment context
dcjohnson24 Aug 13, 2023
4842fa0
Remove quotes
dcjohnson24 Aug 13, 2023
3817614
Test without environment variables
dcjohnson24 Aug 13, 2023
cfb0960
Revert "Test without environment variables"
dcjohnson24 Aug 13, 2023
c08335a
Change python version
dcjohnson24 Aug 13, 2023
1eef5d2
Loosen constraint on pandas version
dcjohnson24 Aug 14, 2023
b56f0c8
Change cta_schedule_versions to cta_data_downloads
dcjohnson24 Aug 14, 2023
665e90e
Install libgeo-dev
dcjohnson24 Aug 14, 2023
6e287ec
Back to python 3.10
dcjohnson24 Aug 14, 2023
9b04970
Change back to version constraint
dcjohnson24 Aug 14, 2023
f0bd45a
Change timezone to America/Chicago
dcjohnson24 Aug 14, 2023
cebd713
Change to correct end date for realtime data
dcjohnson24 Aug 14, 2023
20c595f
rename schedule summary function
dcjohnson24 Aug 15, 2023
4c06991
remove on push
lauriemerrell Sep 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions .github/workflows/cta_data_downloads.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
name: Automate CTA schedule and realtime downloads

on:
push:
branches:
- 'automate-schedule-downloads'

schedule:
# Run every day at 12:30pm CST which is 5:30pm UTC
- cron: 30 17 * * *

env:
PYTHON_VERSION: 3.10.6
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

jobs:
download-cta-schedule-data:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: Download and save CTA schedule data

run: |
pip install -r requirements.txt
python -c 'from scrape_data.cta_data_downloads import save_cta_zip; \
save_cta_zip()' \
$AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY


save-schedule-daily-summary:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: 'Save schedule summaries'
run: |
pip install -r requirements.txt
python -c 'from scrape_data.cta_data_downloads import save_sched_daily_summary; \
save_sched_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY


save-realtime-daily-summary:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: 'Save realtime summaries'

run: |
pip install -r requirements.txt

python -c 'from scrape_data.cta_data_downloads import save_realtime_daily_summary; \
save_realtime_daily_summary()' $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY

70 changes: 51 additions & 19 deletions data_analysis/static_gtfs_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import os
from pathlib import Path
from dataclasses import dataclass
from typing import List
from typing import Tuple

import logging
import calendar
Expand All @@ -38,7 +38,6 @@
datefmt='%m/%d/%Y %I:%M:%S %p'
)


@dataclass
class GTFSFeed:
"""Class for storing GTFSFeed data.
Expand All @@ -53,24 +52,32 @@ class GTFSFeed:

@classmethod
def extract_data(cls, gtfs_zipfile: zipfile.ZipFile,
version_id: str = None) -> GTFSFeed:
version_id: str = None, cta_download: bool = True) -> GTFSFeed:
"""Load each text file in zipfile into a DataFrame

Args:
gtfs_zipfile (zipfile.ZipFile): Zipfile downloaded from
CTA transit feeds e.g.
transitfeeds.com or transitchicago.com e.g.
https://transitfeeds.com/p/chicago-transit-authority/
165/20220718/download"
165/20220718/download or https://www.transitchicago.com/downloads/sch_data/
version_id (str, optional): The schedule version in use.
Defaults to None.

Returns:
GTFSFeed: A GTFSFeed object containing multiple DataFrames
accessible by name.
"""
if version_id is None:
version_id = VERSION_ID
logging.info(f"Extracting data from CTA zipfile version {version_id}")
if cta_download:
if version_id is not None:
raise ValueError("version_id is not used for downloads directly from CTA")
else:
logging.info(f"Extracting data from transitchicago.com zipfile")

else:
if version_id is None:
version_id = VERSION_ID
logging.info(f"Extracting data from transitfeeds.com zipfile version {version_id}")

data_dict = {}
pbar = tqdm(cls.__annotations__.keys())
for txt_file in pbar:
Expand Down Expand Up @@ -140,14 +147,16 @@ def format_dates_hours(data: GTFSFeed) -> GTFSFeed:

def make_trip_summary(
data: GTFSFeed,
feed_start_date: pendulum.datetime,
feed_end_date: pendulum.datetime) -> pd.DataFrame:
feed_start_date: pendulum.datetime = None,
feed_end_date: pendulum.datetime = None) -> pd.DataFrame:
"""Create a summary of trips with one row per date

Args:
data (GTFSFeed): GTFS data from CTA
feed_start_date (datetime): Date from which this feed is valid (inclusive)
feed_end_date (datetime): Date until which this feed is valid (inclusive)
feed_start_date (datetime): Date from which this feed is valid (inclusive).
Defaults to None
feed_end_date (datetime): Date until which this feed is valid (inclusive).
Defaults to None

Returns:
pd.DataFrame: A DataFrame with each trip that occurred per row.
Expand All @@ -161,7 +170,7 @@ def make_trip_summary(
),
columns=["raw_date"],
)

# cross join calendar index with actual calendar to get all combos of
# possible dates & services
calendar_cross = calendar_date_range.merge(data.calendar, how="cross")
Expand Down Expand Up @@ -244,9 +253,10 @@ def make_trip_summary(
trip_stop_hours, how="left", on="trip_id")

# filter to only the rows for the period where this specific feed version was in effect
trip_summary = trip_summary.loc[
(trip_summary['raw_date'] >= feed_start_date)
& (trip_summary['raw_date'] <= feed_end_date), :]
if feed_start_date is not None and feed_end_date is not None:
trip_summary = trip_summary.loc[
(trip_summary['raw_date'] >= feed_start_date)
& (trip_summary['raw_date'] <= feed_end_date), :]

return trip_summary

Expand Down Expand Up @@ -321,6 +331,23 @@ def make_linestring_of_points(
return shapely.geometry.LineString(list(sorted_df["pt"]))


def download_cta_zip() -> Tuple[zipfile.ZipFile, BytesIO]:
"""Download CTA schedule data from transitchicago.com

Returns:
zipfile.ZipFile: A zipfile of the latest GTFS schedule data from transitchicago.com
"""
logger.info('Downloading CTA data')
zip_bytes_io = BytesIO(
requests.get("https://www.transitchicago.com/downloads/sch_data/google_transit.zip"
).content
)
CTA_GTFS = zipfile.ZipFile(zip_bytes_io)
logging.info('Download complete')
return CTA_GTFS, zip_bytes_io



def download_zip(version_id: str) -> zipfile.ZipFile:
"""Download a version schedule from transitfeeds.com

Expand All @@ -344,17 +371,22 @@ def download_zip(version_id: str) -> zipfile.ZipFile:
return CTA_GTFS


def download_extract_format(version_id: str) -> GTFSFeed:
def download_extract_format(version_id: str = None) -> GTFSFeed:
"""Download a zipfile of GTFS data for a given version_id,
extract data, and format date column.

Args:
version_id (str): The version of the GTFS schedule data to download
version_id (str): The version of the GTFS schedule data to download. Defaults to None
If version_id is None, data will be downloaded from the CTA directly (transitchicag.com)
instead of transitfeeds.com

Returns:
GTFSFeed: A GTFSFeed object with formated dates
"""
CTA_GTFS = download_zip(version_id)
if version_id is None:
CTA_GTFS, _ = download_cta_zip()
else:
CTA_GTFS = download_zip(version_id)
data = GTFSFeed.extract_data(CTA_GTFS, version_id=version_id)
data = format_dates_hours(data)
return data
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ python-dotenv==0.20.0
seaborn==0.12.0
PyQt5==5.15.7
folium==0.12.1.post1
mapclassify==2.4.2+55.g0155c6e
mapclassify>=2.4.2+55.g0155c6e
plotly==5.11.0
kaleido==0.2.1
pre-commit==2.20.0
115 changes: 115 additions & 0 deletions scrape_data/cta_data_downloads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import boto3
import sys
import data_analysis.static_gtfs_analysis as sga
import data_analysis.compare_scheduled_and_rt as csrt
import pendulum
from io import StringIO
import pandas as pd


ACCESS_KEY = sys.argv[1]
SECRET_KEY = sys.argv[2]

client = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)

s3 = boto3.resource(
's3',
region_name='us-east-1',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)

today = pendulum.now('America/Chicago').to_date_string()

CTA_GTFS, zipfile_bytes_io = sga.download_cta_zip()

def save_cta_zip() -> None:
print(f'Saving zipfile available at '
f'https://www.transitchicago.com/downloads/sch_data/google_transit.zip '
f'on {today} to public bucket')
filename = f'cta_schedule_zipfiles_raw/google_transit_{today}.zip'
zipfile_bytes_io.seek(0)
client.upload_fileobj(
zipfile_bytes_io,
csrt.BUCKET_PUBLIC,
filename
)
print(f'Confirm that {filename} exists in bucket')
keys('chn-ghost-buses-public', [filename])


def save_csv_to_bucket(df: pd.DataFrame, filename: str) -> None:
"""Save pandas DataFrame to csv in s3

Args:
df (pd.DataFrame): DataFrame to be saved
filename (str): Name of the saved filename in s3.
Should contain the .csv suffix.
"""
csv_buffer = StringIO()
df.to_csv(csv_buffer)

print(f'Saving {filename} to public bucket')
s3.Object(
csrt.BUCKET_PUBLIC,
f'{filename}')\
.put(Body=csv_buffer.getvalue())


def save_sched_daily_summary() -> None:
data = sga.GTFSFeed.extract_data(CTA_GTFS)
data = sga.format_dates_hours(data)
trip_summary = sga.make_trip_summary(data)

route_daily_summary = (
sga.summarize_date_rt(trip_summary)
)
route_daily_summary['date'] = route_daily_summary['date'].astype(str)
route_daily_summary_today = route_daily_summary.loc[route_daily_summary['date'] == today]

print(f'Saving cta_route_daily_summary_{today}.csv to public bucket')
filename = f'schedule_summaries/daily_job/cta_route_daily_summary_{today}.csv'
save_csv_to_bucket(
route_daily_summary_today,
filename=filename
)
print(f'Confirm that {filename} exists in bucket')
keys(csrt.BUCKET_PUBLIC, [filename])


def save_realtime_daily_summary() -> None:
if pendulum.now("America/Chicago").hour >= 11:
end_date = pendulum.yesterday("America/Chicago")
else:
end_date = pendulum.now("America/Chicago").subtract(days=2)

end_date = end_date.to_date_string()

daily_data = pd.read_csv(
(csrt.BASE_PATH / f"bus_full_day_data_v2/{end_date}.csv")
.as_uri(),
low_memory=False
)

daily_data = csrt.make_daily_summary(daily_data)
filename = f'realtime_summaries/daily_job/bus_full_day_data_v2/{end_date}.csv'
save_csv_to_bucket(daily_data, filename=filename)

print(f'Confirm that {filename} exists in bucket')
keys(csrt.BUCKET_PUBLIC, [filename])

# https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3
def keys(bucket_name: str, filenames: list,
prefix: str='/', delimiter: str='/',
start_after: str='') -> None:
s3_paginator = client.get_paginator('list_objects_v2')
prefix = prefix.lstrip(delimiter)
start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
for content in page.get('Contents', ()):
if content['Key'] in filenames:
print(f"{content['Key']} exists")