Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automate schedule downloads #61

Merged
merged 32 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a2af9bf
First commit for downloading and saving schedule data
dcjohnson24 Jul 18, 2023
4b38f62
Fix syntax error
dcjohnson24 Jul 18, 2023
50a8a4e
Change version constraint of mapclassify
dcjohnson24 Jul 19, 2023
f56d0d4
remove single quote
dcjohnson24 Jul 20, 2023
140ffbc
Run as a module
dcjohnson24 Jul 20, 2023
9f00363
Add print function for saving csv to public bucket
dcjohnson24 Jul 25, 2023
12f6b08
Download schedule daily at 5:30pm UTC
dcjohnson24 Jul 25, 2023
8aa3691
Save zipfile from transitchicago.com to s3
dcjohnson24 Jul 25, 2023
2ee3d05
Change method of uploading zipfile
dcjohnson24 Jul 26, 2023
7c6a42e
Check that objects exist in bucket
dcjohnson24 Jul 27, 2023
bc91766
Change yield to print
dcjohnson24 Jul 27, 2023
c0c153c
Separate downloading zip file and saving daily summaries
dcjohnson24 Aug 8, 2023
2dc18f3
remove job dependency
dcjohnson24 Aug 8, 2023
d35f310
Add args to same line
dcjohnson24 Aug 8, 2023
ee7b057
Save realtime summary file
dcjohnson24 Aug 13, 2023
461df42
Change to string
dcjohnson24 Aug 13, 2023
e1baeaa
Correct python version name
dcjohnson24 Aug 13, 2023
398d62a
Add quotes
dcjohnson24 Aug 13, 2023
77ef708
Add environment context
dcjohnson24 Aug 13, 2023
4842fa0
Remove quotes
dcjohnson24 Aug 13, 2023
3817614
Test without environment variables
dcjohnson24 Aug 13, 2023
cfb0960
Revert "Test without environment variables"
dcjohnson24 Aug 13, 2023
c08335a
Change python version
dcjohnson24 Aug 13, 2023
1eef5d2
Loosen constraint on pandas version
dcjohnson24 Aug 14, 2023
b56f0c8
Change cta_schedule_versions to cta_data_downloads
dcjohnson24 Aug 14, 2023
665e90e
Install libgeo-dev
dcjohnson24 Aug 14, 2023
6e287ec
Back to python 3.10
dcjohnson24 Aug 14, 2023
9b04970
Change back to version constraint
dcjohnson24 Aug 14, 2023
f0bd45a
Change timezone to America/Chicago
dcjohnson24 Aug 14, 2023
cebd713
Change to correct end date for realtime data
dcjohnson24 Aug 14, 2023
20c595f
rename schedule summary function
dcjohnson24 Aug 15, 2023
4c06991
remove on push
lauriemerrell Sep 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/workflows/cta_schedule_data.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: Automated job

on:
push:
branches:
- 'automate-schedule-downloads'

schedule:
# Run every day at 12:30pm CST which is 5:30pm UTC
- cron: 30 17 * * *


jobs:
download-cta-schedule-data:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: '3.10'

- name: Download and save CTA schedule data
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
pip install -r requirements.txt
python -m scrape_data.cta_schedule_versions $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY
70 changes: 51 additions & 19 deletions data_analysis/static_gtfs_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import os
from pathlib import Path
from dataclasses import dataclass
from typing import List
from typing import Tuple

import logging
import calendar
Expand All @@ -38,7 +38,6 @@
datefmt='%m/%d/%Y %I:%M:%S %p'
)


@dataclass
class GTFSFeed:
"""Class for storing GTFSFeed data.
Expand All @@ -53,24 +52,32 @@ class GTFSFeed:

@classmethod
def extract_data(cls, gtfs_zipfile: zipfile.ZipFile,
version_id: str = None) -> GTFSFeed:
version_id: str = None, cta_download: bool = True) -> GTFSFeed:
"""Load each text file in zipfile into a DataFrame

Args:
gtfs_zipfile (zipfile.ZipFile): Zipfile downloaded from
CTA transit feeds e.g.
transitfeeds.com or transitchicago.com e.g.
https://transitfeeds.com/p/chicago-transit-authority/
165/20220718/download"
165/20220718/download or https://www.transitchicago.com/downloads/sch_data/
version_id (str, optional): The schedule version in use.
Defaults to None.

Returns:
GTFSFeed: A GTFSFeed object containing multiple DataFrames
accessible by name.
"""
if version_id is None:
version_id = VERSION_ID
logging.info(f"Extracting data from CTA zipfile version {version_id}")
if cta_download:
if version_id is not None:
raise ValueError("version_id is not used for downloads directly from CTA")
else:
logging.info(f"Extracting data from transitchicago.com zipfile")

else:
if version_id is None:
version_id = VERSION_ID
logging.info(f"Extracting data from transitfeeds.com zipfile version {version_id}")

data_dict = {}
pbar = tqdm(cls.__annotations__.keys())
for txt_file in pbar:
Expand Down Expand Up @@ -140,14 +147,16 @@ def format_dates_hours(data: GTFSFeed) -> GTFSFeed:

def make_trip_summary(
data: GTFSFeed,
feed_start_date: pendulum.datetime,
feed_end_date: pendulum.datetime) -> pd.DataFrame:
feed_start_date: pendulum.datetime = None,
feed_end_date: pendulum.datetime = None) -> pd.DataFrame:
"""Create a summary of trips with one row per date

Args:
data (GTFSFeed): GTFS data from CTA
feed_start_date (datetime): Date from which this feed is valid (inclusive)
feed_end_date (datetime): Date until which this feed is valid (inclusive)
feed_start_date (datetime): Date from which this feed is valid (inclusive).
Defaults to None
feed_end_date (datetime): Date until which this feed is valid (inclusive).
Defaults to None

Returns:
pd.DataFrame: A DataFrame with each trip that occurred per row.
Expand All @@ -161,7 +170,7 @@ def make_trip_summary(
),
columns=["raw_date"],
)

# cross join calendar index with actual calendar to get all combos of
# possible dates & services
calendar_cross = calendar_date_range.merge(data.calendar, how="cross")
Expand Down Expand Up @@ -244,9 +253,10 @@ def make_trip_summary(
trip_stop_hours, how="left", on="trip_id")

# filter to only the rows for the period where this specific feed version was in effect
trip_summary = trip_summary.loc[
(trip_summary['raw_date'] >= feed_start_date)
& (trip_summary['raw_date'] <= feed_end_date), :]
if feed_start_date is not None and feed_end_date is not None:
trip_summary = trip_summary.loc[
(trip_summary['raw_date'] >= feed_start_date)
& (trip_summary['raw_date'] <= feed_end_date), :]

return trip_summary

Expand Down Expand Up @@ -321,6 +331,23 @@ def make_linestring_of_points(
return shapely.geometry.LineString(list(sorted_df["pt"]))


def download_cta_zip() -> Tuple[zipfile.ZipFile, BytesIO]:
"""Download CTA schedule data from transitchicago.com

Returns:
zipfile.ZipFile: A zipfile of the latest GTFS schedule data from transitchicago.com
"""
logger.info('Downloading CTA data')
zip_bytes_io = BytesIO(
requests.get("https://www.transitchicago.com/downloads/sch_data/google_transit.zip"
).content
)
CTA_GTFS = zipfile.ZipFile(zip_bytes_io)
logging.info('Download complete')
return CTA_GTFS, zip_bytes_io



def download_zip(version_id: str) -> zipfile.ZipFile:
"""Download a version schedule from transitfeeds.com

Expand All @@ -344,17 +371,22 @@ def download_zip(version_id: str) -> zipfile.ZipFile:
return CTA_GTFS


def download_extract_format(version_id: str) -> GTFSFeed:
def download_extract_format(version_id: str = None) -> GTFSFeed:
"""Download a zipfile of GTFS data for a given version_id,
extract data, and format date column.

Args:
version_id (str): The version of the GTFS schedule data to download
version_id (str): The version of the GTFS schedule data to download. Defaults to None
If version_id is None, data will be downloaded from the CTA directly (transitchicag.com)
instead of transitfeeds.com

Returns:
GTFSFeed: A GTFSFeed object with formated dates
"""
CTA_GTFS = download_zip(version_id)
if version_id is None:
CTA_GTFS, _ = download_cta_zip()
else:
CTA_GTFS = download_zip(version_id)
data = GTFSFeed.extract_data(CTA_GTFS, version_id=version_id)
data = format_dates_hours(data)
return data
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ python-dotenv==0.20.0
seaborn==0.12.0
PyQt5==5.15.7
folium==0.12.1.post1
mapclassify==2.4.2+55.g0155c6e
mapclassify>=2.4.2+55.g0155c6e
plotly==5.11.0
kaleido==0.2.1
pre-commit==2.20.0
59 changes: 59 additions & 0 deletions scrape_data/cta_schedule_versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import boto3
import sys
import data_analysis.static_gtfs_analysis as sga
import pendulum
from io import StringIO

ACCESS_KEY = sys.argv[1]
SECRET_KEY = sys.argv[2]

client = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)

s3 = boto3.resource(
's3',
region_name='us-east-1',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY
)

date = pendulum.now().to_date_string()

zipfile, zipfile_bytes_io = sga.download_cta_zip()
print(f'Saving zipfile available at '
f'https://www.transitchicago.com/downloads/sch_data/google_transit.zip '
f'on {date} to public bucket')
zipfile_bytes_io.seek(0)
client.upload_fileobj(zipfile_bytes_io, 'chn-ghost-buses-public', f'google_transit_{date}.zip')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add another directory level in this path? Right now the zipfiles are just being written to the root directory which doesn't pose a technical problem but will probably get a bit messy quickly. Maybe something like f'cta_schedule_zipfiles_raw/google_transit_{date}.zip?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I can add a new directory.


data = sga.download_extract_format()
trip_summary = sga.make_trip_summary(data)

route_daily_summary = (
sga.summarize_date_rt(trip_summary)
)

csv_buffer = StringIO()
route_daily_summary.to_csv(csv_buffer)

print(f'Saving cta_route_daily_summary_{date}.csv to public bucket')
s3.Object('chn-ghost-buses-public', f'cta_route_daily_summary_{date}.csv')\
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above regarding an intermediate directory level for this path, and this one is a bit trickier since we already generate these files from the current batched process (currently in schedule_summaries/route_level)... We probably will need to figure out how to do a cutover from the old process to new.

Maybe f'schedule_summaries/daily_job/?

Another question here would be whether we want to only save that day's activity (i.e, route_daily_summary[route_daily_summary.date = {date}]), because otherwise these files are pretty big to just save literally every day.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saving only that date would probably be good since we could concatenate the data later.

.put(Body=csv_buffer.getvalue())


# https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3
print('Confirm that objects exist in bucket')
s3_paginator = client.get_paginator('list_objects_v2')

def keys(bucket_name, prefix='/', delimiter='/', start_after=''):
prefix = prefix.lstrip(delimiter)
start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
for content in page.get('Contents', ()):
if content['Key'] in [f'cta_route_daily_summary_{date}.csv', f'google_transit_{date}.zip']:
print(f"{content['Key']} exists")

keys('chn-ghost-buses-public')