Skip to content

Commit

Permalink
Remove random snake casing and rename some variables
Browse files Browse the repository at this point in the history
  • Loading branch information
mfisherlevine committed Aug 16, 2024
1 parent 99b720e commit 74ed1e4
Showing 1 changed file with 47 additions and 47 deletions.
94 changes: 47 additions & 47 deletions python/lsst/atmospec/spectroFlat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,42 @@
from scipy.ndimage import median_filter, uniform_filter, gaussian_filter


def find_flats(butler, cameraName='LATISS', filter='empty', disperser='empty', obs_type='flat'):
def findFlats(butler, cameraName='LATISS', filter='empty', disperser='empty', obsType='flat'):
registry = butler.registry
physical_filter = f'{filter}~{disperser}'
physicalFilter = f'{filter}~{disperser}'
where = (
f"instrument='{cameraName}' AND physical_filter='{physical_filter}' "
f"AND exposure.observation_type='{obs_type}'"
f"instrument='{cameraName}' AND physical_filter='{physicalFilter}' "
f"AND exposure.observation_type='{obsType}'"
)
flat_records = list(registry.queryDimensionRecords('exposure', where=where))
flatRecords = list(registry.queryDimensionRecords('exposure', where=where))

if len(flat_records) == 0:
if len(flatRecords) == 0:
raise ValueError(
'WARNING: No flats found with the selected settings: \n'
f'{cameraName=} {physical_filter=} {obs_type=}')
f'{cameraName=} {physicalFilter=} {obsType=}')

flat_dates = np.sort(np.unique([flat_.day_obs for flat_ in flat_records]))
ids_ = [flat_.id for flat_ in flat_records]
flatDates = np.sort(np.unique([r.day_obs for r in flatRecords]))
ids = [r.id for r in flatRecords]

flat_dict_ids = {}
for date_ in flat_dates:
flat_dict_ids[date_] = []
for id_ in ids_:
if str(date_) in str(id_):
flat_dict_ids[date_].append(id_)
flat_dict_ids[date_] = np.sort(flat_dict_ids[date_])
flatDictIds = {}
for date in flatDates:
flatDictIds[date] = []
for expId in ids:
if str(date) in str(expId):
flatDictIds[date].append(expId)
flatDictIds[date] = np.sort(flatDictIds[date])

return flat_dates, flat_dict_ids
return flatDates, flatDictIds


def getCertifiedFlat(butler, dataId, filter='empty', disperser='empty'):
flat_dates, flat_ids = find_flats(butler, filter=filter, disperser=disperser)
obs_day = dataId // 100_000
date_diff = int(obs_day) - flat_dates
closest_date = flat_dates[np.argmax(date_diff[date_diff <= 0])]
flatDates, flatIds = findFlats(butler, filter=filter, disperser=disperser)
dayObs = dataId // 100_000
dateDiff = int(dayObs) - flatDates
closestDate = flatDates[np.argmax(dateDiff[dateDiff <= 0])]
# load flat
flat_id = flat_ids[closest_date][-1]
certifiedFlat = butler.get('flat', instrument='LATISS', exposure=flat_id, detector=0,
flatId = flatIds[closestDate][-1]
certifiedFlat = butler.get('flat', instrument='LATISS', exposure=flatId, detector=0,
collections=['LATISS/calib', 'LATISS/raw/all'])
return certifiedFlat

Expand All @@ -56,8 +56,8 @@ def getGainDict(exposure):

def getPTCGainDict(butler):
ptc = butler.get('ptc', instrument="LATISS", detector=0, collections='u/cslage/sdf/latiss/ptc_20220927J')
PTCGainDict = ptc.gain
return PTCGainDict
ptcGainDict = ptc.gain
return ptcGainDict


def makeGainFlat(certifiedFlat, gainDict, invertGains=False):
Expand Down Expand Up @@ -88,24 +88,24 @@ def makeGainFlat(certifiedFlat, gainDict, invertGains=False):
ampNames = set(list(a.getName() for a in detector))
assert set(gainDict.keys()) == ampNames

gainDict_norm = {}
gainDictNorm = {}
mean = np.mean(list(gainDict.values()))
for amp in gainDict.keys():
gainDict_norm[amp] = gainDict[amp] / mean
gainDictNorm[amp] = gainDict[amp] / mean

for amp in detector:
bbox = amp.getBBox()
if invertGains:
flat[bbox].maskedImage.image.array[:, :] = 1. / gainDict_norm[amp.getName()]
flat[bbox].maskedImage.image.array[:, :] = 1. / gainDictNorm[amp.getName()]
else:
flat[bbox].maskedImage.image.array[:, :] = gainDict_norm[amp.getName()]
flat[bbox].maskedImage.image.array[:, :] = gainDictNorm[amp.getName()]
flat.maskedImage.mask[:] = 0x0
flat.maskedImage.variance[:] = 0.0

return flat


def makeOpticFlat(certifiedFlat, kernel='mean', window_size=40, mode='mirror', percentile=1.):
def makeOpticFlat(certifiedFlat, kernel='mean', windowSize=40, mode='mirror', percentile=1.):
logger = logging.getLogger()
opticFlat = certifiedFlat.clone()

Expand All @@ -115,10 +115,10 @@ def makeOpticFlat(certifiedFlat, kernel='mean', window_size=40, mode='mirror', p
data = np.copy(opticFlat[bbox].maskedImage.image.array[:, :])
data /= np.median(data)
if kernel == 'gauss':
logger.info(f'Pixel flat smoothing with Gaussian filter with {window_size=}.')
logger.info(f'Pixel flat smoothing with Gaussian filter with {windowSize=}.')
# 'ATTENTION: window size should be equal to Gaussian standard
# deviation (sigma)'
opticFlat[bbox].maskedImage.image.array[:, :] = gaussian_filter(data, sigma=window_size,
opticFlat[bbox].maskedImage.image.array[:, :] = gaussian_filter(data, sigma=windowSize,
mode=mode)
elif kernel == 'mean':
logger.info(
Expand All @@ -129,14 +129,14 @@ def makeOpticFlat(certifiedFlat, kernel='mean', window_size=40, mode='mirror', p
data >= np.percentile(data.flatten(), percentile)) * (
data <= np.percentile(data.flatten(), 100. - percentile)
)
interp_array = np.copy(data)
interp_array[~mask] = np.median(data)
opticFlat[bbox].maskedImage.image.array[:, :] = uniform_filter(interp_array, size=window_size,
interpArray = np.copy(data)
interpArray[~mask] = np.median(data)
opticFlat[bbox].maskedImage.image.array[:, :] = uniform_filter(interpArray, size=windowSize,
mode=mode)
elif kernel == 'median':
logger.info(f'Pixel flat smoothing with median filter with {window_size=}.')
logger.info(f'Pixel flat smoothing with median filter with {windowSize=}.')
opticFlat[bbox].maskedImage.image.array[:, :] = median_filter(data,
size=(window_size, window_size),
size=(windowSize, windowSize),
mode=mode)
else:
raise ValueError(f'Kernel must be within ["mean", "median", "gauss"]. Got {kernel=}.')
Expand All @@ -149,15 +149,15 @@ def makeOpticFlat(certifiedFlat, kernel='mean', window_size=40, mode='mirror', p
def makePixelFlat(
certifiedFlat,
kernel='mean',
window_size=40,
windowSize=40,
mode='mirror',
percentile=1.,
remove_background=True
removeBackground=True
):
logger = logging.getLogger()
logger.info(f'Window size for {kernel} smoothing = {window_size}')
logger.info(f'Window size for {kernel} smoothing = {windowSize}')
pixelFlat = certifiedFlat.clone()
opticFlat = makeOpticFlat(certifiedFlat, kernel=kernel, window_size=window_size, mode=mode,
opticFlat = makeOpticFlat(certifiedFlat, kernel=kernel, windowSize=windowSize, mode=mode,
percentile=percentile)

detector = opticFlat.getDetector()
Expand All @@ -166,7 +166,7 @@ def makePixelFlat(
ampData = pixelFlat[bbox].maskedImage.image.array[:, :]
ampData /= np.median(ampData)
ampData /= opticFlat[bbox].maskedImage.image.array[:, :]
if remove_background:
if removeBackground:
from astropy.stats import SigmaClip
from photutils.background import Background2D, MedianBackground
sigma_clip = SigmaClip(sigma=3.0)
Expand All @@ -186,16 +186,16 @@ def makeSensorFlat(
certifiedFlat,
gainDict,
kernel='mean',
window_size=40,
windowSize=40,
mode='mirror',
percentile=1.,
invertGains=False,
remove_background=True
removeBackground=True
):
logger = logging.getLogger()
logger.info(f'Window size for {kernel} smoothing = {window_size}')
pixelFlat = makePixelFlat(certifiedFlat, kernel=kernel, window_size=window_size, mode=mode,
percentile=percentile, remove_background=remove_background)
logger.info(f'Window size for {kernel} smoothing = {windowSize}')
pixelFlat = makePixelFlat(certifiedFlat, kernel=kernel, windowSize=windowSize, mode=mode,
percentile=percentile, removeBackground=removeBackground)
gainFlat = makeGainFlat(certifiedFlat, gainDict, invertGains=invertGains)
sensorFlat = pixelFlat.clone()

Expand Down

0 comments on commit 74ed1e4

Please sign in to comment.