From 0653111a3ae5ace11a4f5f8f0b50f3f8e98e54d3 Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Wed, 11 Sep 2024 10:18:48 -0700 Subject: [PATCH 01/19] Add emittance measurement first draft --- lcls_tools/common/devices/magnet.py | 4 +- .../measurements/emittance_measurement.py | 66 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 lcls_tools/common/measurements/emittance_measurement.py diff --git a/lcls_tools/common/devices/magnet.py b/lcls_tools/common/devices/magnet.py index 214d3fbe..c6b22755 100644 --- a/lcls_tools/common/devices/magnet.py +++ b/lcls_tools/common/devices/magnet.py @@ -328,6 +328,8 @@ def scan( self, scan_settings: List[Dict[str, float]], function: Optional[callable] = None, + *args, + **kwargs ) -> None: """ Scans magnets given a list of magnet settings (Dict[magnet_name, bdes_value]) @@ -335,7 +337,7 @@ def scan( """ for setting in scan_settings: self.set_bdes(setting) - function() if function else None + function(*args, **kwargs) if function else None def turn_off( self, diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py new file mode 100644 index 00000000..077a25c6 --- /dev/null +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -0,0 +1,66 @@ +from typing import Callable + +from lcls_tools.common.devices.reader import create_magnet, create_screen, create_wire +from lcls_tools.common.devices.magnet import MagnetCollection +from lcls_tools.common.devices.screen import Screen +from lcls_tools.common.devices.wire import Wire +from lcls_tools.common.measurements.measurement import Measurement +from meme.model import Model + +class QuadScanEmittance(Measurement): + model: Model + magnet_collection: MagnetCollection + magnet_settings: dict + rmats: list + acquire_data: Callable[..., None] + + screen: Screen + image_file_locations: list + + wire: Wire + + def __init__(self, model_beamline: str, magnet_area: str, magnet_settings: dict, + to_device_area: str, to_device_name: str): + self.model = Model(model_beamline) + self.magnet_collection = create_magnet(area=magnet_area) + self.magnet_settings = magnet_settings + if to_device_name.startswith(('YAG','OTR')): + self.screen = create_screen(area=to_device_area, name=to_device_name) + self.acquire_data = self.acquire_profmon + else: + self.wire = create_wire(area=to_device_area, name=to_device_name) + self.acquire_data = self.acquire_wire + + + def measure(self): + self.magnet_collection.scan(scan_settings=self.settings, function=self.acquire_data) + beam_sizes = self.get_beamsize(image_file = self.image_file_locations) # x_rms, y_rms, x_stdz, y_stdz + + emittance, bmag, sig, is_valid = self.compute_emit_bmag(k = [-6,-3,0], + beamsize_squared = [beam_sizes['x_rms'],beam_sizes['y_rms']], + q_len = 0.221, + rmat = self.rmats, + beta0 = 0.0001, + alpha0 = 0.0002, + get_bmag = True) + + results = { + "emittance": emittance, + "BMAG": bmag + } + + return results + + def acquire_profmon(self, magnet_name, to_device_name, num_to_capture=10): + latest_image_filepath = self.screen.save_images(num_to_capture=num_to_capture) + self.rmats.append(self.model.get_rmat(from_device=magnet_name, to_device=to_device_name)) + self.image_file_locations.append(latest_image_filepath) + + def acquire_wire(self, magnet_name, to_device_name): + pass + + def get_beamsize(self, image_file): + pass + + def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, beta0, alpha0, get_bmag): + pass \ No newline at end of file From 39d7ec31f5bc1f1cf5b86a6006272775b2da6605 Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Thu, 26 Sep 2024 14:33:55 -0700 Subject: [PATCH 02/19] - Get beamsize directly from device - Initialize attributes as properties --- lcls_tools/common/devices/magnet.py | 4 +- .../measurements/emittance_measurement.py | 99 +++++++++++-------- 2 files changed, 57 insertions(+), 46 deletions(-) diff --git a/lcls_tools/common/devices/magnet.py b/lcls_tools/common/devices/magnet.py index c6b22755..214d3fbe 100644 --- a/lcls_tools/common/devices/magnet.py +++ b/lcls_tools/common/devices/magnet.py @@ -328,8 +328,6 @@ def scan( self, scan_settings: List[Dict[str, float]], function: Optional[callable] = None, - *args, - **kwargs ) -> None: """ Scans magnets given a list of magnet settings (Dict[magnet_name, bdes_value]) @@ -337,7 +335,7 @@ def scan( """ for setting in scan_settings: self.set_bdes(setting) - function(*args, **kwargs) if function else None + function() if function else None def turn_off( self, diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 077a25c6..9c15a16c 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -1,48 +1,65 @@ -from typing import Callable - from lcls_tools.common.devices.reader import create_magnet, create_screen, create_wire from lcls_tools.common.devices.magnet import MagnetCollection -from lcls_tools.common.devices.screen import Screen -from lcls_tools.common.devices.wire import Wire +from lcls_tools.common.devices.device import Device from lcls_tools.common.measurements.measurement import Measurement + from meme.model import Model class QuadScanEmittance(Measurement): - model: Model - magnet_collection: MagnetCollection - magnet_settings: dict + model_beamline: str + magnet_area: str + magnet_name: str + scan_values: list[float] rmats: list - acquire_data: Callable[..., None] + _model: Model = None + _magnet_collection: MagnetCollection = None + _magnet_settings: list[dict] = None - screen: Screen - image_file_locations: list + to_device_area: str + to_device_name: str + beam_sizes: list + _to_device: Device = None - wire: Wire - - def __init__(self, model_beamline: str, magnet_area: str, magnet_settings: dict, - to_device_area: str, to_device_name: str): - self.model = Model(model_beamline) - self.magnet_collection = create_magnet(area=magnet_area) - self.magnet_settings = magnet_settings - if to_device_name.startswith(('YAG','OTR')): - self.screen = create_screen(area=to_device_area, name=to_device_name) - self.acquire_data = self.acquire_profmon - else: - self.wire = create_wire(area=to_device_area, name=to_device_name) - self.acquire_data = self.acquire_wire + def __init__(self): + super().__init__() + @property + def model(self) -> Model: + if self._model is None: + self._model = Model(self.model_beamline) + return self._model + + @property + def magnet_collection(self) -> MagnetCollection: + if self._magnet_collection is None: + self._magnet_collection = create_magnet(area=self.magnet_area) + return self._magnet_collection + + @property + def magnet_settings(self) -> list[dict]: + if self._magnet_settings is None: + self._magnet_settings = [{self.magnet_name:value} for value in self.scan_values] + return self._magnet_settings + + @property + def to_device(self) -> Device: + if self._to_device is None: + if self.to_device_name.startswith('OTR','YAG'): + self.to_device = create_screen(area=self.to_device_area, name=self.to_device_name) + else: + self.to_device = create_wire(area=self.to_device_area, name=self.to_device_name) def measure(self): - self.magnet_collection.scan(scan_settings=self.settings, function=self.acquire_data) - beam_sizes = self.get_beamsize(image_file = self.image_file_locations) # x_rms, y_rms, x_stdz, y_stdz + self.magnet_collection.scan(scan_settings=self.magnet_settings, function=self.measure_beamsize) - emittance, bmag, sig, is_valid = self.compute_emit_bmag(k = [-6,-3,0], - beamsize_squared = [beam_sizes['x_rms'],beam_sizes['y_rms']], - q_len = 0.221, - rmat = self.rmats, - beta0 = 0.0001, - alpha0 = 0.0002, - get_bmag = True) + emittance, bmag, sig, is_valid = compute_emit_bmag( + k = self.scan_values, + beamsize_squared = self.beam_sizes, # [beam_sizes['x_rms'],beam_sizes['y_rms']], + q_len = 0.221, # self.to_device magnet length? + rmat = self.rmats, + beta0 = 0.0001, # ? + alpha0 = 0.0002, # ? + get_bmag = True) results = { "emittance": emittance, @@ -51,16 +68,12 @@ def measure(self): return results - def acquire_profmon(self, magnet_name, to_device_name, num_to_capture=10): - latest_image_filepath = self.screen.save_images(num_to_capture=num_to_capture) - self.rmats.append(self.model.get_rmat(from_device=magnet_name, to_device=to_device_name)) - self.image_file_locations.append(latest_image_filepath) + def measure_beamsize(self): + self.beam_sizes.append(self.to_device.get_beamsize()) + self.rmats.append(self.model.get_rmat(from_device=self.magnet_collection.name, to_device=self.to_device.name)) - def acquire_wire(self, magnet_name, to_device_name): - pass - - def get_beamsize(self, image_file): - pass +class MultiDeviceEmittance(Measurement): + pass - def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, beta0, alpha0, get_bmag): - pass \ No newline at end of file +def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, beta0, alpha0, get_bmag): + pass \ No newline at end of file From f10801abbb5b38d08ab041e2e9233f80c59e8cf4 Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Thu, 10 Oct 2024 20:32:54 -0700 Subject: [PATCH 03/19] - Make fields like model and magnet_collection required - Replace device attribute with device_measurement - Replace get_beamsize() method with device_measurement.measure() method --- .../measurements/emittance_measurement.py | 74 ++++++++----------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 9c15a16c..1b304901 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -1,3 +1,6 @@ +import numpy as np + +from pydantic import Field from lcls_tools.common.devices.reader import create_magnet, create_screen, create_wire from lcls_tools.common.devices.magnet import MagnetCollection from lcls_tools.common.devices.device import Device @@ -6,60 +9,42 @@ from meme.model import Model class QuadScanEmittance(Measurement): - model_beamline: str - magnet_area: str + model: Model + magnet_collection: MagnetCollection magnet_name: str scan_values: list[float] - rmats: list - _model: Model = None - _magnet_collection: MagnetCollection = None - _magnet_settings: list[dict] = None - - to_device_area: str - to_device_name: str - beam_sizes: list - _to_device: Device = None + _magnet_settings: list[dict] # example: [{"QE04": -6}, {"QE04": -3}, {"QE04": 0}] + device_measurement: Measurement + _rmats: list + _beam_sizes: dict def __init__(self): super().__init__() - @property - def model(self) -> Model: - if self._model is None: - self._model = Model(self.model_beamline) - return self._model - - @property - def magnet_collection(self) -> MagnetCollection: - if self._magnet_collection is None: - self._magnet_collection = create_magnet(area=self.magnet_area) - return self._magnet_collection - @property def magnet_settings(self) -> list[dict]: if self._magnet_settings is None: self._magnet_settings = [{self.magnet_name:value} for value in self.scan_values] return self._magnet_settings - - @property - def to_device(self) -> Device: - if self._to_device is None: - if self.to_device_name.startswith('OTR','YAG'): - self.to_device = create_screen(area=self.to_device_area, name=self.to_device_name) - else: - self.to_device = create_wire(area=self.to_device_area, name=self.to_device_name) def measure(self): + self._rmats.append(self.model.get_rmat(from_device=self.magnet_name, to_device=self.device_measurement.device.name)) self.magnet_collection.scan(scan_settings=self.magnet_settings, function=self.measure_beamsize) + beamsize_squared = np.vstack((self._beam_sizes["x_rms"], self._beam_sizes["y_rms"]))**2 + magnet_length = self.magnet_collection.magnets[self.magnet_name].length + twiss = self.model.get_twiss(self.magnet_name) + twiss_betas_alphas = np.array([[twiss["beta_x"],twiss["alpha_x"]],[twiss["beta_y"],twiss["alpha_y"]]]) + - emittance, bmag, sig, is_valid = compute_emit_bmag( - k = self.scan_values, - beamsize_squared = self.beam_sizes, # [beam_sizes['x_rms'],beam_sizes['y_rms']], - q_len = 0.221, # self.to_device magnet length? - rmat = self.rmats, - beta0 = 0.0001, # ? - alpha0 = 0.0002, # ? - get_bmag = True) + emittance, bmag, _, _ = compute_emit_bmag( + k = self.scan_values, + beamsize_squared = beamsize_squared, + q_len = magnet_length, + rmat = self._rmats, + twiss_design = twiss_betas_alphas, + # thin_lens = , + # maxiter = + ) results = { "emittance": emittance, @@ -69,11 +54,16 @@ def measure(self): return results def measure_beamsize(self): - self.beam_sizes.append(self.to_device.get_beamsize()) - self.rmats.append(self.model.get_rmat(from_device=self.magnet_collection.name, to_device=self.to_device.name)) + results = self.device_measurement.measure() + if "x_rms" not in self._beam_sizes: + self._beam_sizes["x_rms"] = [] + if "y_rms" not in self._beam_sizes: + self._beam_sizes["y_rms"] = [] + self._beam_sizes["x_rms"].append(results["Sx"]) + self._beam_sizes["y_rms"].append(results["Sy"]) class MultiDeviceEmittance(Measurement): pass -def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, beta0, alpha0, get_bmag): +def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, twiss_design, thin_lens, maxiter): pass \ No newline at end of file From bb18b885702bb026fa78b0e6b8519c53421dbad4 Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Tue, 29 Oct 2024 11:36:23 -0700 Subject: [PATCH 04/19] Add helper function get_optics to eliminate meme dependency in emittance measurement class --- lcls_tools/common/data/model_general_calcs.py | 9 +++ .../measurements/emittance_measurement.py | 59 ++++++++++--------- 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/lcls_tools/common/data/model_general_calcs.py b/lcls_tools/common/data/model_general_calcs.py index be662e5a..694a2f12 100644 --- a/lcls_tools/common/data/model_general_calcs.py +++ b/lcls_tools/common/data/model_general_calcs.py @@ -1,3 +1,5 @@ +from meme.model import Model + def bmag(twiss, twiss_reference): """Calculates BMAG from imput twiss and reference twiss""" beta_a, alpha_a, beta_b, alpha_b = twiss @@ -49,3 +51,10 @@ def bdes_to_kmod(e_tot=None, effective_length=None, bdes=None, bp = ele["E_TOT"] / 1e9 / 299.792458 * 1e4 # kG m effective_length = ele["L"] return bdes / effective_length / bp # kG / m / kG m = 1/m^2 + +def get_optics(magnet: str, measurement_device: str, beamline: str): + """Get rmats and twiss for a given beamline, magnet and measurement device""" + model = Model(beamline) + rmats = model.get_rmat(from_device=magnet, to_device=measurement_device) + twiss = model.get_twiss(magnet) + return rmats, twiss diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 1b304901..883d6d0b 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -1,49 +1,49 @@ import numpy as np -from pydantic import Field -from lcls_tools.common.devices.reader import create_magnet, create_screen, create_wire from lcls_tools.common.devices.magnet import MagnetCollection -from lcls_tools.common.devices.device import Device from lcls_tools.common.measurements.measurement import Measurement +from lcls_tools.common.data.model_general_calcs import bdes_to_kmod, get_optics + +from typing import Optional -from meme.model import Model class QuadScanEmittance(Measurement): - model: Model + beamline: str + energy: float magnet_collection: MagnetCollection magnet_name: str scan_values: list[float] - _magnet_settings: list[dict] # example: [{"QE04": -6}, {"QE04": -3}, {"QE04": 0}] device_measurement: Measurement - _rmats: list - _beam_sizes: dict + rmats: Optional[np.ndarray] + twiss: Optional[np.ndarray] + beam_sizes: Optional[dict] def __init__(self): super().__init__() @property def magnet_settings(self) -> list[dict]: - if self._magnet_settings is None: - self._magnet_settings = [{self.magnet_name:value} for value in self.scan_values] - return self._magnet_settings + return [{self.magnet_name: value} for value in self.scan_values] def measure(self): - self._rmats.append(self.model.get_rmat(from_device=self.magnet_name, to_device=self.device_measurement.device.name)) + """Returns the emittance and BMAG + Get the rmats and twiss parameters + Perform the scan, measuring beam sizes at each scan value + Compute the emittance and BMAG using the geometric focusing strengths, + beam sizes squared, magnet length, rmats, and twiss betas and alphas""" + self.rmats, self.twiss = get_optics(self.magnet_name, self.device_measurement.device.name, self.beamline) self.magnet_collection.scan(scan_settings=self.magnet_settings, function=self.measure_beamsize) - beamsize_squared = np.vstack((self._beam_sizes["x_rms"], self._beam_sizes["y_rms"]))**2 + beamsize_squared = np.vstack((self.beam_sizes["x_rms"], self.beam_sizes["y_rms"]))**2 magnet_length = self.magnet_collection.magnets[self.magnet_name].length - twiss = self.model.get_twiss(self.magnet_name) - twiss_betas_alphas = np.array([[twiss["beta_x"],twiss["alpha_x"]],[twiss["beta_y"],twiss["alpha_y"]]]) - - + twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], + [self.twiss["beta_y"], self.twiss["alpha_y"]]]) + kmod = bdes_to_kmod(self.energy, magnet_length, self.scan_values) emittance, bmag, _, _ = compute_emit_bmag( - k = self.scan_values, + k = kmod, beamsize_squared = beamsize_squared, q_len = magnet_length, - rmat = self._rmats, - twiss_design = twiss_betas_alphas, - # thin_lens = , - # maxiter = + rmat = self.rmats, + twiss_design = twiss_betas_alphas ) results = { @@ -54,16 +54,17 @@ def measure(self): return results def measure_beamsize(self): + """Take measurement from measurement device, store beam sizes in self.beam_sizes""" results = self.device_measurement.measure() - if "x_rms" not in self._beam_sizes: - self._beam_sizes["x_rms"] = [] - if "y_rms" not in self._beam_sizes: - self._beam_sizes["y_rms"] = [] - self._beam_sizes["x_rms"].append(results["Sx"]) - self._beam_sizes["y_rms"].append(results["Sy"]) + if "x_rms" not in self.beam_sizes: + self.beam_sizes["x_rms"] = [] + if "y_rms" not in self.beam_sizes: + self.beam_sizes["y_rms"] = [] + self.beam_sizes["x_rms"].append(results["Sx"]) + self.beam_sizes["y_rms"].append(results["Sy"]) class MultiDeviceEmittance(Measurement): pass def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, twiss_design, thin_lens, maxiter): - pass \ No newline at end of file + pass From 9f3cf2aaa59c9dfe14eb155a5bb0aa9b1cf3c2a1 Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Tue, 29 Oct 2024 11:46:00 -0700 Subject: [PATCH 05/19] Fix formatting and style issues --- lcls_tools/common/data/model_general_calcs.py | 2 ++ .../measurements/emittance_measurement.py | 20 ++++++++++--------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/lcls_tools/common/data/model_general_calcs.py b/lcls_tools/common/data/model_general_calcs.py index 694a2f12..0d86bfa5 100644 --- a/lcls_tools/common/data/model_general_calcs.py +++ b/lcls_tools/common/data/model_general_calcs.py @@ -1,5 +1,6 @@ from meme.model import Model + def bmag(twiss, twiss_reference): """Calculates BMAG from imput twiss and reference twiss""" beta_a, alpha_a, beta_b, alpha_b = twiss @@ -52,6 +53,7 @@ def bdes_to_kmod(e_tot=None, effective_length=None, bdes=None, effective_length = ele["L"] return bdes / effective_length / bp # kG / m / kG m = 1/m^2 + def get_optics(magnet: str, measurement_device: str, beamline: str): """Get rmats and twiss for a given beamline, magnet and measurement device""" model = Model(beamline) diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 883d6d0b..ac6cf4b3 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -29,21 +29,21 @@ def measure(self): """Returns the emittance and BMAG Get the rmats and twiss parameters Perform the scan, measuring beam sizes at each scan value - Compute the emittance and BMAG using the geometric focusing strengths, + Compute the emittance and BMAG using the geometric focusing strengths, beam sizes squared, magnet length, rmats, and twiss betas and alphas""" self.rmats, self.twiss = get_optics(self.magnet_name, self.device_measurement.device.name, self.beamline) self.magnet_collection.scan(scan_settings=self.magnet_settings, function=self.measure_beamsize) beamsize_squared = np.vstack((self.beam_sizes["x_rms"], self.beam_sizes["y_rms"]))**2 magnet_length = self.magnet_collection.magnets[self.magnet_name].length - twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], + twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], [self.twiss["beta_y"], self.twiss["alpha_y"]]]) kmod = bdes_to_kmod(self.energy, magnet_length, self.scan_values) emittance, bmag, _, _ = compute_emit_bmag( - k = kmod, - beamsize_squared = beamsize_squared, - q_len = magnet_length, - rmat = self.rmats, - twiss_design = twiss_betas_alphas + k=kmod, + beamsize_squared=beamsize_squared, + q_len=magnet_length, + rmat=self.rmats, + twiss_design=twiss_betas_alphas ) results = { @@ -52,7 +52,7 @@ def measure(self): } return results - + def measure_beamsize(self): """Take measurement from measurement device, store beam sizes in self.beam_sizes""" results = self.device_measurement.measure() @@ -62,9 +62,11 @@ def measure_beamsize(self): self.beam_sizes["y_rms"] = [] self.beam_sizes["x_rms"].append(results["Sx"]) self.beam_sizes["y_rms"].append(results["Sy"]) - + + class MultiDeviceEmittance(Measurement): pass + def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, twiss_design, thin_lens, maxiter): pass From 97a81c0570fc29faa4974a7cfa05835f1ad555cc Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Tue, 29 Oct 2024 12:49:10 -0700 Subject: [PATCH 06/19] Add docstring to QuadScanEmittance class --- .../measurements/emittance_measurement.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index ac6cf4b3..8a15bcc8 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -8,6 +8,21 @@ class QuadScanEmittance(Measurement): + """Use a quad and profile monitor/wire scanner to perform an emittance measurement + ------------------------ + Arguments: + beamline: beamline where the devices are located + energy: beam energy + magnet_collection: MagnetCollection object of magnets for an area of the beamline (use create_magnet()) + magnet_name: name of magnet + scan_values: BDES values of magnet to scan over + device_measurement: Measurement object of profile monitor/wire scanner + ------------------------ + Methods: + measure: does the quad scan, getting the beam sizes at each scan value, + gets the rmats and twiss parameters, then computes and returns the emittance and BMAG + measure_beamsize: take measurement from measurement device, store beam sizes + """ beamline: str energy: float magnet_collection: MagnetCollection @@ -31,8 +46,8 @@ def measure(self): Perform the scan, measuring beam sizes at each scan value Compute the emittance and BMAG using the geometric focusing strengths, beam sizes squared, magnet length, rmats, and twiss betas and alphas""" - self.rmats, self.twiss = get_optics(self.magnet_name, self.device_measurement.device.name, self.beamline) self.magnet_collection.scan(scan_settings=self.magnet_settings, function=self.measure_beamsize) + self.rmats, self.twiss = get_optics(self.magnet_name, self.device_measurement.device.name, self.beamline) beamsize_squared = np.vstack((self.beam_sizes["x_rms"], self.beam_sizes["y_rms"]))**2 magnet_length = self.magnet_collection.magnets[self.magnet_name].length twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], From c76364ef8abccaeba1f11b29219d3f0af54a48e9 Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Wed, 6 Nov 2024 13:24:46 -0800 Subject: [PATCH 07/19] Add test for and fix bugs in emittance_measurement.py --- .../measurements/emittance_measurement.py | 36 ++++---- .../test_emittance_measurement.py | 90 +++++++++++++++++++ 2 files changed, 110 insertions(+), 16 deletions(-) create mode 100644 tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 8a15bcc8..b06dd9cc 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -1,4 +1,5 @@ import numpy as np +from pydantic import ConfigDict from lcls_tools.common.devices.magnet import MagnetCollection from lcls_tools.common.measurements.measurement import Measurement @@ -20,21 +21,23 @@ class QuadScanEmittance(Measurement): ------------------------ Methods: measure: does the quad scan, getting the beam sizes at each scan value, - gets the rmats and twiss parameters, then computes and returns the emittance and BMAG + gets the rmat and twiss parameters, then computes and returns the emittance and BMAG measure_beamsize: take measurement from measurement device, store beam sizes """ + name: str = "emittance_profile" beamline: str energy: float magnet_collection: MagnetCollection magnet_name: str + #TODO: remove magnet_length once lengths added to yaml files + magnet_length: float scan_values: list[float] device_measurement: Measurement - rmats: Optional[np.ndarray] - twiss: Optional[np.ndarray] - beam_sizes: Optional[dict] + rmat: Optional[np.ndarray] = None + twiss: Optional[np.ndarray] = None + beam_sizes: Optional[dict] = {} - def __init__(self): - super().__init__() + model_config = ConfigDict(arbitrary_types_allowed=True) @property def magnet_settings(self) -> list[dict]: @@ -42,22 +45,23 @@ def magnet_settings(self) -> list[dict]: def measure(self): """Returns the emittance and BMAG - Get the rmats and twiss parameters + Get the rmat and twiss parameters Perform the scan, measuring beam sizes at each scan value Compute the emittance and BMAG using the geometric focusing strengths, - beam sizes squared, magnet length, rmats, and twiss betas and alphas""" + beam sizes squared, magnet length, rmat, and twiss betas and alphas""" self.magnet_collection.scan(scan_settings=self.magnet_settings, function=self.measure_beamsize) - self.rmats, self.twiss = get_optics(self.magnet_name, self.device_measurement.device.name, self.beamline) + self.rmat, self.twiss = get_optics(self.magnet_name, self.device_measurement.device.name, self.beamline) beamsize_squared = np.vstack((self.beam_sizes["x_rms"], self.beam_sizes["y_rms"]))**2 - magnet_length = self.magnet_collection.magnets[self.magnet_name].length + #TODO: uncomment once lengths added to yaml files + # magnet_length = self.magnet_collection.magnets[self.magnet_name].length twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], [self.twiss["beta_y"], self.twiss["alpha_y"]]]) - kmod = bdes_to_kmod(self.energy, magnet_length, self.scan_values) + kmod = bdes_to_kmod(self.energy, self.magnet_length, np.array(self.scan_values)) emittance, bmag, _, _ = compute_emit_bmag( k=kmod, beamsize_squared=beamsize_squared, - q_len=magnet_length, - rmat=self.rmats, + q_len=self.magnet_length, + rmat=self.rmat, twiss_design=twiss_betas_alphas ) @@ -75,13 +79,13 @@ def measure_beamsize(self): self.beam_sizes["x_rms"] = [] if "y_rms" not in self.beam_sizes: self.beam_sizes["y_rms"] = [] - self.beam_sizes["x_rms"].append(results["Sx"]) - self.beam_sizes["y_rms"].append(results["Sy"]) + self.beam_sizes["x_rms"].append(np.mean(results["Sx"])) + self.beam_sizes["y_rms"].append(np.mean(results["Sy"])) class MultiDeviceEmittance(Measurement): pass - +#TODO: delete and import actual compute_emit_bmag def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, twiss_design, thin_lens, maxiter): pass diff --git a/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py new file mode 100644 index 00000000..88fafdbb --- /dev/null +++ b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py @@ -0,0 +1,90 @@ +from unittest import TestCase +from unittest.mock import patch, Mock +import numpy as np + +from lcls_tools.common.devices.reader import create_magnet +from lcls_tools.common.measurements.emittance_measurement import QuadScanEmittance +from lcls_tools.common.measurements.screen_profile import ScreenBeamProfileMeasurement + +class EmittanceMeasurementTest(TestCase): + def setUp(self) -> None: + self.options = [ + "TRIM", + "PERTURB", + "BCON_TO_BDES", + "SAVE_BDES", + "LOAD_BDES", + "UNDO_BDES", + "DAC_ZERO", + "CALIB", + "STDZ", + "RESET", + "TURN_OFF", + "TURN_ON", + "DEGAUSS", + ] + self.ctrl_options_patch = patch("epics.PV.get_ctrlvars", new_callable=Mock) + self.mock_ctrl_options = self.ctrl_options_patch.start() + self.mock_ctrl_options.return_value = {"enum_strs": tuple(self.options)} + self.magnet_collection = create_magnet(area="GUNB") + return super().setUp() + + @patch("lcls_tools.common.measurements.emittance_measurement.get_optics") + @patch("lcls_tools.common.measurements.emittance_measurement.compute_emit_bmag") + @patch( + "lcls_tools.common.devices.magnet.Magnet.is_bact_settled", + new_callable=Mock, + ) + @patch("epics.PV.put", new_callable=Mock) + @patch("lcls_tools.common.devices.magnet.Magnet.trim", new_callable=Mock) + def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emit_bmag, mock_get_optics): + """Test quad scan emittance measurement""" + rmat_mock = np.array([ + [2.52667046e+00, 4.21817732e+00, 0.00000000e+00, 0.00000000e+00, 1.68300181e-24, -1.03180226e-24], + [3.67866188e-01, 1.00991619e+00, 0.00000000e+00, 0.00000000e+00, 3.83090380e-25, -6.94484939e-25], + [0.00000000e+00, 0.00000000e+00, -5.16660571e-01, 4.13618612e+00, 0.00000000e+00, 0.00000000e+00], + [0.00000000e+00, 0.00000000e+00, -3.65446389e-01, 9.90116596e-01, 0.00000000e+00, 0.00000000e+00], + [3.50100414e-25, -2.58633175e-25, 0.00000000e+00, 0.00000000e+00, 1.00000000e+00, 5.98659826e-05], + [-1.07994072e-24, -1.25510334e-25, 0.00000000e+00, 0.00000000e+00, -4.92137957e-08, 1.00000000e+00] + ]) + twiss_dtype = np.dtype([('s', 'float32'), ('z', 'float32'),('length', 'float32'), + ('p0c', 'float32'), ('alpha_x', 'float32'), ('beta_x', 'float32'), + ('eta_x', 'float32'), ('etap_x', 'float32'), ('psi_x', 'float32'), + ('alpha_y', 'float32'), ('beta_y', 'float32'), ('eta_y', 'float32'), + ('etap_y', 'float32'), ('psi_y', 'float32')]) + twiss_mock = np.array([(11.977644, 2027.7198, 0.054, 1.3499904e+08, 3.9888208, 5.5319443, + -6.172034e-18, 1.2438233e-17, 5.954487, 0.01185468, 5.314966, 0., 0., 3.5827537)], + dtype=twiss_dtype) + mock_get_optics.return_value = (rmat_mock, twiss_mock) + mock_compute_emit_bmag.return_value = (1.5e-9, 1.5, None, None) + mock_bact_settle.return_value = True + beamline = "SC_DIAG0" + energy = 3.0e9 + magnet_name = "CQ01B" + magnet_length = 1.0 + scan_values = [-6.0, -3.0, 0.0] + profmon_measurement = Mock(spec=ScreenBeamProfileMeasurement) + profmon_measurement.device = Mock() + profmon_measurement.device.name = "YAG01B" + profmon_measurement.measure.return_value = result = { + "Cx": [0.0], + "Cy": [0.0], + "Sx": [50.0], + "Sy": [50.0], + "bb_penalty": [0.0], + "total_intensity": [1.0e6], + } + quad_scan = QuadScanEmittance(beamline=beamline, energy=energy, magnet_collection=self.magnet_collection, + magnet_name=magnet_name, magnet_length=magnet_length, scan_values=scan_values, + device_measurement=profmon_measurement) + result_dict = quad_scan.measure() + assert result_dict["emittance"] == mock_compute_emit_bmag.return_value[0] + assert result_dict["BMAG"] == mock_compute_emit_bmag.return_value[1] + mock_compute_emit_bmag.assert_called_once() + mock_get_optics.assert_called_once() + number_scan_values = len(scan_values) + self.assertEqual(mock_put.call_count, number_scan_values) + self.assertEqual(mock_trim.call_count, number_scan_values) + self.assertEqual(mock_bact_settle.call_count, number_scan_values) + assert profmon_measurement.measure.call_count == number_scan_values + From 862f2282aba24347297ac78a88984ae83d857e96 Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Wed, 6 Nov 2024 13:30:55 -0800 Subject: [PATCH 08/19] Fix formatting and style issues --- .../measurements/emittance_measurement.py | 7 ++- .../test_emittance_measurement.py | 58 +++++++++---------- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index b06dd9cc..d60e2b60 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -29,7 +29,7 @@ class QuadScanEmittance(Measurement): energy: float magnet_collection: MagnetCollection magnet_name: str - #TODO: remove magnet_length once lengths added to yaml files + # TODO: remove magnet_length once lengths added to yaml files magnet_length: float scan_values: list[float] device_measurement: Measurement @@ -52,7 +52,7 @@ def measure(self): self.magnet_collection.scan(scan_settings=self.magnet_settings, function=self.measure_beamsize) self.rmat, self.twiss = get_optics(self.magnet_name, self.device_measurement.device.name, self.beamline) beamsize_squared = np.vstack((self.beam_sizes["x_rms"], self.beam_sizes["y_rms"]))**2 - #TODO: uncomment once lengths added to yaml files + # TODO: uncomment once lengths added to yaml files # magnet_length = self.magnet_collection.magnets[self.magnet_name].length twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], [self.twiss["beta_y"], self.twiss["alpha_y"]]]) @@ -86,6 +86,7 @@ def measure_beamsize(self): class MultiDeviceEmittance(Measurement): pass -#TODO: delete and import actual compute_emit_bmag + +# TODO: delete and import actual compute_emit_bmag def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, twiss_design, thin_lens, maxiter): pass diff --git a/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py index 88fafdbb..b29ee762 100644 --- a/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py +++ b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py @@ -6,28 +6,29 @@ from lcls_tools.common.measurements.emittance_measurement import QuadScanEmittance from lcls_tools.common.measurements.screen_profile import ScreenBeamProfileMeasurement + class EmittanceMeasurementTest(TestCase): def setUp(self) -> None: - self.options = [ - "TRIM", - "PERTURB", - "BCON_TO_BDES", - "SAVE_BDES", - "LOAD_BDES", - "UNDO_BDES", - "DAC_ZERO", - "CALIB", - "STDZ", - "RESET", - "TURN_OFF", - "TURN_ON", - "DEGAUSS", - ] - self.ctrl_options_patch = patch("epics.PV.get_ctrlvars", new_callable=Mock) - self.mock_ctrl_options = self.ctrl_options_patch.start() - self.mock_ctrl_options.return_value = {"enum_strs": tuple(self.options)} - self.magnet_collection = create_magnet(area="GUNB") - return super().setUp() + self.options = [ + "TRIM", + "PERTURB", + "BCON_TO_BDES", + "SAVE_BDES", + "LOAD_BDES", + "UNDO_BDES", + "DAC_ZERO", + "CALIB", + "STDZ", + "RESET", + "TURN_OFF", + "TURN_ON", + "DEGAUSS", + ] + self.ctrl_options_patch = patch("epics.PV.get_ctrlvars", new_callable=Mock) + self.mock_ctrl_options = self.ctrl_options_patch.start() + self.mock_ctrl_options.return_value = {"enum_strs": tuple(self.options)} + self.magnet_collection = create_magnet(area="GUNB") + return super().setUp() @patch("lcls_tools.common.measurements.emittance_measurement.get_optics") @patch("lcls_tools.common.measurements.emittance_measurement.compute_emit_bmag") @@ -47,14 +48,14 @@ def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emi [3.50100414e-25, -2.58633175e-25, 0.00000000e+00, 0.00000000e+00, 1.00000000e+00, 5.98659826e-05], [-1.07994072e-24, -1.25510334e-25, 0.00000000e+00, 0.00000000e+00, -4.92137957e-08, 1.00000000e+00] ]) - twiss_dtype = np.dtype([('s', 'float32'), ('z', 'float32'),('length', 'float32'), - ('p0c', 'float32'), ('alpha_x', 'float32'), ('beta_x', 'float32'), - ('eta_x', 'float32'), ('etap_x', 'float32'), ('psi_x', 'float32'), - ('alpha_y', 'float32'), ('beta_y', 'float32'), ('eta_y', 'float32'), - ('etap_y', 'float32'), ('psi_y', 'float32')]) + twiss_dtype = np.dtype([('s', 'float32'), ('z', 'float32'), ('length', 'float32'), + ('p0c', 'float32'), ('alpha_x', 'float32'), ('beta_x', 'float32'), + ('eta_x', 'float32'), ('etap_x', 'float32'), ('psi_x', 'float32'), + ('alpha_y', 'float32'), ('beta_y', 'float32'), ('eta_y', 'float32'), + ('etap_y', 'float32'), ('psi_y', 'float32')]) twiss_mock = np.array([(11.977644, 2027.7198, 0.054, 1.3499904e+08, 3.9888208, 5.5319443, - -6.172034e-18, 1.2438233e-17, 5.954487, 0.01185468, 5.314966, 0., 0., 3.5827537)], - dtype=twiss_dtype) + -6.172034e-18, 1.2438233e-17, 5.954487, 0.01185468, 5.314966, 0., 0., 3.5827537)], + dtype=twiss_dtype) mock_get_optics.return_value = (rmat_mock, twiss_mock) mock_compute_emit_bmag.return_value = (1.5e-9, 1.5, None, None) mock_bact_settle.return_value = True @@ -66,7 +67,7 @@ def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emi profmon_measurement = Mock(spec=ScreenBeamProfileMeasurement) profmon_measurement.device = Mock() profmon_measurement.device.name = "YAG01B" - profmon_measurement.measure.return_value = result = { + profmon_measurement.measure.return_value = { "Cx": [0.0], "Cy": [0.0], "Sx": [50.0], @@ -87,4 +88,3 @@ def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emi self.assertEqual(mock_trim.call_count, number_scan_values) self.assertEqual(mock_bact_settle.call_count, number_scan_values) assert profmon_measurement.measure.call_count == number_scan_values - From d3213d263d5a8dd2d54dbbfcadf25df69c2a61d5 Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Wed, 6 Nov 2024 13:36:47 -0800 Subject: [PATCH 09/19] Fix under-indent --- .../common/measurements/test_emittance_measurement.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py index b29ee762..c90a21aa 100644 --- a/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py +++ b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py @@ -54,8 +54,8 @@ def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emi ('alpha_y', 'float32'), ('beta_y', 'float32'), ('eta_y', 'float32'), ('etap_y', 'float32'), ('psi_y', 'float32')]) twiss_mock = np.array([(11.977644, 2027.7198, 0.054, 1.3499904e+08, 3.9888208, 5.5319443, - -6.172034e-18, 1.2438233e-17, 5.954487, 0.01185468, 5.314966, 0., 0., 3.5827537)], - dtype=twiss_dtype) + -6.172034e-18, 1.2438233e-17, 5.954487, 0.01185468, 5.314966, 0., 0., 3.5827537)], + dtype=twiss_dtype) mock_get_optics.return_value = (rmat_mock, twiss_mock) mock_compute_emit_bmag.return_value = (1.5e-9, 1.5, None, None) mock_bact_settle.return_value = True From 95bece5aec3ebde222f67ad53190f86400cb398d Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Wed, 6 Nov 2024 13:41:33 -0800 Subject: [PATCH 10/19] Fix overindent --- .../common/measurements/test_emittance_measurement.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py index c90a21aa..358c7558 100644 --- a/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py +++ b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py @@ -53,9 +53,11 @@ def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emi ('eta_x', 'float32'), ('etap_x', 'float32'), ('psi_x', 'float32'), ('alpha_y', 'float32'), ('beta_y', 'float32'), ('eta_y', 'float32'), ('etap_y', 'float32'), ('psi_y', 'float32')]) - twiss_mock = np.array([(11.977644, 2027.7198, 0.054, 1.3499904e+08, 3.9888208, 5.5319443, - -6.172034e-18, 1.2438233e-17, 5.954487, 0.01185468, 5.314966, 0., 0., 3.5827537)], - dtype=twiss_dtype) + twiss_mock = np.array( + [(11.977644, 2027.7198, 0.054, 1.3499904e+08, 3.9888208, 5.5319443, + -6.172034e-18, 1.2438233e-17, 5.954487, 0.01185468, 5.314966, 0., 0., 3.5827537)], + dtype=twiss_dtype + ) mock_get_optics.return_value = (rmat_mock, twiss_mock) mock_compute_emit_bmag.return_value = (1.5e-9, 1.5, None, None) mock_bact_settle.return_value = True From 97bd05af85a4af8d3a60fabf2cf3315b055d3a1b Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Thu, 7 Nov 2024 12:00:16 -0800 Subject: [PATCH 11/19] Small fixes and enhancements - Add x_rms and y_rms to output of emittance measure method - Raise NotImplementedError for MultiDeviceEmittance measure method and compute_emit_bmag - Shorten mock values in test_emittance_measurement --- .../measurements/emittance_measurement.py | 17 ++++++++++------ .../test_emittance_measurement.py | 20 ++++++++++--------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index d60e2b60..97218b9c 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -24,7 +24,7 @@ class QuadScanEmittance(Measurement): gets the rmat and twiss parameters, then computes and returns the emittance and BMAG measure_beamsize: take measurement from measurement device, store beam sizes """ - name: str = "emittance_profile" + name: str = "quad_scan_emittance" beamline: str energy: float magnet_collection: MagnetCollection @@ -44,8 +44,8 @@ def magnet_settings(self) -> list[dict]: return [{self.magnet_name: value} for value in self.scan_values] def measure(self): - """Returns the emittance and BMAG - Get the rmat and twiss parameters + """Returns the emittance, BMAG, x_rms and y_rms + Get the rmat, twiss parameters, and measured beam sizes Perform the scan, measuring beam sizes at each scan value Compute the emittance and BMAG using the geometric focusing strengths, beam sizes squared, magnet length, rmat, and twiss betas and alphas""" @@ -67,7 +67,9 @@ def measure(self): results = { "emittance": emittance, - "BMAG": bmag + "BMAG": bmag, + "x_rms": self.beam_sizes["x_rms"], + "y_rms": self.beam_sizes["y_rms"] } return results @@ -84,9 +86,12 @@ def measure_beamsize(self): class MultiDeviceEmittance(Measurement): - pass + name: str = "multi_device_emittance" + + def measure(self): + raise NotImplementedError("Multi-device emittance not yet implemented") # TODO: delete and import actual compute_emit_bmag def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, twiss_design, thin_lens, maxiter): - pass + raise NotImplementedError("compute_emit_bmag to be implemented elsewhere") diff --git a/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py index 358c7558..0e389b48 100644 --- a/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py +++ b/tests/unit_tests/lcls_tools/common/measurements/test_emittance_measurement.py @@ -41,12 +41,12 @@ def setUp(self) -> None: def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emit_bmag, mock_get_optics): """Test quad scan emittance measurement""" rmat_mock = np.array([ - [2.52667046e+00, 4.21817732e+00, 0.00000000e+00, 0.00000000e+00, 1.68300181e-24, -1.03180226e-24], - [3.67866188e-01, 1.00991619e+00, 0.00000000e+00, 0.00000000e+00, 3.83090380e-25, -6.94484939e-25], - [0.00000000e+00, 0.00000000e+00, -5.16660571e-01, 4.13618612e+00, 0.00000000e+00, 0.00000000e+00], - [0.00000000e+00, 0.00000000e+00, -3.65446389e-01, 9.90116596e-01, 0.00000000e+00, 0.00000000e+00], - [3.50100414e-25, -2.58633175e-25, 0.00000000e+00, 0.00000000e+00, 1.00000000e+00, 5.98659826e-05], - [-1.07994072e-24, -1.25510334e-25, 0.00000000e+00, 0.00000000e+00, -4.92137957e-08, 1.00000000e+00] + [2.5, 4.2, 0.0, 0.0, 1.6, -1.0], + [3.6, 1.0, 0.0, 0.0, 3.8, -6.9], + [0.0, 0.0, -5.1, 4.1, 0.0, 0.0], + [0.0, 0.0, -3.6, 9.9, 0.0, 0.0], + [3.5, -2.5, 0.0, 0.0, 1.0, 5.9], + [-1.0, -1.2, 0.0, 0.0, -4.9, 1.0] ]) twiss_dtype = np.dtype([('s', 'float32'), ('z', 'float32'), ('length', 'float32'), ('p0c', 'float32'), ('alpha_x', 'float32'), ('beta_x', 'float32'), @@ -54,8 +54,8 @@ def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emi ('alpha_y', 'float32'), ('beta_y', 'float32'), ('eta_y', 'float32'), ('etap_y', 'float32'), ('psi_y', 'float32')]) twiss_mock = np.array( - [(11.977644, 2027.7198, 0.054, 1.3499904e+08, 3.9888208, 5.5319443, - -6.172034e-18, 1.2438233e-17, 5.954487, 0.01185468, 5.314966, 0., 0., 3.5827537)], + [(11.9, 2027.7, 0.05, 1.3, 3.9, 5.5, + -6.1, 1.2, 5.9, 0.01, 5.3, 0., 0., 3.5)], dtype=twiss_dtype ) mock_get_optics.return_value = (rmat_mock, twiss_mock) @@ -66,6 +66,7 @@ def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emi magnet_name = "CQ01B" magnet_length = 1.0 scan_values = [-6.0, -3.0, 0.0] + number_scan_values = len(scan_values) profmon_measurement = Mock(spec=ScreenBeamProfileMeasurement) profmon_measurement.device = Mock() profmon_measurement.device.name = "YAG01B" @@ -83,9 +84,10 @@ def test_quad_scan(self, mock_trim, mock_put, mock_bact_settle, mock_compute_emi result_dict = quad_scan.measure() assert result_dict["emittance"] == mock_compute_emit_bmag.return_value[0] assert result_dict["BMAG"] == mock_compute_emit_bmag.return_value[1] + assert result_dict["x_rms"] == [profmon_measurement.measure.return_value["Sx"]] * number_scan_values + assert result_dict["y_rms"] == [profmon_measurement.measure.return_value["Sy"]] * number_scan_values mock_compute_emit_bmag.assert_called_once() mock_get_optics.assert_called_once() - number_scan_values = len(scan_values) self.assertEqual(mock_put.call_count, number_scan_values) self.assertEqual(mock_trim.call_count, number_scan_values) self.assertEqual(mock_bact_settle.call_count, number_scan_values) From 2d58ca8228c1e2d1fd49b0d386c0cb8b862ed8ae Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Thu, 7 Nov 2024 17:35:26 -0800 Subject: [PATCH 12/19] Fix parts of the calculation - Get rmats at 'mid' of from_device (the magnet) - Get twiss at measurement_device (the screen) - Use just x rmat and y rmat for compute_emit_bmag --- lcls_tools/common/data/model_general_calcs.py | 4 ++-- lcls_tools/common/measurements/emittance_measurement.py | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/lcls_tools/common/data/model_general_calcs.py b/lcls_tools/common/data/model_general_calcs.py index 0d86bfa5..1c87430d 100644 --- a/lcls_tools/common/data/model_general_calcs.py +++ b/lcls_tools/common/data/model_general_calcs.py @@ -57,6 +57,6 @@ def bdes_to_kmod(e_tot=None, effective_length=None, bdes=None, def get_optics(magnet: str, measurement_device: str, beamline: str): """Get rmats and twiss for a given beamline, magnet and measurement device""" model = Model(beamline) - rmats = model.get_rmat(from_device=magnet, to_device=measurement_device) - twiss = model.get_twiss(magnet) + rmats = model.get_rmat(from_device=magnet, to_device=measurement_device, from_device_pos='mid') + twiss = model.get_twiss(measurement_device) return rmats, twiss diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 97218b9c..459fb4de 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -16,6 +16,7 @@ class QuadScanEmittance(Measurement): energy: beam energy magnet_collection: MagnetCollection object of magnets for an area of the beamline (use create_magnet()) magnet_name: name of magnet + magnet_length: length of magnet scan_values: BDES values of magnet to scan over device_measurement: Measurement object of profile monitor/wire scanner ------------------------ @@ -24,7 +25,6 @@ class QuadScanEmittance(Measurement): gets the rmat and twiss parameters, then computes and returns the emittance and BMAG measure_beamsize: take measurement from measurement device, store beam sizes """ - name: str = "quad_scan_emittance" beamline: str energy: float magnet_collection: MagnetCollection @@ -33,10 +33,12 @@ class QuadScanEmittance(Measurement): magnet_length: float scan_values: list[float] device_measurement: Measurement + rmat: Optional[np.ndarray] = None twiss: Optional[np.ndarray] = None beam_sizes: Optional[dict] = {} + name: str = "quad_scan_emittance" model_config = ConfigDict(arbitrary_types_allowed=True) @property @@ -54,6 +56,7 @@ def measure(self): beamsize_squared = np.vstack((self.beam_sizes["x_rms"], self.beam_sizes["y_rms"]))**2 # TODO: uncomment once lengths added to yaml files # magnet_length = self.magnet_collection.magnets[self.magnet_name].length + rmat = np.array([self.rmat[0:2, 0:2], self.rmat[2:4, 2:4]]) # x rmat and y rmat twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], [self.twiss["beta_y"], self.twiss["alpha_y"]]]) kmod = bdes_to_kmod(self.energy, self.magnet_length, np.array(self.scan_values)) @@ -61,7 +64,7 @@ def measure(self): k=kmod, beamsize_squared=beamsize_squared, q_len=self.magnet_length, - rmat=self.rmat, + rmat=rmat, twiss_design=twiss_betas_alphas ) From d1d6a49a1e79fded7037709b599338315c194580 Mon Sep 17 00:00:00 2001 From: Shamin Chowdhury Date: Thu, 7 Nov 2024 17:37:30 -0800 Subject: [PATCH 13/19] Fix formatting and style issues --- lcls_tools/common/measurements/emittance_measurement.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 459fb4de..b9abd690 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -33,7 +33,7 @@ class QuadScanEmittance(Measurement): magnet_length: float scan_values: list[float] device_measurement: Measurement - + rmat: Optional[np.ndarray] = None twiss: Optional[np.ndarray] = None beam_sizes: Optional[dict] = {} @@ -56,7 +56,7 @@ def measure(self): beamsize_squared = np.vstack((self.beam_sizes["x_rms"], self.beam_sizes["y_rms"]))**2 # TODO: uncomment once lengths added to yaml files # magnet_length = self.magnet_collection.magnets[self.magnet_name].length - rmat = np.array([self.rmat[0:2, 0:2], self.rmat[2:4, 2:4]]) # x rmat and y rmat + rmat = np.array([self.rmat[0:2, 0:2], self.rmat[2:4, 2:4]]) # x rmat and y rmat twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], [self.twiss["beta_y"], self.twiss["alpha_y"]]]) kmod = bdes_to_kmod(self.energy, self.magnet_length, np.array(self.scan_values)) From 179d84297eaed55339cbe39ff921b3450d617f65 Mon Sep 17 00:00:00 2001 From: Ryan Roussel Date: Tue, 12 Nov 2024 08:57:57 -0600 Subject: [PATCH 14/19] add scan method to device class + magnet --- lcls_tools/common/devices/device.py | 13 ++++++++++++- lcls_tools/common/devices/magnet.py | 9 +++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/lcls_tools/common/devices/device.py b/lcls_tools/common/devices/device.py index 3727713e..b1f38af1 100644 --- a/lcls_tools/common/devices/device.py +++ b/lcls_tools/common/devices/device.py @@ -1,5 +1,5 @@ from pydantic import BaseModel, SerializeAsAny, ConfigDict, field_validator -from typing import List, Union, Callable, Dict +from typing import List, Union, Callable, Dict, Optional from epics import PV @@ -216,6 +216,17 @@ def remove_callback_from_pv( if index: pv_obj.remove_callback(index) + def scan( + self, + scan_settings: List[float], + function: Optional[callable] = None, + ) -> None: + """ + Scans device parameters and calls the provided function after each setting + is achieved. + """ + raise NotImplementedError + class DeviceCollection(BaseModel): devices: Dict[str, SerializeAsAny[Device]] = None diff --git a/lcls_tools/common/devices/magnet.py b/lcls_tools/common/devices/magnet.py index 214d3fbe..e47c5b1c 100644 --- a/lcls_tools/common/devices/magnet.py +++ b/lcls_tools/common/devices/magnet.py @@ -253,6 +253,15 @@ def degauss(self): """Degauss magnet""" self.controls_information.PVs.ctrl.put(self.ctrl_options["DEGAUSS"]) + def scan( + self, + scan_settings: List[float], + function: Optional[callable] = None, + ) -> None: + for setting in scan_settings: + self.bctrl = setting + function() if function else None + class MagnetCollection(DeviceCollection): devices: Dict[str, SerializeAsAny[Magnet]] = Field(alias="magnets") From 886e133f280a3dbc6076295bcd9849b82b1a8071 Mon Sep 17 00:00:00 2001 From: Ryan Roussel Date: Tue, 12 Nov 2024 09:10:27 -0600 Subject: [PATCH 15/19] update general calcs and emittance class --- lcls_tools/common/data/model_general_calcs.py | 16 ++++-- .../measurements/emittance_measurement.py | 55 ++++++++++++------- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/lcls_tools/common/data/model_general_calcs.py b/lcls_tools/common/data/model_general_calcs.py index 1c87430d..d3378d2e 100644 --- a/lcls_tools/common/data/model_general_calcs.py +++ b/lcls_tools/common/data/model_general_calcs.py @@ -1,5 +1,8 @@ from meme.model import Model +from lcls_tools.common.devices.magnet import Magnet +from lcls_tools.common.measurements.measurement import Measurement + def bmag(twiss, twiss_reference): """Calculates BMAG from imput twiss and reference twiss""" @@ -54,9 +57,14 @@ def bdes_to_kmod(e_tot=None, effective_length=None, bdes=None, return bdes / effective_length / bp # kG / m / kG m = 1/m^2 -def get_optics(magnet: str, measurement_device: str, beamline: str): +def get_optics(magnet: Magnet, measurement: Measurement): """Get rmats and twiss for a given beamline, magnet and measurement device""" - model = Model(beamline) - rmats = model.get_rmat(from_device=magnet, to_device=measurement_device, from_device_pos='mid') - twiss = model.get_twiss(measurement_device) + # TODO: get optics from arbitrary devices (potentially in different beam lines) + model = Model(magnet.metadata.area) + rmats = model.get_rmat( + from_device=magnet.name, + to_device=measurement.device.name, + from_device_pos='mid' + ) + twiss = model.get_twiss(measurement.device.name) return rmats, twiss diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index b9abd690..3742f995 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -1,7 +1,7 @@ import numpy as np from pydantic import ConfigDict -from lcls_tools.common.devices.magnet import MagnetCollection +from lcls_tools.common.devices.magnet import Magnet from lcls_tools.common.measurements.measurement import Measurement from lcls_tools.common.data.model_general_calcs import bdes_to_kmod, get_optics @@ -27,12 +27,9 @@ class QuadScanEmittance(Measurement): """ beamline: str energy: float - magnet_collection: MagnetCollection - magnet_name: str - # TODO: remove magnet_length once lengths added to yaml files - magnet_length: float scan_values: list[float] - device_measurement: Measurement + magnet: Magnet + measurement_device: Measurement rmat: Optional[np.ndarray] = None twiss: Optional[np.ndarray] = None @@ -41,31 +38,48 @@ class QuadScanEmittance(Measurement): name: str = "quad_scan_emittance" model_config = ConfigDict(arbitrary_types_allowed=True) - @property - def magnet_settings(self) -> list[dict]: - return [{self.magnet_name: value} for value in self.scan_values] - def measure(self): """Returns the emittance, BMAG, x_rms and y_rms Get the rmat, twiss parameters, and measured beam sizes Perform the scan, measuring beam sizes at each scan value Compute the emittance and BMAG using the geometric focusing strengths, beam sizes squared, magnet length, rmat, and twiss betas and alphas""" - self.magnet_collection.scan(scan_settings=self.magnet_settings, function=self.measure_beamsize) - self.rmat, self.twiss = get_optics(self.magnet_name, self.device_measurement.device.name, self.beamline) - beamsize_squared = np.vstack((self.beam_sizes["x_rms"], self.beam_sizes["y_rms"]))**2 - # TODO: uncomment once lengths added to yaml files - # magnet_length = self.magnet_collection.magnets[self.magnet_name].length - rmat = np.array([self.rmat[0:2, 0:2], self.rmat[2:4, 2:4]]) # x rmat and y rmat + + # scan magnet strength and measure beamsize + self.magnet.scan( + scan_settings=self.scan_values, + function=self.measure_beamsize + ) + + # get transport matrix and design twiss values from meme + # TODO: get settings from arbitrary methods (ie. not meme) + self.rmat, self.twiss = get_optics( + self.magnet_name, + self.device_measurement.device.name, + ) + beamsize_squared = np.vstack(( + self.beam_sizes["x_rms"], self.beam_sizes["y_rms"] + )) ** 2 + + magnet_length = self.magnet.metadata.length + if magnet_length is None: + raise ValueError("magnet length needs to be specified for magnet " + f"{self.magnet.name} to be used in emittance measurement") + + # organize data into arrays for use in `compute_emit_bmag` + rmat = np.array([self.rmat[0:2, 0:2], self.rmat[2:4, 2:4]]) twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], [self.twiss["beta_y"], self.twiss["alpha_y"]]]) - kmod = bdes_to_kmod(self.energy, self.magnet_length, np.array(self.scan_values)) + kmod = bdes_to_kmod(self.energy, magnet_length, np.array(self.scan_values)) + + # compute emittance and bmag emittance, bmag, _, _ = compute_emit_bmag( k=kmod, beamsize_squared=beamsize_squared, - q_len=self.magnet_length, + q_len=magnet_length, rmat=rmat, - twiss_design=twiss_betas_alphas + twiss_design=twiss_betas_alphas, + thin_lens=True ) results = { @@ -96,5 +110,6 @@ def measure(self): # TODO: delete and import actual compute_emit_bmag -def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, twiss_design, thin_lens, maxiter): +def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, twiss_design, thin_lens, + maxiter): raise NotImplementedError("compute_emit_bmag to be implemented elsewhere") From ff7ecca9f832a0e9bfd7678ab93bb18b99051e71 Mon Sep 17 00:00:00 2001 From: Ryan Roussel Date: Tue, 12 Nov 2024 09:29:14 -0600 Subject: [PATCH 16/19] change emit/bmag calc + formatting --- lcls_tools/common/data/model_general_calcs.py | 9 +++++---- .../common/measurements/emittance_measurement.py | 11 +++-------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/lcls_tools/common/data/model_general_calcs.py b/lcls_tools/common/data/model_general_calcs.py index 703b2425..2cc9a996 100644 --- a/lcls_tools/common/data/model_general_calcs.py +++ b/lcls_tools/common/data/model_general_calcs.py @@ -1,10 +1,10 @@ -from meme.model import Model +import numpy as np +from meme.model import Model from lcls_tools.common.devices.magnet import Magnet from lcls_tools.common.measurements.measurement import Measurement - def bmag(twiss, twiss_reference): """Calculates BMAG from imput twiss and reference twiss""" beta_a, alpha_a, beta_b, alpha_b = twiss @@ -70,6 +70,7 @@ def get_optics(magnet: Magnet, measurement: Measurement): twiss = model.get_twiss(measurement.device.name) return rmats, twiss + def propagate_twiss(twiss_init: np.ndarray, rmat: np.ndarray): """ Propagates twiss parameters downstream given a transport rmat. @@ -107,9 +108,9 @@ def twiss_transport_mat_from_rmat(rmat: np.ndarray): c, s, cp, sp = rmat[..., 0, 0], rmat[..., 0, 1], rmat[..., 1, 0], rmat[..., 1, 1] result = np.stack( ( - np.stack((c**2, -2 * c * s, s**2), axis=-1), + np.stack((c ** 2, -2 * c * s, s ** 2), axis=-1), np.stack((-c * cp, c * sp + cp * s, -s * sp), axis=-1), - np.stack((cp**2, -2 * cp * sp, sp**2), axis=-1), + np.stack((cp ** 2, -2 * cp * sp, sp ** 2), axis=-1), ), axis=-2, ) # result shape (batchshape, 3, 3) diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 3742f995..3cced92d 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -1,6 +1,7 @@ import numpy as np from pydantic import ConfigDict +from lcls_tools.common.data.emittance import compute_emit_bmag from lcls_tools.common.devices.magnet import Magnet from lcls_tools.common.measurements.measurement import Measurement from lcls_tools.common.data.model_general_calcs import bdes_to_kmod, get_optics @@ -79,7 +80,6 @@ def measure(self): q_len=magnet_length, rmat=rmat, twiss_design=twiss_betas_alphas, - thin_lens=True ) results = { @@ -92,7 +92,8 @@ def measure(self): return results def measure_beamsize(self): - """Take measurement from measurement device, store beam sizes in self.beam_sizes""" + """Take measurement from measurement device, + store beam sizes in self.beam_sizes""" results = self.device_measurement.measure() if "x_rms" not in self.beam_sizes: self.beam_sizes["x_rms"] = [] @@ -107,9 +108,3 @@ class MultiDeviceEmittance(Measurement): def measure(self): raise NotImplementedError("Multi-device emittance not yet implemented") - - -# TODO: delete and import actual compute_emit_bmag -def compute_emit_bmag(self, k, beamsize_squared, q_len, rmat, twiss_design, thin_lens, - maxiter): - raise NotImplementedError("compute_emit_bmag to be implemented elsewhere") From ee135bce911e6c04dbd40f534583188c151b1fd6 Mon Sep 17 00:00:00 2001 From: Ryan Roussel Date: Tue, 12 Nov 2024 09:35:47 -0600 Subject: [PATCH 17/19] change get optics output to dict --- lcls_tools/common/data/model_general_calcs.py | 6 ++++-- lcls_tools/common/measurements/emittance_measurement.py | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/lcls_tools/common/data/model_general_calcs.py b/lcls_tools/common/data/model_general_calcs.py index 2cc9a996..b0d1ae0c 100644 --- a/lcls_tools/common/data/model_general_calcs.py +++ b/lcls_tools/common/data/model_general_calcs.py @@ -1,3 +1,5 @@ +from typing import Dict + import numpy as np from meme.model import Model @@ -58,7 +60,7 @@ def bdes_to_kmod(e_tot=None, effective_length=None, bdes=None, return bdes / effective_length / bp # kG / m / kG m = 1/m^2 -def get_optics(magnet: Magnet, measurement: Measurement): +def get_optics(magnet: Magnet, measurement: Measurement) -> Dict: """Get rmats and twiss for a given beamline, magnet and measurement device""" # TODO: get optics from arbitrary devices (potentially in different beam lines) model = Model(magnet.metadata.area) @@ -68,7 +70,7 @@ def get_optics(magnet: Magnet, measurement: Measurement): from_device_pos='mid' ) twiss = model.get_twiss(measurement.device.name) - return rmats, twiss + return {"rmats": rmats,"design_twiss": twiss} def propagate_twiss(twiss_init: np.ndarray, rmat: np.ndarray): diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 3cced92d..804cdf28 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -54,10 +54,12 @@ def measure(self): # get transport matrix and design twiss values from meme # TODO: get settings from arbitrary methods (ie. not meme) - self.rmat, self.twiss = get_optics( + optics = get_optics( self.magnet_name, self.device_measurement.device.name, ) + self.rmat = optics["rmat"] + self.twiss = optics["design_twiss"] beamsize_squared = np.vstack(( self.beam_sizes["x_rms"], self.beam_sizes["y_rms"] )) ** 2 From 46331c90915cadbfc586c28a2c396a8098352fe1 Mon Sep 17 00:00:00 2001 From: Ryan Roussel Date: Tue, 12 Nov 2024 09:46:26 -0600 Subject: [PATCH 18/19] formatting --- lcls_tools/common/data/model_general_calcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lcls_tools/common/data/model_general_calcs.py b/lcls_tools/common/data/model_general_calcs.py index b0d1ae0c..8cb9b344 100644 --- a/lcls_tools/common/data/model_general_calcs.py +++ b/lcls_tools/common/data/model_general_calcs.py @@ -70,7 +70,7 @@ def get_optics(magnet: Magnet, measurement: Measurement) -> Dict: from_device_pos='mid' ) twiss = model.get_twiss(measurement.device.name) - return {"rmats": rmats,"design_twiss": twiss} + return {"rmats": rmats, "design_twiss": twiss} def propagate_twiss(twiss_init: np.ndarray, rmat: np.ndarray): From 665449112245f6bbdb0ecac6cdcc6acf488ceece Mon Sep 17 00:00:00 2001 From: Ryan Roussel Date: Wed, 13 Nov 2024 10:12:25 -0600 Subject: [PATCH 19/19] allow rmats/design twiss to be specified explicitly --- .../measurements/emittance_measurement.py | 45 ++++++++++++------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/lcls_tools/common/measurements/emittance_measurement.py b/lcls_tools/common/measurements/emittance_measurement.py index 804cdf28..0f8a9f55 100644 --- a/lcls_tools/common/measurements/emittance_measurement.py +++ b/lcls_tools/common/measurements/emittance_measurement.py @@ -1,12 +1,13 @@ +from typing import Optional + import numpy as np -from pydantic import ConfigDict +from numpy import ndarray +from pydantic import ConfigDict, PositiveInt, field_validator from lcls_tools.common.data.emittance import compute_emit_bmag +from lcls_tools.common.data.model_general_calcs import bdes_to_kmod, get_optics from lcls_tools.common.devices.magnet import Magnet from lcls_tools.common.measurements.measurement import Measurement -from lcls_tools.common.data.model_general_calcs import bdes_to_kmod, get_optics - -from typing import Optional class QuadScanEmittance(Measurement): @@ -30,15 +31,20 @@ class QuadScanEmittance(Measurement): energy: float scan_values: list[float] magnet: Magnet - measurement_device: Measurement + beamsize_measurement: Measurement + n_measurement_shots: PositiveInt = 1 - rmat: Optional[np.ndarray] = None - twiss: Optional[np.ndarray] = None + rmat: Optional[ndarray] = None # 4 x 4 beam transport matrix + design_twiss: Optional[dict] = None # design twiss values beam_sizes: Optional[dict] = {} name: str = "quad_scan_emittance" model_config = ConfigDict(arbitrary_types_allowed=True) + @field_validator("rmat") + def validate_rmat(cls, v, info): + assert v.shape == (4, 4) + def measure(self): """Returns the emittance, BMAG, x_rms and y_rms Get the rmat, twiss parameters, and measured beam sizes @@ -54,12 +60,15 @@ def measure(self): # get transport matrix and design twiss values from meme # TODO: get settings from arbitrary methods (ie. not meme) - optics = get_optics( - self.magnet_name, - self.device_measurement.device.name, - ) - self.rmat = optics["rmat"] - self.twiss = optics["design_twiss"] + if self.rmat is None and self.twiss is None: + optics = get_optics( + self.magnet_name, + self.device_measurement.device.name, + ) + + self.rmat = optics["rmat"] + self.design_twiss = optics["design_twiss"] + beamsize_squared = np.vstack(( self.beam_sizes["x_rms"], self.beam_sizes["y_rms"] )) ** 2 @@ -70,9 +79,11 @@ def measure(self): f"{self.magnet.name} to be used in emittance measurement") # organize data into arrays for use in `compute_emit_bmag` - rmat = np.array([self.rmat[0:2, 0:2], self.rmat[2:4, 2:4]]) - twiss_betas_alphas = np.array([[self.twiss["beta_x"], self.twiss["alpha_x"]], - [self.twiss["beta_y"], self.twiss["alpha_y"]]]) + rmat = np.stack([self.rmat[0:2, 0:2], self.rmat[2:4, 2:4]]) + twiss_betas_alphas = np.array( + [[self.design_twiss["beta_x"], self.design_twiss["alpha_x"]], + [self.design_twiss["beta_y"], self.design_twiss["alpha_y"]]] + ) kmod = bdes_to_kmod(self.energy, magnet_length, np.array(self.scan_values)) # compute emittance and bmag @@ -96,7 +107,7 @@ def measure(self): def measure_beamsize(self): """Take measurement from measurement device, store beam sizes in self.beam_sizes""" - results = self.device_measurement.measure() + results = self.beamsize_measurement.measure(self.n_measurement_shots) if "x_rms" not in self.beam_sizes: self.beam_sizes["x_rms"] = [] if "y_rms" not in self.beam_sizes: