diff --git a/reduction/lr_reduction/event_reduction.py b/reduction/lr_reduction/event_reduction.py index 354e4c7..ed69866 100644 --- a/reduction/lr_reduction/event_reduction.py +++ b/reduction/lr_reduction/event_reduction.py @@ -1,6 +1,7 @@ """ - Event based reduction for the Liquids Reflectometer +Event based reduction for the Liquids Reflectometer """ + import datetime import json import os @@ -17,29 +18,30 @@ NEUTRON_MASS = 1.675e-27 # kg # Attenuators: PV name and thickness in cm -CD_ATTENUATORS = [['BL4B:Actuator:50MRb', 0.0058], - ['BL4B:Actuator:100MRb', 0.0122], - ['BL4B:Actuator:200MRb', 0.0244], # Uncalibrated - ['BL4B:Actuator:400MRb', 0.0488], # Uncalibrated - ] +CD_ATTENUATORS = [ + ["BL4B:Actuator:50MRb", 0.0058], + ["BL4B:Actuator:100MRb", 0.0122], + ["BL4B:Actuator:200MRb", 0.0244], # Uncalibrated + ["BL4B:Actuator:400MRb", 0.0488], # Uncalibrated +] def get_wl_range(ws): """ - Determine TOF range from the data + Determine TOF range from the data - :param ws: (Mantid workspace) workspace to work with + :param ws: (Mantid workspace) workspace to work with - :return: (list) [min, max] wavelength range + :return: (list) [min, max] wavelength range """ run_object = ws.getRun() - wl = run_object.getProperty('LambdaRequest').value[0] - chopper_speed = run_object.getProperty('SpeedRequest1').value[0] + wl = run_object.getProperty("LambdaRequest").value[0] + chopper_speed = run_object.getProperty("SpeedRequest1").value[0] # Cut the edges by using a width of 2.6 A - wl_min = (wl - 1.3 * 60.0 / chopper_speed) - wl_max = (wl + 1.3 * 60.0 / chopper_speed) + wl_min = wl - 1.3 * 60.0 / chopper_speed + wl_max = wl + 1.3 * 60.0 / chopper_speed return [wl_min, wl_max] @@ -61,11 +63,11 @@ def get_q_binning(q_min=0.001, q_max=0.15, q_step=-0.02): An array of Q values based on the specified binning. """ if q_step > 0: - n_steps = int((q_max-q_min)/q_step) + n_steps = int((q_max - q_min) / q_step) return q_min + np.asarray([q_step * i for i in range(n_steps)]) else: - _step = 1.0+np.abs(q_step) - n_steps = int(np.log(q_max/q_min)/np.log(_step)) + _step = 1.0 + np.abs(q_step) + n_steps = int(np.log(q_max / q_min) / np.log(_step)) return q_min * np.asarray([_step**i for i in range(n_steps)]) @@ -94,40 +96,40 @@ def get_attenuation_info(ws): def read_settings(ws): """ - Read settings file and return values for the given timestamp + Read settings file and return values for the given timestamp - :param ws: Mantid workspace - :return: (dict) dictionary with settings + :param ws: Mantid workspace + :return: (dict) dictionary with settings """ settings = dict() package_dir, _ = os.path.split(__file__) - t = ws.getRun()['start_time'].value.split('T')[0] + t = ws.getRun()["start_time"].value.split("T")[0] timestamp = datetime.date.fromisoformat(t) - with open(os.path.join(package_dir, 'settings.json'), 'r') as fd: + with open(os.path.join(package_dir, "settings.json"), "r") as fd: data = json.load(fd) for key in data.keys(): chosen_value = None delta_time = None for item in data[key]: - valid_from = datetime.date.fromisoformat(item['from']) + valid_from = datetime.date.fromisoformat(item["from"]) delta = valid_from - timestamp if delta_time is None or (delta.total_seconds() < 0 and delta > delta_time): delta_time = delta - chosen_value = item['value'] + chosen_value = item["value"] settings[key] = chosen_value return settings def process_attenuation(ws, thickness=0): """ - Correct for absorption by assigning weight to each neutron event + Correct for absorption by assigning weight to each neutron event - :param ws: (Mantid workspace) workspace to correct - :param thickness: (float) attenuator thickness in cm (default is 0). + :param ws: (Mantid workspace) workspace to correct + :param thickness: (float) attenuator thickness in cm (default is 0). - :return: (Mantid workspace) corrected workspace + :return: (Mantid workspace) corrected workspace """ settings = read_settings(ws) if "source-det-distance" in settings: @@ -138,19 +140,22 @@ def process_attenuation(ws, thickness=0): constant = 1e-4 * NEUTRON_MASS * SDD / PLANCK_CONSTANT package_dir, _ = os.path.split(__file__) - mu_abs = np.loadtxt(os.path.join(package_dir, 'Cd-abs-factors.txt')).T + mu_abs = np.loadtxt(os.path.join(package_dir, "Cd-abs-factors.txt")).T wl_model = mu_abs[0] # Turn model into a histogram wl_step = wl_model[-1] - wl_model[-2] final_wl = wl_model[-1] + wl_step - wl_model = np.append(wl_model, [final_wl,]) + wl_model = np.append( + wl_model, + [ + final_wl, + ], + ) mu_model = mu_abs[1] tof_model = constant * wl_model - transmission = 1/np.exp(-mu_model * thickness) - transmission_ws = api.CreateWorkspace(OutputWorkspace='transmission', - DataX=tof_model, DataY=transmission, - UnitX='TOF', NSpec=1) + transmission = 1 / np.exp(-mu_model * thickness) + transmission_ws = api.CreateWorkspace(OutputWorkspace="transmission", DataX=tof_model, DataY=transmission, UnitX="TOF", NSpec=1) ws = api.Multiply(ws, transmission_ws, OutputWorkspace=str(ws)) return ws @@ -158,47 +163,50 @@ def process_attenuation(ws, thickness=0): def get_dead_time_correction(ws, template_data): """ - Compute dead time correction to be applied to the reflectivity curve. - The method will also try to load the error events from each of the - data files to ensure that we properly estimate the dead time correction. + Compute dead time correction to be applied to the reflectivity curve. + The method will also try to load the error events from each of the + data files to ensure that we properly estimate the dead time correction. - :param ws: (Mantid worksapce) workspace with raw data to compute correction for - :param template_data: (reduction_template_reader.ReductionParameters) - reduction parameters + :param ws: (Mantid worksapce) workspace with raw data to compute correction for + :param template_data: (reduction_template_reader.ReductionParameters) + reduction parameters - :return: (Mantid workspace) workspace with dead time correction to apply + :return: (Mantid workspace) workspace with dead time correction to apply """ tof_min = ws.getTofMin() tof_max = ws.getTofMax() run_number = ws.getRun().getProperty("run_number").value error_ws = api.LoadErrorEventsNexus("REF_L_%s" % run_number) - corr_ws = mantid_algorithm_exec(DeadTimeCorrection.SingleReadoutDeadTimeCorrection, - InputWorkspace=ws, - InputErrorEventsWorkspace=error_ws, - Paralyzable=template_data.paralyzable, - DeadTime=template_data.dead_time_value, - TOFStep=template_data.dead_time_tof_step, - TOFRange=[tof_min, tof_max], - OutputWorkspace="corr") + corr_ws = mantid_algorithm_exec( + DeadTimeCorrection.SingleReadoutDeadTimeCorrection, + InputWorkspace=ws, + InputErrorEventsWorkspace=error_ws, + Paralyzable=template_data.paralyzable, + DeadTime=template_data.dead_time_value, + TOFStep=template_data.dead_time_tof_step, + TOFRange=[tof_min, tof_max], + OutputWorkspace="corr", + ) corr_ws = api.Rebin(corr_ws, [tof_min, 10, tof_max]) return corr_ws + def apply_dead_time_correction(ws, template_data): """ - Apply dead time correction, and ensure that it is done only once - per workspace. + Apply dead time correction, and ensure that it is done only once + per workspace. - :param ws: (Mantid workspace) workspace with raw data to compute correction for - :param template_data: (reduction_template_reader.ReductionParameters) - reduction parameters + :param ws: (Mantid workspace) workspace with raw data to compute correction for + :param template_data: (reduction_template_reader.ReductionParameters) + reduction parameters - :return: (Mantid workspace) workspace with dead time correction applied + :return: (Mantid workspace) workspace with dead time correction applied """ - if 'dead_time_applied' not in ws.getRun(): + if "dead_time_applied" not in ws.getRun(): corr_ws = get_dead_time_correction(ws, template_data) ws = api.Multiply(ws, corr_ws, OutputWorkspace=str(ws)) - api.AddSampleLog(Workspace=ws, LogName="dead_time_applied", LogText='1', LogType="Number") + api.AddSampleLog(Workspace=ws, LogName="dead_time_applied", LogText="1", LogType="Number") return ws @@ -244,15 +252,30 @@ class EventReflectivity(object): DEFAULT_4B_SAMPLE_DET_DISTANCE = 1.83 DEFAULT_4B_SOURCE_DET_DISTANCE = 15.75 - def __init__(self, scattering_workspace, direct_workspace, - signal_peak, signal_bck, norm_peak, norm_bck, - specular_pixel, signal_low_res, norm_low_res, - q_min=None, q_step=-0.02, q_max=None, - tof_range=None, theta=1.0, instrument=None, - functional_background=False, dead_time=False, - paralyzable=True, dead_time_value=4.2, - dead_time_tof_step=100, use_emission_time=True): - + def __init__( + self, + scattering_workspace, + direct_workspace, + signal_peak, + signal_bck, + norm_peak, + norm_bck, + specular_pixel, + signal_low_res, + norm_low_res, + q_min=None, + q_step=-0.02, + q_max=None, + tof_range=None, + theta=1.0, + instrument=None, + functional_background=False, + dead_time=False, + paralyzable=True, + dead_time_value=4.2, + dead_time_tof_step=100, + use_emission_time=True, + ): if instrument in [self.INSTRUMENT_4A, self.INSTRUMENT_4B]: self.instrument = instrument else: @@ -285,13 +308,13 @@ def __init__(self, scattering_workspace, direct_workspace, # Process workspaces if self.tof_range is not None: - self._ws_sc = api.CropWorkspace(InputWorkspace=scattering_workspace, - XMin=tof_range[0], XMax=tof_range[1], - OutputWorkspace='_'+str(scattering_workspace)) + self._ws_sc = api.CropWorkspace( + InputWorkspace=scattering_workspace, XMin=tof_range[0], XMax=tof_range[1], OutputWorkspace="_" + str(scattering_workspace) + ) if direct_workspace is not None: - self._ws_db = api.CropWorkspace(InputWorkspace=direct_workspace, - XMin=tof_range[0], XMax=tof_range[1], - OutputWorkspace='_'+str(direct_workspace)) + self._ws_db = api.CropWorkspace( + InputWorkspace=direct_workspace, XMin=tof_range[0], XMax=tof_range[1], OutputWorkspace="_" + str(direct_workspace) + ) else: self._ws_db = None else: @@ -303,7 +326,7 @@ def __init__(self, scattering_workspace, direct_workspace, def extract_meta_data(self): """ - Extract meta data from the data file. + Extract meta data from the loaded data file. """ # Get instrument parameters settings = read_settings(self._ws_sc) @@ -324,12 +347,12 @@ def extract_meta_data(self): if self.tof_range is None: self.wl_range = get_wl_range(self._ws_sc) else: - self.wl_range = [self.tof_range[0] / self.constant, self.tof_range[1] / self.constant] + self.wl_range = [self.tof_range[0] / self.constant, self.tof_range[1] / self.constant] # q_min and q_max are the boundaries for the final Q binning # We also hold on to the true Q range covered by the measurement - self.q_min_meas = 4.0*np.pi/self.wl_range[1] * np.fabs(np.sin(self.theta)) - self.q_max_meas = 4.0*np.pi/self.wl_range[0] * np.fabs(np.sin(self.theta)) + self.q_min_meas = 4.0 * np.pi / self.wl_range[1] * np.fabs(np.sin(self.theta)) + self.q_max_meas = 4.0 * np.pi / self.wl_range[0] * np.fabs(np.sin(self.theta)) if self.q_min is None: self.q_min = self.q_min_meas @@ -341,29 +364,29 @@ def extract_meta_data(self): # Catch options that can be turned off if self.signal_low_res is None: - self.signal_low_res = [1, self.n_x-1] + self.signal_low_res = [1, self.n_x - 1] if self.norm_low_res is None: - self.norm_low_res = [1, self.n_x-1] + self.norm_low_res = [1, self.n_x - 1] def extract_meta_data_4A(self): """ - 4A-specific meta data + 4A-specific meta data """ run_object = self._ws_sc.getRun() - self.det_distance = run_object['SampleDetDis'].getStatistics().mean - source_sample_distance = run_object['ModeratorSamDis'].getStatistics().mean - if run_object['SampleDetDis'].units not in ['m', 'meter']: + self.det_distance = run_object["SampleDetDis"].getStatistics().mean + source_sample_distance = run_object["ModeratorSamDis"].getStatistics().mean + if run_object["SampleDetDis"].units not in ["m", "meter"]: self.det_distance /= 1000.0 - if run_object['ModeratorSamDis'].units not in ['m', 'meter']: + if run_object["ModeratorSamDis"].units not in ["m", "meter"]: source_sample_distance /= 1000.0 self.source_detector_distance = source_sample_distance + self.det_distance def extract_meta_data_4B(self): """ - 4B-specific meta data + 4B-specific meta data - Distance from source to sample was 13.63 meters prior to the source - to detector distance being determined with Bragg edges to be 15.75 m. + Distance from source to sample was 13.63 meters prior to the source + to detector distance being determined with Bragg edges to be 15.75 m. """ settings = read_settings(self._ws_sc) @@ -390,6 +413,11 @@ def extract_meta_data_4B(self): self.source_detector_distance = self.DEFAULT_4B_SOURCE_DET_DISTANCE def __repr__(self): + """ + Generate a string representation of the reduction settings. + + :return: (str) string representation of the reduction settings + """ output = "Reduction settings:\n" output += " sample-det: %s\n" % self.det_distance output += " source-det: %s\n" % self.source_detector_distance @@ -403,12 +431,14 @@ def __repr__(self): def to_dict(self): """ - Returns meta-data to be used/stored. + Returns meta-data to be used/stored. + + :return: (dict) dictionary with meta-data """ if self._ws_sc.getRun().hasProperty("start_time"): start_time = self._ws_sc.getRun().getProperty("start_time").value else: - start_time = 'live' + start_time = "live" experiment = self._ws_sc.getRun().getProperty("experiment_identifier").value run_number = self._ws_sc.getRun().getProperty("run_number").value sequence_number = int(self._ws_sc.getRun().getProperty("sequence_number").value[0]) @@ -421,25 +451,42 @@ def to_dict(self): norm_run = 0 dq0 = 0 - return dict(wl_min=self.wl_range[0], wl_max=self.wl_range[1], - q_min=self.q_min_meas, q_max=self.q_max_meas, theta=self.theta, - start_time=start_time, experiment=experiment, run_number=run_number, - run_title=run_title, norm_run=norm_run, time=time.ctime(), - dq0=dq0, dq_over_q=self.dq_over_q, sequence_number=sequence_number, - sequence_id=sequence_id, q_summing=self.q_summing) - - def specular(self, q_summing=False, tof_weighted=False, bck_in_q=False, - clean=False, normalize=True): + return dict( + wl_min=self.wl_range[0], + wl_max=self.wl_range[1], + q_min=self.q_min_meas, + q_max=self.q_max_meas, + theta=self.theta, + start_time=start_time, + experiment=experiment, + run_number=run_number, + run_title=run_title, + norm_run=norm_run, + time=time.ctime(), + dq0=dq0, + dq_over_q=self.dq_over_q, + sequence_number=sequence_number, + sequence_id=sequence_id, + q_summing=self.q_summing, + ) + + def specular(self, q_summing=False, tof_weighted=False, bck_in_q=False, clean=False, normalize=True): """ - Compute specular reflectivity. + Compute specular reflectivity. + + For constant-Q binning, it's preferred to use tof_weighted=True. + + :param q_summing: turns on constant-Q binning + :param tof_weighted: if True, binning will be done by weighting each event to the DB distribution + :param bck_in_q: if True, the background will be estimated in Q space using the constant-Q binning approach + :param clean: if True, and Q summing is True, then leading artifact will be removed + :param normalize: if True, and tof_weighted is False, normalization will be skipped - For constant-Q binning, it's preferred to use tof_weighted=True. + :return: A tuple containing: - :param q_summing: turns on constant-Q binning - :param tof_weighted: if True, binning will be done by weighting each event to the DB distribution - :param bck_in_q: if True, the background will be estimated in Q space using the constant-Q binning approach - :param clean: if True, and Q summing is True, then leading artifact will be removed - :param normalize: if True, and tof_weighted is False, normalization will be skipped + - q_bins: The Q bin boundaries + - refl: The reflectivity values + - d_refl: The uncertainties in the reflectivity values """ if tof_weighted: self.specular_weighted(q_summing=q_summing, bck_in_q=bck_in_q) @@ -447,7 +494,7 @@ def specular(self, q_summing=False, tof_weighted=False, bck_in_q=False, self.specular_unweighted(q_summing=q_summing, normalize=normalize) # Remove leading zeros - r = np.trim_zeros(self.refl, 'f') + r = np.trim_zeros(self.refl, "f") trim = len(self.refl) - len(r) self.refl = self.refl[trim:] self.d_refl = self.d_refl[trim:] @@ -471,15 +518,30 @@ def specular(self, q_summing=False, tof_weighted=False, bck_in_q=False, def specular_unweighted(self, q_summing=False, normalize=True): """ - Simple specular reflectivity calculation. This is the same approach as the - original LR reduction, which sums up pixels without constant-Q binning. - The original approach bins in TOF, then rebins the final results after - transformation to Q. This approach bins directly to Q. + Simple specular reflectivity calculation. This is the same approach as the + original LR reduction, which sums up pixels without constant-Q binning. + The original approach bins in TOF, then rebins the final results after + transformation to Q. This approach bins directly to Q. + + :param q_summing: (bool, optional) If True, sum the data in Q-space (default is False). + :param normalize: (bool, optional) If True, normalize the reflectivity by the direct + beam (default is True). + + :return: A tuple containing: + + - q_bins: The Q bin boundaries + - refl: The reflectivity values + - d_refl: The uncertainties in the reflectivity values """ # Scattering data - refl, d_refl = self._reflectivity(self._ws_sc, peak_position=self.specular_pixel, - peak=self.signal_peak, low_res=self.signal_low_res, - theta=self.theta, q_summing=q_summing) + refl, d_refl = self._reflectivity( + self._ws_sc, + peak_position=self.specular_pixel, + peak=self.signal_peak, + low_res=self.signal_low_res, + theta=self.theta, + q_summing=q_summing, + ) # Remove background if self.signal_bck is not None: @@ -493,19 +555,21 @@ def specular_unweighted(self, q_summing=False, normalize=True): # we can bin the DB according to the same transform instead of binning and dividing in TOF. # This is mathematically equivalent and convenient in terms of abstraction for later # use for the constant-Q calculation elsewhere in the code. - norm, d_norm = self._reflectivity(self._ws_db, peak_position=0, - peak=self.norm_peak, low_res=self.norm_low_res, - theta=self.theta, q_summing=False) + norm, d_norm = self._reflectivity( + self._ws_db, peak_position=0, peak=self.norm_peak, low_res=self.norm_low_res, theta=self.theta, q_summing=False + ) # Direct beam background could be added here. The effect will be negligible. if self.norm_bck is not None: norm_bck, d_norm_bck = self.norm_bck_subtraction() norm -= norm_bck d_norm = np.sqrt(d_norm**2 + d_norm_bck**2) - db_bins = norm>0 + db_bins = norm > 0 - refl[db_bins] = refl[db_bins]/norm[db_bins] - d_refl[db_bins] = np.sqrt(d_refl[db_bins]**2 / norm[db_bins]**2 + refl[db_bins]**2 * d_norm[db_bins]**2 / norm[db_bins]**4) + refl[db_bins] = refl[db_bins] / norm[db_bins] + d_refl[db_bins] = np.sqrt( + d_refl[db_bins] ** 2 / norm[db_bins] ** 2 + refl[db_bins] ** 2 * d_norm[db_bins] ** 2 / norm[db_bins] ** 4 + ) # Hold on to normalization to be able to diagnose issues later self.norm = norm[db_bins] @@ -523,25 +587,40 @@ def specular_unweighted(self, q_summing=False, normalize=True): def specular_weighted(self, q_summing=True, bck_in_q=False): """ - Compute reflectivity by weighting each event by flux. - This allows for summing in Q and to estimate the background in either Q - or pixels next to the peak. + Compute reflectivity by weighting each event by flux. + This allows for summing in Q and to estimate the background in either Q + or pixels next to the peak. + + :param q_summing: (bool, optional) If True, sum the data in Q-space (default is False). + :param bck_in_q: (bool, optional) If True, subtract background along Q lines (default is False). + + :return: A tuple containing: + + - q_bins: The Q bin boundaries + - refl: The reflectivity values + - d_refl: The uncertainties in the reflectivity values """ # Event weights for normalization db_charge = self._ws_db.getRun().getProtonCharge() wl_events, wl_weights = self._get_events(self._ws_db, self.norm_peak, self.norm_low_res) wl_dist, wl_bins = np.histogram(wl_events, bins=100, weights=wl_weights) _bin_width = wl_bins[1:] - wl_bins[:-1] - wl_dist = wl_dist/db_charge/_bin_width - wl_middle = [(wl_bins[i+1]+wl_bins[i])/2.0 for i in range(len(wl_bins)-1)] - - refl, d_refl = self._reflectivity(self._ws_sc, peak_position=self.specular_pixel, - peak=self.signal_peak, low_res=self.signal_low_res, - theta=self.theta, q_summing=q_summing, wl_dist=wl_dist, wl_bins=wl_middle) + wl_dist = wl_dist / db_charge / _bin_width + wl_middle = [(wl_bins[i + 1] + wl_bins[i]) / 2.0 for i in range(len(wl_bins) - 1)] + + refl, d_refl = self._reflectivity( + self._ws_sc, + peak_position=self.specular_pixel, + peak=self.signal_peak, + low_res=self.signal_low_res, + theta=self.theta, + q_summing=q_summing, + wl_dist=wl_dist, + wl_bins=wl_middle, + ) if self.signal_bck is not None: - refl_bck, d_refl_bck = self.bck_subtraction(wl_dist=wl_dist, wl_bins=wl_middle, - q_summing=bck_in_q) + refl_bck, d_refl_bck = self.bck_subtraction(wl_dist=wl_dist, wl_bins=wl_middle, q_summing=bck_in_q) refl -= refl_bck d_refl = np.sqrt(d_refl**2 + d_refl_bck**2) @@ -551,28 +630,34 @@ def specular_weighted(self, q_summing=True, bck_in_q=False): def _roi_integration(self, ws, peak, low_res, q_bins=None, wl_dist=None, wl_bins=None, q_summing=False): """ - Integrate a region of interest and normalize by the number of included pixels. + Integrate a region of interest and normalize by the number of included pixels. - The options are the same as for the reflectivity calculation. - If wl_dist and wl_bins are supplied, the events will be weighted by flux. - If q_summing is True, the angle of each neutron will be recalculated according to - their position on the detector and place in the proper Q bin. + The options are the same as for the reflectivity calculation. + If wl_dist and wl_bins are supplied, the events will be weighted by flux. + If q_summing is True, the angle of each neutron will be recalculated according to + their position on the detector and place in the proper Q bin. """ q_bins = self.q_bins if q_bins is None else q_bins - refl_bck, d_refl_bck = self._reflectivity(ws, peak_position=0, q_bins=q_bins, - peak=peak, low_res=low_res, - theta=self.theta, q_summing=q_summing, - wl_dist=wl_dist, wl_bins=wl_bins) - - _pixel_area = (peak[1]-peak[0]+1.0) + refl_bck, d_refl_bck = self._reflectivity( + ws, + peak_position=0, + q_bins=q_bins, + peak=peak, + low_res=low_res, + theta=self.theta, + q_summing=q_summing, + wl_dist=wl_dist, + wl_bins=wl_bins, + ) + + _pixel_area = peak[1] - peak[0] + 1.0 refl_bck /= _pixel_area d_refl_bck /= _pixel_area return refl_bck, d_refl_bck - def bck_subtraction(self, normalize_to_single_pixel=False, q_bins=None, wl_dist=None, wl_bins=None, - q_summing=False): + def bck_subtraction(self, normalize_to_single_pixel=False, q_bins=None, wl_dist=None, wl_bins=None, q_summing=False): """ - Higher-level call for background subtraction. Hides the ranges needed to define the ROI. + Higher-level call for background subtraction. Hides the ranges needed to define the ROI. """ # Sanity check if len(self.signal_bck) == 2 and self.use_functional_bck: @@ -582,63 +667,87 @@ def bck_subtraction(self, normalize_to_single_pixel=False, q_bins=None, wl_dist= self.use_functional_bck = False if self.use_functional_bck: - return background.functional_background(self._ws_sc, self, self.signal_peak, - self.signal_bck, self.signal_low_res, - normalize_to_single_pixel=normalize_to_single_pixel, - q_bins=q_bins, wl_dist=wl_dist, wl_bins=wl_bins, - q_summing=q_summing) + return background.functional_background( + self._ws_sc, + self, + self.signal_peak, + self.signal_bck, + self.signal_low_res, + normalize_to_single_pixel=normalize_to_single_pixel, + q_bins=q_bins, + wl_dist=wl_dist, + wl_bins=wl_bins, + q_summing=q_summing, + ) else: - return background.side_background(self._ws_sc, self, self.signal_peak, self.signal_bck, - self.signal_low_res, - normalize_to_single_pixel=normalize_to_single_pixel, - q_bins=q_bins, wl_dist=wl_dist, wl_bins=wl_bins, - q_summing=q_summing) + return background.side_background( + self._ws_sc, + self, + self.signal_peak, + self.signal_bck, + self.signal_low_res, + normalize_to_single_pixel=normalize_to_single_pixel, + q_bins=q_bins, + wl_dist=wl_dist, + wl_bins=wl_bins, + q_summing=q_summing, + ) def norm_bck_subtraction(self): """ - Higher-level call for background subtraction for the normalization run. + Higher-level call for background subtraction for the normalization run. """ - return background.side_background(self._ws_db, self, self.norm_peak, self.norm_bck, - self.norm_low_res, normalize_to_single_pixel=False) - - def slice(self, x_min=0.002, x_max=0.004, x_bins=None, z_bins=None, # noqa A003 - refl=None, d_refl=None, normalize=False): + return background.side_background( + self._ws_db, self, self.norm_peak, self.norm_bck, self.norm_low_res, normalize_to_single_pixel=False + ) + + def slice( + self, + x_min=0.002, + x_max=0.004, + x_bins=None, + z_bins=None, # noqa A003 + refl=None, + d_refl=None, + normalize=False, + ): """ - Retrieve a slice from the off-specular data. + Retrieve a slice from the off-specular data. """ x_bins = self._offspec_x_bins if x_bins is None else x_bins z_bins = self._offspec_z_bins if z_bins is None else z_bins refl = self._offspec_refl if refl is None else refl d_refl = self._offspec_d_refl if d_refl is None else d_refl - i_min = len(x_bins[x_bins= seq_list[i]: - data_sets[seq_list[i]-1].data_files = [run_list[i]] - new_data_sets.append(data_sets[seq_list[i]-1]) + data_sets[seq_list[i] - 1].data_files = [run_list[i]] + new_data_sets.append(data_sets[seq_list[i] - 1]) else: print("Too few entries [%s] in template for sequence number %s" % (len(data_sets), seq_list[i])) # Save the template that was used xml_str = reduction_template_reader.to_xml(new_data_sets) - with open(os.path.join(output_dir, 'REF_L_%s_auto_template.xml' % run_list[0]), 'w') as fd: + with open(os.path.join(output_dir, "REF_L_%s_auto_template.xml" % run_list[0]), "w") as fd: fd.write(xml_str) def offset_from_first_run( - ws, - template_file :str, - output_dir: str, - ): + ws, + template_file: str, + output_dir: str, +): """ Find a theta offset from the first peak. Used when sample is misaligned. @@ -153,7 +150,7 @@ def offset_from_first_run( sequence_id = ws.getRun().getProperty("sequence_id").value[0] # Theta value that we are aiming for - ths_value = ws.getRun()['ths'].value[-1] + ths_value = ws.getRun()["ths"].value[-1] # Read template so we can load the direct beam run template_data = template.read_template(template_file, sequence_number) @@ -163,31 +160,30 @@ def offset_from_first_run( ws_db = mtd_api.LoadEventNexus("REF_L_%s" % template_data.norm_file) # Look for parameters that might have been determined earlier for this measurement - options_file = os.path.join(output_dir, 'REFL_%s_options.json' % sequence_id) + options_file = os.path.join(output_dir, "REFL_%s_options.json" % sequence_id) if sequence_number > 1 and os.path.isfile(options_file): - with open(options_file, 'r') as fd: + with open(options_file, "r") as fd: options = json.load(fd) - return options['theta_offset'] + return options["theta_offset"] else: # Fit direct beam position - x_min=template_data.norm_peak_range[0] - x_max=template_data.norm_peak_range[1] + x_min = template_data.norm_peak_range[0] + x_max = template_data.norm_peak_range[1] _, _x, _y = peak_finding.process_data(ws_db, summed=True, tof_step=200) peak_center = np.argmax(_y) - db_center, db_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, - center=peak_center, - sigma=1.) - print(" DB center: %g\t Width: %g from [%g %g]" % (db_center, db_width, - template_data.norm_peak_range[0], - template_data.norm_peak_range[1])) + db_center, db_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, center=peak_center, sigma=1.0) + print( + " DB center: %g\t Width: %g from [%g %g]" + % (db_center, db_width, template_data.norm_peak_range[0], template_data.norm_peak_range[1]) + ) # Fit the reflected beam position - x_min=template_data.data_peak_range[0] - x_max=template_data.data_peak_range[1] + x_min = template_data.data_peak_range[0] + x_max = template_data.data_peak_range[1] _, _x, _y = peak_finding.process_data(ws, summed=True, tof_step=200) peak_center = np.argmax(_y[x_min:x_max]) + x_min - sc_center, sc_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, center=peak_center, sigma=3.) + sc_center, sc_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, center=peak_center, sigma=3.0) pixel_offset = sc_center - peak_center print(" SC center: %g\t Width: %g" % (sc_center, sc_width)) @@ -195,36 +191,36 @@ def offset_from_first_run( sample_det_distance = settings["sample-det-distance"] pixel_width = settings["pixel-width"] / 1000.0 - theta = np.arctan((sc_center-db_center) * pixel_width / sample_det_distance) / 2.0 * 180 / np.pi + theta = np.arctan((sc_center - db_center) * pixel_width / sample_det_distance) / 2.0 * 180 / np.pi theta_offset = theta - ths_value # If this is the first angle, keep the value for later - options = dict(theta_offset = theta_offset, - pixel_offset = pixel_offset) - with open(options_file, 'w') as fp: + options = dict(theta_offset=theta_offset, pixel_offset=pixel_offset) + with open(options_file, "w") as fp: json.dump(options, fp) return theta_offset def reduce_explorer(ws, ws_db, theta_pv=None, center_pixel=145, db_center_pixel=145, peak_width=10): - """ - """ + """ """ from . import peak_finding if theta_pv is None: - if 'BL4B:CS:ExpPl:OperatingMode' in ws.getRun() \ - and ws.getRun().getProperty('BL4B:CS:ExpPl:OperatingMode').value[0] == 'Free Liquid': - theta_pv = 'thi' + if ( + "BL4B:CS:ExpPl:OperatingMode" in ws.getRun() + and ws.getRun().getProperty("BL4B:CS:ExpPl:OperatingMode").value[0] == "Free Liquid" + ): + theta_pv = "thi" else: - theta_pv = 'ths' + theta_pv = "ths" print("\nProcessing: %s" % ws.getRunNumber()) # Theta value that we are aiming for theta_value = np.fabs(ws.getRun()[theta_pv].value[0]) # Load normalization run - tthd_value = ws.getRun()['tthd'].value[0] + tthd_value = ws.getRun()["tthd"].value[0] # Fit direct beam position x_min = center_pixel - 25 @@ -235,8 +231,8 @@ def reduce_explorer(ws, ws_db, theta_pv=None, center_pixel=145, db_center_pixel= print(" DB center: %g\t Width: %g" % (db_center, db_width)) # Fit the reflected beam position - x_min=db_center_pixel-peak_width - x_max=db_center_pixel+peak_width + x_min = db_center_pixel - peak_width + x_max = db_center_pixel + peak_width tof, _x, _y = peak_finding.process_data(ws, summed=True, tof_step=200) peak_center = np.argmax(_y) sc_center, sc_width, _ = peak_finding.fit_signal_flat_bck(_x, _y, x_min=x_min, x_max=x_max, center=peak_center) @@ -244,41 +240,47 @@ def reduce_explorer(ws, ws_db, theta_pv=None, center_pixel=145, db_center_pixel= pixel_width = float(ws.getInstrument().getNumberParameter("pixel-width")[0]) / 1000.0 sample_det_distance = event_reduction.EventReflectivity.DEFAULT_4B_SAMPLE_DET_DISTANCE - twotheta = np.arctan((db_center-sc_center)*pixel_width / sample_det_distance) / 2.0 * 180 / np.pi + twotheta = np.arctan((db_center - sc_center) * pixel_width / sample_det_distance) / 2.0 * 180 / np.pi # Store the tthd of the direct beam and account for the fact that it may be # different from our reflected beam for this calibration data. # This will allow us to be compatible with both fixed and moving detector arm. - tthd_db = ws_db.getRun()['tthd'].value[0] + tthd_db = ws_db.getRun()["tthd"].value[0] twotheta = twotheta + tthd_value - tthd_db print(" Theta = %g Two-theta = %g" % (theta_value, twotheta)) # Perform the reduction width_mult = 2.5 - peak = [np.rint(sc_center - width_mult*sc_width).astype(int), np.rint(sc_center + width_mult*sc_width).astype(int)] - norm_peak = [np.rint(db_center - width_mult*db_width).astype(int), np.rint(db_center + width_mult*db_width).astype(int)] - peak_bck = [peak[0]-3, peak[1]+3] - norm_bck = [norm_peak[0]-3, norm_peak[1]+3] + peak = [np.rint(sc_center - width_mult * sc_width).astype(int), np.rint(sc_center + width_mult * sc_width).astype(int)] + norm_peak = [np.rint(db_center - width_mult * db_width).astype(int), np.rint(db_center + width_mult * db_width).astype(int)] + peak_bck = [peak[0] - 3, peak[1] + 3] + norm_bck = [norm_peak[0] - 3, norm_peak[1] + 3] tof_min = ws.getTofMin() tof_max = ws.getTofMax() - theta = theta_value * np.pi / 180. + theta = theta_value * np.pi / 180.0 - #TODO: dead time correction should be applied here - event_refl = event_reduction.EventReflectivity(ws, ws_db, - signal_peak=peak, signal_bck=peak_bck, - norm_peak=norm_peak, norm_bck=norm_bck, - specular_pixel=sc_center.value, - signal_low_res=[65,180], norm_low_res=[65,180], - q_min=None, q_max=None, - tof_range = [tof_min, tof_max], - theta=theta) + # TODO: dead time correction should be applied here + event_refl = event_reduction.EventReflectivity( + ws, + ws_db, + signal_peak=peak, + signal_bck=peak_bck, + norm_peak=norm_peak, + norm_bck=norm_bck, + specular_pixel=sc_center.value, + signal_low_res=[65, 180], + norm_low_res=[65, 180], + q_min=None, + q_max=None, + tof_range=[tof_min, tof_max], + theta=theta, + ) # R(Q) - qz, refl, d_refl = event_refl.specular(q_summing=False, tof_weighted=False, - bck_in_q=False, clean=False, normalize=True) - qz_mid = (qz[:-1] + qz[1:])/2.0 + qz, refl, d_refl = event_refl.specular(q_summing=False, tof_weighted=False, bck_in_q=False, clean=False, normalize=True) + qz_mid = (qz[:-1] + qz[1:]) / 2.0 return qz_mid, refl, d_refl