forked from pfuhe1/cpdn_extract_scripts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wah_extract_local.py
executable file
·151 lines (135 loc) · 6.65 KB
/
wah_extract_local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python2.7
###############################################################################
# Program : wah_extract_local.py
# Author : Peter Uhe, based on original scripts by Neil Massey
# Date : 09/09/16
# Purpose : Script to specify the folder containing w@h zip files and extract the data of
# requested fields into separate netCDF files
###############################################################################
import sys, os
import ast
import tempfile, shutil
import glob
import argparse
import traceback
from wah_extract_functions import extract_local,process_netcdf,check_files_exist,get_filename,compress_netcdf
###############################################################################
if __name__ == "__main__":
urls_file = ""
fields = ""
output_dir = ""
parser=argparse.ArgumentParser('Batch Extract Script:')
in_dir_help='Input directory e.g. /gpfs/projects/cpdn/storage/boinc/upload/batch_440/successful/'
parser.add_argument('-i','--in_dir',required=True,help=in_dir_help)
out_dir_help='Base of output directory for extracted files'
parser.add_argument('-o','--out_dir',required=True,help=out_dir_help)
fields_help='List of fields to extract: fields has the format:'
fields_help+='\n : [file_stream,stash_code,[region],process,valid_min,valid_max,time_freq,cell_method,vert_lev]'
fields_help+='\n : where file_stream = ga.pd|ga.pe|ma.pc'
fields_help+='\n : stash_code = stash_section*1000 + stash_item'
fields_help+='\n : [region] = [lon_NW,lat_NW,lon_SW,lat_SW]'
fields_help+='\n : process = time post_processing: min|max|mean|sum|all'
fields_help+='\n : time_freq = input variable data frequency in hours (e.g. 24=daily, 720=monthly)'
fields_help+='\n : cell_method = input variable time cell method: minimum,maximum,mean,inst'
fields_help+='\n : vert_lev = (optional) input variable name of vertical level in netcdf file'
parser.add_argument('-f','--fields',required=True,help=fields_help)
# add in argument for selecting one year
parser.add_argument('-y','--year',default=0,help='Year to extract: specifiy a particular year to extract, if need to extract all years, set to 0')
parser.add_argument('-s','--start_zip',type=int,default=1,help='First zip to extract')
parser.add_argument('-e','--end_zip',type=int,default=12,help='Last zip to extract')
parser.add_argument('--structure',default='std',help='Directory structure [std|startdate-dir]')
parser.add_argument('--output-freq',default='month',help='Output frequency of model zip/data files [monthly|yearly]')
# Get arguments
args = parser.parse_args()
fields=args.fields
output_dir=args.out_dir
in_dir=args.in_dir
year_to_extract=args.year
start_zip=args.start_zip
end_zip=args.end_zip
if args.structure!='std' and args.structure!='startdate-dir':
raise Exception('Error, --structure argument must be either std or startdate-dir')
# split the field list up
field_list = ast.literal_eval(fields)
for field in field_list:
if len(field) != 9:
print "Error! Fields argument not formatted correctly"
print field
print fields_help
exit()
# Get all workunit folders within batch folder
# Either specify a certain year to extract or extract all years
YearCode=int(year_to_extract)
if YearCode == 0:
taskdirs = glob.glob(in_dir+'*')
else:
YearString='_'+ str(YearCode) + '*'
pathhh= os.path.join(in_dir+'*'+YearString)
taskdirs= glob.glob(pathhh)
print 'Year to extract:',YearCode
print 'fields',field_list
print 'Number of tasks:',len(taskdirs)
# create a temporary directory in home directory
# temp_dir = tempfile.mkdtemp(dir=os.environ['HOME'])
tmp_dir = os.path.join(output_dir+'/tmp')
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
print 'created temporary dir: ',os.path.basename(tmp_dir)
temp_dir = tempfile.mkdtemp(dir=tmp_dir)
temp_nc = os.path.join(temp_dir,'tmp.nc')
try:
# Loop over tasks
for u in list(taskdirs):
if not os.path.isdir(u):
# Assume that each task is in it's own directory
continue
print u
# Check if files already exist and skip if the are
if check_files_exist(u, field_list,output_dir,start_zip,end_zip,args.structure,args.output_freq):
print 'Files exist, skipping'
continue
# Extract zip files into temporary directory
all_netcdfs=extract_local(u, field_list, output_dir, temp_dir,start_zip,end_zip)
if not all_netcdfs:
print 'Extract failed for task: ',os.path.basename(u)
continue
# Process fields into single netcdf files
for field in field_list:
out_file = get_filename(u, field,output_dir,start_zip,end_zip,structure=args.structure,zip_freq=args.output_freq)
print out_file
netcdfs=all_netcdfs[field[0]] # List of netcdf files for stream in field (e.g. 'ga.pe')
if not netcdfs:
print 'Error, no files for requested file stream:',field[0]
continue
for i,nc_in_file in enumerate(netcdfs):
if i==0:
append=False
else:
append=True
out_netcdf=process_netcdf(nc_in_file,temp_nc,field,append,zip_freq=args.output_freq)
if not out_netcdf:
break
# Successfully created file:
if out_netcdf:
# First make the directory
out_dir=os.path.dirname(out_file)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
# Rename temp file to out_netcdf
shutil.move(temp_nc,out_file)
print os.path.basename(out_file)
# Remove netcdf files to stop temp directory getting too big
for nc_list in all_netcdfs.itervalues():
for fname in nc_list:
os.remove(fname)
# Compress the output netcdf file
compress_netcdf(out_file)
except Exception as e:
print('Error extracting netcdf files',e)
traceback.print_exc()
finally:
# remove the temporary directory
shutil.rmtree(temp_dir,ignore_errors=True)
shutil.rmtree(tmp_dir,ignore_errors=True)