Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to fit individually, globally, or both #63

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from mantid.simpleapi import CreateEmptyTableWorkspace, DeleteWorkspace, ReplaceSpecialValues, GroupWorkspaces, mtd,\
ConvertTableToMatrixWorkspace, ConjoinWorkspaces, Transpose, PlotPeakByLogValue,RenameWorkspace
from calibration_scripts.calibrate_vesuvio_helper_functions import EVSGlobals, EVSMiscFunctions,\
InvalidDetectors
InvalidDetectors, FitTypes

import os
import sys
Expand Down Expand Up @@ -53,7 +53,7 @@ def PyInit(self):
self.declareProperty('Iterations', 2, validator=IntBoundedValidator(lower=1),
doc="Number of iterations to perform. Default is 2.")

shared_fit_type_validator = StringListValidator(["Individual", "Shared", "Both"])
shared_fit_type_validator = StringListValidator([member.value for member in FitTypes])
self.declareProperty('SharedParameterFitType', "Individual", doc='Calculate shared parameters using an individual and/or'
'global fit.', validator=shared_fit_type_validator)

Expand Down Expand Up @@ -132,28 +132,28 @@ def PyExec(self):
self._run_calibration_fit(Samples=self._samples, Function='Voigt', Mode='SingleDifference',
SpectrumRange=EVSGlobals.BACKSCATTERING_RANGE, InstrumentParameterWorkspace=self._param_table,
Mass=self._sample_mass, OutputWorkspace=E1_fit_back, CreateOutput=self._create_output,
PeakType='Recoil', SharedParameterFitType=self._shared_parameter_fit_type,
PeakType='Recoil', SharedParameterFitType=self._shared_parameter_fit_type.value,
InvalidDetectors=self._invalid_detectors.get_all_invalid_detectors())
self._invalid_detectors.add_invalid_detectors(invalid_detectors)

E1_peak_fits_back = mtd[self._current_workspace + '_E1_back_Peak_Parameters'].getNames()
self._calculate_final_energy(E1_peak_fits_back, EVSGlobals.BACKSCATTERING_RANGE, self._shared_parameter_fit_type != "Individual")
self._calculate_final_energy(E1_peak_fits_back, EVSGlobals.BACKSCATTERING_RANGE, self._shared_parameter_fit_type)

# calibrate L1 for backscattering detectors based on the averaged E1 value and calibrated theta values
self._calculate_final_flight_path(E1_peak_fits_back[0], EVSGlobals.BACKSCATTERING_RANGE)
self._calculate_final_flight_path(E1_peak_fits_back, EVSGlobals.BACKSCATTERING_RANGE, self._shared_parameter_fit_type)

# calibrate L1 for frontscattering detectors based on the averaged E1 value and calibrated theta values
E1_fit_front = self._current_workspace + '_E1_front'
invalid_detectors += \
self._run_calibration_fit(Samples=self._samples, Function='Voigt', Mode='SingleDifference',
SpectrumRange=EVSGlobals.FRONTSCATTERING_RANGE, InstrumentParameterWorkspace=self._param_table,
Mass=self._sample_mass, OutputWorkspace=E1_fit_front, CreateOutput=self._create_output,
PeakType='Recoil', SharedParameterFitType=self._shared_parameter_fit_type,
PeakType='Recoil', SharedParameterFitType=self._shared_parameter_fit_type.value,
InvalidDetectors=self._invalid_detectors.get_all_invalid_detectors())
self._invalid_detectors.add_invalid_detectors(invalid_detectors)

E1_peak_fits_front = mtd[self._current_workspace + '_E1_front_Peak_Parameters'].getNames()
self._calculate_final_flight_path(E1_peak_fits_front[0], EVSGlobals.FRONTSCATTERING_RANGE)
self._calculate_final_flight_path(E1_peak_fits_front, EVSGlobals.FRONTSCATTERING_RANGE, self._shared_parameter_fit_type)

# make the fitted parameters for this iteration the input to the next iteration.
table_group.append(self._current_workspace)
Expand Down Expand Up @@ -245,33 +245,75 @@ def _calculate_incident_flight_path(self, table_name, spec_list):

DeleteWorkspace(L0_param_table)

def _calculate_final_flight_path(self, peak_table, spec_list):
def _calculate_final_flight_path(self, peak_table, spec_list, fit_type):
"""
Calculate the final flight path using the values for energy.
This also uses the old value for L1 loaded from the parameter file.

@param peak_table - names of tables containing fitted parameters
@param spec_list - spectrum range to calculate t0 for.
jess-farmer marked this conversation as resolved.
Show resolved Hide resolved
@param fit_type - type of fitting, one of 'Individual', 'Shared', or 'Both'
"""
if fit_type is FitTypes.BOTH:
params = self._read_calibration_params_from_table(spec_list, E1_col_names=['E1', 'global_E1'])
self._calculate_L1_from_table(spec_list, peak_table[0], params, 'L1')
self._calculate_L1_from_table(spec_list, peak_table[1], params, 'global_L1')
elif fit_type is FitTypes.SHARED:
params = self._read_calibration_params_from_table(spec_list, E1_col_names=['global_E1'])
self._calculate_L1_from_table(spec_list, peak_table[0], params, 'global_L1')
else:
params = self._read_calibration_params_from_table(spec_list, E1_col_names=['E1'])
self._calculate_L1_from_table(spec_list, peak_table[0], params, 'L1')

E1 = EVSMiscFunctions.read_table_column(self._current_workspace, 'E1', spec_list)
t0 = EVSMiscFunctions.read_table_column(self._current_workspace, 't0', spec_list)
t0_error = EVSMiscFunctions.read_table_column(self._current_workspace, 't0_Err', spec_list)
L0 = EVSMiscFunctions.read_table_column(self._current_workspace, 'L0', spec_list)
theta = EVSMiscFunctions.read_table_column(self._current_workspace, 'theta', spec_list)
r_theta = EVSMiscFunctions.calculate_r_theta(self._sample_mass, theta)

peak_centres = self._invalid_detectors.filter_peak_centres_for_invalid_detectors(spec_list, peak_table)
def _read_calibration_params_from_table(self, spec_list, E1_col_names=[], L1_col_names=[]):
"""
Retrieve calibration parameters and return dict of param values

delta_t = (peak_centres - t0) / 1e+6
delta_t_error = t0_error / 1e+6
@param spec_list - spectrum range to retreive param values for
@param E1_col_names - E1 columns to read from
@param L1_col_names - L1 columns to read from
"""
param_dict = {}
for col_name in E1_col_names:
if col_name == 'E1':
param_dict['E1'] = EVSMiscFunctions.read_table_column(self._current_workspace, col_name, spec_list) * EVSGlobals.MEV_CONVERSION
if col_name == 'global_E1':
param_dict['global_E1'] = EVSMiscFunctions.read_table_column(self._current_workspace, col_name, spec_list) * EVSGlobals.MEV_CONVERSION
for col_name in L1_col_names:
if col_name == 'L1':
param_dict['L1'] = EVSMiscFunctions.read_table_column(self._param_table, col_name, spec_list)
param_dict['t0'] = EVSMiscFunctions.read_table_column(self._current_workspace, 't0', spec_list)
param_dict['t0_error'] = EVSMiscFunctions.read_table_column(self._current_workspace, 't0_Err', spec_list)
param_dict['L0'] = EVSMiscFunctions.read_table_column(self._current_workspace, 'L0', spec_list)
param_dict['theta'] = EVSMiscFunctions.read_table_column(self._current_workspace, 'theta', spec_list)
param_dict['r_theta'] = EVSMiscFunctions.calculate_r_theta(self._sample_mass, param_dict['theta'])

return param_dict

def _calculate_L1_from_table(self, spec_list, peak_table, params, L1_col_name):
"""
Calculate L1 from fitted calibration parameters

E1 *= EVSGlobals.MEV_CONVERSION
v1 = np.sqrt( 2 * E1 /scipy.constants.m_n)
L1 = v1 * delta_t - L0 * r_theta
@param spec_list - spectrum range to calculate L1 for
@param peak_table - name of table to calculate L1 for
@param params - dict of params required for calculation
@param L1_col_name - either 'L1' or 'global_L1'
"""
if L1_col_name == 'L1':
peak_centres = self._invalid_detectors.filter_peak_centres_for_invalid_detectors(spec_list, peak_table)
elif L1_col_name == 'global_L1':
peak_centre = EVSMiscFunctions.read_fitting_result_table_column(peak_table, 'f1.LorentzPos', [0])
peak_centres = np.empty(spec_list[1] + 1 - spec_list[0])
peak_centres.fill(float(peak_centre))

delta_t = (peak_centres - params['t0']) / 1e+6
delta_t_error = params['t0_error'] / 1e+6
v1 = np.sqrt( 2 * params[L1_col_name.strip('L1') + 'E1'] /scipy.constants.m_n)
L1 = v1 * delta_t - params['L0'] * params['r_theta']
L1_error = v1 * delta_t_error

self._set_table_column(self._current_workspace, 'L1', L1, spec_list)
self._set_table_column(self._current_workspace, 'L1_Err', L1_error, spec_list)
self._set_table_column(self._current_workspace, L1_col_name, L1, spec_list)
self._set_table_column(self._current_workspace, f'{L1_col_name}_Err', L1_error, spec_list)

def _calculate_scattering_angle(self, table_name, spec_list):
"""
Expand Down Expand Up @@ -315,66 +357,70 @@ def _calculate_scattering_angle(self, table_name, spec_list):
self._set_table_column(self._current_workspace, 'theta', masked_theta, spec_list)
self._set_table_column(self._current_workspace, 'theta_Err', theta_error, spec_list)

def _calculate_final_energy(self, peak_table, spec_list, calculate_global):
def _calculate_final_energy(self, peak_table, spec_list, fit_type):
jess-farmer marked this conversation as resolved.
Show resolved Hide resolved
"""
Calculate the final energy using the fitted peak centres of a run.

@param table_name - name of table containing fitted parameters for the peak centres
@param peak_table - list of tables containing fitted parameters for the peak centres
@param spec_list - spectrum range to calculate t0 for.
@param fit_type - type of fitting, one of 'Individual', 'Shared', or 'Both'
"""

spec_range = EVSGlobals.DETECTOR_RANGE[1] + 1 - EVSGlobals.DETECTOR_RANGE[0]
mean_E1 = np.empty(spec_range)
E1_error = np.empty(spec_range)
global_E1 = np.empty(spec_range)
global_E1_error = np.empty(spec_range)

if not self._E1_value_and_error:
t0 = EVSMiscFunctions.read_table_column(self._current_workspace, 't0', spec_list)
L0 = EVSMiscFunctions.read_table_column(self._current_workspace, 'L0', spec_list)

L1 = EVSMiscFunctions.read_table_column(self._param_table, 'L1', spec_list)
theta = EVSMiscFunctions.read_table_column(self._current_workspace, 'theta', spec_list)
r_theta = EVSMiscFunctions.calculate_r_theta(self._sample_mass, theta)

peak_centres = self._invalid_detectors.filter_peak_centres_for_invalid_detectors(spec_list, peak_table[0])

delta_t = (peak_centres - t0) / 1e+6
v1 = (L0 * r_theta + L1) / delta_t

E1 = 0.5 * scipy.constants.m_n * v1 ** 2
E1 /= EVSGlobals.MEV_CONVERSION
params = self._read_calibration_params_from_table(spec_list, L1_col_names=['L1'])
if fit_type is FitTypes.BOTH:
self._calculate_E1(spec_list, spec_range, peak_table[0], params, 'E1')
self._calculate_E1(spec_list, spec_range, peak_table[1], params, 'global_E1')
elif fit_type is FitTypes.SHARED:
self._calculate_E1(spec_list, spec_range, peak_table[0], params, 'global_E1')
else:
self._calculate_E1(spec_list, spec_range, peak_table[0], params, 'E1')
else:
mean_E1 = np.empty(spec_range)
E1_error = np.empty(spec_range)
mean_E1_val = self._E1_value_and_error[0]
E1_error_val = self._E1_value_and_error[1]

mean_E1_val = np.nanmean(E1)
E1_error_val = np.nanstd(E1)
mean_E1.fill(mean_E1_val)
E1_error.fill(E1_error_val)

else:
mean_E1_val = self._E1_value_and_error[0]
E1_error_val = self._E1_value_and_error[1]
self._set_table_column(self._current_workspace, 'E1', mean_E1)
self._set_table_column(self._current_workspace, 'E1_Err', E1_error)

mean_E1.fill(mean_E1_val)
E1_error.fill(E1_error_val)
def _calculate_E1(self, spec_list, spec_range, peak_table, params, E1_col_name):
"""
Calculate E1 value from fitted parameters

self._set_table_column(self._current_workspace, 'E1', mean_E1)
self._set_table_column(self._current_workspace, 'E1_Err', E1_error)
@param spec_list - spectrum range to calculate E1 for
@param spec_range - detector range
@param peak_table - name of table containing fitted parameters for the peak centres
@param params - dict of params needed to calculate E1
@param E1_col_name - either 'E1' or 'global_E1'
"""
if E1_col_name == 'E1':
peak_centres = self._invalid_detectors.filter_peak_centres_for_invalid_detectors(spec_list, peak_table)
elif E1_col_name == 'global_E1':
peak_centre = EVSMiscFunctions.read_fitting_result_table_column(peak_table, 'f1.LorentzPos', [0])
peak_centres = [float(peak_centre)] * len(params['L0'])

if calculate_global: # This fn will need updating for the only global option
peak_centre = EVSMiscFunctions.read_fitting_result_table_column(peak_table[1], 'f1.LorentzPos', [0])
peak_centre = [peak_centre] * len(peak_centres)
E1_col = np.empty(spec_range)
E1_col_error = np.empty(spec_range)

delta_t = (peak_centre - t0) / 1e+6
v1 = (L0 * r_theta + L1) / delta_t
E1 = 0.5 * scipy.constants.m_n * v1 ** 2
E1 /= EVSGlobals.MEV_CONVERSION
delta_t = (peak_centres - params['t0']) / 1e+6
v1 = (params['L0'] * params['r_theta'] + params['L1']) / delta_t
E1 = 0.5 * scipy.constants.m_n * v1 ** 2
E1 /= EVSGlobals.MEV_CONVERSION

global_E1_val = np.nanmean(E1)
global_E1_error_val = np.nanstd(E1)
E1_val = np.nanmean(E1)
E1_error_val = np.nanstd(E1)

global_E1.fill(global_E1_val)
global_E1_error.fill(global_E1_error_val)
E1_col.fill(E1_val)
E1_col_error.fill(E1_error_val)

self._set_table_column(self._current_workspace, 'global_E1', global_E1)
self._set_table_column(self._current_workspace, 'global_E1_Err', global_E1_error)
self._set_table_column(self._current_workspace, E1_col_name, E1_col)
self._set_table_column(self._current_workspace, f'{E1_col_name}_Err', E1_col_error)

def _setup(self):
"""
Expand All @@ -387,7 +433,7 @@ def _setup(self):
self._d_spacings = self.getProperty("DSpacings").value.tolist()
self._E1_value_and_error = self.getProperty("E1FixedValueAndError").value.tolist()
self._invalid_detectors = InvalidDetectors(self.getProperty("InvalidDetectors").value.tolist())
self._shared_parameter_fit_type = self.getProperty("SharedParameterFitType").value
self._shared_parameter_fit_type = FitTypes(self.getProperty("SharedParameterFitType").value)
self._calc_L0 = self.getProperty("CalculateL0").value
self._make_IP_file = self.getProperty("CreateIPFile").value
self._output_workspace_name = self.getPropertyValue("OutputWorkspace")
Expand Down
Loading