Skip to content

Commit

Permalink
allow fitting of multiple peaks and add new system test
Browse files Browse the repository at this point in the history
  • Loading branch information
jess-farmer committed May 4, 2023
1 parent 6c33460 commit 66d0f1b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def PyExec(self):
self._calculate_final_energy(E1_peak_fits_back, EVSGlobals.BACKSCATTERING_RANGE, self._shared_parameter_fit_type)

# calibrate L1 for backscattering detectors based on the averaged E1 value and calibrated theta values
self._calculate_final_flight_path(E1_peak_fits_back[0], EVSGlobals.BACKSCATTERING_RANGE)
self._calculate_final_flight_path(E1_peak_fits_back, EVSGlobals.BACKSCATTERING_RANGE, self._shared_parameter_fit_type)

# calibrate L1 for frontscattering detectors based on the averaged E1 value and calibrated theta values
E1_fit_front = self._current_workspace + '_E1_front'
Expand All @@ -153,7 +153,7 @@ def PyExec(self):
self._invalid_detectors.add_invalid_detectors(invalid_detectors)

E1_peak_fits_front = mtd[self._current_workspace + '_E1_front_Peak_Parameters'].getNames()
self._calculate_final_flight_path(E1_peak_fits_front[0], EVSGlobals.FRONTSCATTERING_RANGE)
self._calculate_final_flight_path(E1_peak_fits_front, EVSGlobals.FRONTSCATTERING_RANGE, self._shared_parameter_fit_type)

# make the fitted parameters for this iteration the input to the next iteration.
table_group.append(self._current_workspace)
Expand Down Expand Up @@ -245,33 +245,61 @@ def _calculate_incident_flight_path(self, table_name, spec_list):

DeleteWorkspace(L0_param_table)

def _calculate_final_flight_path(self, peak_table, spec_list):
def _calculate_final_flight_path(self, peak_table, spec_list, fit_type):
"""
Calculate the final flight path using the values for energy.
This also uses the old value for L1 loaded from the parameter file.
@param spec_list - spectrum range to calculate t0 for.
"""
if fit_type == 'Both':
indv_peak_table = peak_table[0]
glob_peak_table = peak_table[1]
E1 = EVSMiscFunctions.read_table_column(self._current_workspace, 'E1', spec_list)
E1 *= EVSGlobals.MEV_CONVERSION
global_E1 = EVSMiscFunctions.read_table_column(self._current_workspace, 'global_E1', spec_list)
global_E1 *= EVSGlobals.MEV_CONVERSION
elif fit_type == 'Shared':
glob_peak_table = peak_table[0]
global_E1 = EVSMiscFunctions.read_table_column(self._current_workspace, 'global_E1', spec_list)
global_E1 *= EVSGlobals.MEV_CONVERSION
else:
indv_peak_table = peak_table[0]
E1 = EVSMiscFunctions.read_table_column(self._current_workspace, 'E1', spec_list)
E1 *= EVSGlobals.MEV_CONVERSION

E1 = EVSMiscFunctions.read_table_column(self._current_workspace, 'E1', spec_list)
t0 = EVSMiscFunctions.read_table_column(self._current_workspace, 't0', spec_list)
t0_error = EVSMiscFunctions.read_table_column(self._current_workspace, 't0_Err', spec_list)
L0 = EVSMiscFunctions.read_table_column(self._current_workspace, 'L0', spec_list)
theta = EVSMiscFunctions.read_table_column(self._current_workspace, 'theta', spec_list)
r_theta = EVSMiscFunctions.calculate_r_theta(self._sample_mass, theta)

peak_centres = self._invalid_detectors.filter_peak_centres_for_invalid_detectors(spec_list, peak_table)
if fit_type != 'Shared':

peak_centres = self._invalid_detectors.filter_peak_centres_for_invalid_detectors(spec_list, indv_peak_table)

delta_t = (peak_centres - t0) / 1e+6
delta_t_error = t0_error / 1e+6
v1 = np.sqrt( 2 * E1 /scipy.constants.m_n)
L1 = v1 * delta_t - L0 * r_theta
L1_error = v1 * delta_t_error

delta_t = (peak_centres - t0) / 1e+6
delta_t_error = t0_error / 1e+6
self._set_table_column(self._current_workspace, 'L1', L1, spec_list)
self._set_table_column(self._current_workspace, 'L1_Err', L1_error, spec_list)

E1 *= EVSGlobals.MEV_CONVERSION
v1 = np.sqrt( 2 * E1 /scipy.constants.m_n)
L1 = v1 * delta_t - L0 * r_theta
L1_error = v1 * delta_t_error
if fit_type != 'Individual':
peak_centre = EVSMiscFunctions.read_fitting_result_table_column(glob_peak_table, 'f1.LorentzPos', [0])
peak_centre_list = np.empty(spec_list[1] + 1 -spec_list[0])
peak_centre_list.fill(float(peak_centre))

self._set_table_column(self._current_workspace, 'L1', L1, spec_list)
self._set_table_column(self._current_workspace, 'L1_Err', L1_error, spec_list)
delta_t = (peak_centre_list - t0) / 1e+6
delta_t_error = t0_error / 1e+6
global_v1 = np.sqrt( 2 * global_E1 /scipy.constants.m_n)
global_L1 = global_v1 * delta_t - L0 * r_theta
global_L1_error = global_v1 * delta_t_error

self._set_table_column(self._current_workspace, 'global_L1', global_L1, spec_list)
self._set_table_column(self._current_workspace, 'global_L1_Err', global_L1_error, spec_list)

def _calculate_scattering_angle(self, table_name, spec_list):
"""
Expand Down Expand Up @@ -383,7 +411,7 @@ def _calculate_final_energy(self, peak_table, spec_list, fit_type):

self._set_table_column(self._current_workspace, 'global_E1', global_E1)
self._set_table_column(self._current_workspace, 'global_E1_Err', global_E1_error)
else:
else:
mean_E1 = np.empty(spec_range)
E1_error = np.empty(spec_range)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,12 @@ def _fit_peaks(self):
"""

estimated_peak_positions_all_peaks = self._estimate_peak_positions()
num_estimated_peaks, num_spectra = estimated_peak_positions_all_peaks.shape
self._output_parameter_tables = []
self._peak_fit_workspaces = []
if self._shared_parameter_fit_type != "Shared":
num_estimated_peaks, num_spectra = estimated_peak_positions_all_peaks.shape

self._prog_reporter = Progress(self, 0.0, 1.0, num_estimated_peaks*num_spectra)

self._output_parameter_tables = []
self._peak_fit_workspaces = []
for peak_index, estimated_peak_positions in enumerate(estimated_peak_positions_all_peaks):

self._peak_fit_workspaces_by_spec = []
Expand All @@ -579,18 +578,21 @@ def _fit_peaks(self):
GroupWorkspaces(self._output_parameter_tables, OutputWorkspace=self._output_workspace_name + '_Peak_Parameters')

if self._shared_parameter_fit_type != "Individual":
estimated_peak_position = np.mean(estimated_peak_positions_all_peaks)
output_parameter_table_name = self._output_workspace_name + '_Shared_Peak_Parameters'
output_parameter_table_headers = self._create_parameter_table_and_output_headers(output_parameter_table_name)
for peak_index, estimated_peak_position in enumerate(np.mean(estimated_peak_positions_all_peaks,1)):

self._fit_shared_peak(self._spec_list[0], estimated_peak_position, output_parameter_table_name,
output_parameter_table_headers)
output_parameter_table_name = self._output_workspace_name + '_Shared_Peak_%d_Parameters' % peak_index
output_parameter_table_headers = self._create_parameter_table_and_output_headers(output_parameter_table_name)

fit_workspace_name = self._fit_shared_peak(peak_index, self._spec_list[0], estimated_peak_position, output_parameter_table_name,
output_parameter_table_headers)

if self._shared_parameter_fit_type == 'Both':
self._output_parameter_tables.append(output_parameter_table_name)
self._peak_fit_workspaces.append(fit_workspace_name)

if self._shared_parameter_fit_type == 'Both':
GroupWorkspaces(self._output_parameter_tables, OutputWorkspace=self._output_workspace_name + '_Peak_Parameters')
else:
GroupWorkspaces(output_parameter_table_name, OutputWorkspace=self._output_workspace_name + '_Peak_Parameters')
GroupWorkspaces(self._output_parameter_tables, OutputWorkspace=self._output_workspace_name + '_Peak_Parameters')

def _fit_peak(self, peak_index, spec_index, peak_position, output_parameter_table_name,
output_parameter_table_headers):
Expand All @@ -614,8 +616,8 @@ def _fit_peak(self, peak_index, spec_index, peak_position, output_parameter_tabl

return fit_workspace_name

def _fit_shared_peak(self, spec_range, peak_position, output_parameter_table_name, output_parameter_table_headers):
peak_params = self._find_peaks_and_output_params(spec_range, peak_position)
def _fit_shared_peak(self, peak_index, spec_range, peak_position, output_parameter_table_name, output_parameter_table_headers):
peak_params = self._find_peaks_and_output_params(spec_range, peak_position, peak_index)
fit_func = self._build_function_string(peak_params)
start_str = 'composite=MultiDomainFunction, NumDeriv=1;'
validSpecs = mtd[self._sample]
Expand All @@ -630,7 +632,7 @@ def _fit_shared_peak(self, spec_range, peak_position, output_parameter_table_nam
ties = ','.join(f'f{i}.f1.{p}=f0.f1.{p}' for p in self._func_param_names.values() for i in range(1,n_valid_specs))
func_string = composite_func + f';ties=({ties})'
xmin, xmax = self._find_fit_x_window(peak_params)
fit_output_name = '__' + self._output_workspace_name + '_Peak_0'
fit_output_name = '__' + self._output_workspace_name + '_Peak_%d' % peak_index

# create new workspace for each spectra
x = validSpecs.readX(0)
Expand All @@ -655,13 +657,17 @@ def _fit_shared_peak(self, spec_range, peak_position, output_parameter_table_nam

self._output_fit_params_to_table_ws(0, fit_params, output_parameter_table_name,
output_headers)

fit_workspace_name = fws.name()
self._del_fit_workspaces(ncm, fit_params, fws)

def _find_peaks_and_output_params(self, spec_number, peak_position, peak_index=None, spec_index=None):
if spec_index is not None and peak_index is not None:
return fit_workspace_name

def _find_peaks_and_output_params(self, spec_number, peak_position, peak_index, spec_index=None):
if spec_index is not None:
peak_table_name = '__' + self._sample + '_peaks_table_%d_%d' % (peak_index, spec_index)
else:
peak_table_name = '__' + self._sample + '_peak_table'
peak_table_name = '__' + self._sample + '_peak_table_%d' % (peak_index)
find_peak_params = self._get_find_peak_parameters(spec_number, [peak_position])
FindPeaks(InputWorkspace=self._sample, WorkspaceIndex=spec_index, PeaksList=peak_table_name, **find_peak_params)
if mtd[peak_table_name].rowCount() == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,27 @@ def test_copper_with_individual_and_global_fit(self, load_file_mock):
detector_specific_r_tols = {"L1": {116: 0.65, 170: 0.75}}
detector_specific_r_tols["L1"].update({k: TestConstants.INVALID_DETECTOR for k in [138, 141, 146, 147, 156, 158, 160, 163, 164,
165, 167, 168, 169, 170, 182, 191, 192]})
self._assert_parameters_match_expected(params_table, detector_specific_r_tols)
self._assert_parameters_match_expected(params_table, detector_specific_r_tols, "Both")
self._assert_E1_parameters_match_expected(params_table, 3e-3, "Both")

@patch('unpackaged.vesuvio_calibration.calibration_scripts.calibrate_vesuvio_fit.EVSCalibrationFit._load_file')
def test_copper_with_global_fit(self, load_file_mock):
self._setup_copper_test()
self._output_workspace = "copper_analysis_test"

load_file_mock.side_effect = self._load_file_side_effect

self._create_evs_calibration_alg()
self._alg.setProperty("SharedParameterFitType", "Shared")
params_table = self._execute_evs_calibration_analysis()

# Specify detectors tolerances set by user, then update with those to mask as invalid.
detector_specific_r_tols = {"L1": {116: 0.65, 170: 0.75}}
detector_specific_r_tols["L1"].update({k: TestConstants.INVALID_DETECTOR for k in [138, 141, 146, 147, 156, 158, 160, 163, 164,
165, 167, 168, 169, 170, 182, 191, 192]})
self._assert_parameters_match_expected(params_table, detector_specific_r_tols, "Shared")
self._assert_E1_parameters_match_expected(params_table, 3e-3, "Shared")

def _assert_theta_parameters_match_expected(self, params_table, rel_tolerance):
thetas = params_table.column('theta')
actual_thetas = self._calibrated_params['theta']
Expand Down Expand Up @@ -246,10 +264,12 @@ def _assert_E1_parameters_match_expected(self, params_table, rel_tolerance, fit_
global_E1 = params_table.column('global_E1')[0]
self.assertAlmostEqual(global_E1, EVSGlobals.ENERGY_ESTIMATE, delta=EVSGlobals.ENERGY_ESTIMATE*rel_tolerance)

def _assert_parameters_match_expected(self, params_table, tolerances=None):
def _assert_parameters_match_expected(self, params_table, tolerances=None, fit_type="Individual"):
rel_tol_theta, rel_tol_L1 = self._extract_tolerances(deepcopy(tolerances))
theta_errors = self._assert_theta_parameters_match_expected(params_table, rel_tol_theta)
L1_errors = self._assert_L1_parameters_match_expected(params_table, rel_tol_L1)
L1_errors = None
if fit_type != 'Shared':
L1_errors = self._assert_L1_parameters_match_expected(params_table, rel_tol_L1)

if theta_errors or L1_errors:
raise AssertionError(f"Theta: {theta_errors})\n L1: {L1_errors}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,29 +660,32 @@ def test_fit_peaks_individual(self, group_workspaces_mock):
group_workspaces_mock.assert_called_once_with(['output_ws_name_Peak_0_Parameters', 'output_ws_name_Peak_1_Parameters'],
OutputWorkspace='output_ws_name_Peak_Parameters')

@patch('calibration_scripts.calibrate_vesuvio_fit.EVSCalibrationFit._fit_shared_peak')
@patch('calibration_scripts.calibrate_vesuvio_fit.GroupWorkspaces')
def test_fit_peaks_shared(self, group_workspaces_mock, fit_shared_peak_mock):
def test_fit_peaks_shared(self, group_workspaces_mock):
alg = EVSCalibrationFit()
alg._estimate_peak_positions = MagicMock(return_value=np.asarray([[5, 10, 15], [2.5, 7.5, 10.5]]))
alg._create_parameter_table_and_output_headers = MagicMock(return_value=['a', 'b', 'c'])
alg._fit_peak = MagicMock(side_effect=lambda a, b, c, d, e: f'fit_ws_{a}_{b}')
alg._fit_shared_peak = MagicMock(side_effect=lambda a, b, c, d, e: f'fit_ws_{a}')
alg._output_workspace_name = 'output_ws_name'
alg._shared_parameter_fit_type = 'Shared'
alg._spec_list = [3,4,5]
alg._fit_peaks()
alg._create_parameter_table_and_output_headers.assert_has_calls([call('output_ws_name_Shared_Peak_Parameters')])
group_workspaces_mock.assert_called_once_with('output_ws_name_Shared_Peak_Parameters',
self.assertEqual(['fit_ws_0', 'fit_ws_1'],
alg._peak_fit_workspaces)
alg._create_parameter_table_and_output_headers.assert_has_calls([call('output_ws_name_Shared_Peak_0_Parameters'),
call('output_ws_name_Shared_Peak_1_Parameters')])
group_workspaces_mock.assert_called_once_with(['output_ws_name_Shared_Peak_0_Parameters','output_ws_name_Shared_Peak_1_Parameters'],
OutputWorkspace='output_ws_name_Peak_Parameters')
fit_shared_peak_mock.assert_called_once_with(3, np.mean(alg._estimate_peak_positions()), 'output_ws_name_Shared_Peak_Parameters', ['a', 'b', 'c'])
alg._fit_shared_peak.assert_has_calls([call(0, 3, 10, 'output_ws_name_Shared_Peak_0_Parameters', ['a', 'b', 'c']),
call(1, 3, 41/6, 'output_ws_name_Shared_Peak_1_Parameters', ['a', 'b', 'c'])])

@patch('calibration_scripts.calibrate_vesuvio_fit.EVSCalibrationFit._fit_shared_peak')
@patch('calibration_scripts.calibrate_vesuvio_fit.GroupWorkspaces')
def test_fit_peaks_both(self, group_workspaces_mock, fit_shared_peak_mock):
def test_fit_peaks_both(self, group_workspaces_mock):
alg = EVSCalibrationFit()
alg._estimate_peak_positions = MagicMock(return_value=np.asarray([[5, 10, 15], [2.5, 7.5, 10.5]]))
alg._create_parameter_table_and_output_headers = MagicMock(return_value=['a', 'b', 'c'])
alg._fit_peak = MagicMock(side_effect=lambda a, b, c, d, e: f'fit_ws_{a}_{b}')
alg._fit_shared_peak = MagicMock(side_effect=lambda a, b, c, d, e: f'fit_ws_{a}')
alg._output_workspace_name = 'output_ws_name'
alg._shared_parameter_fit_type = 'Both'
alg._spec_list = [3,4,5]
Expand All @@ -695,11 +698,13 @@ def test_fit_peaks_both(self, group_workspaces_mock, fit_shared_peak_mock):
call(1, 0, 2.5, 'output_ws_name_Peak_1_Parameters', ['a', 'b', 'c']),
call(1, 1, 7.5, 'output_ws_name_Peak_1_Parameters', ['a', 'b', 'c']),
call(1, 2, 10.5, 'output_ws_name_Peak_1_Parameters', ['a', 'b', 'c'])])
self.assertEqual([['fit_ws_0_0', 'fit_ws_0_1', 'fit_ws_0_2'], ['fit_ws_1_0', 'fit_ws_1_1', 'fit_ws_1_2']],
self.assertEqual([['fit_ws_0_0', 'fit_ws_0_1', 'fit_ws_0_2'], ['fit_ws_1_0', 'fit_ws_1_1', 'fit_ws_1_2'], 'fit_ws_0', 'fit_ws_1'],
alg._peak_fit_workspaces)
group_workspaces_mock.assert_called_once_with(['output_ws_name_Peak_0_Parameters', 'output_ws_name_Peak_1_Parameters',
'output_ws_name_Shared_Peak_Parameters'], OutputWorkspace='output_ws_name_Peak_Parameters')
fit_shared_peak_mock.assert_called_once_with(3, np.mean(alg._estimate_peak_positions()), 'output_ws_name_Shared_Peak_Parameters', ['a', 'b', 'c'])
'output_ws_name_Shared_Peak_0_Parameters','output_ws_name_Shared_Peak_1_Parameters'],
OutputWorkspace='output_ws_name_Peak_Parameters')
alg._fit_shared_peak.assert_has_calls([call(0, 3, 10, 'output_ws_name_Shared_Peak_0_Parameters', ['a', 'b', 'c']),
call(1, 3, 41/6, 'output_ws_name_Shared_Peak_1_Parameters', ['a', 'b', 'c'])])

@staticmethod
def _setup_alg_mocks_fit_peak():
Expand Down

0 comments on commit 66d0f1b

Please sign in to comment.