From f30d1c3846cf135cec6c3ba70fd3892f8e1c58f7 Mon Sep 17 00:00:00 2001 From: Theja Putta Date: Sun, 27 Aug 2023 21:38:01 -0600 Subject: [PATCH 1/8] gitignore --- .gitignore | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.gitignore b/.gitignore index fae8299..9e18ea6 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,10 @@ vignettes/*.pdf # R Environment Variables .Renviron + +# python venv +venv/ +env/ + +# Environment variables +*.env \ No newline at end of file From cecc919f5a150deac4f96c7618200b8bb8272645 Mon Sep 17 00:00:00 2001 From: Theja Putta Date: Sun, 27 Aug 2023 21:38:15 -0600 Subject: [PATCH 2/8] reqs --- requirements.txt | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f7751f4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +python-dotenv +psycopg2-binary +SQLAlchemy +pandas \ No newline at end of file From aa39eb2bdfe499154001dcdfba2e5ab1c1d36043 Mon Sep 17 00:00:00 2001 From: Theja Putta Date: Sun, 27 Aug 2023 21:38:35 -0600 Subject: [PATCH 3/8] download fars data --- general/update_fars/01_download_fars.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 general/update_fars/01_download_fars.py diff --git a/general/update_fars/01_download_fars.py b/general/update_fars/01_download_fars.py new file mode 100644 index 0000000..4e4ced5 --- /dev/null +++ b/general/update_fars/01_download_fars.py @@ -0,0 +1,15 @@ +from urllib import request +import os + +# place to save data locally +data_folder = "/mnt/c/Users/tputta/OneDrive - Toole Design/Desktop/SSPF/FARS" + +start_yr = 2015 +end_yr = 2021 + +for yr in range(start_yr, end_yr+1): + print(f"Downloading {yr} FARS data") + if not os.path.exists(f"{data_folder}/{yr}"): + os.mkdir(f"{data_folder}/{yr}") + request.urlretrieve(f"https://static.nhtsa.gov/nhtsa/downloads/FARS/{yr}/National/FARS{yr}NationalCSV.zip", f"{data_folder}/{yr}/FARS{yr}NationalCSV.zip") + yr += 1 \ No newline at end of file From 48483b6800a17cd54d18192cd85111a5809d411d Mon Sep 17 00:00:00 2001 From: Theja Putta Date: Sun, 27 Aug 2023 21:39:00 -0600 Subject: [PATCH 4/8] combine csv and load to db --- general/update_fars/02_combine_csv.py | 66 +++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 general/update_fars/02_combine_csv.py diff --git a/general/update_fars/02_combine_csv.py b/general/update_fars/02_combine_csv.py new file mode 100644 index 0000000..29ed51b --- /dev/null +++ b/general/update_fars/02_combine_csv.py @@ -0,0 +1,66 @@ +import pandas as pd +import os +import psycopg2 +from psycopg2 import sql +from dotenv import load_dotenv +from zipfile import ZipFile +from sqlalchemy import create_engine +from io import StringIO + +# get environment variables +load_dotenv("rds_conn_vars.env") + +# db connection +TDG_HOST = os.environ["TDG_HOST"] +TDG_DB_NAME = os.environ["TDG_DB_NAME"] +TDG_USER_NAME = os.environ["TDG_USER_NAME"] +TDG_PASSWORD = os.environ["TDG_PASSWORD"] + +connection_string = f"host={TDG_HOST} dbname={TDG_DB_NAME} user={TDG_USER_NAME} password={TDG_PASSWORD}" +engine = create_engine(f'postgresql://{TDG_USER_NAME}:{TDG_PASSWORD}@{TDG_HOST}:5432/{TDG_DB_NAME}') + +conn = psycopg2.connect(connection_string) + +# place to save data locally +data_folder = "/mnt/c/Users/tputta/OneDrive - Toole Design/Desktop/SSPF/FARS" + +for (start_yr, end_yr) in [(2015, 2019), (2017, 2021)]: + # store csv as a list of dataframes + for table_type in ["accident", "person"]: + # for table_type in ["person"]: + df_list = [] + for yr in range(start_yr, end_yr+1): + print(f"Reading {yr} {table_type} data as a dataframe") + with ZipFile(f"{data_folder}/{yr}/FARS{yr}NationalCSV.zip") as zip_file: + # print(zip_file.namelist()) + for file_name in list(zip_file.namelist()): + if f"{table_type}.csv" in file_name.lower(): + print(f" {file_name}") + df = pd.read_csv(zip_file.open(file_name), low_memory=False, encoding_errors="ignore") + if table_type == "person": + df['year'] = yr + df_list.append(df) + + # merge all the csvs into one dataframe + print("Merging all dfs into one") + df = pd.concat(df_list, ignore_index=True) + del df_list + + # col names to lower case + df.columns = map(str.lower, df.columns) + + print("Writing the dataframe to postgres table") + # df.to_sql(name=f"fars_{table_type}_{start_yr}_{end_yr}", schema="received", con=engine, if_exists="replace", index=False, chunksize=10000, method="multi") + + # using sql copy function instead of pandas to_sql as it is much faster + df.head(0).to_sql(name=f"fars_{table_type}_{start_yr}_{end_yr}", schema="received", con=engine, if_exists="replace") + query = sql.SQL(f"copy received.fars_{table_type}_{start_yr}_{end_yr} from stdin with csv delimiter as ','") + buffer = StringIO() + df.to_csv(buffer, header=False) + buffer.seek(0) + cur = conn.cursor() + + cur.copy_expert(sql=query, file=buffer) + conn.commit() + +conn.close() \ No newline at end of file From e5b911f46ee2ce6b13ded14c0e62090ce78144de Mon Sep 17 00:00:00 2001 From: Theja Putta Date: Sun, 27 Aug 2023 21:39:17 -0600 Subject: [PATCH 5/8] process fars in db --- general/update_fars/03_process_fars.py | 69 ++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 general/update_fars/03_process_fars.py diff --git a/general/update_fars/03_process_fars.py b/general/update_fars/03_process_fars.py new file mode 100644 index 0000000..4f98dfb --- /dev/null +++ b/general/update_fars/03_process_fars.py @@ -0,0 +1,69 @@ +import pandas as pd +import os +import psycopg2 +from dotenv import load_dotenv + +# get environment variables +load_dotenv("rds_conn_vars.env") + +# db connection +TDG_HOST = os.environ["TDG_HOST"] +TDG_DB_NAME = os.environ["TDG_DB_NAME"] +TDG_USER_NAME = os.environ["TDG_USER_NAME"] +TDG_PASSWORD = os.environ["TDG_PASSWORD"] + +connection_string = f"host={TDG_HOST} dbname={TDG_DB_NAME} user={TDG_USER_NAME} password={TDG_PASSWORD}" +conn = psycopg2.connect(connection_string) +cur = conn.cursor() + +for (start_yr, end_yr) in [(2015, 2019), (2017, 2021)]: + print(f"Processing {start_yr}_{end_yr} data") + query = f""" + DROP TABLE IF EXISTS automated.fars_processed_{start_yr}_{end_yr}; + CREATE TABLE automated.fars_processed_{start_yr}_{end_yr} AS ( + SELECT + a.st_case, + -- just go through person types in order of vulnerability + CASE + WHEN per_typ && ARRAY[5, 8] THEN 'pedestrian' + WHEN per_typ && ARRAY[6, 7] THEN 'bicyclist' + WHEN per_typ && ARRAY[1, 2, 3] THEN 'motor vehicle' + ELSE 'other' + END AS crash_mode, + -- since its FARS, hard coding in severity + 'fatality' AS crash_severity, + a.year AS crash_year, + LPAD(state::TEXT, 2, '0') AS state_fp, + LPAD(county::TEXT, 3, '0') AS county_fp, + CASE + WHEN city != 0 THEN LPAD(city::TEXT, 5, '0') + ELSE NULL + END AS place_fp, + a.func_sys AS functional_class, + ST_SetSRID(ST_MakePoint(longitud, latitude), 4326) AS geom + FROM + received.fars_accident_{start_yr}_{end_yr} a + LEFT JOIN + ( + SELECT + st_case, + year, + array_agg(per_typ::INT) AS per_typ + FROM + received.fars_person_{start_yr}_{end_yr} + GROUP BY + st_case, + year + ) p + ON a.st_case = p.st_case AND a.year = p.year + ) + ; + + ALTER TABLE automated.fars_processed_{start_yr}_{end_yr} ADD pkey SERIAL PRIMARY KEY; + CREATE INDEX ON automated.fars_processed_{start_yr}_{end_yr} USING GIST(geom); + ANALYZE automated.fars_processed_{start_yr}_{end_yr}; + """ + cur.execute(query) + conn.commit() + +conn.close() \ No newline at end of file From 6b2f852984e5496ed9f8ad9eaafe7c0d62dc7697 Mon Sep 17 00:00:00 2001 From: Theja Putta Date: Mon, 28 Aug 2023 11:49:38 -0600 Subject: [PATCH 6/8] backup 2015_2019 FARS table --- general/update_fars/04_backup_old_fars_data.sql | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 general/update_fars/04_backup_old_fars_data.sql diff --git a/general/update_fars/04_backup_old_fars_data.sql b/general/update_fars/04_backup_old_fars_data.sql new file mode 100644 index 0000000..31b2dd0 --- /dev/null +++ b/general/update_fars/04_backup_old_fars_data.sql @@ -0,0 +1,7 @@ +-- This is run on the main SSPF database +CREATE TABLE static.fars_processed_2015_2019_backup (LIKE static.fars_processed INCLUDING ALL); + +INSERT INTO static.fars_processed_2015_2019_backup + SELECT * + FROM static.fars_processed +; \ No newline at end of file From 1d1787a65e4a0b9348b448ec971563a7f09c5973 Mon Sep 17 00:00:00 2001 From: Theja Putta Date: Mon, 28 Aug 2023 11:50:05 -0600 Subject: [PATCH 7/8] TDG server to AWS --- .../update_fars/05_copy_fars_from_rdg_server.sh | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 general/update_fars/05_copy_fars_from_rdg_server.sh diff --git a/general/update_fars/05_copy_fars_from_rdg_server.sh b/general/update_fars/05_copy_fars_from_rdg_server.sh new file mode 100644 index 0000000..370e89a --- /dev/null +++ b/general/update_fars/05_copy_fars_from_rdg_server.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +# Export env vars +export $(grep -v '^#' rds_conn_vars.env | xargs) + +ogr2ogr \ + -lco GEOMETRY_NAME=geom \ + -lco precision=NO \ + -f "PostgreSQL" \ + PG:"host=$HOST dbname=$DB_NAME user=$USER_NAME password=$PASSWORD" \ + PG:"host=$TDG_HOST dbname=$TDG_DB_NAME user=$TDG_USER_NAME password=$TDG_PASSWORD" "automated.fars_processed_2017_2021" \ + -t_srs EPSG:4326 \ + -nln "scratch._tmp_fars_processed_2017_2021" \ + -overwrite \ + -progress \ + --config PG_USE_COPY YES \ No newline at end of file From 14f6d57cc3273f6f9074026df0e3971a82482401 Mon Sep 17 00:00:00 2001 From: Theja Putta Date: Mon, 28 Aug 2023 13:30:30 -0600 Subject: [PATCH 8/8] sql to python --- .../update_fars/04_backup_old_fars_data.py | 29 +++++++++++++++++++ .../update_fars/04_backup_old_fars_data.sql | 7 ----- ...ver.sh => 05_copy_fars_from_tdg_server.sh} | 0 3 files changed, 29 insertions(+), 7 deletions(-) create mode 100644 general/update_fars/04_backup_old_fars_data.py delete mode 100644 general/update_fars/04_backup_old_fars_data.sql rename general/update_fars/{05_copy_fars_from_rdg_server.sh => 05_copy_fars_from_tdg_server.sh} (100%) diff --git a/general/update_fars/04_backup_old_fars_data.py b/general/update_fars/04_backup_old_fars_data.py new file mode 100644 index 0000000..185a33e --- /dev/null +++ b/general/update_fars/04_backup_old_fars_data.py @@ -0,0 +1,29 @@ +import os +import psycopg2 +from dotenv import load_dotenv + +# get environment variables +load_dotenv("rds_conn_vars.env") + +# db connection +HOST = os.environ["HOST"] +DB_NAME = os.environ["DB_NAME"] +USER_NAME = os.environ["USER_NAME"] +PASSWORD = os.environ["PASSWORD"] + +connection_string = f"host={HOST} dbname={DB_NAME} user={USER_NAME} password={PASSWORD}" +conn = psycopg2.connect(connection_string) +cur = conn.cursor() + +query = f""" + CREATE TABLE static.fars_processed_2015_2019_backup (LIKE static.fars_processed INCLUDING ALL); + + INSERT INTO static.fars_processed_2015_2019_backup + SELECT * + FROM static.fars_processed + ; + """ + +cur.execute(query) +conn.commit() +conn.close() \ No newline at end of file diff --git a/general/update_fars/04_backup_old_fars_data.sql b/general/update_fars/04_backup_old_fars_data.sql deleted file mode 100644 index 31b2dd0..0000000 --- a/general/update_fars/04_backup_old_fars_data.sql +++ /dev/null @@ -1,7 +0,0 @@ --- This is run on the main SSPF database -CREATE TABLE static.fars_processed_2015_2019_backup (LIKE static.fars_processed INCLUDING ALL); - -INSERT INTO static.fars_processed_2015_2019_backup - SELECT * - FROM static.fars_processed -; \ No newline at end of file diff --git a/general/update_fars/05_copy_fars_from_rdg_server.sh b/general/update_fars/05_copy_fars_from_tdg_server.sh similarity index 100% rename from general/update_fars/05_copy_fars_from_rdg_server.sh rename to general/update_fars/05_copy_fars_from_tdg_server.sh