-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update fars table #8
base: main
Are you sure you want to change the base?
Changes from all commits
f30d1c3
cecc919
aa39eb2
48483b6
e5b911f
6b2f852
1d1787a
14f6d57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,3 +37,10 @@ vignettes/*.pdf | |
|
||
# R Environment Variables | ||
.Renviron | ||
|
||
# python venv | ||
venv/ | ||
env/ | ||
|
||
# Environment variables | ||
*.env |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from urllib import request | ||
import os | ||
|
||
# place to save data locally | ||
data_folder = "/mnt/c/Users/tputta/OneDrive - Toole Design/Desktop/SSPF/FARS" | ||
|
||
start_yr = 2015 | ||
end_yr = 2021 | ||
|
||
for yr in range(start_yr, end_yr+1): | ||
print(f"Downloading {yr} FARS data") | ||
if not os.path.exists(f"{data_folder}/{yr}"): | ||
os.mkdir(f"{data_folder}/{yr}") | ||
request.urlretrieve(f"https://static.nhtsa.gov/nhtsa/downloads/FARS/{yr}/National/FARS{yr}NationalCSV.zip", f"{data_folder}/{yr}/FARS{yr}NationalCSV.zip") | ||
yr += 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we incrementing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I started with while loop and this is a vestige of that that I forgot to remove. It does not really harm anything in this case, but I will remove it |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import pandas as pd | ||
import os | ||
import psycopg2 | ||
from psycopg2 import sql | ||
from dotenv import load_dotenv | ||
from zipfile import ZipFile | ||
from sqlalchemy import create_engine | ||
from io import StringIO | ||
|
||
# get environment variables | ||
load_dotenv("rds_conn_vars.env") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there documentation somewhere that says how to create this? If not, should we add it here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The .env file itself is not very different from how we define variables in a shell script. There are plenty of resources online for the proper syntax of it and it is pretty straightforward. I don't think we need to add any additional documentation details for this. |
||
|
||
# db connection | ||
TDG_HOST = os.environ["TDG_HOST"] | ||
TDG_DB_NAME = os.environ["TDG_DB_NAME"] | ||
TDG_USER_NAME = os.environ["TDG_USER_NAME"] | ||
TDG_PASSWORD = os.environ["TDG_PASSWORD"] | ||
|
||
connection_string = f"host={TDG_HOST} dbname={TDG_DB_NAME} user={TDG_USER_NAME} password={TDG_PASSWORD}" | ||
engine = create_engine(f'postgresql://{TDG_USER_NAME}:{TDG_PASSWORD}@{TDG_HOST}:5432/{TDG_DB_NAME}') | ||
|
||
conn = psycopg2.connect(connection_string) | ||
|
||
# place to save data locally | ||
data_folder = "/mnt/c/Users/tputta/OneDrive - Toole Design/Desktop/SSPF/FARS" | ||
|
||
for (start_yr, end_yr) in [(2015, 2019), (2017, 2021)]: | ||
# store csv as a list of dataframes | ||
for table_type in ["accident", "person"]: | ||
# for table_type in ["person"]: | ||
df_list = [] | ||
for yr in range(start_yr, end_yr+1): | ||
print(f"Reading {yr} {table_type} data as a dataframe") | ||
with ZipFile(f"{data_folder}/{yr}/FARS{yr}NationalCSV.zip") as zip_file: | ||
# print(zip_file.namelist()) | ||
for file_name in list(zip_file.namelist()): | ||
if f"{table_type}.csv" in file_name.lower(): | ||
print(f" {file_name}") | ||
df = pd.read_csv(zip_file.open(file_name), low_memory=False, encoding_errors="ignore") | ||
if table_type == "person": | ||
df['year'] = yr | ||
df_list.append(df) | ||
|
||
# merge all the csvs into one dataframe | ||
print("Merging all dfs into one") | ||
df = pd.concat(df_list, ignore_index=True) | ||
del df_list | ||
|
||
# col names to lower case | ||
df.columns = map(str.lower, df.columns) | ||
|
||
print("Writing the dataframe to postgres table") | ||
# df.to_sql(name=f"fars_{table_type}_{start_yr}_{end_yr}", schema="received", con=engine, if_exists="replace", index=False, chunksize=10000, method="multi") | ||
|
||
# using sql copy function instead of pandas to_sql as it is much faster | ||
df.head(0).to_sql(name=f"fars_{table_type}_{start_yr}_{end_yr}", schema="received", con=engine, if_exists="replace") | ||
query = sql.SQL(f"copy received.fars_{table_type}_{start_yr}_{end_yr} from stdin with csv delimiter as ','") | ||
buffer = StringIO() | ||
df.to_csv(buffer, header=False) | ||
buffer.seek(0) | ||
cur = conn.cursor() | ||
|
||
cur.copy_expert(sql=query, file=buffer) | ||
conn.commit() | ||
|
||
conn.close() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
import pandas as pd | ||
import os | ||
import psycopg2 | ||
from dotenv import load_dotenv | ||
|
||
# get environment variables | ||
load_dotenv("rds_conn_vars.env") | ||
|
||
# db connection | ||
TDG_HOST = os.environ["TDG_HOST"] | ||
TDG_DB_NAME = os.environ["TDG_DB_NAME"] | ||
TDG_USER_NAME = os.environ["TDG_USER_NAME"] | ||
TDG_PASSWORD = os.environ["TDG_PASSWORD"] | ||
|
||
connection_string = f"host={TDG_HOST} dbname={TDG_DB_NAME} user={TDG_USER_NAME} password={TDG_PASSWORD}" | ||
conn = psycopg2.connect(connection_string) | ||
cur = conn.cursor() | ||
|
||
for (start_yr, end_yr) in [(2015, 2019), (2017, 2021)]: | ||
print(f"Processing {start_yr}_{end_yr} data") | ||
query = f""" | ||
DROP TABLE IF EXISTS automated.fars_processed_{start_yr}_{end_yr}; | ||
CREATE TABLE automated.fars_processed_{start_yr}_{end_yr} AS ( | ||
SELECT | ||
a.st_case, | ||
-- just go through person types in order of vulnerability | ||
CASE | ||
WHEN per_typ && ARRAY[5, 8] THEN 'pedestrian' | ||
WHEN per_typ && ARRAY[6, 7] THEN 'bicyclist' | ||
WHEN per_typ && ARRAY[1, 2, 3] THEN 'motor vehicle' | ||
ELSE 'other' | ||
END AS crash_mode, | ||
-- since its FARS, hard coding in severity | ||
'fatality' AS crash_severity, | ||
a.year AS crash_year, | ||
LPAD(state::TEXT, 2, '0') AS state_fp, | ||
LPAD(county::TEXT, 3, '0') AS county_fp, | ||
CASE | ||
WHEN city != 0 THEN LPAD(city::TEXT, 5, '0') | ||
ELSE NULL | ||
END AS place_fp, | ||
a.func_sys AS functional_class, | ||
ST_SetSRID(ST_MakePoint(longitud, latitude), 4326) AS geom | ||
FROM | ||
received.fars_accident_{start_yr}_{end_yr} a | ||
LEFT JOIN | ||
( | ||
SELECT | ||
st_case, | ||
year, | ||
array_agg(per_typ::INT) AS per_typ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be overkill, but it could be worth replicating the logic shown in Table 3-39 on pg 560 of https://crashstats.nhtsa.dot.gov/Api/Public/ViewPublication/813417, since the Also, according to that table, we might want to assign 'motor vehicle' to the 'Driver' ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, ran the numbers, the removing |
||
FROM | ||
received.fars_person_{start_yr}_{end_yr} | ||
GROUP BY | ||
st_case, | ||
year | ||
) p | ||
ON a.st_case = p.st_case AND a.year = p.year | ||
) | ||
; | ||
|
||
ALTER TABLE automated.fars_processed_{start_yr}_{end_yr} ADD pkey SERIAL PRIMARY KEY; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the unique ID is comprised on |
||
CREATE INDEX ON automated.fars_processed_{start_yr}_{end_yr} USING GIST(geom); | ||
ANALYZE automated.fars_processed_{start_yr}_{end_yr}; | ||
""" | ||
cur.execute(query) | ||
conn.commit() | ||
|
||
conn.close() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import os | ||
import psycopg2 | ||
from dotenv import load_dotenv | ||
|
||
# get environment variables | ||
load_dotenv("rds_conn_vars.env") | ||
|
||
# db connection | ||
HOST = os.environ["HOST"] | ||
DB_NAME = os.environ["DB_NAME"] | ||
USER_NAME = os.environ["USER_NAME"] | ||
PASSWORD = os.environ["PASSWORD"] | ||
|
||
connection_string = f"host={HOST} dbname={DB_NAME} user={USER_NAME} password={PASSWORD}" | ||
conn = psycopg2.connect(connection_string) | ||
cur = conn.cursor() | ||
|
||
query = f""" | ||
CREATE TABLE static.fars_processed_2015_2019_backup (LIKE static.fars_processed INCLUDING ALL); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you use |
||
|
||
INSERT INTO static.fars_processed_2015_2019_backup | ||
SELECT * | ||
FROM static.fars_processed | ||
; | ||
""" | ||
|
||
cur.execute(query) | ||
conn.commit() | ||
conn.close() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
#!/bin/bash | ||
|
||
# Export env vars | ||
export $(grep -v '^#' rds_conn_vars.env | xargs) | ||
|
||
ogr2ogr \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this script copything things from our server to the SSPF server just meant for testing, or is it meant to be how its going to be done going forward? |
||
-lco GEOMETRY_NAME=geom \ | ||
-lco precision=NO \ | ||
-f "PostgreSQL" \ | ||
PG:"host=$HOST dbname=$DB_NAME user=$USER_NAME password=$PASSWORD" \ | ||
PG:"host=$TDG_HOST dbname=$TDG_DB_NAME user=$TDG_USER_NAME password=$TDG_PASSWORD" "automated.fars_processed_2017_2021" \ | ||
-t_srs EPSG:4326 \ | ||
-nln "scratch._tmp_fars_processed_2017_2021" \ | ||
-overwrite \ | ||
-progress \ | ||
--config PG_USE_COPY YES |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
python-dotenv | ||
psycopg2-binary | ||
SQLAlchemy | ||
pandas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably doesn't matter, but since this is a public repo, do you think that you should put your local filepath in here? Maybe instead make a
data
folder either in the top level repo directory or just in this folder, and save it there?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it is a one-time thing I didn't bother creating a
data
folder and adding it to gitignore.