From da9475342332cce20b285c46f560c69a4e198e69 Mon Sep 17 00:00:00 2001 From: Mathieu Doucet <matd10@yahoo.com> Date: Mon, 6 Jan 2025 12:40:48 -0500 Subject: [PATCH] ruff it --- reduction/lr_reduction/event_reduction.py | 574 +++++++++++++--------- reduction/lr_reduction/workflow.py | 178 +++---- 2 files changed, 431 insertions(+), 321 deletions(-) diff --git a/reduction/lr_reduction/event_reduction.py b/reduction/lr_reduction/event_reduction.py index 354e4c7..ed69866 100644 --- a/reduction/lr_reduction/event_reduction.py +++ b/reduction/lr_reduction/event_reduction.py @@ -1,6 +1,7 @@ """ - Event based reduction for the Liquids Reflectometer +Event based reduction for the Liquids Reflectometer """ + import datetime import json import os @@ -17,29 +18,30 @@ NEUTRON_MASS = 1.675e-27 # kg # Attenuators: PV name and thickness in cm -CD_ATTENUATORS = [['BL4B:Actuator:50MRb', 0.0058], - ['BL4B:Actuator:100MRb', 0.0122], - ['BL4B:Actuator:200MRb', 0.0244], # Uncalibrated - ['BL4B:Actuator:400MRb', 0.0488], # Uncalibrated - ] +CD_ATTENUATORS = [ + ["BL4B:Actuator:50MRb", 0.0058], + ["BL4B:Actuator:100MRb", 0.0122], + ["BL4B:Actuator:200MRb", 0.0244], # Uncalibrated + ["BL4B:Actuator:400MRb", 0.0488], # Uncalibrated +] def get_wl_range(ws): """ - Determine TOF range from the data + Determine TOF range from the data - :param ws: (Mantid workspace) workspace to work with + :param ws: (Mantid workspace) workspace to work with - :return: (list) [min, max] wavelength range + :return: (list) [min, max] wavelength range """ run_object = ws.getRun() - wl = run_object.getProperty('LambdaRequest').value[0] - chopper_speed = run_object.getProperty('SpeedRequest1').value[0] + wl = run_object.getProperty("LambdaRequest").value[0] + chopper_speed = run_object.getProperty("SpeedRequest1").value[0] # Cut the edges by using a width of 2.6 A - wl_min = (wl - 1.3 * 60.0 / chopper_speed) - wl_max = (wl + 1.3 * 60.0 / chopper_speed) + wl_min = wl - 1.3 * 60.0 / chopper_speed + wl_max = wl + 1.3 * 60.0 / chopper_speed return [wl_min, wl_max] @@ -61,11 +63,11 @@ def get_q_binning(q_min=0.001, q_max=0.15, q_step=-0.02): An array of Q values based on the specified binning. """ if q_step > 0: - n_steps = int((q_max-q_min)/q_step) + n_steps = int((q_max - q_min) / q_step) return q_min + np.asarray([q_step * i for i in range(n_steps)]) else: - _step = 1.0+np.abs(q_step) - n_steps = int(np.log(q_max/q_min)/np.log(_step)) + _step = 1.0 + np.abs(q_step) + n_steps = int(np.log(q_max / q_min) / np.log(_step)) return q_min * np.asarray([_step**i for i in range(n_steps)]) @@ -94,40 +96,40 @@ def get_attenuation_info(ws): def read_settings(ws): """ - Read settings file and return values for the given timestamp + Read settings file and return values for the given timestamp - :param ws: Mantid workspace - :return: (dict) dictionary with settings + :param ws: Mantid workspace + :return: (dict) dictionary with settings """ settings = dict() package_dir, _ = os.path.split(__file__) - t = ws.getRun()['start_time'].value.split('T')[0] + t = ws.getRun()["start_time"].value.split("T")[0] timestamp = datetime.date.fromisoformat(t) - with open(os.path.join(package_dir, 'settings.json'), 'r') as fd: + with open(os.path.join(package_dir, "settings.json"), "r") as fd: data = json.load(fd) for key in data.keys(): chosen_value = None delta_time = None for item in data[key]: - valid_from = datetime.date.fromisoformat(item['from']) + valid_from = datetime.date.fromisoformat(item["from"]) delta = valid_from - timestamp if delta_time is None or (delta.total_seconds() < 0 and delta > delta_time): delta_time = delta - chosen_value = item['value'] + chosen_value = item["value"] settings[key] = chosen_value return settings def process_attenuation(ws, thickness=0): """ - Correct for absorption by assigning weight to each neutron event + Correct for absorption by assigning weight to each neutron event - :param ws: (Mantid workspace) workspace to correct - :param thickness: (float) attenuator thickness in cm (default is 0). + :param ws: (Mantid workspace) workspace to correct + :param thickness: (float) attenuator thickness in cm (default is 0). - :return: (Mantid workspace) corrected workspace + :return: (Mantid workspace) corrected workspace """ settings = read_settings(ws) if "source-det-distance" in settings: @@ -138,19 +140,22 @@ def process_attenuation(ws, thickness=0): constant = 1e-4 * NEUTRON_MASS * SDD / PLANCK_CONSTANT package_dir, _ = os.path.split(__file__) - mu_abs = np.loadtxt(os.path.join(package_dir, 'Cd-abs-factors.txt')).T + mu_abs = np.loadtxt(os.path.join(package_dir, "Cd-abs-factors.txt")).T wl_model = mu_abs[0] # Turn model into a histogram wl_step = wl_model[-1] - wl_model[-2] final_wl = wl_model[-1] + wl_step - wl_model = np.append(wl_model, [final_wl,]) + wl_model = np.append( + wl_model, + [ + final_wl, + ], + ) mu_model = mu_abs[1] tof_model = constant * wl_model - transmission = 1/np.exp(-mu_model * thickness) - transmission_ws = api.CreateWorkspace(OutputWorkspace='transmission', - DataX=tof_model, DataY=transmission, - UnitX='TOF', NSpec=1) + transmission = 1 / np.exp(-mu_model * thickness) + transmission_ws = api.CreateWorkspace(OutputWorkspace="transmission", DataX=tof_model, DataY=transmission, UnitX="TOF", NSpec=1) ws = api.Multiply(ws, transmission_ws, OutputWorkspace=str(ws)) return ws @@ -158,47 +163,50 @@ def process_attenuation(ws, thickness=0): def get_dead_time_correction(ws, template_data): """ - Compute dead time correction to be applied to the reflectivity curve. - The method will also try to load the error events from each of the - data files to ensure that we properly estimate the dead time correction. + Compute dead time correction to be applied to the reflectivity curve. + The method will also try to load the error events from each of the + data files to ensure that we properly estimate the dead time correction. - :param ws: (Mantid worksapce) workspace with raw data to compute correction for - :param template_data: (reduction_template_reader.ReductionParameters) - reduction parameters + :param ws: (Mantid worksapce) workspace with raw data to compute correction for + :param template_data: (reduction_template_reader.ReductionParameters) + reduction parameters - :return: (Mantid workspace) workspace with dead time correction to apply + :return: (Mantid workspace) workspace with dead time correction to apply """ tof_min = ws.getTofMin() tof_max = ws.getTofMax() run_number = ws.getRun().getProperty("run_number").value error_ws = api.LoadErrorEventsNexus("REF_L_%s" % run_number) - corr_ws = mantid_algorithm_exec(DeadTimeCorrection.SingleReadoutDeadTimeCorrection, - InputWorkspace=ws, - InputErrorEventsWorkspace=error_ws, - Paralyzable=template_data.paralyzable, - DeadTime=template_data.dead_time_value, - TOFStep=template_data.dead_time_tof_step, - TOFRange=[tof_min, tof_max], - OutputWorkspace="corr") + corr_ws = mantid_algorithm_exec( + DeadTimeCorrection.SingleReadoutDeadTimeCorrection, + InputWorkspace=ws, + InputErrorEventsWorkspace=error_ws, + Paralyzable=template_data.paralyzable, + DeadTime=template_data.dead_time_value, + TOFStep=template_data.dead_time_tof_step, + TOFRange=[tof_min, tof_max], + OutputWorkspace="corr", + ) corr_ws = api.Rebin(corr_ws, [tof_min, 10, tof_max]) return corr_ws + def apply_dead_time_correction(ws, template_data): """ - Apply dead time correction, and ensure that it is done only once - per workspace. + Apply dead time correction, and ensure that it is done only once + per workspace. - :param ws: (Mantid workspace) workspace with raw data to compute correction for - :param template_data: (reduction_template_reader.ReductionParameters) - reduction parameters + :param ws: (Mantid workspace) workspace with raw data to compute correction for + :param template_data: (reduction_template_reader.ReductionParameters) + reduction parameters - :return: (Mantid workspace) workspace with dead time correction applied + :return: (Mantid workspace) workspace with dead time correction applied """ - if 'dead_time_applied' not in ws.getRun(): + if "dead_time_applied" not in ws.getRun(): corr_ws = get_dead_time_correction(ws, template_data) ws = api.Multiply(ws, corr_ws, OutputWorkspace=str(ws)) - api.AddSampleLog(Workspace=ws, LogName="dead_time_applied", LogText='1', LogType="Number") + api.AddSampleLog(Workspace=ws, LogName="dead_time_applied", LogText="1", LogType="Number") return ws @@ -244,15 +252,30 @@ class EventReflectivity(object): DEFAULT_4B_SAMPLE_DET_DISTANCE = 1.83 DEFAULT_4B_SOURCE_DET_DISTANCE = 15.75 - def __init__(self, scattering_workspace, direct_workspace, - signal_peak, signal_bck, norm_peak, norm_bck, - specular_pixel, signal_low_res, norm_low_res, - q_min=None, q_step=-0.02, q_max=None, - tof_range=None, theta=1.0, instrument=None, - functional_background=False, dead_time=False, - paralyzable=True, dead_time_value=4.2, - dead_time_tof_step=100, use_emission_time=True): - + def __init__( + self, + scattering_workspace, + direct_workspace, + signal_peak, + signal_bck, + norm_peak, + norm_bck, + specular_pixel, + signal_low_res, + norm_low_res, + q_min=None, + q_step=-0.02, + q_max=None, + tof_range=None, + theta=1.0, + instrument=None, + functional_background=False, + dead_time=False, + paralyzable=True, + dead_time_value=4.2, + dead_time_tof_step=100, + use_emission_time=True, + ): if instrument in [self.INSTRUMENT_4A, self.INSTRUMENT_4B]: self.instrument = instrument else: @@ -285,13 +308,13 @@ def __init__(self, scattering_workspace, direct_workspace, # Process workspaces if self.tof_range is not None: - self._ws_sc = api.CropWorkspace(InputWorkspace=scattering_workspace, - XMin=tof_range[0], XMax=tof_range[1], - OutputWorkspace='_'+str(scattering_workspace)) + self._ws_sc = api.CropWorkspace( + InputWorkspace=scattering_workspace, XMin=tof_range[0], XMax=tof_range[1], OutputWorkspace="_" + str(scattering_workspace) + ) if direct_workspace is not None: - self._ws_db = api.CropWorkspace(InputWorkspace=direct_workspace, - XMin=tof_range[0], XMax=tof_range[1], - OutputWorkspace='_'+str(direct_workspace)) + self._ws_db = api.CropWorkspace( + InputWorkspace=direct_workspace, XMin=tof_range[0], XMax=tof_range[1], OutputWorkspace="_" + str(direct_workspace) + ) else: self._ws_db = None else: @@ -303,7 +326,7 @@ def __init__(self, scattering_workspace, direct_workspace, def extract_meta_data(self): """ - Extract meta data from the data file. + Extract meta data from the loaded data file. """ # Get instrument parameters settings = read_settings(self._ws_sc) @@ -324,12 +347,12 @@ def extract_meta_data(self): if self.tof_range is None: self.wl_range = get_wl_range(self._ws_sc) else: - self.wl_range = [self.tof_range[0] / self.constant, self.tof_range[1] / self.constant] + self.wl_range = [self.tof_range[0] / self.constant, self.tof_range[1] / self.constant] # q_min and q_max are the boundaries for the final Q binning # We also hold on to the true Q range covered by the measurement - self.q_min_meas = 4.0*np.pi/self.wl_range[1] * np.fabs(np.sin(self.theta)) - self.q_max_meas = 4.0*np.pi/self.wl_range[0] * np.fabs(np.sin(self.theta)) + self.q_min_meas = 4.0 * np.pi / self.wl_range[1] * np.fabs(np.sin(self.theta)) + self.q_max_meas = 4.0 * np.pi / self.wl_range[0] * np.fabs(np.sin(self.theta)) if self.q_min is None: self.q_min = self.q_min_meas @@ -341,29 +364,29 @@ def extract_meta_data(self): # Catch options that can be turned off if self.signal_low_res is None: - self.signal_low_res = [1, self.n_x-1] + self.signal_low_res = [1, self.n_x - 1] if self.norm_low_res is None: - self.norm_low_res = [1, self.n_x-1] + self.norm_low_res = [1, self.n_x - 1] def extract_meta_data_4A(self): """ - 4A-specific meta data + 4A-specific meta data """ run_object = self._ws_sc.getRun() - self.det_distance = run_object['SampleDetDis'].getStatistics().mean - source_sample_distance = run_object['ModeratorSamDis'].getStatistics().mean - if run_object['SampleDetDis'].units not in ['m', 'meter']: + self.det_distance = run_object["SampleDetDis"].getStatistics().mean + source_sample_distance = run_object["ModeratorSamDis"].getStatistics().mean + if run_object["SampleDetDis"].units not in ["m", "meter"]: self.det_distance /= 1000.0 - if run_object['ModeratorSamDis'].units not in ['m', 'meter']: + if run_object["ModeratorSamDis"].units not in ["m", "meter"]: source_sample_distance /= 1000.0 self.source_detector_distance = source_sample_distance + self.det_distance def extract_meta_data_4B(self): """ - 4B-specific meta data + 4B-specific meta data - Distance from source to sample was 13.63 meters prior to the source - to detector distance being determined with Bragg edges to be 15.75 m. + Distance from source to sample was 13.63 meters prior to the source + to detector distance being determined with Bragg edges to be 15.75 m. """ settings = read_settings(self._ws_sc) @@ -390,6 +413,11 @@ def extract_meta_data_4B(self): self.source_detector_distance = self.DEFAULT_4B_SOURCE_DET_DISTANCE def __repr__(self): + """ + Generate a string representation of the reduction settings. + + :return: (str) string representation of the reduction settings + """ output = "Reduction settings:\n" output += " sample-det: %s\n" % self.det_distance output += " source-det: %s\n" % self.source_detector_distance @@ -403,12 +431,14 @@ def __repr__(self): def to_dict(self): """ - Returns meta-data to be used/stored. + Returns meta-data to be used/stored. + + :return: (dict) dictionary with meta-data """ if self._ws_sc.getRun().hasProperty("start_time"): start_time = self._ws_sc.getRun().getProperty("start_time").value else: - start_time = 'live' + start_time = "live" experiment = self._ws_sc.getRun().getProperty("experiment_identifier").value run_number = self._ws_sc.getRun().getProperty("run_number").value sequence_number = int(self._ws_sc.getRun().getProperty("sequence_number").value[0]) @@ -421,25 +451,42 @@ def to_dict(self): norm_run = 0 dq0 = 0 - return dict(wl_min=self.wl_range[0], wl_max=self.wl_range[1], - q_min=self.q_min_meas, q_max=self.q_max_meas, theta=self.theta, - start_time=start_time, experiment=experiment, run_number=run_number, - run_title=run_title, norm_run=norm_run, time=time.ctime(), - dq0=dq0, dq_over_q=self.dq_over_q, sequence_number=sequence_number, - sequence_id=sequence_id, q_summing=self.q_summing) - - def specular(self, q_summing=False, tof_weighted=False, bck_in_q=False, - clean=False, normalize=True): + return dict( + wl_min=self.wl_range[0], + wl_max=self.wl_range[1], + q_min=self.q_min_meas, + q_max=self.q_max_meas, + theta=self.theta, + start_time=start_time, + experiment=experiment, + run_number=run_number, + run_title=run_title, + norm_run=norm_run, + time=time.ctime(), + dq0=dq0, + dq_over_q=self.dq_over_q, + sequence_number=sequence_number, + sequence_id=sequence_id, + q_summing=self.q_summing, + ) + + def specular(self, q_summing=False, tof_weighted=False, bck_in_q=False, clean=False, normalize=True): """ - Compute specular reflectivity. + Compute specular reflectivity. + + For constant-Q binning, it's preferred to use tof_weighted=True. + + :param q_summing: turns on constant-Q binning + :param tof_weighted: if True, binning will be done by weighting each event to the DB distribution + :param bck_in_q: if True, the background will be estimated in Q space using the constant-Q binning approach + :param clean: if True, and Q summing is True, then leading artifact will be removed + :param normalize: if True, and tof_weighted is False, normalization will be skipped - For constant-Q binning, it's preferred to use tof_weighted=True. + :return: A tuple containing: - :param q_summing: turns on constant-Q binning - :param tof_weighted: if True, binning will be done by weighting each event to the DB distribution - :param bck_in_q: if True, the background will be estimated in Q space using the constant-Q binning approach - :param clean: if True, and Q summing is True, then leading artifact will be removed - :param normalize: if True, and tof_weighted is False, normalization will be skipped + - q_bins: The Q bin boundaries + - refl: The reflectivity values + - d_refl: The uncertainties in the reflectivity values """ if tof_weighted: self.specular_weighted(q_summing=q_summing, bck_in_q=bck_in_q) @@ -447,7 +494,7 @@ def specular(self, q_summing=False, tof_weighted=False, bck_in_q=False, self.specular_unweighted(q_summing=q_summing, normalize=normalize) # Remove leading zeros - r = np.trim_zeros(self.refl, 'f') + r = np.trim_zeros(self.refl, "f") trim = len(self.refl) - len(r) self.refl = self.refl[trim:] self.d_refl = self.d_refl[trim:] @@ -471,15 +518,30 @@ def specular(self, q_summing=False, tof_weighted=False, bck_in_q=False, def specular_unweighted(self, q_summing=False, normalize=True): """ - Simple specular reflectivity calculation. This is the same approach as the - original LR reduction, which sums up pixels without constant-Q binning. - The original approach bins in TOF, then rebins the final results after - transformation to Q. This approach bins directly to Q. + Simple specular reflectivity calculation. This is the same approach as the + original LR reduction, which sums up pixels without constant-Q binning. + The original approach bins in TOF, then rebins the final results after + transformation to Q. This approach bins directly to Q. + + :param q_summing: (bool, optional) If True, sum the data in Q-space (default is False). + :param normalize: (bool, optional) If True, normalize the reflectivity by the direct + beam (default is True). + + :return: A tuple containing: + + - q_bins: The Q bin boundaries + - refl: The reflectivity values + - d_refl: The uncertainties in the reflectivity values """ # Scattering data - refl, d_refl = self._reflectivity(self._ws_sc, peak_position=self.specular_pixel, - peak=self.signal_peak, low_res=self.signal_low_res, - theta=self.theta, q_summing=q_summing) + refl, d_refl = self._reflectivity( + self._ws_sc, + peak_position=self.specular_pixel, + peak=self.signal_peak, + low_res=self.signal_low_res, + theta=self.theta, + q_summing=q_summing, + ) # Remove background if self.signal_bck is not None: @@ -493,19 +555,21 @@ def specular_unweighted(self, q_summing=False, normalize=True): # we can bin the DB according to the same transform instead of binning and dividing in TOF. # This is mathematically equivalent and convenient in terms of abstraction for later # use for the constant-Q calculation elsewhere in the code. - norm, d_norm = self._reflectivity(self._ws_db, peak_position=0, - peak=self.norm_peak, low_res=self.norm_low_res, - theta=self.theta, q_summing=False) + norm, d_norm = self._reflectivity( + self._ws_db, peak_position=0, peak=self.norm_peak, low_res=self.norm_low_res, theta=self.theta, q_summing=False + ) # Direct beam background could be added here. The effect will be negligible. if self.norm_bck is not None: norm_bck, d_norm_bck = self.norm_bck_subtraction() norm -= norm_bck d_norm = np.sqrt(d_norm**2 + d_norm_bck**2) - db_bins = norm>0 + db_bins = norm > 0 - refl[db_bins] = refl[db_bins]/norm[db_bins] - d_refl[db_bins] = np.sqrt(d_refl[db_bins]**2 / norm[db_bins]**2 + refl[db_bins]**2 * d_norm[db_bins]**2 / norm[db_bins]**4) + refl[db_bins] = refl[db_bins] / norm[db_bins] + d_refl[db_bins] = np.sqrt( + d_refl[db_bins] ** 2 / norm[db_bins] ** 2 + refl[db_bins] ** 2 * d_norm[db_bins] ** 2 / norm[db_bins] ** 4 + ) # Hold on to normalization to be able to diagnose issues later self.norm = norm[db_bins] @@ -523,25 +587,40 @@ def specular_unweighted(self, q_summing=False, normalize=True): def specular_weighted(self, q_summing=True, bck_in_q=False): """ - Compute reflectivity by weighting each event by flux. - This allows for summing in Q and to estimate the background in either Q - or pixels next to the peak. + Compute reflectivity by weighting each event by flux. + This allows for summing in Q and to estimate the background in either Q + or pixels next to the peak. + + :param q_summing: (bool, optional) If True, sum the data in Q-space (default is False). + :param bck_in_q: (bool, optional) If True, subtract background along Q lines (default is False). + + :return: A tuple containing: + + - q_bins: The Q bin boundaries + - refl: The reflectivity values + - d_refl: The uncertainties in the reflectivity values """ # Event weights for normalization db_charge = self._ws_db.getRun().getProtonCharge() wl_events, wl_weights = self._get_events(self._ws_db, self.norm_peak, self.norm_low_res) wl_dist, wl_bins = np.histogram(wl_events, bins=100, weights=wl_weights) _bin_width = wl_bins[1:] - wl_bins[:-1] - wl_dist = wl_dist/db_charge/_bin_width - wl_middle = [(wl_bins[i+1]+wl_bins[i])/2.0 for i in range(len(wl_bins)-1)] - - refl, d_refl = self._reflectivity(self._ws_sc, peak_position=self.specular_pixel, - peak=self.signal_peak, low_res=self.signal_low_res, - theta=self.theta, q_summing=q_summing, wl_dist=wl_dist, wl_bins=wl_middle) + wl_dist = wl_dist / db_charge / _bin_width + wl_middle = [(wl_bins[i + 1] + wl_bins[i]) / 2.0 for i in range(len(wl_bins) - 1)] + + refl, d_refl = self._reflectivity( + self._ws_sc, + peak_position=self.specular_pixel, + peak=self.signal_peak, + low_res=self.signal_low_res, + theta=self.theta, + q_summing=q_summing, + wl_dist=wl_dist, + wl_bins=wl_middle, + ) if self.signal_bck is not None: - refl_bck, d_refl_bck = self.bck_subtraction(wl_dist=wl_dist, wl_bins=wl_middle, - q_summing=bck_in_q) + refl_bck, d_refl_bck = self.bck_subtraction(wl_dist=wl_dist, wl_bins=wl_middle, q_summing=bck_in_q) refl -= refl_bck d_refl = np.sqrt(d_refl**2 + d_refl_bck**2) @@ -551,28 +630,34 @@ def specular_weighted(self, q_summing=True, bck_in_q=False): def _roi_integration(self, ws, peak, low_res, q_bins=None, wl_dist=None, wl_bins=None, q_summing=False): """ - Integrate a region of interest and normalize by the number of included pixels. + Integrate a region of interest and normalize by the number of included pixels. - The options are the same as for the reflectivity calculation. - If wl_dist and wl_bins are supplied, the events will be weighted by flux. - If q_summing is True, the angle of each neutron will be recalculated according to - their position on the detector and place in the proper Q bin. + The options are the same as for the reflectivity calculation. + If wl_dist and wl_bins are supplied, the events will be weighted by flux. + If q_summing is True, the angle of each neutron will be recalculated according to + their position on the detector and place in the proper Q bin. """ q_bins = self.q_bins if q_bins is None else q_bins - refl_bck, d_refl_bck = self._reflectivity(ws, peak_position=0, q_bins=q_bins, - peak=peak, low_res=low_res, - theta=self.theta, q_summing=q_summing, - wl_dist=wl_dist, wl_bins=wl_bins) - - _pixel_area = (peak[1]-peak[0]+1.0) + refl_bck, d_refl_bck = self._reflectivity( + ws, + peak_position=0, + q_bins=q_bins, + peak=peak, + low_res=low_res, + theta=self.theta, + q_summing=q_summing, + wl_dist=wl_dist, + wl_bins=wl_bins, + ) + + _pixel_area = peak[1] - peak[0] + 1.0 refl_bck /= _pixel_area d_refl_bck /= _pixel_area return refl_bck, d_refl_bck - def bck_subtraction(self, normalize_to_single_pixel=False, q_bins=None, wl_dist=None, wl_bins=None, - q_summing=False): + def bck_subtraction(self, normalize_to_single_pixel=False, q_bins=None, wl_dist=None, wl_bins=None, q_summing=False): """ - Higher-level call for background subtraction. Hides the ranges needed to define the ROI. + Higher-level call for background subtraction. Hides the ranges needed to define the ROI. """ # Sanity check if len(self.signal_bck) == 2 and self.use_functional_bck: @@ -582,63 +667,87 @@ def bck_subtraction(self, normalize_to_single_pixel=False, q_bins=None, wl_dist= self.use_functional_bck = False if self.use_functional_bck: - return background.functional_background(self._ws_sc, self, self.signal_peak, - self.signal_bck, self.signal_low_res, - normalize_to_single_pixel=normalize_to_single_pixel, - q_bins=q_bins, wl_dist=wl_dist, wl_bins=wl_bins, - q_summing=q_summing) + return background.functional_background( + self._ws_sc, + self, + self.signal_peak, + self.signal_bck, + self.signal_low_res, + normalize_to_single_pixel=normalize_to_single_pixel, + q_bins=q_bins, + wl_dist=wl_dist, + wl_bins=wl_bins, + q_summing=q_summing, + ) else: - return background.side_background(self._ws_sc, self, self.signal_peak, self.signal_bck, - self.signal_low_res, - normalize_to_single_pixel=normalize_to_single_pixel, - q_bins=q_bins, wl_dist=wl_dist, wl_bins=wl_bins, - q_summing=q_summing) + return background.side_background( + self._ws_sc, + self, + self.signal_peak, + self.signal_bck, + self.signal_low_res, + normalize_to_single_pixel=normalize_to_single_pixel, + q_bins=q_bins, + wl_dist=wl_dist, + wl_bins=wl_bins, + q_summing=q_summing, + ) def norm_bck_subtraction(self): """ - Higher-level call for background subtraction for the normalization run. + Higher-level call for background subtraction for the normalization run. """ - return background.side_background(self._ws_db, self, self.norm_peak, self.norm_bck, - self.norm_low_res, normalize_to_single_pixel=False) - - def slice(self, x_min=0.002, x_max=0.004, x_bins=None, z_bins=None, # noqa A003 - refl=None, d_refl=None, normalize=False): + return background.side_background( + self._ws_db, self, self.norm_peak, self.norm_bck, self.norm_low_res, normalize_to_single_pixel=False + ) + + def slice( + self, + x_min=0.002, + x_max=0.004, + x_bins=None, + z_bins=None, # noqa A003 + refl=None, + d_refl=None, + normalize=False, + ): """ - Retrieve a slice from the off-specular data. + Retrieve a slice from the off-specular data. """ x_bins = self._offspec_x_bins if x_bins is None else x_bins z_bins = self._offspec_z_bins if z_bins is None else z_bins refl = self._offspec_refl if refl is None else refl d_refl = self._offspec_d_refl if d_refl is None else d_refl - i_min = len(x_bins[x_bins<x_min]) - i_max = len(x_bins[x_bins<x_max]) + i_min = len(x_bins[x_bins < x_min]) + i_max = len(x_bins[x_bins < x_max]) _spec = np.sum(refl[i_min:i_max], axis=0) - _d_spec = np.sum( (d_refl[i_min:i_max])**2, axis=0) + _d_spec = np.sum((d_refl[i_min:i_max]) ** 2, axis=0) _d_spec = np.sqrt(_d_spec) if normalize: - _spec /= (i_max-i_min) - _d_spec /= (i_max-i_min) + _spec /= i_max - i_min + _d_spec /= i_max - i_min return z_bins, _spec, _d_spec - def _reflectivity(self, ws, peak_position, peak, low_res, theta, q_bins=None, q_summing=False, - wl_dist=None, wl_bins=None, sum_pixels=True): + def _reflectivity( + self, ws, peak_position, peak, low_res, theta, q_bins=None, q_summing=False, wl_dist=None, wl_bins=None, sum_pixels=True + ): """ - Assumes that the input workspace is normalized by proton charge. + Assumes that the input workspace is normalized by proton charge. """ charge = ws.getRun().getProtonCharge() _q_bins = self.q_bins if q_bins is None else q_bins - shape = len(_q_bins)-1 if sum_pixels else ((peak[1]-peak[0]+1), len(_q_bins)-1) + shape = len(_q_bins) - 1 if sum_pixels else ((peak[1] - peak[0] + 1), len(_q_bins) - 1) refl = np.zeros(shape) d_refl_sq = np.zeros(shape) counts = np.zeros(shape) _pixel_width = self.pixel_width if q_summing else 0.0 - for i in range(low_res[0], int(low_res[1]+1)): - for j in range(peak[0], int(peak[1]+1)): + for i in range(low_res[0], int(low_res[1] + 1)): + for j in range(peak[0], int(peak[1] + 1)): if self.instrument == self.INSTRUMENT_4A: pixel = j * self.n_y + i else: @@ -660,14 +769,14 @@ def _reflectivity(self, ws, peak_position, peak, low_res, theta, q_bins=None, q_ delta_theta_f = np.arctan(x_distance / self.det_distance) / 2.0 # Sign will depend on reflect up or down - ths_value = ws.getRun()['ths'].value[-1] + ths_value = ws.getRun()["ths"].value[-1] delta_theta_f *= np.sign(ths_value) - qz=4.0*np.pi/wl_list * np.sin(theta + delta_theta_f - d_theta) + qz = 4.0 * np.pi / wl_list * np.sin(theta + delta_theta_f - d_theta) qz = np.fabs(qz) if wl_dist is not None and wl_bins is not None: - wl_weights = 1.0/np.interp(wl_list, wl_bins, wl_dist, np.inf, np.inf) + wl_weights = 1.0 / np.interp(wl_list, wl_bins, wl_dist, np.inf, np.inf) hist_weights = wl_weights * qz / wl_list hist_weights *= event_weights _counts, _ = np.histogram(qz, bins=_q_bins, weights=hist_weights) @@ -676,14 +785,14 @@ def _reflectivity(self, ws, peak_position, peak, low_res, theta, q_bins=None, q_ refl += _counts counts += _norm else: - refl[j-peak[0]] += _counts - counts[j-peak[0]] += _norm + refl[j - peak[0]] += _counts + counts[j - peak[0]] += _norm else: _counts, _ = np.histogram(qz, bins=_q_bins, weights=event_weights) if sum_pixels: refl += _counts else: - refl[j-peak[0]] += _counts + refl[j - peak[0]] += _counts # The following is for information purposes if q_summing: @@ -692,9 +801,9 @@ def _reflectivity(self, ws, peak_position, peak, low_res, theta, q_bins=None, q_ delta_theta_f0 = np.arctan(x0 / self.det_distance) / 2.0 delta_theta_f1 = np.arctan(x1 / self.det_distance) / 2.0 - qz_max = 4.0*np.pi/self.tof_range[1]*self.constant * np.fabs(np.sin(theta + delta_theta_f0)) - qz_min = 4.0*np.pi/self.tof_range[1]*self.constant * np.fabs(np.sin(theta + delta_theta_f1)) - mid_point = (qz_max + qz_min)/2.0 + qz_max = 4.0 * np.pi / self.tof_range[1] * self.constant * np.fabs(np.sin(theta + delta_theta_f0)) + qz_min = 4.0 * np.pi / self.tof_range[1] * self.constant * np.fabs(np.sin(theta + delta_theta_f1)) + mid_point = (qz_max + qz_min) / 2.0 print("Qz range: ", qz_min, mid_point, qz_max) self.summing_threshold = mid_point @@ -705,7 +814,7 @@ def _reflectivity(self, ws, peak_position, peak, low_res, theta, q_bins=None, q_ if not sum_pixels: bin_size = np.tile(bin_size, [counts.shape[0], 1]) - d_refl_sq[non_zero] = refl[non_zero] / np.sqrt(counts[non_zero]) / charge / bin_size[non_zero] + d_refl_sq[non_zero] = refl[non_zero] / np.sqrt(counts[non_zero]) / charge / bin_size[non_zero] refl[non_zero] = refl[non_zero] / charge / bin_size[non_zero] else: d_refl_sq = np.sqrt(np.fabs(refl)) / charge @@ -715,13 +824,13 @@ def _reflectivity(self, ws, peak_position, peak, low_res, theta, q_bins=None, q_ def _get_events(self, ws, peak, low_res): """ - Return an array of wavelengths for a given workspace. + Return an array of wavelengths for a given workspace. """ wl_events = np.asarray([]) wl_weights = np.asarray([]) - for i in range(low_res[0], int(low_res[1]+1)): - for j in range(peak[0], int(peak[1]+1)): + for i in range(low_res[0], int(low_res[1] + 1)): + for j in range(peak[0], int(peak[1] + 1)): if self.instrument == self.INSTRUMENT_4A: pixel = j * self.n_y + i else: @@ -737,17 +846,16 @@ def _get_events(self, ws, peak, low_res): wl_weights = np.concatenate((wl_weights, weights)) return wl_events, wl_weights - def off_specular(self, x_axis=None, x_min=-0.015, x_max=0.015, x_npts=50, - z_min=None, z_max=None, z_npts=-120, bck_in_q=None): + def off_specular(self, x_axis=None, x_min=-0.015, x_max=0.015, x_npts=50, z_min=None, z_max=None, z_npts=-120, bck_in_q=None): """ - Compute off-specular - :param x_axis: Axis selection - :param x_min: Min value on x-axis - :param x_max: Max value on x-axis - :param x_npts: Number of points in x (negative will produce a log scale) - :param z_min: Min value on z-axis (if none, default Qz will be used) - :param z_max: Max value on z-axis (if none, default Qz will be used) - :param z_npts: Number of points in z (negative will produce a log scale) + Compute off-specular + :param x_axis: Axis selection + :param x_min: Min value on x-axis + :param x_max: Max value on x-axis + :param x_npts: Number of points in x (negative will produce a log scale) + :param z_min: Min value on z-axis (if none, default Qz will be used) + :param z_max: Max value on z-axis (if none, default Qz will be used) + :param z_npts: Number of points in z (negative will produce a log scale) """ # Z axis binning qz_bins = self.q_bins @@ -765,23 +873,23 @@ def off_specular(self, x_axis=None, x_min=-0.015, x_max=0.015, x_npts=50, wl_events, wl_weights = self._get_events(self._ws_db, self.norm_peak, self.norm_low_res) wl_dist, wl_bins = np.histogram(wl_events, bins=100, weights=wl_weights) - wl_middle = [(wl_bins[i+1]+wl_bins[i])/2.0 for i in range(len(wl_bins)-1)] + wl_middle = [(wl_bins[i + 1] + wl_bins[i]) / 2.0 for i in range(len(wl_bins) - 1)] - _refl, _d_refl = self._off_specular(self._ws_sc, wl_dist, wl_middle, qx_bins, qz_bins, - self.specular_pixel, self.theta, x_axis=x_axis) + _refl, _d_refl = self._off_specular( + self._ws_sc, wl_dist, wl_middle, qx_bins, qz_bins, self.specular_pixel, self.theta, x_axis=x_axis + ) db_charge = self._ws_db.getRun().getProtonCharge() - _refl *= db_charge * (wl_bins[1]-wl_bins[0]) - _d_refl *= db_charge * (wl_bins[1]-wl_bins[0]) + _refl *= db_charge * (wl_bins[1] - wl_bins[0]) + _d_refl *= db_charge * (wl_bins[1] - wl_bins[0]) # Background if self.signal_bck: if bck_in_q is None: print("Not implemented") else: - _, refl_bck, d_refl_bck = self.slice(bck_in_q[0], bck_in_q[1], - x_bins=qx_bins, z_bins=qz_bins, - refl=_refl, d_refl=_d_refl, - normalize=True) + _, refl_bck, d_refl_bck = self.slice( + bck_in_q[0], bck_in_q[1], x_bins=qx_bins, z_bins=qz_bins, refl=_refl, d_refl=_d_refl, normalize=True + ) _refl -= refl_bck _d_refl = np.sqrt(_d_refl**2 + d_refl_bck**2) @@ -794,12 +902,12 @@ def off_specular(self, x_axis=None, x_min=-0.015, x_max=0.015, x_npts=50, def _off_specular(self, ws, wl_dist, wl_bins, x_bins, z_bins, peak_position, theta, x_axis=None): charge = ws.getRun().getProtonCharge() - refl = np.zeros([len(x_bins)-1, len(z_bins)-1]) - counts = np.zeros([len(x_bins)-1, len(z_bins)-1]) + refl = np.zeros([len(x_bins) - 1, len(z_bins) - 1]) + counts = np.zeros([len(x_bins) - 1, len(z_bins) - 1]) for j in range(0, self.n_x): wl_list = np.asarray([]) - for i in range(self.signal_low_res[0], int(self.signal_low_res[1]+1)): + for i in range(self.signal_low_res[0], int(self.signal_low_res[1] + 1)): if self.instrument == self.INSTRUMENT_4A: pixel = j * self.n_y + i else: @@ -809,12 +917,12 @@ def _off_specular(self, ws, wl_dist, wl_bins, x_bins, z_bins, peak_position, the wl_list = np.concatenate((wl_events, wl_list)) k = 2.0 * np.pi / wl_list - wl_weights = 1.0/np.interp(wl_list, wl_bins, wl_dist, np.inf, np.inf) + wl_weights = 1.0 / np.interp(wl_list, wl_bins, wl_dist, np.inf, np.inf) - x_distance = float(j-peak_position) * self.pixel_width + x_distance = float(j - peak_position) * self.pixel_width delta_theta_f = np.arctan(x_distance / self.det_distance) # Sign will depend on reflect up or down - ths_value = ws.getRun()['ths'].value[-1] + ths_value = ws.getRun()["ths"].value[-1] delta_theta_f *= np.sign(ths_value) theta_f = theta + delta_theta_f @@ -827,13 +935,13 @@ def _off_specular(self, ws, wl_dist, wl_bins, x_bins, z_bins, peak_position, the _x = qx _z = qz if x_axis == EventReflectivity.DELTA_KZ_VS_QZ: - _x = (ki_z - kf_z) + _x = ki_z - kf_z elif x_axis == EventReflectivity.KZI_VS_KZF: _x = ki_z _z = kf_z elif x_axis == EventReflectivity.THETAF_VS_WL: _x = wl_list - _z = np.ones(len(wl_list))*theta_f + _z = np.ones(len(wl_list)) * theta_f histo_weigths = wl_weights * _z / wl_list _counts, _, _ = np.histogram2d(_x, _z, bins=[x_bins, z_bins], weights=histo_weigths) @@ -842,16 +950,16 @@ def _off_specular(self, ws, wl_dist, wl_bins, x_bins, z_bins, peak_position, the counts += _counts bin_size = z_bins[1] - z_bins[0] - d_refl_sq = refl / np.sqrt(counts) / charge / bin_size + d_refl_sq = refl / np.sqrt(counts) / charge / bin_size refl /= charge * bin_size return refl, d_refl_sq def emission_time_correction(self, ws, tofs): """ - Coorect TOF for emission time delay in the moderator - :param ws: Mantid workspace - :param tofs: list of TOF values + Coorect TOF for emission time delay in the moderator + :param ws: Mantid workspace + :param tofs: list of TOF values """ mt_run = ws.getRun() use_emission_delay = False @@ -867,7 +975,7 @@ def emission_time_correction(self, ws, tofs): def gravity_correction(self, ws, wl_list): """ - Gravity correction for each event + Gravity correction for each event """ # Xi reference would be the position of xi if the si slit were to be positioned # at the sample. The distance from the sample to si is then xi_reference - xi. @@ -878,7 +986,7 @@ def gravity_correction(self, ws, wl_list): # Distance between the s1 and the sample s1_sample_distance = 1485 if ws.getInstrument().hasParameter("s1-sample-distance"): - s1_sample_distance = ws.getInstrument().getNumberParameter("s1-sample-distance")[0]*1000 + s1_sample_distance = ws.getInstrument().getNumberParameter("s1-sample-distance")[0] * 1000 xi = 310 if ws.getInstrument().hasParameter("BL4B:Mot:xi.RBV"): @@ -888,48 +996,48 @@ def gravity_correction(self, ws, wl_list): slit_distance = s1_sample_distance - sample_si_distance # Angle of the incident beam on a horizontal sample - theta_in=-4.0 + theta_in = -4.0 # Calculation from the ILL paper. This works for inclined beams. # Calculated theta is the angle on the sample - g = 9.8067 # m/s^2 - h = 6.6260715e-34 # Js=kg m^2/s - mn = 1.67492749804e-27 # kg + g = 9.8067 # m/s^2 + h = 6.6260715e-34 # Js=kg m^2/s + mn = 1.67492749804e-27 # kg - v = h/(mn*wl_list*1e-10) - k = g/(2*v**2) + v = h / (mn * wl_list * 1e-10) + k = g / (2 * v**2) # Define the sample position as x=0, y=0. increasing x is towards moderator - xs=0 + xs = 0 # positions of slits x1 = sample_si_distance / 1000 x2 = (sample_si_distance + slit_distance) / 1000 - #height of slits determined by incident theta, y=0 is the sample height - y1=x1*np.tan(theta_in*np.pi/180) - y2=x2*np.tan(theta_in*np.pi/180) + # height of slits determined by incident theta, y=0 is the sample height + y1 = x1 * np.tan(theta_in * np.pi / 180) + y2 = x2 * np.tan(theta_in * np.pi / 180) # This is the location of the top of the parabola - x0=(y1-y2+k*(x1**2-x2**2))/(2*k*(x1-x2)) + x0 = (y1 - y2 + k * (x1**2 - x2**2)) / (2 * k * (x1 - x2)) # Angle is arctan(dy/dx) at sample - theta_sample = np.arctan(2*k*(x0-xs)) * 180/np.pi + theta_sample = np.arctan(2 * k * (x0 - xs)) * 180 / np.pi - return (theta_sample-theta_in) * np.pi / 180.0 + return (theta_sample - theta_in) * np.pi / 180.0 def compute_resolution(ws, default_dq=0.027, theta=None, q_summing=False): """ - Compute the Q resolution from the meta data. - :param theta: scattering angle in radians - :param q_summing: if True, the pixel size will be used for the resolution + Compute the Q resolution from the meta data. + :param theta: scattering angle in radians + :param q_summing: if True, the pixel size will be used for the resolution """ settings = read_settings(ws) if theta is None: - theta = abs(ws.getRun().getProperty("ths").value[0]) * np.pi / 180. + theta = abs(ws.getRun().getProperty("ths").value[0]) * np.pi / 180.0 if q_summing: # Q resolution is reported as FWHM, so here we consider this to be diff --git a/reduction/lr_reduction/workflow.py b/reduction/lr_reduction/workflow.py index d51e143..449c8f2 100644 --- a/reduction/lr_reduction/workflow.py +++ b/reduction/lr_reduction/workflow.py @@ -1,6 +1,7 @@ """ - Autoreduction process for the Liquids Reflectometer +Autoreduction process for the Liquids Reflectometer """ + import json import os @@ -10,21 +11,21 @@ from . import event_reduction, output, reduction_template_reader, template -def reduce(ws, template_file, output_dir, average_overlap=False, - theta_offset: float | None = 0, - q_summing=False, bck_in_q=False, is_live=False): +def reduce( + ws, template_file, output_dir, average_overlap=False, theta_offset: float | None = 0, q_summing=False, bck_in_q=False, is_live=False +): """ - Function called by reduce_REFL.py, which lives in /SNS/REF_L/shared/autoreduce - and is called by the automated reduction workflow. + Function called by reduce_REFL.py, which lives in /SNS/REF_L/shared/autoreduce + and is called by the automated reduction workflow. - If average_overlap is used, overlapping points will be averaged, otherwise they - will be left in the final data file. + If average_overlap is used, overlapping points will be averaged, otherwise they + will be left in the final data file. - :param average_overlap: if True, the overlapping points will be averaged - :param q_summing: if True, constant-Q binning will be used - :param bck_in_q: if True, and constant-Q binning is used, the background will be estimated - along constant-Q lines rather than along TOF/pixel boundaries. - :param theta_offset: Theta offset to apply. If None, the template value will be used. + :param average_overlap: if True, the overlapping points will be averaged + :param q_summing: if True, constant-Q binning will be used + :param bck_in_q: if True, and constant-Q binning is used, the background will be estimated + along constant-Q lines rather than along TOF/pixel boundaries. + :param theta_offset: Theta offset to apply. If None, the template value will be used. """ # Get the sequence number sequence_number = 1 @@ -42,12 +43,9 @@ def reduce(ws, template_file, output_dir, average_overlap=False, template_data.angle_offset = theta_offset # Call the reduction using the template - qz_mid, refl, d_refl, meta_data = template.process_from_template_ws(ws, template_data, - q_summing=q_summing, - tof_weighted=bck_in_q, - clean=q_summing, - bck_in_q=bck_in_q, - info=True) + qz_mid, refl, d_refl, meta_data = template.process_from_template_ws( + ws, template_data, q_summing=q_summing, tof_weighted=bck_in_q, clean=q_summing, bck_in_q=bck_in_q, info=True + ) # Save partial results coll = output.RunCollection() @@ -55,16 +53,15 @@ def reduce(ws, template_file, output_dir, average_overlap=False, # If this is live data, put it in a separate file to avoid conflict with auto-reduction if is_live: - reduced_file = os.path.join(output_dir, 'REFL_live_partial.txt') + reduced_file = os.path.join(output_dir, "REFL_live_partial.txt") else: - reduced_file = os.path.join(output_dir, 'REFL_%s_%s_%s_partial.txt' % (meta_data['sequence_id'], - meta_data['sequence_number'], - meta_data['run_number'])) + reduced_file = os.path.join( + output_dir, "REFL_%s_%s_%s_partial.txt" % (meta_data["sequence_id"], meta_data["sequence_number"], meta_data["run_number"]) + ) coll.save_ascii(reduced_file, meta_as_json=True) # Assemble partial results into a single R(q) - seq_list, run_list = assemble_results(meta_data['sequence_id'], output_dir, - average_overlap, is_live=is_live) + seq_list, run_list = assemble_results(meta_data["sequence_id"], output_dir, average_overlap, is_live=is_live) # Save template. This will not happen if the template_file input was # template data, which the template processing allows. @@ -79,7 +76,7 @@ def reduce(ws, template_file, output_dir, average_overlap=False, def assemble_results(first_run, output_dir, average_overlap=False, is_live=False): """ - Find related runs and assemble them in one R(q) data set + Find related runs and assemble them in one R(q) data set """ # Keep track of sequence IDs and run numbers so we can make a new template seq_list = [] @@ -88,8 +85,8 @@ def assemble_results(first_run, output_dir, average_overlap=False, is_live=False file_list = sorted(os.listdir(output_dir)) for item in file_list: - if item.startswith("REFL_%s" % first_run) and item.endswith('partial.txt'): - toks = item.split('_') + if item.startswith("REFL_%s" % first_run) and item.endswith("partial.txt"): + toks = item.split("_") if not len(toks) == 5 or int(toks[2]) == 0: continue seq_list.append(int(toks[2])) @@ -97,14 +94,14 @@ def assemble_results(first_run, output_dir, average_overlap=False, is_live=False # Read the partial data and add to a collection _, _, _, _, _meta = output.read_file(os.path.join(output_dir, item)) - if is_live or not _meta['start_time'] == "live": + if is_live or not _meta["start_time"] == "live": coll.add_from_file(os.path.join(output_dir, item)) - #elif item == "REFL_live_partial.txt": + # elif item == "REFL_live_partial.txt": # coll.add_from_file(os.path.join(output_dir, item)) - output_file_name = 'REFL_%s_combined_data_auto.txt' % first_run + output_file_name = "REFL_%s_combined_data_auto.txt" % first_run if is_live: - output_file_name = 'REFL_%s_live_estimate.txt' % first_run + output_file_name = "REFL_%s_live_estimate.txt" % first_run coll.save_ascii(os.path.join(output_dir, output_file_name)) return seq_list, run_list @@ -112,8 +109,8 @@ def assemble_results(first_run, output_dir, average_overlap=False, is_live=False def write_template(seq_list, run_list, template_file, output_dir): """ - Read the appropriate entry in a template file and save an updated - copy with the updated run number. + Read the appropriate entry in a template file and save an updated + copy with the updated run number. """ with open(template_file, "r") as fd: xml_str = fd.read() @@ -122,22 +119,22 @@ def write_template(seq_list, run_list, template_file, output_dir): new_data_sets = [] for i in range(len(seq_list)): if len(data_sets) >= seq_list[i]: - data_sets[seq_list[i]-1].data_files = [run_list[i]] - new_data_sets.append(data_sets[seq_list[i]-1]) + data_sets[seq_list[i] - 1].data_files = [run_list[i]] + new_data_sets.append(data_sets[seq_list[i] - 1]) else: print("Too few entries [%s] in template for sequence number %s" % (len(data_sets), seq_list[i])) # Save the template that was used xml_str = reduction_template_reader.to_xml(new_data_sets) - with open(os.path.join(output_dir, 'REF_L_%s_auto_template.xml' % run_list[0]), 'w') as fd: + with open(os.path.join(output_dir, "REF_L_%s_auto_template.xml" % run_list[0]), "w") as fd: fd.write(xml_str) def offset_from_first_run( - ws, - template_file :str, - output_dir: str, - ): + ws, + template_file: str, + output_dir: str, +): """ Find a theta offset from the first peak. Used when sample is misaligned. @@ -153,7 +150,7 @@ def offset_from_first_run( sequence_id = ws.getRun().getProperty("sequence_id").value[0] # Theta value that we are aiming for - ths_value = ws.getRun()['ths'].value[-1] + ths_value = ws.getRun()["ths"].value[-1] # Read template so we can load the direct beam run template_data = template.read_template(template_file, sequence_number) @@ -163,31 +160,30 @@ def offset_from_first_run( ws_db = mtd_api.LoadEventNexus("REF_L_%s" % template_data.norm_file) # Look for parameters that might have been determined earlier for this measurement - options_file = os.path.join(output_dir, 'REFL_%s_options.json' % sequence_id) + options_file = os.path.join(output_dir, "REFL_%s_options.json" % sequence_id) if sequence_number > 1 and os.path.isfile(options_file): - with open(options_file, 'r') as fd: + with open(options_file, "r") as fd: options = json.load(fd) - return options['theta_offset'] + return options["theta_offset"] else: # Fit direct beam position - x_min=template_data.norm_peak_range[0] - x_max=template_data.norm_peak_range[1] + x_min = template_data.norm_peak_range[0] + x_max = template_data.norm_peak_range[1] _, _x, _y = peak_finding.process_data(ws_db, summed=True, tof_step=200) peak_center = np.argmax(_y) - db_center, db_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, - center=peak_center, - sigma=1.) - print(" DB center: %g\t Width: %g from [%g %g]" % (db_center, db_width, - template_data.norm_peak_range[0], - template_data.norm_peak_range[1])) + db_center, db_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, center=peak_center, sigma=1.0) + print( + " DB center: %g\t Width: %g from [%g %g]" + % (db_center, db_width, template_data.norm_peak_range[0], template_data.norm_peak_range[1]) + ) # Fit the reflected beam position - x_min=template_data.data_peak_range[0] - x_max=template_data.data_peak_range[1] + x_min = template_data.data_peak_range[0] + x_max = template_data.data_peak_range[1] _, _x, _y = peak_finding.process_data(ws, summed=True, tof_step=200) peak_center = np.argmax(_y[x_min:x_max]) + x_min - sc_center, sc_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, center=peak_center, sigma=3.) + sc_center, sc_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, center=peak_center, sigma=3.0) pixel_offset = sc_center - peak_center print(" SC center: %g\t Width: %g" % (sc_center, sc_width)) @@ -195,36 +191,36 @@ def offset_from_first_run( sample_det_distance = settings["sample-det-distance"] pixel_width = settings["pixel-width"] / 1000.0 - theta = np.arctan((sc_center-db_center) * pixel_width / sample_det_distance) / 2.0 * 180 / np.pi + theta = np.arctan((sc_center - db_center) * pixel_width / sample_det_distance) / 2.0 * 180 / np.pi theta_offset = theta - ths_value # If this is the first angle, keep the value for later - options = dict(theta_offset = theta_offset, - pixel_offset = pixel_offset) - with open(options_file, 'w') as fp: + options = dict(theta_offset=theta_offset, pixel_offset=pixel_offset) + with open(options_file, "w") as fp: json.dump(options, fp) return theta_offset def reduce_explorer(ws, ws_db, theta_pv=None, center_pixel=145, db_center_pixel=145, peak_width=10): - """ - """ + """ """ from . import peak_finding if theta_pv is None: - if 'BL4B:CS:ExpPl:OperatingMode' in ws.getRun() \ - and ws.getRun().getProperty('BL4B:CS:ExpPl:OperatingMode').value[0] == 'Free Liquid': - theta_pv = 'thi' + if ( + "BL4B:CS:ExpPl:OperatingMode" in ws.getRun() + and ws.getRun().getProperty("BL4B:CS:ExpPl:OperatingMode").value[0] == "Free Liquid" + ): + theta_pv = "thi" else: - theta_pv = 'ths' + theta_pv = "ths" print("\nProcessing: %s" % ws.getRunNumber()) # Theta value that we are aiming for theta_value = np.fabs(ws.getRun()[theta_pv].value[0]) # Load normalization run - tthd_value = ws.getRun()['tthd'].value[0] + tthd_value = ws.getRun()["tthd"].value[0] # Fit direct beam position x_min = center_pixel - 25 @@ -235,8 +231,8 @@ def reduce_explorer(ws, ws_db, theta_pv=None, center_pixel=145, db_center_pixel= print(" DB center: %g\t Width: %g" % (db_center, db_width)) # Fit the reflected beam position - x_min=db_center_pixel-peak_width - x_max=db_center_pixel+peak_width + x_min = db_center_pixel - peak_width + x_max = db_center_pixel + peak_width tof, _x, _y = peak_finding.process_data(ws, summed=True, tof_step=200) peak_center = np.argmax(_y) sc_center, sc_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, center=peak_center) @@ -244,41 +240,47 @@ def reduce_explorer(ws, ws_db, theta_pv=None, center_pixel=145, db_center_pixel= pixel_width = float(ws.getInstrument().getNumberParameter("pixel-width")[0]) / 1000.0 sample_det_distance = event_reduction.EventReflectivity.DEFAULT_4B_SAMPLE_DET_DISTANCE - twotheta = np.arctan((db_center-sc_center)*pixel_width / sample_det_distance) / 2.0 * 180 / np.pi + twotheta = np.arctan((db_center - sc_center) * pixel_width / sample_det_distance) / 2.0 * 180 / np.pi # Store the tthd of the direct beam and account for the fact that it may be # different from our reflected beam for this calibration data. # This will allow us to be compatible with both fixed and moving detector arm. - tthd_db = ws_db.getRun()['tthd'].value[0] + tthd_db = ws_db.getRun()["tthd"].value[0] twotheta = twotheta + tthd_value - tthd_db print(" Theta = %g Two-theta = %g" % (theta_value, twotheta)) # Perform the reduction width_mult = 2.5 - peak = [np.rint(sc_center - width_mult*sc_width).astype(int), np.rint(sc_center + width_mult*sc_width).astype(int)] - norm_peak = [np.rint(db_center - width_mult*db_width).astype(int), np.rint(db_center + width_mult*db_width).astype(int)] - peak_bck = [peak[0]-3, peak[1]+3] - norm_bck = [norm_peak[0]-3, norm_peak[1]+3] + peak = [np.rint(sc_center - width_mult * sc_width).astype(int), np.rint(sc_center + width_mult * sc_width).astype(int)] + norm_peak = [np.rint(db_center - width_mult * db_width).astype(int), np.rint(db_center + width_mult * db_width).astype(int)] + peak_bck = [peak[0] - 3, peak[1] + 3] + norm_bck = [norm_peak[0] - 3, norm_peak[1] + 3] tof_min = ws.getTofMin() tof_max = ws.getTofMax() - theta = theta_value * np.pi / 180. + theta = theta_value * np.pi / 180.0 - #TODO: dead time correction should be applied here - event_refl = event_reduction.EventReflectivity(ws, ws_db, - signal_peak=peak, signal_bck=peak_bck, - norm_peak=norm_peak, norm_bck=norm_bck, - specular_pixel=sc_center.value, - signal_low_res=[65,180], norm_low_res=[65,180], - q_min=None, q_max=None, - tof_range = [tof_min, tof_max], - theta=theta) + # TODO: dead time correction should be applied here + event_refl = event_reduction.EventReflectivity( + ws, + ws_db, + signal_peak=peak, + signal_bck=peak_bck, + norm_peak=norm_peak, + norm_bck=norm_bck, + specular_pixel=sc_center.value, + signal_low_res=[65, 180], + norm_low_res=[65, 180], + q_min=None, + q_max=None, + tof_range=[tof_min, tof_max], + theta=theta, + ) # R(Q) - qz, refl, d_refl = event_refl.specular(q_summing=False, tof_weighted=False, - bck_in_q=False, clean=False, normalize=True) - qz_mid = (qz[:-1] + qz[1:])/2.0 + qz, refl, d_refl = event_refl.specular(q_summing=False, tof_weighted=False, bck_in_q=False, clean=False, normalize=True) + qz_mid = (qz[:-1] + qz[1:]) / 2.0 return qz_mid, refl, d_refl