Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fars table #8

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,10 @@ vignettes/*.pdf

# R Environment Variables
.Renviron

# python venv
venv/
env/

# Environment variables
*.env
15 changes: 15 additions & 0 deletions general/update_fars/01_download_fars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from urllib import request
import os

# place to save data locally
data_folder = "/mnt/c/Users/tputta/OneDrive - Toole Design/Desktop/SSPF/FARS"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably doesn't matter, but since this is a public repo, do you think that you should put your local filepath in here? Maybe instead make a data folder either in the top level repo directory or just in this folder, and save it there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is a one-time thing I didn't bother creating a data folder and adding it to gitignore.


start_yr = 2015
end_yr = 2021

for yr in range(start_yr, end_yr+1):
print(f"Downloading {yr} FARS data")
if not os.path.exists(f"{data_folder}/{yr}"):
os.mkdir(f"{data_folder}/{yr}")
request.urlretrieve(f"https://static.nhtsa.gov/nhtsa/downloads/FARS/{yr}/National/FARS{yr}NationalCSV.zip", f"{data_folder}/{yr}/FARS{yr}NationalCSV.zip")
yr += 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we incrementing yr here? Won't it just go to the next value once it goes back to the top of the loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with while loop and this is a vestige of that that I forgot to remove. It does not really harm anything in this case, but I will remove it

66 changes: 66 additions & 0 deletions general/update_fars/02_combine_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pandas as pd
import os
import psycopg2
from psycopg2 import sql
from dotenv import load_dotenv
from zipfile import ZipFile
from sqlalchemy import create_engine
from io import StringIO

# get environment variables
load_dotenv("rds_conn_vars.env")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there documentation somewhere that says how to create this? If not, should we add it here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .env file itself is not very different from how we define variables in a shell script. There are plenty of resources online for the proper syntax of it and it is pretty straightforward. I don't think we need to add any additional documentation details for this.


# db connection
TDG_HOST = os.environ["TDG_HOST"]
TDG_DB_NAME = os.environ["TDG_DB_NAME"]
TDG_USER_NAME = os.environ["TDG_USER_NAME"]
TDG_PASSWORD = os.environ["TDG_PASSWORD"]

connection_string = f"host={TDG_HOST} dbname={TDG_DB_NAME} user={TDG_USER_NAME} password={TDG_PASSWORD}"
engine = create_engine(f'postgresql://{TDG_USER_NAME}:{TDG_PASSWORD}@{TDG_HOST}:5432/{TDG_DB_NAME}')

conn = psycopg2.connect(connection_string)

# place to save data locally
data_folder = "/mnt/c/Users/tputta/OneDrive - Toole Design/Desktop/SSPF/FARS"

for (start_yr, end_yr) in [(2015, 2019), (2017, 2021)]:
# store csv as a list of dataframes
for table_type in ["accident", "person"]:
# for table_type in ["person"]:
df_list = []
for yr in range(start_yr, end_yr+1):
print(f"Reading {yr} {table_type} data as a dataframe")
with ZipFile(f"{data_folder}/{yr}/FARS{yr}NationalCSV.zip") as zip_file:
# print(zip_file.namelist())
for file_name in list(zip_file.namelist()):
if f"{table_type}.csv" in file_name.lower():
print(f" {file_name}")
df = pd.read_csv(zip_file.open(file_name), low_memory=False, encoding_errors="ignore")
if table_type == "person":
df['year'] = yr
df_list.append(df)

# merge all the csvs into one dataframe
print("Merging all dfs into one")
df = pd.concat(df_list, ignore_index=True)
del df_list

# col names to lower case
df.columns = map(str.lower, df.columns)

print("Writing the dataframe to postgres table")
# df.to_sql(name=f"fars_{table_type}_{start_yr}_{end_yr}", schema="received", con=engine, if_exists="replace", index=False, chunksize=10000, method="multi")

# using sql copy function instead of pandas to_sql as it is much faster
df.head(0).to_sql(name=f"fars_{table_type}_{start_yr}_{end_yr}", schema="received", con=engine, if_exists="replace")
query = sql.SQL(f"copy received.fars_{table_type}_{start_yr}_{end_yr} from stdin with csv delimiter as ','")
buffer = StringIO()
df.to_csv(buffer, header=False)
buffer.seek(0)
cur = conn.cursor()

cur.copy_expert(sql=query, file=buffer)
conn.commit()

conn.close()
69 changes: 69 additions & 0 deletions general/update_fars/03_process_fars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pandas as pd
import os
import psycopg2
from dotenv import load_dotenv

# get environment variables
load_dotenv("rds_conn_vars.env")

# db connection
TDG_HOST = os.environ["TDG_HOST"]
TDG_DB_NAME = os.environ["TDG_DB_NAME"]
TDG_USER_NAME = os.environ["TDG_USER_NAME"]
TDG_PASSWORD = os.environ["TDG_PASSWORD"]

connection_string = f"host={TDG_HOST} dbname={TDG_DB_NAME} user={TDG_USER_NAME} password={TDG_PASSWORD}"
conn = psycopg2.connect(connection_string)
cur = conn.cursor()

for (start_yr, end_yr) in [(2015, 2019), (2017, 2021)]:
print(f"Processing {start_yr}_{end_yr} data")
query = f"""
DROP TABLE IF EXISTS automated.fars_processed_{start_yr}_{end_yr};
CREATE TABLE automated.fars_processed_{start_yr}_{end_yr} AS (
SELECT
a.st_case,
-- just go through person types in order of vulnerability
CASE
WHEN per_typ && ARRAY[5, 8] THEN 'pedestrian'
WHEN per_typ && ARRAY[6, 7] THEN 'bicyclist'
WHEN per_typ && ARRAY[1, 2, 3] THEN 'motor vehicle'
ELSE 'other'
END AS crash_mode,
-- since its FARS, hard coding in severity
'fatality' AS crash_severity,
a.year AS crash_year,
LPAD(state::TEXT, 2, '0') AS state_fp,
LPAD(county::TEXT, 3, '0') AS county_fp,
CASE
WHEN city != 0 THEN LPAD(city::TEXT, 5, '0')
ELSE NULL
END AS place_fp,
a.func_sys AS functional_class,
ST_SetSRID(ST_MakePoint(longitud, latitude), 4326) AS geom
FROM
received.fars_accident_{start_yr}_{end_yr} a
LEFT JOIN
(
SELECT
st_case,
year,
array_agg(per_typ::INT) AS per_typ

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be overkill, but it could be worth replicating the logic shown in Table 3-39 on pg 560 of https://crashstats.nhtsa.dot.gov/Api/Public/ViewPublication/813417, since the per_type code changes little year to year.

Also, according to that table, we might want to assign 'motor vehicle' to the 'Driver' (1) and 'Passenger' types (2 and 9) and remove 3 as its listed as 'Other non-occupant'.

Copy link

@Jacob816 Jacob816 Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, ran the numbers, the removing 3 and adding 9 to assign 'motor vehicle' removes all instances of 'other' and assigns them to 'motor vehicle'. For 2015-2019 this was 167 crashes (0.10% of all crashes), and for 2017-2021 it was 133 crashes (0.08% of all crashes). So pretty minor, but still might be worth changing.

FROM
received.fars_person_{start_yr}_{end_yr}
GROUP BY
st_case,
year
) p
ON a.st_case = p.st_case AND a.year = p.year
)
;

ALTER TABLE automated.fars_processed_{start_yr}_{end_yr} ADD pkey SERIAL PRIMARY KEY;
Copy link

@Jacob816 Jacob816 Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the unique ID is comprised on st_case which is the 2 letter state code + sequential case number (which resets each year) and crash_year, should the primary key be (st_case, crash_year)?

CREATE INDEX ON automated.fars_processed_{start_yr}_{end_yr} USING GIST(geom);
ANALYZE automated.fars_processed_{start_yr}_{end_yr};
"""
cur.execute(query)
conn.commit()

conn.close()
29 changes: 29 additions & 0 deletions general/update_fars/04_backup_old_fars_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import psycopg2
from dotenv import load_dotenv

# get environment variables
load_dotenv("rds_conn_vars.env")

# db connection
HOST = os.environ["HOST"]
DB_NAME = os.environ["DB_NAME"]
USER_NAME = os.environ["USER_NAME"]
PASSWORD = os.environ["PASSWORD"]

connection_string = f"host={HOST} dbname={DB_NAME} user={USER_NAME} password={PASSWORD}"
conn = psycopg2.connect(connection_string)
cur = conn.cursor()

query = f"""
CREATE TABLE static.fars_processed_2015_2019_backup (LIKE static.fars_processed INCLUDING ALL);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use INCLUDING ALL when making the table, will that make static.fars_processed_2015_2019_backup a dependent and create an issue if static.fars_processed is deleted when the new dataset is uploaded in the next script?


INSERT INTO static.fars_processed_2015_2019_backup
SELECT *
FROM static.fars_processed
;
"""

cur.execute(query)
conn.commit()
conn.close()
16 changes: 16 additions & 0 deletions general/update_fars/05_copy_fars_from_tdg_server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

# Export env vars
export $(grep -v '^#' rds_conn_vars.env | xargs)

ogr2ogr \

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this script copything things from our server to the SSPF server just meant for testing, or is it meant to be how its going to be done going forward?

-lco GEOMETRY_NAME=geom \
-lco precision=NO \
-f "PostgreSQL" \
PG:"host=$HOST dbname=$DB_NAME user=$USER_NAME password=$PASSWORD" \
PG:"host=$TDG_HOST dbname=$TDG_DB_NAME user=$TDG_USER_NAME password=$TDG_PASSWORD" "automated.fars_processed_2017_2021" \
-t_srs EPSG:4326 \
-nln "scratch._tmp_fars_processed_2017_2021" \
-overwrite \
-progress \
--config PG_USE_COPY YES
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
python-dotenv
psycopg2-binary
SQLAlchemy
pandas