-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio_utils.py
290 lines (225 loc) · 10.4 KB
/
io_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
#!/usr/bin/env python
'''
io_utils - contains scripts for read/write of main files
'''
import os
import errno
import pandas as pd
import numpy as np
import setup
import datetime as dt
import csv
import logging
logger = logging.getLogger(__name__)
from qc_utils import Station, populate_station, MDI, QC_TESTS
#************************************************************************
def count_skip_rows(infile: str) -> list:
"""
Read through the file, counting matches for expected header,
but in unexpected lines (!=0). Return these line numbers as list (zero-indexed)
:param infile str: file to process
:returns: list of line numbers
"""
skip_rows = []
with open(infile, "r") as data_file:
reader = csv.reader(data_file)
for r, row in enumerate(reader):
if r == 0:
continue
if "Station_ID|Station_name" in row[0]:
logger.warning(f"Extra header row at line {r}")
skip_rows += [r]
return skip_rows
#************************************************************************
def read_psv(infile: str, separator: str) -> pd.DataFrame:
'''
http://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-read-csv-table
https://stackoverflow.com/questions/64302419/what-are-all-of-the-exceptions-that-pandas-read-csv-throw
:param str infile: location and name of infile (without extension)
:param str separator: separating character (e.g. ",", "|")
:returns: df - DataFrame
'''
try:
df = pd.read_csv(infile, sep=separator, compression="infer",
dtype=setup.DTYPE_DICT, na_values="Null", quoting=3, index_col=False)
except FileNotFoundError as e:
logger.warning(f"psv file not found: {str(e)}")
print(str(e))
raise FileNotFoundError(str(e))
except ValueError as e:
logger.warning(f"Error in psv rows: {str(e)}")
print(str(e))
# Presuming that there is an extra header line somewhere in the file
# Find location of the extra header line
skip_rows = count_skip_rows(infile)
# Now re-read the file
df = pd.read_csv(infile, sep=separator, compression="infer",
dtype=setup.DTYPE_DICT, na_values="Null", quoting=3,
index_col=False, skiprows=skip_rows)
except pd.errors.ParserError as e:
logger.warning(f"Parser Error: {str(e)}")
print(str(e))
raise pd.errors.ParserError(str(e))
except EOFError as e:
logger.warning(f"End of File Error (gzip): {str(e)}")
print(str(e))
raise EOFError(str(e))
# Number of columns at August 2023, or after adding flag columns
assert len(df.columns) in [238, 238+len(setup.obs_var_list)]
return df # read_psv
#************************************************************************
def read(infile:str) -> pd.DataFrame:
"""
Wrapper for read functions to allow remainder to be file format agnostic.
:param str infile: location and name of infile (without extension)
:param str extension: infile extension [mff]
:returns: df - DataFrame
"""
# for .psv
if os.path.exists(infile):
df = read_psv(infile, "|")
else:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), infile)
return df # read
#************************************************************************
def calculate_datetimes(station_df: pd.DataFrame) -> pd.Series:
"""
Convert the separate Y-M-D H-M values into datetime objects
:param pd.DataFrame station_df: dataframe for the station record
:returns: pd.Series of datetime64 values
"""
try:
datetimes = pd.to_datetime(station_df[["Year", "Month", "Day", "Hour", "Minute"]])
except ValueError as e:
if str(e) == "cannot assemble the datetimes: day is out of range for month":
year = station_df["Year"]
month = station_df["Month"]
day = station_df["Day"]
for y, yy in enumerate(year):
try:
# if Datatime doesn't throw an error here, then it's valid
_ = dt.datetime(yy, month[y], day[y])
except ValueError:
print(f"Bad date: {yy}-{month[y]}-{day[y]}\n")
logger.warning(f"Bad date: {yy}-{month[y]}-{day[y]}\n")
raise ValueError(f"Bad date - {yy}-{month[y]}-{day[y]}")
return datetimes
#************************************************************************
def convert_wind_flags(station_df: pd.DataFrame) -> None:
# explicitly remove any missing data indicators - wind direction only
for wind_flag in ["C-Calm", "V-Variable"]:
combined_mask = (station_df["wind_direction_Measurement_Code"] == wind_flag) &\
(station_df["wind_direction"] == 999)
station_df.loc[combined_mask, "wind_direction"] = np.nan
#************************************************************************
def read_station(stationfile: str, station: Station,
read_flags: bool = False) -> tuple[Station, pd.DataFrame]:
"""
Read station info, and populate with data.
:param str stationfile: full path to station file
:param station station: station object with locational metadata only
:param bool read_flags: incorporate any pre-existing flags
:returns: station & station_df
"""
#*************************
# read MFF
try:
station_df = read(stationfile)
except FileNotFoundError:
logger.warning(f"Missing station file {stationfile}")
raise FileNotFoundError
# calculate datetime series
datetimes = calculate_datetimes(station_df)
# convert any remaining wind flags
convert_wind_flags(station_df)
# convert dataframe to station and MetVar objects for internal processing
populate_station(station, station_df, setup.obs_var_list, read_flags=read_flags)
station.times = datetimes
# store extra information to enable easy extraction later
station.years = station_df["Year"].fillna(MDI).to_numpy()
station.months = station_df["Month"].fillna(MDI).to_numpy()
station.days = station_df["Day"].fillna(MDI).to_numpy()
station.hours = station_df["Hour"].fillna(MDI).to_numpy()
return station, station_df # read_station
#************************************************************************
def write_psv(outfile: str, df: pd.DataFrame, separator: str) -> None:
'''
http://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-read-csv-table
:param str outfile: location and name of outfile
:param DataFrame df: data frame to write
:param str separator: separating character (e.g. ",", "|")
'''
df.to_csv(outfile, index=False, sep=separator, compression="infer")
return # write_psv
#************************************************************************
def write(outfile: str, df: pd.DataFrame, formatters: dict = {}) -> None:
"""
Wrapper for write functions to allow remainder to be file format agnostic.
:param str outfile: location and name of outfile
:param DataFrame df: data frame to write
:param formatters dict: dictionary of formatters
"""
# need to adjust formatting for certain columns before writing
for column, fmt in formatters.items():
df[column] = pd.Series([fmt.format(val) for val in df[column]], index = df.index)
# Latitude & Longitude = {:7.4f}
# Monthy, Day, Hour, & Minute = {:0.2d}
# for .psv
write_psv(outfile, df, "|")
return # write
#************************************************************************
def flag_write(outfilename: str, df: pd.DataFrame, diagnostics: bool = False) -> None:
"""
Write out flag summary files to enable quicker plotting
:param str outfile: location and name of outfile
:param DataFrame df: data frame to write
:param bool diagnostics: verbose output
"""
with open(outfilename, "w") as outfile:
for var in setup.obs_var_list:
flags = df[f"{var}_QC_flag"].fillna("")
# Pull out the actual observations
this_var_data = df[var].fillna(MDI).to_numpy().astype(float)
this_var_data = np.ma.masked_where(this_var_data == MDI, this_var_data)
# write out for all tests, regardless if set for this variable or not
for test in QC_TESTS.keys():
locs = flags[flags.str.contains(test)]
# For percentage, compare against all obs, not obs for that var
if np.ma.count(this_var_data) == 0:
outfile.write(f"{var} : {test} : 0\n")
else:
outfile.write(f"{var} : {test} : {locs.shape[0]/np.ma.count(this_var_data)}\n")
outfile.write(f"{var} : {test}_counts : {locs.shape[0]}\n")
# for total, get number of set flags (excluding fixable wind logical)
flagged, = np.where(np.logical_and(flags != "", flags != "1"))
if np.ma.count(this_var_data) == 0:
outfile.write(f"{var} : All : 0\n")
else:
outfile.write(f"{var} : All : {flagged.shape[0]/np.ma.count(this_var_data)}\n")
outfile.write(f"{var} : All_counts : {flagged.shape[0]}\n")
logging.info(f"{var} - {flagged.shape[0]}")
if diagnostics:
print(f"{var} - {flagged.shape[0]}")
print(f"{var} - {100*flagged.shape[0]/np.ma.count(this_var_data)}")
return # flag_write
#************************************************************************
def write_error(station: Station, message: str, error: str = "", diagnostics:bool = False) -> None:
"""
Write out quick failure message for station
:param Station station: met. station
:param str message: message to store
:param str error: error output from stacktrace
:param bool diagnostics: turn on diagnostic output
"""
outfilename = os.path.join(setup.SUBDAILY_ERROR_DIR, f"{station.id:11s}.err")
# in case this file already exists, then append
if os.path.exists(outfilename):
write_type = "a"
else:
write_type = "w"
with open(outfilename, write_type) as outfile:
outfile.write(dt.datetime.strftime(dt.datetime.now(), "%Y-%m-%d %H:%M") + "\n")
outfile.write(message + "\n")
if error != "":
outfile.write(error + "\n")
return # write_error