Skip to content

Commit

Permalink
Merge pull request #31 from Keck-DataReductionPipelines/stf_parallelize
Browse files Browse the repository at this point in the history
Incorporate parallelized starfinder runs
  • Loading branch information
abhimat authored May 13, 2024
2 parents dc2ff64 + d6008bf commit 8d664b5
Showing 1 changed file with 154 additions and 63 deletions.
217 changes: 154 additions & 63 deletions kai/reduce/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@
import subprocess
import pylab as py
import pdb
from multiprocessing import Pool

def idl_process_run(batch_file):
"""
Function to help run parallel processes for IDL, used for StarFinder
Parameters
----------
batch_file : str
Path of the bath file containing the IDL command to run
"""

batch_file_log = batch_file + '.log'

cmd = 'idl < ' + batch_file + ' >& ' + batch_file_log
#os.system(cmd)
subp = subprocess.Popen(cmd, shell=True, executable="/bin/tcsh")
tmp = subp.communicate()

return

class Analysis(object):
"""
Expand All @@ -26,7 +46,8 @@ def __init__(self, epoch, rootDir='/g/lu/data/orion/', filt='kp',
combo_stf_dir = None,
epochDirSuffix=None, imgSuffix=None, stfDir=None,
useDistorted=False, cleanList='c.lis',
airopa_mode='single', stf_version=None,
airopa_mode='single', stf_version=None, stf_debug=False,
stf_parallelize=True,
instrument=instruments.default_inst):
"""
Set up Analysis object.
Expand Down Expand Up @@ -56,16 +77,24 @@ def __init__(self, epoch, rootDir='/g/lu/data/orion/', filt='kp',
(e.g.: 'v3_1')
instrument : instruments object, optional
Instrument of data. Default is `instruments.default_inst`
stf_debug : boolean, default=False
Keyword to specify if starfinder should be run in debug mode
stf_parallelize : boolean, default=True
Keyword to specify if starfinder runs on main map and sub maps
should be run in parallel.
By default, runs in parallel (all 4 run together).
"""
# Setup default parameters
self.type = 'ao'
self.corrMain = 0.8
self.corrSub = 0.6
self.corrClean = 0.7
self.corrMain = [0.8]
self.corrSub = [0.6]
self.corrClean = [0.7]
# airopa mode can be "legacy", "single", or "variable"
self.airopa_mode = airopa_mode
self.trimfake = 1
self.stfFlags = ''
self.stf_debug = stf_debug
self.stf_parallelize = stf_parallelize

self.starlist = rootDir + 'source_list/psf_central.dat'
self.labellist = rootDir+ 'source_list/label.dat'
Expand Down Expand Up @@ -252,40 +281,101 @@ def starfinderCombo(self, oldPsf=False):

os.chdir(self.dirComboStf)
if self.type == 'ao':
# Write an IDL batch file
fileIDLbatch = 'idlbatch_combo_' + self.filt
fileIDLlog = fileIDLbatch + '.log'
util.rmall([fileIDLlog, fileIDLbatch])

_batch = open(fileIDLbatch, 'w')
_batch.write("find_stf_new, ")
_batch.write("'" + self.epoch + "', ")
_batch.write("'" + self.filt + "', ")
_batch.write("corr_main=%3.1f, " % self.corrMain)
_batch.write("corr_subs=%3.1f, " % self.corrSub)
# Only send in the deblend flag if using it!
if self.deblend != None:
_batch.write("deblend=" + str(self.deblend) + ", ")
_batch.write("cooStar='" + self.cooStar + "', ")
_batch.write("suffixEpoch='" + self.suffix + "', ")
_batch.write("imgSuffix='" + self.imgSuffix + "', ")
_batch.write("starlist='" + self.starlist + "', ")
if self.airopa_mode == 'legacy':
_batch.write("/legacy, ")
if self.airopa_mode == 'variable':
_batch.write("/aoopt, ")
_batch.write("trimfake=" + str(self.trimfake) + ", ")
if not oldPsf:
_batch.write("/makePsf, ")
# Write an IDL batch file for the main map and each submap
batch_files = []
batch_file_logs = []

_batch.write("rootDir='" + self.rootDir + "'")

# Support for arbitrary starfinder flags.
_batch.write(self.stfFlags)
combos = ['main', '1', '2', '3']

for cur_combo in combos:
batch_file = 'idlbatch_main_' + self.filt
batch_file_log = batch_file + '.log'

if cur_combo != 'main':
batch_file = 'idlbatch_sm{0}_{1}'.format(cur_combo, self.filt)
batch_file_log = batch_file + '.log'

util.rmall([batch_file, batch_file_log])

batch_out = ""

combo_file_path = "{0}/mag{1}_{2}.fits".format(
self.dirCombo, self.epoch, self.filt,
)

if cur_combo != 'main':
combo_file_path = "{0}/m{1}_{2}_{3}.fits".format(
self.dirCombo, self.epoch, self.filt, cur_combo,
)

batch_out += "find_stf, "
batch_out += "'{0}', ".format(combo_file_path)

if cur_combo == 'main':
batch_out += "{0}, ".format(self.corrMain)
else:
batch_out += "{0}, ".format(self.corrSub)

batch_out += "ttStar='TTstar', gsStar='', "

# Add deblend flag if using it
if self.deblend != None:
batch_out += "deblend={0}, ".format(self.deblend)

if not oldPsf:
batch_out += "/makePsf, "

batch_out += "makeRes=1, "
batch_out += "makeStars=1, "

if self.airopa_mode == 'legacy':
batch_out += "/legacy, "

if self.airopa_mode == 'variable':
batch_out += "/aoopt, "

batch_out += "cooStar='{0}', ".format(self.cooStar)
batch_out += "starlist='{0}', ".format(self.starlist)
batch_out += "trimfake={0}, ".format(self.trimfake)

if self.stf_debug:
batch_out += "/debug, "

batch_out += "fixPsf=1, "
batch_out += "backboxFWHM=25, "
batch_out += "flat=1, "
batch_out += "subtract=1"

# Support for arbitrary starfinder flags.
if self.stfFlags != '':
batch_out += ", {0}".format(self.stfFlags)

batch_out += "\n"
batch_out += "exit\n"

# Write out the current batch file's contents
with open(batch_file, 'w') as out_file:
out_file.write(batch_out)

batch_files.append(batch_file)
batch_file_logs.append(batch_file_log)

if self.stf_parallelize:
stf_pool = Pool(processes=4)

stf_pool.map(idl_process_run, batch_files)
else:
for batch_file in batch_files:
idl_process_run(batch_file)

# Combine all log files into one combo log file
# (to preserve backward compatibility for code that reads log files)
fileIDLlog = 'idlbatch_combo_' + self.filt + '.log'
with open(fileIDLlog, 'wb') as out_file:
for log_file in batch_file_logs:
with open(log_file, 'rb') as in_file:
out_file.write(in_file.read())

_batch.write("\n")
_batch.write("exit\n")
_batch.close()
elif self.type == 'speckle':
fileIDLbatch = 'idlbatch_combo'
fileIDLlog = fileIDLbatch + '.log'
Expand All @@ -294,8 +384,8 @@ def starfinderCombo(self, oldPsf=False):
_batch = open(fileIDLbatch, 'w')
_batch.write("find_new_speck, ")
_batch.write("'" + self.epoch + "', ")
_batch.write("corr_main=%3.1f, " % self.corrMain)
_batch.write("corr_subs=%3.1f, " % self.corrSub)
_batch.write("corr_main={0}, " % self.corrMain)
_batch.write("corr_subs={0}, " % self.corrSub)
_batch.write("starlist='" + self.starlist + "', ")
if self.airopa_mode == 'legacy':
_batch.write("/legacy, ")
Expand All @@ -312,11 +402,11 @@ def starfinderCombo(self, oldPsf=False):
_batch.write("\n")
_batch.write("exit\n")
_batch.close()

cmd = 'idl < ' + fileIDLbatch + ' >& ' + fileIDLlog
#os.system(cmd)
subp = subprocess.Popen(cmd, shell=True, executable="/bin/tcsh")
tmp = subp.communicate()
cmd = 'idl < ' + fileIDLbatch + ' >& ' + fileIDLlog
#os.system(cmd)
subp = subprocess.Popen(cmd, shell=True, executable="/bin/tcsh")
tmp = subp.communicate()

# Write data_sources file
data_sources_file = open(self.dirComboStf + '/data_sources.txt', 'w')
Expand Down Expand Up @@ -547,10 +637,10 @@ def calibrateCombo(self):
else:
if self.deblend == 1:
fileMain = 'mag%s%s_%s_%3.1fd_stf.lis' % \
(self.epoch, self.imgSuffix, self.filt, self.corrMain)
(self.epoch, self.imgSuffix, self.filt, self.corrMain[0])
else:
fileMain = 'mag%s%s_%s_%3.1f_stf.lis' % \
(self.epoch, self.imgSuffix, self.filt, self.corrMain)
(self.epoch, self.imgSuffix, self.filt, self.corrMain[0])
print(cmd + fileMain)

# Now call from within python... don't bother with command line anymore.
Expand All @@ -565,14 +655,14 @@ def calibrateCombo(self):
for ss in range(self.numSubMaps):
if self.type == 'speckle':
fileSub = 'm%s_%d_%3.1f_stf.lis' % \
(self.epoch, ss+1, self.corrSub)
(self.epoch, ss+1, self.corrSub[0])
else:
if self.deblend == 1:
fileSub = 'm%s%s_%s_%d_%3.1fd_stf.lis' % \
(self.epoch, self.imgSuffix, self.filt, ss+1, self.corrSub)
(self.epoch, self.imgSuffix, self.filt, ss+1, self.corrSub[0])
else:
fileSub = 'm%s%s_%s_%d_%3.1f_stf.lis' % \
(self.epoch, self.imgSuffix, self.filt, ss+1, self.corrSub)
(self.epoch, self.imgSuffix, self.filt, ss+1, self.corrSub[0])

print(cmd + fileSub)

Expand Down Expand Up @@ -656,24 +746,24 @@ def alignCombo(self):
os.chdir(self.dirComboAln)

# Put the files in to the align*.list file
alnList1 = 'align%s%s_%3.1f.list' % (self.imgSuffix, file_ext, self.corrMain)
alnList2 = 'align%s%s_%3.1f_named.list' % (self.imgSuffix, file_ext, self.corrMain)
alnList1 = 'align%s%s_%3.1f.list' % (self.imgSuffix, file_ext, self.corrMain[0])
alnList2 = 'align%s%s_%3.1f_named.list' % (self.imgSuffix, file_ext, self.corrMain[0])


_list = open(alnList1, 'w')
if self.deblend == 1:
_list.write('../mag%s%s%s_%3.1fd_stf_cal.lis %d ref\n' %
(self.epoch, self.imgSuffix, file_ext, self.corrMain, alignType))
(self.epoch, self.imgSuffix, file_ext, self.corrMain[0], alignType))
else:
_list.write('../mag%s%s%s_%3.1f_stf_cal.lis %d ref\n' %
(self.epoch, self.imgSuffix, file_ext, self.corrMain, alignType))
(self.epoch, self.imgSuffix, file_ext, self.corrMain[0], alignType))
for ss in range(self.numSubMaps):
if self.deblend == 1:
_list.write('../m%s%s%s_%d_%3.1fd_stf_cal.lis %d\n' %
(self.epoch, self.imgSuffix, file_ext, ss+1, self.corrSub, alignType))
(self.epoch, self.imgSuffix, file_ext, ss+1, self.corrSub[0], alignType))
else:
_list.write('../m%s%s%s_%d_%3.1f_stf_cal.lis %d\n' %
(self.epoch, self.imgSuffix, file_ext, ss+1, self.corrSub, alignType))
(self.epoch, self.imgSuffix, file_ext, ss+1, self.corrSub[0], alignType))


_list.close()
Expand All @@ -682,7 +772,7 @@ def alignCombo(self):

# Make an unlabeled version
cmd = 'java -Xmx1024m align %s ' % (self.alignFlags)
cmd += '-r align%s%s_%3.1f ' % (self.imgSuffix, file_ext, self.corrMain)
cmd += '-r align%s%s_%3.1f ' % (self.imgSuffix, file_ext, self.corrMain[0])
cmd += alnList1
print(cmd)
#os.system(cmd)
Expand All @@ -700,8 +790,9 @@ def alignCombo(self):

if (self.orbitlist != None) and (self.orbitlist != ''):
cmd += '-o %s ' % self.orbitlist

cmd += '-r align%s%s_%3.1f_named ' % (self.imgSuffix, file_ext, self.corrMain)

cmd += '-r align%s%s_%3.1f_named ' % (self.imgSuffix, file_ext, self.corrMain[0])

cmd += alnList2
print(cmd)

Expand All @@ -710,33 +801,33 @@ def alignCombo(self):


align_options = 'align%s%s_%3.1f %d -e' % \
(self.imgSuffix, file_ext, self.corrMain, self.minSubMaps)
(self.imgSuffix, file_ext, self.corrMain[0], self.minSubMaps)
align_rms.run(align_options.split())

align_options = 'align%s%s_%3.1f_named %d -e' % \
(self.imgSuffix, file_ext, self.corrMain, self.minSubMaps)
(self.imgSuffix, file_ext, self.corrMain[0], self.minSubMaps)
align_rms.run(align_options.split())


# Move the resulting files to their final resting place
os.rename('align%s%s_%3.1f_rms.lis' %
(self.imgSuffix, file_ext, self.corrMain),
(self.imgSuffix, file_ext, self.corrMain[0]),
'../mag%s%s%s_rms.lis' %
(self.epoch, self.imgSuffix, file_ext))
os.rename('align%s%s_%3.1f_named_rms.lis' %
(self.imgSuffix, file_ext, self.corrMain),
(self.imgSuffix, file_ext, self.corrMain[0]),
'../mag%s%s%s_rms_named.lis' %
(self.epoch, self.imgSuffix, file_ext))

# Copy over the label.dat and orbit.dat file that was used.
shutil.copyfile(self.labellist,
'align%s%s_%3.1f_named_label_list.txt' %
(self.imgSuffix, file_ext, self.corrMain))
(self.imgSuffix, file_ext, self.corrMain[0]))

if (self.orbitlist != None) and (self.orbitlist != ''):
shutil.copyfile(self.orbitlist,
'align%s%s_%3.1f_named_orbit_list.txt' %
(self.imgSuffix, file_ext, self.corrMain))
(self.imgSuffix, file_ext, self.corrMain[0]))

# Now plot up the results
plotSuffix = self.imgSuffix + file_ext
Expand Down

0 comments on commit 8d664b5

Please sign in to comment.