From 7dbeaa028d1b1689c464277b233f397f9dce614c Mon Sep 17 00:00:00 2001 From: hailey <188331+haileyplusplus@users.noreply.github.com> Date: Tue, 2 Apr 2024 23:04:06 -0500 Subject: [PATCH 1/3] Add simple library to cache downloaded files locally. --- data_analysis/cache_manager.py | 51 +++++++++++++++++++++++++++ data_analysis/static_gtfs_analysis.py | 10 +++--- utils/s3_csv_reader.py | 12 +++++-- 3 files changed, 66 insertions(+), 7 deletions(-) create mode 100644 data_analysis/cache_manager.py diff --git a/data_analysis/cache_manager.py b/data_analysis/cache_manager.py new file mode 100644 index 0000000..80446ae --- /dev/null +++ b/data_analysis/cache_manager.py @@ -0,0 +1,51 @@ +from pathlib import Path + +import logging + +import requests +from io import BytesIO + +DATA_DIR = Path(__file__).parent.parent / "data_output" / "scratch" + + +class CacheManager: + def __init__(self, ignore_cached_calculation=False, verbose=False): + self.data_dir: Path = DATA_DIR + self.objects = {} + self.ignore_cached_calculation = ignore_cached_calculation + self.verbose = verbose + + def log(self, *args): + if self.verbose: + logging.info(args) + + def retrieve_object(self, name, func): + obj = self.objects.get(name) + if obj is None: + obj = func() + self.objects[name] = obj + return obj + + def retrieve(self, subdir: str, filename: str, url: str) -> BytesIO: + """Retrieve data from the local filesystem cache or a remote URL. + + Args: + subdir (str): subdirectory under DATA_DIR. + filename (str): filename in subdir. + url (str): fetch data from this URL if the file does not exist locally. + + Returns: + BytesIO: buffer containing payload data. + """ + cache_dir = self.data_dir / subdir + if not cache_dir.exists(): + cache_dir.mkdir() + filepath = cache_dir / filename + if filepath.exists(): + self.log(f'Retrieved cached {url} from {filename}') + return BytesIO(filepath.open('rb').read()) + bytes_io = BytesIO(requests.get(url).content) + with filepath.open('wb') as ofh: + ofh.write(bytes_io.getvalue()) + self.log(f'Stored cached {url} in {filename}') + return bytes_io diff --git a/data_analysis/static_gtfs_analysis.py b/data_analysis/static_gtfs_analysis.py index 65bba88..df4febd 100644 --- a/data_analysis/static_gtfs_analysis.py +++ b/data_analysis/static_gtfs_analysis.py @@ -27,6 +27,7 @@ from tqdm import tqdm from scrape_data.scrape_schedule_versions import create_schedule_list +fron data_analysis.cache_manager import CacheManager VERSION_ID = "20220718" BUCKET = os.getenv('BUCKET_PUBLIC', 'chn-ghost-buses-public') @@ -360,11 +361,10 @@ def download_zip(version_id: str) -> zipfile.ZipFile: """ logger.info('Downloading CTA data') CTA_GTFS = zipfile.ZipFile( - BytesIO( - requests.get( - f"https://transitfeeds.com/p/chicago-transit-authority" - f"/165/{version_id}/download" - ).content + CacheManager().retrieve( + "transitfeeds_schedules", + f"{version_id}.zip", + f"https://transitfeeds.com/p/chicago-transit-authority/165/{version_id}/download" ) ) logging.info('Download complete') diff --git a/utils/s3_csv_reader.py b/utils/s3_csv_reader.py index ae1d63c..8324fb3 100644 --- a/utils/s3_csv_reader.py +++ b/utils/s3_csv_reader.py @@ -1,6 +1,9 @@ import pandas as pd from pathlib import Path import data_analysis.compare_scheduled_and_rt as csrt +from data_analysis.cache_manager import CacheManager + +CACHE_MANAGER = CacheManager() def read_csv(filename: str | Path) -> pd.DataFrame: """Read pandas csv from S3 @@ -14,9 +17,14 @@ def read_csv(filename: str | Path) -> pd.DataFrame: if isinstance(filename, str): filename = Path(filename) s3_filename = '/'.join(filename.parts[-2:]) + cache_filename = f'{filename.stem}.csv' df = pd.read_csv( - f'https://{csrt.BUCKET_PUBLIC}.s3.us-east-2.amazonaws.com/{s3_filename}', - low_memory=False + CACHE_MANAGER.retrieve( + 's3csv', + cache_filename, + f'https://{csrt.BUCKET_PUBLIC}.s3.us-east-2.amazonaws.com/{s3_filename}', + ), + low_memory=False ) return df \ No newline at end of file From 37fb33cef8bc4ba01bfafdc50a5e42db885b85b9 Mon Sep 17 00:00:00 2001 From: hailey <188331+haileyplusplus@users.noreply.github.com> Date: Tue, 2 Apr 2024 23:12:46 -0500 Subject: [PATCH 2/3] Fix typo and clean up. --- data_analysis/cache_manager.py | 24 +++++++----------------- data_analysis/static_gtfs_analysis.py | 4 ++-- utils/s3_csv_reader.py | 2 +- 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/data_analysis/cache_manager.py b/data_analysis/cache_manager.py index 80446ae..89e9c2f 100644 --- a/data_analysis/cache_manager.py +++ b/data_analysis/cache_manager.py @@ -5,26 +5,16 @@ import requests from io import BytesIO -DATA_DIR = Path(__file__).parent.parent / "data_output" / "scratch" - class CacheManager: - def __init__(self, ignore_cached_calculation=False, verbose=False): - self.data_dir: Path = DATA_DIR - self.objects = {} - self.ignore_cached_calculation = ignore_cached_calculation + DATA_DIR = Path(__file__).parent.parent / "data_output" / "scratch" + + def __init__(self, verbose=False): self.verbose = verbose def log(self, *args): if self.verbose: - logging.info(args) - - def retrieve_object(self, name, func): - obj = self.objects.get(name) - if obj is None: - obj = func() - self.objects[name] = obj - return obj + logging.info(*args) def retrieve(self, subdir: str, filename: str, url: str) -> BytesIO: """Retrieve data from the local filesystem cache or a remote URL. @@ -37,15 +27,15 @@ def retrieve(self, subdir: str, filename: str, url: str) -> BytesIO: Returns: BytesIO: buffer containing payload data. """ - cache_dir = self.data_dir / subdir + cache_dir = self.DATA_DIR / subdir if not cache_dir.exists(): cache_dir.mkdir() filepath = cache_dir / filename if filepath.exists(): - self.log(f'Retrieved cached {url} from {filename}') + self.log(f'Retrieved cached {url} from {subdir}/{filename}') return BytesIO(filepath.open('rb').read()) bytes_io = BytesIO(requests.get(url).content) with filepath.open('wb') as ofh: ofh.write(bytes_io.getvalue()) - self.log(f'Stored cached {url} in {filename}') + self.log(f'Stored cached {url} in {subdir}/{filename}') return bytes_io diff --git a/data_analysis/static_gtfs_analysis.py b/data_analysis/static_gtfs_analysis.py index df4febd..759966f 100644 --- a/data_analysis/static_gtfs_analysis.py +++ b/data_analysis/static_gtfs_analysis.py @@ -27,7 +27,7 @@ from tqdm import tqdm from scrape_data.scrape_schedule_versions import create_schedule_list -fron data_analysis.cache_manager import CacheManager +from data_analysis.cache_manager import CacheManager VERSION_ID = "20220718" BUCKET = os.getenv('BUCKET_PUBLIC', 'chn-ghost-buses-public') @@ -361,7 +361,7 @@ def download_zip(version_id: str) -> zipfile.ZipFile: """ logger.info('Downloading CTA data') CTA_GTFS = zipfile.ZipFile( - CacheManager().retrieve( + CacheManager(verbose=True).retrieve( "transitfeeds_schedules", f"{version_id}.zip", f"https://transitfeeds.com/p/chicago-transit-authority/165/{version_id}/download" diff --git a/utils/s3_csv_reader.py b/utils/s3_csv_reader.py index 8324fb3..07a683f 100644 --- a/utils/s3_csv_reader.py +++ b/utils/s3_csv_reader.py @@ -3,7 +3,7 @@ import data_analysis.compare_scheduled_and_rt as csrt from data_analysis.cache_manager import CacheManager -CACHE_MANAGER = CacheManager() +CACHE_MANAGER = CacheManager(verbose=True) def read_csv(filename: str | Path) -> pd.DataFrame: """Read pandas csv from S3 From 7e02eae8e86223f878b70a2c2721c99e33ebcf51 Mon Sep 17 00:00:00 2001 From: hailey <188331+haileyplusplus@users.noreply.github.com> Date: Tue, 2 Apr 2024 23:17:31 -0500 Subject: [PATCH 3/3] Change default verbosity to work better with progress bar. --- utils/s3_csv_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/s3_csv_reader.py b/utils/s3_csv_reader.py index 07a683f..704feef 100644 --- a/utils/s3_csv_reader.py +++ b/utils/s3_csv_reader.py @@ -3,7 +3,7 @@ import data_analysis.compare_scheduled_and_rt as csrt from data_analysis.cache_manager import CacheManager -CACHE_MANAGER = CacheManager(verbose=True) +CACHE_MANAGER = CacheManager(verbose=False) def read_csv(filename: str | Path) -> pd.DataFrame: """Read pandas csv from S3