-
Notifications
You must be signed in to change notification settings - Fork 0
/
createIngestData.py
205 lines (164 loc) · 9.07 KB
/
createIngestData.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/env python
# coding: utf-8
# Import python modules
import argparse, os, glob, psycopg2
import pandas as pd
import numpy as np
from psycopg2.extensions import AsIs
from loguru import logger
# This function takes a dataset name as input, and uses it to query the drf_harvest_data_file_met table, creating a list
# of filenames. The list is converted to a DataFrame and returned.
def getInputFiles(inputDataset):
try:
# Create connection to database and get cursor
conn = psycopg2.connect("dbname='apsviz_gauges' user='apsviz_gauges' host='localhost' port='5432' password='apsviz_gauges'")
cur = conn.cursor()
# Set enviromnent
cur.execute("""SET CLIENT_ENCODING TO UTF8""")
cur.execute("""SET STANDARD_CONFORMING_STRINGS TO ON""")
cur.execute("""BEGIN""")
# Run query
cur.execute("""SELECT dir_path, file_name
FROM drf_harvest_data_file_meta
WHERE source = %(source)s AND ingested = False
ORDER BY data_date_time""",
{'source': inputDataset})
# convert query output to Pandas DataFrame
df = pd.DataFrame(cur.fetchall(), columns=['dir_path','file_name'])
# Close cursor and database connection
cur.close()
conn.close()
# Return Pandas dataframe
if inputDataset == 'adcirc':
return(df.head(40))
else:
return(df.head(20))
# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# This function takes as input the source_archive (noaa, contrails), and a list of station_id(s), and returnst source_id(s) for
# observation data from the gauge_source table in the apsviz_gauges database. This funciton specifically gets source_id(s) for
# observation data, such as from NOAA and NCEM.
def getObsSourceID(source_archive,station_tuples):
try:
# Create connection to database and get cursor
conn = psycopg2.connect("dbname='apsviz_gauges' user='apsviz_gauges' host='localhost' port='5432' password='apsviz_gauges'")
cur = conn.cursor()
# Set enviromnent
cur.execute("""SET CLIENT_ENCODING TO UTF8""")
cur.execute("""SET STANDARD_CONFORMING_STRINGS TO ON""")
cur.execute("""BEGIN""")
# Run query
cur.execute("""SELECT s.source_id AS source_id, g.station_id AS station_id, g.station_name AS station_name,
s.data_source AS data_source, s.source_name AS source_name
FROM drf_gauge_station g INNER JOIN drf_gauge_source s ON s.station_id=g.station_id
WHERE source_archive = %(sourcearchive)s AND station_name IN %(stationtuples)s
ORDER BY station_name""",
{'sourcearchive': source_archive,'stationtuples': AsIs(station_tuples)})
# convert query output to Pandas dataframe
dfstations = pd.DataFrame(cur.fetchall(), columns=['source_id','station_id', 'station_name',
'data_source','source_name'])
# Close cursor and database connection
cur.close()
conn.close()
# Return Pandas dataframe
return(dfstations)
# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# This function takes as input the data_source (hsofs...), and a list of station_id(s), and returns source_id(s) for
# model data from the drf_gauge_source table in the apsviz_gauges database. This funciton specifically gets source_id(s) for
# model data, such as from ADCIRC. The data_source, such is hsofs, is the grid that is used in the ADCIRC run.
def getModelSourceID(data_source,station_tuples):
try:
# Create connection to database and get cursor
conn = psycopg2.connect("dbname='apsviz_gauges' user='apsviz_gauges' host='localhost' port='5432' password='apsviz_gauges'")
cur = conn.cursor()
# Set enviromnent
cur.execute("""SET CLIENT_ENCODING TO UTF8""")
cur.execute("""SET STANDARD_CONFORMING_STRINGS TO ON""")
cur.execute("""BEGIN""")
# Run query
cur.execute("""SELECT s.source_id AS source_id, g.station_id AS station_id, g.station_name AS station_name,
s.data_source AS data_source, s.source_name AS source_name
FROM drf_gauge_station g INNER JOIN drf_gauge_source s ON s.station_id=g.station_id
WHERE data_source = %(datasource)s AND station_name IN %(stationtuples)s
ORDER BY station_name""",
{'datasource': data_source, 'stationtuples': AsIs(station_tuples)})
# convert query output to Pandas dataframe
dfstations = pd.DataFrame(cur.fetchall(), columns=['source_id','station_id','station_name','data_source',
'source_name'])
# Close cursor and database connection
cur.close()
conn.close()
# Return Pandas dataframe
return(dfstations)
# If exception print error
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# This function takes as input a directory input path, directory output path and a filename, and returns a csv
# file that containes gauge data. the function uses the getObsSourceID and getModelSourceID functions above to get
# a list of existing source ids that it includes in the gauge data to enable joining the gauge data table with
# gauge source table. The function adds a timemark, that it gets from the input file name. The timemark values can
# be used to uniquely query an ADCIRC model run.
def addMeta(inputDir, outputDir, inputFile):
# Read input file, convert column name to lower case, rename station column to station_name, convert its data
# type to string, and add timemark and source_id columns
df = pd.read_csv(inputDir+inputFile)
df.columns= df.columns.str.lower()
df = df.rename(columns={'station': 'station_name'})
df = df.astype({"station_name": str})
df.insert(0,'timemark', '')
df.insert(0,'source_id', '')
# Extract list of stations from dataframe for querying the database, and get source_archive name from filename.
station_tuples = tuple(sorted([str(x) for x in df['station_name'].unique().tolist()]))
source_archive = inputFile.split('_')[0].lower().strip()
# check if source archive name is ADCIRC
if source_archive == 'adcirc':
# Get soure_name and data_source from filename, and use it along with the list of stations to run
# the getModelSourceID function to get the sourc_id(s)
data_source = inputFile.split('_')[2].lower().strip()+'_'+inputFile.split('_')[3].lower().strip()
dfstations = getModelSourceID(data_source,station_tuples)
# Get the timemark for the forecast and nowecast data
df['timemark'] = inputFile.split('_')[-1].split('.')[0].lower().strip()
else:
# Use source_archive and list of stations to get source_id(s) for the observation gauge data
dfstations = getObsSourceID(source_archive,station_tuples)
df['timemark'] = inputFile.split('_')[-1].split('.')[0].lower().strip()
# Add source id(s) to dataframe
for index, row in dfstations.iterrows():
df.loc[df['station_name'] == row['station_name'], 'source_id'] = row['source_id']
# Drom station_name column from dataframe
df.drop(columns=['station_name'], inplace=True)
# Write dataframe to csv file
df.to_csv(outputDir+'data_copy_'+inputFile, index=False)
# This function takes as input a directory input path, a directory output path and a dataset variable. It
# generates and list of input filenames, and uses them to run the addMeta function above.
def processData(outputDir, inputDataset):
dfDirFiles = getInputFiles(inputDataset)
for index, row in dfDirFiles.iterrows():
inputDir = row[0]
inputFile = row[1]
addMeta(inputDir, outputDir, inputFile)
# Main program function takes args as input, which contains the outputDir, and inputDataset values.
@logger.catch
def main(args):
# Add logger
logger.remove()
log_path = os.getenv('LOG_PATH', os.path.join(os.path.dirname(__file__), 'logs'))
logger.add(log_path+'/createIngestData.log', level='DEBUG')
# Extract args variables
outputDir = args.outputDir
inputDataset = args.inputDataset
logger.info('Start processing data for dataset '+inputDataset+'.')
processData(outputDir, inputDataset)
logger.info('Finished processing data for dataset '+inputDataset+'.')
# Run main function takes outputDir, and inputDataset as input.
if __name__ == "__main__":
""" This is executed when run from the command line """
parser = argparse.ArgumentParser()
# Optional argument which requires a parameter (eg. -d test)
parser.add_argument("--outputDir", action="store", dest="outputDir")
parser.add_argument("--inputDataset", action="store", dest="inputDataset")
args = parser.parse_args()
main(args)