-
Notifications
You must be signed in to change notification settings - Fork 4
/
wah_extract_local.py
executable file
·131 lines (113 loc) · 4.95 KB
/
wah_extract_local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python2.7
###############################################################################
# Program : wah_extract_local.py
# Author : Peter Uhe, based on original scripts by Neil Massey
# Date : 09/09/16
# Purpose : Script to specify the folder containing w@h zip files and extract the data of
# requested fields into separate netCDF files
###############################################################################
import sys, os
import ast
import tempfile, shutil
import glob
import argparse
from wah_extract_functions import extract_local,process_netcdf,check_files_exist,get_filename
###############################################################################
if __name__ == "__main__":
urls_file = ""
fields = ""
output_dir = ""
parser=argparse.ArgumentParser('Batch Extract Script:')
in_dir_help='Input directory e.g. /gpfs/projects/cpdn/storage/boinc/upload/batch_440/successful/'
parser.add_argument('-i','--in_dir',required=True,help=in_dir_help)
out_dir_help='Base of output directory for extracted files'
parser.add_argument('-o','--out_dir',required=True,help=out_dir_help)
fields_help='List of fields to extract: fields has the format:'
fields_help+='\n : [file_stream,stash_code,[region],process,valid_min,valid_max,time_freq,cell_method,vert_lev]'
fields_help+='\n : where file_stream = ga.pd|ga.pe|ma.pc'
fields_help+='\n : stash_code = stash_section*1000 + stash_item'
fields_help+='\n : [region] = [lon_NW,lat_NW,lon_SW,lat_SW]'
fields_help+='\n : process = time post_processing: min|max|mean|sum|all'
fields_help+='\n : time_freq = input variable data frequency in hours (e.g. 24=daily, 720=monthly)'
fields_help+='\n : cell_method = input variable time cell method: minimum,maximum,mean'
fields_help+='\n : vert_lev = (optional) input variable name of vertical level in netcdf file'
parser.add_argument('-f','--fields',required=True,help=fields_help)
parser.add_argument('-s','--start_zip',type=int,default=1,help='First zip to extract')
parser.add_argument('-e','--end_zip',type=int,default=12,help='Last zip to extract')
parser.add_argument('--structure',default='std',help='Directory structure [std|startdate-dir]')
parser.add_argument('--output-freq',default='month',help='Output frequency of model zip/data files [monthly|yearly]')
# Get arguments
args = parser.parse_args()
fields=args.fields
output_dir=args.out_dir
in_dir=args.in_dir
start_zip=args.start_zip
end_zip=args.end_zip
if args.structure!='std' and args.structure!='startdate-dir':
raise Exception('Error, --structure argument must be either std or startdate-dir')
# split the field list up
field_list = ast.literal_eval(fields)
for field in field_list:
if len(field) != 9:
print "Error! Fields argument not formatted correctly"
print field
print fields_help
exit()
# Get all workunit folders within batch folder
taskdirs= glob.glob(in_dir+'/*')
print 'fields',field_list
print 'Number of tasks:',len(taskdirs)
# create a temporary directory in home directory
temp_dir = tempfile.mkdtemp(dir=os.environ['HOME'])
temp_nc = os.path.join(temp_dir,'tmp.nc')
try:
# Loop over tasks
for u in list(taskdirs):
if not os.path.isdir(u):
# Assume that each task is in it's own directory
continue
print u
# Check if files already exist and skip if the are
if check_files_exist(u, field_list,output_dir,start_zip,end_zip,args.structure,args.output_freq):
print 'Files exist, skipping'
continue
# Extract zip files into temporary directory
all_netcdfs=extract_local(u, field_list, output_dir, temp_dir,start_zip,end_zip)
if not all_netcdfs:
print 'Extract failed for task: ',os.path.basename(u)
continue
# Process fields into single netcdf files
for field in field_list:
out_file = get_filename(u, field,output_dir,start_zip,end_zip,structure=args.structure,zip_freq=args.output_freq)
print out_file
netcdfs=all_netcdfs[field[0]] # List of netcdf files for stream in field (e.g. 'ga.pe')
if not netcdfs:
print 'Error, no files for requested file stream:',field[0]
continue
for i,nc_in_file in enumerate(netcdfs):
if i==0:
append=False
else:
append=True
out_netcdf=process_netcdf(nc_in_file,temp_nc,field,append,zip_freq=args.output_freq)
if not out_netcdf:
break
# Successfully created file:
if out_netcdf:
# First make the directory
out_dir=os.path.dirname(out_file)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
# Rename temp file to out_netcdf
shutil.move(temp_nc,out_file)
print os.path.basename(out_file)
# Remove netcdf files to stop temp directory getting too big
for nc_list in all_netcdfs.itervalues():
for fname in nc_list:
os.remove(fname)
except Exception,e:
print 'Error extracting netcdf files',e
# raise
finally:
# remove the temporary directory
shutil.rmtree(temp_dir)