diff --git a/src/i22_bluesky/plans/linkam.py b/src/i22_bluesky/plans/linkam.py index 4f14b66..e2070c6 100644 --- a/src/i22_bluesky/plans/linkam.py +++ b/src/i22_bluesky/plans/linkam.py @@ -1,20 +1,30 @@ +from __future__ import annotations + from typing import Annotated, Any +import bluesky.plan_stubs as bps import bluesky.preprocessors as bpp +import numpy as np from bluesky.utils import MsgGenerator +from dodal.common.coordination import group_uuid from dodal.common.maths import step_to_num from dodal.devices.linkam3 import Linkam3 from dodal.plans.data_session_metadata import attach_data_session_metadata_decorator from ophyd_async.core import StandardDetector, StandardFlyer +from ophyd_async.epics.adcore import ( + ADBaseIO, + NDAttributePv, + NDAttributePvDbrType, +) from ophyd_async.fastcs.panda import HDFPanda, StaticSeqTableTriggerLogic -from ophyd_async.plan_stubs import setup_ndstats_sum -from pydantic import validate_call - -from i22_bluesky.stubs.linkam import ( - LinkamTrajectory, - capture_linkam_segment, - stamp_temp_pv, +from ophyd_async.plan_stubs import ( + fly_and_collect, + prepare_static_seq_table_flyer_and_detectors_with_same_trigger, + setup_ndattributes, + setup_ndstats_sum, ) +from pydantic import BaseModel, Field, model_validator, validate_call + from i22_bluesky.stubs.panda import load_panda_config_for_linkam from i22_bluesky.util.default_devices import ( DETECTORS, @@ -114,3 +124,180 @@ def inner_linkam_plan(): rs_uid = yield from inner_linkam_plan() return rs_uid + + +class LinkamPathSegment(BaseModel): + stop: float = Field( + description="Target final temperature and initial temperature of next segment.", + json_schema_extra={"units": "°C"}, + ) + rate: float = Field( + description="Absolute value of temperature change rate |dT/dt|.", + json_schema_extra={"units": "°C/minute"}, + gt=0.0, + ) + step: float | None = Field( + description="Temp change to generate points on this segment from. \ + Ignored if `num` if set. May be |dT| if stop < start.", + json_schema_extra={"units": "°C"}, + default=None, + ) + num: int | None = Field( + description="Number of equally spaced points to capture along this segment. \ + Overrides `step` if set. 1 gives only start of segment.", + gt=0, + default=None, + ) + num_frames: int | None = Field( + description="Number of frames to capture at each captured point. \ + If not set, must be set for overall trajectory.", + gt=0, + default=None, + ) + exposure: float | None = Field( + description="Exposure time for detector(s) per frame. \ + If not set, must be set for overall trajectory.", + gt=0.0, + default=None, + json_schema_extra={"units": "s"}, + ) + flown: bool = Field( + description="Whether this segment should be flown (temperature controller \ + begins move, then frames are captured periodically) or stepped \ + (temperature controller moves, stops then frames are captured).", + default=True, + ) + + @model_validator(mode="after") + def check_num_or_step_set(self) -> LinkamPathSegment: + assert ( + self.num is not None or self.step is not None + ), "Must have set at least one of 'num', 'step'" + return self + + +class LinkamTrajectory(BaseModel): + start: float = Field( + description="Initial temperature of 1st segment.", + json_schema_extra={"units": "°C"}, + ) + path: list[LinkamPathSegment] = Field( + description="Ordered list of segments describing the temperature path.", + min_length=1, + ) + default_num_frames: int | None = Field( + description="Number of frames to collect if not overriden by segment. \ + Must be set if any segment does not define for itself.", + gt=0, + default=None, + ) + default_exposure: float | None = Field( + description="Exposure time for each frame if not overriden by segment. \ + Must be set if any segment does not define for itself.", + gt=0.0, + default=None, + json_schema_extra={"units": "s"}, + ) + + @model_validator(mode="after") + def check_defaults(self) -> LinkamTrajectory: + assert self.default_num_frames is not None or all( + segment.num_frames is not None for segment in self.path + ), "Number of frames not set for default and for some segment(s)!" + assert self.default_exposure is not None or all( + segment.exposure is not None for segment in self.path + ), "Exposure not set for default and for some segment(s)!" + return self + + +def capture_temp( + linkam: Linkam3, + flyer: StandardFlyer, + detectors: set[StandardDetector], + temp: float, + num_frames: int, + exposure: float, + shutter_time: float = 0.04, + stream_name: str = "primary", +): + yield from bps.mv(linkam, temp) + yield from prepare_static_seq_table_flyer_and_detectors_with_same_trigger( + flyer=flyer, + detectors=detectors, + number_of_frames=num_frames, + exposure=exposure, + shutter_time=shutter_time, + ) + yield from fly_and_collect( + stream_name=stream_name, + flyer=flyer, + detectors=detectors, + ) + + +def capture_linkam_segment( + linkam: Linkam3, + flyer: StandardFlyer, + detectors: set[StandardDetector], + start: float, + stop: float, + num: int, + rate: float, + num_frames: int, + exposure: float, + shutter_time: float = 0.04, + stream_name: str = "primary", + fly: bool = False, +) -> MsgGenerator: + # Move to start in case previous segment has misaligned step + yield from bps.mv(linkam, start) + # Set temperature ramp rate to expected for segment + yield from bps.mv(linkam.ramp_rate, rate) + + if not fly: + # Move, stop then collect at each step + for temp in np.linspace(start, stop, num): + yield from capture_temp( + linkam, + flyer, + detectors, + temp, + num_frames, + exposure, + shutter_time, + stream_name, + ) + else: + # Kick off move, capturing periodically + yield from prepare_static_seq_table_flyer_and_detectors_with_same_trigger( + flyer=flyer, + detectors=detectors, + number_of_frames=num * num_frames, + exposure=exposure, + shutter_time=shutter_time, + period=abs(stop - start / (rate / 60)), # period in s, dT/(dT/dt) + ) + linkam_group = group_uuid("linkam") + yield from bps.abs_set(linkam, stop, group=linkam_group, wait=False) + yield from fly_and_collect( + stream_name=stream_name, + flyer=flyer, + detectors=detectors, + ) + # Make sure linkam has finished + yield from bps.wait(group=linkam_group) + + +def stamp_temp_pv(linkam: Linkam3, stamped_detector: StandardDetector): + assert isinstance(driver := stamped_detector.drv, ADBaseIO) + yield from setup_ndattributes( + driver, + [ + NDAttributePv( + "Temperature", + linkam.temp, + dbrtype=NDAttributePvDbrType.DBR_FLOAT, + description="Current linkam temperature", + ) + ], + ) diff --git a/src/i22_bluesky/plans/stopflow.py b/src/i22_bluesky/plans/stopflow.py index 85839dc..d5b6c38 100644 --- a/src/i22_bluesky/plans/stopflow.py +++ b/src/i22_bluesky/plans/stopflow.py @@ -16,28 +16,30 @@ # As such the implementation is: # start acquisition -> acquire n frames -> wait for trigger -> acquire m frames # where n can be 0. - from typing import Any +import bluesky.plan_stubs as bps import bluesky.preprocessors as bpp from bluesky.protocols import Readable from bluesky.utils import MsgGenerator from dodal.plans.data_session_metadata import attach_data_session_metadata_decorator from ophyd_async.core import ( + DetectorTrigger, StandardDetector, StandardFlyer, + TriggerInfo, + in_micros, ) from ophyd_async.fastcs.panda import ( HDFPanda, + SeqTable, + SeqTableInfo, + SeqTrigger, StaticSeqTableTriggerLogic, ) from ophyd_async.plan_stubs import ensure_connected, fly_and_collect from i22_bluesky.stubs.panda import load_panda_config_for_stopflow -from i22_bluesky.stubs.stopflow import ( - prepare_seq_table_flyer_and_det, - raise_for_minimum_exposure_times, -) from i22_bluesky.util.default_devices import ( BASELINE_DEVICES, DETECTORS, @@ -194,3 +196,162 @@ def inner_stopflow_plan(): ) yield from inner_stopflow_plan() + + +def prepare_seq_table_flyer_and_det( + flyer: StandardFlyer[SeqTableInfo], + detectors: set[StandardDetector], + pre_stop_frames: int, + post_stop_frames: int, + exposure: float, + shutter_time: float, + period: float = 0.0, +) -> MsgGenerator: + """ + Setup detectors/flyer for a stop flow experiment. Create a seq table and + upload it to the panda. Arm all detectors. + + Args: + flyer: Flyer object that controls the panda + detectors: Detectors that are triggered by the panda + post_stop_frames: Number of frames to be collected after the flow + is stopped. + pre_stop_frames: Number of frames (if any) to be collected before + the flow is stopped. + exposure: Detector exposure time + shutter_time: Time period (seconds) to wait for the shutter to + open fully before beginning acquisition + period: Time period (seconds) to wait after arming the detector + before taking the first batch of frames + + Returns: + MsgGenerator: Plan + + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + + deadtime = ( + max(det.controller.get_deadtime(exposure) for det in detectors) + + DEADTIME_BUFFER + ) + trigger_info = TriggerInfo( + num=(pre_stop_frames + post_stop_frames), + trigger=DetectorTrigger.constant_gate, + deadtime=deadtime, + livetime=exposure, + frame_timeout=60.0, + ) + + # Generate a seq table + table = _stopflow_seq_table( + pre_stop_frames, + post_stop_frames, + exposure, + shutter_time, + deadtime, + period, + ) + table_info = SeqTableInfo(table, repeats=1) + + # Upload the seq table and arm all detectors. + for det in detectors: + yield from bps.prepare(det, trigger_info, wait=False, group="prep") + yield from bps.prepare(flyer, table_info, wait=False, group="prep") + yield from bps.wait(group="prep") + + +def _stopflow_seq_table( + pre_stop_frames: int, + post_stop_frames: int, + exposure: float, + shutter_time: float, + deadtime: float, + period: float, +) -> SeqTable: + """Create a SeqTable based on the parameters of a stop flow measurement + + Args: + pre_stop_frames: Number of frames to take initially, before flow stops + post_stop_frames: Number of frames to take after flow stops + exposure: Exposure time of each frame (excluding deadtime) + shutter_time: Time period (seconds) to wait for the shutter + to open fully before beginning acquisition + deadtime: Dead time to leave between frames, dependant on the + instruments involved + period: Time period (seconds) to wait after arming the detector + before taking the first batch of frames + + Returns: + SeqTable: SeqTable that will result in a series of triggers + for the measurement + """ + + total_gate_time = (pre_stop_frames + post_stop_frames) * (exposure + deadtime) + pre_delay = max(period - 2 * shutter_time - total_gate_time, 0) + # Wait for pre-delay then open shutter + table = SeqTable.row( + time1=in_micros(pre_delay), + time2=in_micros(shutter_time), + outa2=True, + ) + + # Keeping shutter open, do n triggers + if pre_stop_frames > 0: + table += SeqTable.row( + repeats=pre_stop_frames, + time1=in_micros(exposure), + outa1=True, + outb1=True, + time2=in_micros(deadtime), + outa2=True, + ) + # Do m triggers after BITA=1 + if post_stop_frames > 0: + table += SeqTable.row( + trigger=SeqTrigger.BITA_1, + repeats=1, + time1=in_micros(exposure), + outa1=True, + outb1=True, + time2=in_micros(deadtime), + outa2=True, + ) + if post_stop_frames > 1: + table += SeqTable.row( + repeats=post_stop_frames - 1, + time1=in_micros(exposure), + outa1=True, + outb1=True, + time2=in_micros(deadtime), + outa2=True, + ) + # Add the shutter close + table += SeqTable.row(time2=in_micros(shutter_time)) + return table + + +def raise_for_minimum_exposure_times( + exposure: float, + detectors: set[StandardDetector], +) -> None: + minimum_exposure_times = { + "saxs": 1.0 / 250.0, + "waxs": 1.0 / 250.0, + "oav": 1.0 / 22.0, + "i0": 1.0 / 2e4, + "it": 1.0 / 2e4, + } + detectors_below_limit = { + detector + for detector in detectors + if exposure < minimum_exposure_times.get(detector.name, 0.0) + } + if len(detectors_below_limit) > 0: + raise KeyError( + f"The exposure time requested was {exposure}, but " + "the following detectors do not support going " + f"that fast: {detectors_below_limit}. Try running the plan" + "without them. " + f"See minimum exposure time table: {minimum_exposure_times}" + ) diff --git a/src/i22_bluesky/stubs/__init__.py b/src/i22_bluesky/stubs/__init__.py index e69de29..eac1970 100644 --- a/src/i22_bluesky/stubs/__init__.py +++ b/src/i22_bluesky/stubs/__init__.py @@ -0,0 +1,13 @@ +from .panda import ( + load_panda_config_for_linkam, + load_panda_config_for_stopflow, + save_panda_config_for_linkam, + save_panda_config_for_stopflow, +) + +__all__ = [ + "load_panda_config_for_linkam", + "load_panda_config_for_stopflow", + "save_panda_config_for_linkam", + "save_panda_config_for_stopflow", +] diff --git a/src/i22_bluesky/stubs/linkam.py b/src/i22_bluesky/stubs/linkam.py deleted file mode 100644 index fb252b1..0000000 --- a/src/i22_bluesky/stubs/linkam.py +++ /dev/null @@ -1,210 +0,0 @@ -from __future__ import annotations - -import bluesky.plan_stubs as bps -import numpy as np -from bluesky.utils import MsgGenerator -from dodal.common.coordination import group_uuid -from dodal.devices.linkam3 import Linkam3 -from ophyd_async.core import StandardDetector, StandardFlyer -from ophyd_async.epics.adcore import ( - ADBaseIO, - NDAttributePv, - NDAttributePvDbrType, -) -from ophyd_async.fastcs.panda import HDFPanda -from ophyd_async.plan_stubs import ( - fly_and_collect, - prepare_static_seq_table_flyer_and_detectors_with_same_trigger, - setup_ndattributes, -) -from pydantic import BaseModel, Field, model_validator - -from i22_bluesky.stubs.panda import load_panda_for_plan, save_panda_for_plan -from i22_bluesky.util.default_devices import PANDA - -LINKAM_FOLDER = "linkam" - - -class LinkamPathSegment(BaseModel): - stop: float = Field( - description="Target final temperature and initial temperature of next segment.", - json_schema_extra={"units": "°C"}, - ) - rate: float = Field( - description="Absolute value of temperature change rate |dT/dt|.", - json_schema_extra={"units": "°C/minute"}, - gt=0.0, - ) - step: float | None = Field( - description="Temp change to generate points on this segment from. \ - Ignored if `num` if set. May be |dT| if stop < start.", - json_schema_extra={"units": "°C"}, - default=None, - ) - num: int | None = Field( - description="Number of equally spaced points to capture along this segment. \ - Overrides `step` if set. 1 gives only start of segment.", - gt=0, - default=None, - ) - num_frames: int | None = Field( - description="Number of frames to capture at each captured point. \ - If not set, must be set for overall trajectory.", - gt=0, - default=None, - ) - exposure: float | None = Field( - description="Exposure time for detector(s) per frame. \ - If not set, must be set for overall trajectory.", - gt=0.0, - default=None, - json_schema_extra={"units": "s"}, - ) - flown: bool = Field( - description="Whether this segment should be flown (temperature controller \ - begins move, then frames are captured periodically) or stepped \ - (temperature controller moves, stops then frames are captured).", - default=True, - ) - - @model_validator(mode="after") - def check_num_or_step_set(self) -> LinkamPathSegment: - assert ( - self.num is not None or self.step is not None - ), "Must have set at least one of 'num', 'step'" - return self - - -class LinkamTrajectory(BaseModel): - start: float = Field( - description="Initial temperature of 1st segment.", - json_schema_extra={"units": "°C"}, - ) - path: list[LinkamPathSegment] = Field( - description="Ordered list of segments describing the temperature path.", - min_length=1, - ) - default_num_frames: int | None = Field( - description="Number of frames to collect if not overriden by segment. \ - Must be set if any segment does not define for itself.", - gt=0, - default=None, - ) - default_exposure: float | None = Field( - description="Exposure time for each frame if not overriden by segment. \ - Must be set if any segment does not define for itself.", - gt=0.0, - default=None, - json_schema_extra={"units": "s"}, - ) - - @model_validator(mode="after") - def check_defaults(self) -> LinkamTrajectory: - assert self.default_num_frames is not None or all( - segment.num_frames is not None for segment in self.path - ), "Number of frames not set for default and for some segment(s)!" - assert self.default_exposure is not None or all( - segment.exposure is not None for segment in self.path - ), "Exposure not set for default and for some segment(s)!" - return self - - -def capture_temp( - linkam: Linkam3, - flyer: StandardFlyer, - detectors: set[StandardDetector], - temp: float, - num_frames: int, - exposure: float, - shutter_time: float = 0.04, - stream_name: str = "primary", -): - yield from bps.mv(linkam, temp) - yield from prepare_static_seq_table_flyer_and_detectors_with_same_trigger( - flyer=flyer, - detectors=detectors, - number_of_frames=num_frames, - exposure=exposure, - shutter_time=shutter_time, - ) - yield from fly_and_collect( - stream_name=stream_name, - flyer=flyer, - detectors=detectors, - ) - - -def capture_linkam_segment( - linkam: Linkam3, - flyer: StandardFlyer, - detectors: set[StandardDetector], - start: float, - stop: float, - num: int, - rate: float, - num_frames: int, - exposure: float, - shutter_time: float = 0.04, - stream_name: str = "primary", - fly: bool = False, -) -> MsgGenerator: - # Move to start in case previous segment has misaligned step - yield from bps.mv(linkam, start) - # Set temperature ramp rate to expected for segment - yield from bps.mv(linkam.ramp_rate, rate) - - if not fly: - # Move, stop then collect at each step - for temp in np.linspace(start, stop, num): - yield from capture_temp( - linkam, - flyer, - detectors, - temp, - num_frames, - exposure, - shutter_time, - stream_name, - ) - else: - # Kick off move, capturing periodically - yield from prepare_static_seq_table_flyer_and_detectors_with_same_trigger( - flyer=flyer, - detectors=detectors, - number_of_frames=num * num_frames, - exposure=exposure, - shutter_time=shutter_time, - period=abs(stop - start / (rate / 60)), # period in s, dT/(dT/dt) - ) - linkam_group = group_uuid("linkam") - yield from bps.abs_set(linkam, stop, group=linkam_group, wait=False) - yield from fly_and_collect( - stream_name=stream_name, - flyer=flyer, - detectors=detectors, - ) - # Make sure linkam has finished - yield from bps.wait(group=linkam_group) - - -def stamp_temp_pv(linkam: Linkam3, stamped_detector: StandardDetector): - assert isinstance(driver := stamped_detector.drv, ADBaseIO) - yield from setup_ndattributes( - driver, - [ - NDAttributePv( - "Temperature", - linkam.temperature, - dbrtype=NDAttributePvDbrType.DBR_FLOAT, - description="Current linkam temperature", - ) - ], - ) - - -def save_panda_config_for_stopflow(panda: HDFPanda = PANDA) -> MsgGenerator: - yield from save_panda_for_plan(LINKAM_FOLDER, panda) - - -def load_panda_config_for_stopflow(panda: HDFPanda = PANDA) -> MsgGenerator: - yield from load_panda_for_plan(LINKAM_FOLDER, panda) diff --git a/src/i22_bluesky/stubs/panda.py b/src/i22_bluesky/stubs/panda.py index e41f49b..8cbde2c 100644 --- a/src/i22_bluesky/stubs/panda.py +++ b/src/i22_bluesky/stubs/panda.py @@ -7,6 +7,8 @@ from i22_bluesky.util.default_devices import PANDA SAVES_ROOT = Path(__file__).parent.parent.parent +LINKAM_FOLDER = "linkam" +STOPFLOW_FOLDER = "stopflow" def get_device_save_dir(plan_name: str) -> Path: @@ -29,3 +31,19 @@ def load_panda_for_plan(plan_name: str, panda: HDFPanda = PANDA) -> MsgGenerator panda, get_device_save_dir(plan_name), ) + + +def save_panda_config_for_stopflow(panda: HDFPanda = PANDA) -> MsgGenerator: + yield from save_panda_for_plan(LINKAM_FOLDER, panda) + + +def load_panda_config_for_stopflow(panda: HDFPanda = PANDA) -> MsgGenerator: + yield from load_panda_for_plan(LINKAM_FOLDER, panda) + + +def save_panda_config_for_linkam(panda: HDFPanda = PANDA) -> MsgGenerator: + yield from save_panda_for_plan(STOPFLOW_FOLDER, panda) + + +def load_panda_config_for_linkam(panda: HDFPanda = PANDA) -> MsgGenerator: + yield from load_panda_for_plan(STOPFLOW_FOLDER, panda) diff --git a/src/i22_bluesky/stubs/stopflow.py b/src/i22_bluesky/stubs/stopflow.py deleted file mode 100644 index 9405d72..0000000 --- a/src/i22_bluesky/stubs/stopflow.py +++ /dev/null @@ -1,190 +0,0 @@ -import bluesky.plan_stubs as bps -from bluesky.utils import MsgGenerator -from ophyd_async.core import ( - DetectorTrigger, - StandardDetector, - StandardFlyer, - TriggerInfo, - in_micros, -) -from ophyd_async.fastcs.panda import ( - HDFPanda, - SeqTable, - SeqTableInfo, - SeqTrigger, -) - -from i22_bluesky.stubs.panda import load_panda_for_plan, save_panda_for_plan -from i22_bluesky.util.default_devices import PANDA - -#: Buffer added to deadtime to handle minor discrepencies between detector -#: and panda clocks -DEADTIME_BUFFER = 20e-6 -STOPFLOW_FOLDER = "stopflow" - - -def prepare_seq_table_flyer_and_det( - flyer: StandardFlyer[SeqTableInfo], - detectors: set[StandardDetector], - pre_stop_frames: int, - post_stop_frames: int, - exposure: float, - shutter_time: float, - period: float = 0.0, -) -> MsgGenerator: - """ - Setup detectors/flyer for a stop flow experiment. Create a seq table and - upload it to the panda. Arm all detectors. - - Args: - flyer: Flyer object that controls the panda - detectors: Detectors that are triggered by the panda - post_stop_frames: Number of frames to be collected after the flow - is stopped. - pre_stop_frames: Number of frames (if any) to be collected before - the flow is stopped. - exposure: Detector exposure time - shutter_time: Time period (seconds) to wait for the shutter to - open fully before beginning acquisition - period: Time period (seconds) to wait after arming the detector - before taking the first batch of frames - - Returns: - MsgGenerator: Plan - - Yields: - Iterator[MsgGenerator]: Bluesky messages - """ - - deadtime = ( - max(det.controller.get_deadtime(exposure) for det in detectors) - + DEADTIME_BUFFER - ) - trigger_info = TriggerInfo( - num=(pre_stop_frames + post_stop_frames), - trigger=DetectorTrigger.constant_gate, - deadtime=deadtime, - livetime=exposure, - frame_timeout=60.0, - ) - - # Generate a seq table - table = _stopflow_seq_table( - pre_stop_frames, - post_stop_frames, - exposure, - shutter_time, - deadtime, - period, - ) - table_info = SeqTableInfo(table, repeats=1) - - # Upload the seq table and arm all detectors. - for det in detectors: - yield from bps.prepare(det, trigger_info, wait=False, group="prep") - yield from bps.prepare(flyer, table_info, wait=False, group="prep") - yield from bps.wait(group="prep") - - -def _stopflow_seq_table( - pre_stop_frames: int, - post_stop_frames: int, - exposure: float, - shutter_time: float, - deadtime: float, - period: float, -) -> SeqTable: - """Create a SeqTable based on the parameters of a stop flow measurement - - Args: - pre_stop_frames: Number of frames to take initially, before flow stops - post_stop_frames: Number of frames to take after flow stops - exposure: Exposure time of each frame (excluding deadtime) - shutter_time: Time period (seconds) to wait for the shutter - to open fully before beginning acquisition - deadtime: Dead time to leave between frames, dependant on the - instruments involved - period: Time period (seconds) to wait after arming the detector - before taking the first batch of frames - - Returns: - SeqTable: SeqTable that will result in a series of triggers - for the measurement - """ - - total_gate_time = (pre_stop_frames + post_stop_frames) * (exposure + deadtime) - pre_delay = max(period - 2 * shutter_time - total_gate_time, 0) - # Wait for pre-delay then open shutter - table = SeqTable.row( - time1=in_micros(pre_delay), - time2=in_micros(shutter_time), - outa2=True, - ) - - # Keeping shutter open, do n triggers - if pre_stop_frames > 0: - table += SeqTable.row( - repeats=pre_stop_frames, - time1=in_micros(exposure), - outa1=True, - outb1=True, - time2=in_micros(deadtime), - outa2=True, - ) - # Do m triggers after BITA=1 - if post_stop_frames > 0: - table += SeqTable.row( - trigger=SeqTrigger.BITA_1, - repeats=1, - time1=in_micros(exposure), - outa1=True, - outb1=True, - time2=in_micros(deadtime), - outa2=True, - ) - if post_stop_frames > 1: - table += SeqTable.row( - repeats=post_stop_frames - 1, - time1=in_micros(exposure), - outa1=True, - outb1=True, - time2=in_micros(deadtime), - outa2=True, - ) - # Add the shutter close - table += SeqTable.row(time2=in_micros(shutter_time)) - return table - - -def raise_for_minimum_exposure_times( - exposure: float, - detectors: set[StandardDetector], -) -> None: - minimum_exposure_times = { - "saxs": 1.0 / 250.0, - "waxs": 1.0 / 250.0, - "oav": 1.0 / 22.0, - "i0": 1.0 / 2e4, - "it": 1.0 / 2e4, - } - detectors_below_limit = { - detector - for detector in detectors - if exposure < minimum_exposure_times.get(detector.name, 0.0) - } - if len(detectors_below_limit) > 0: - raise KeyError( - f"The exposure time requested was {exposure}, but " - "the following detectors do not support going " - f"that fast: {detectors_below_limit}. Try running the plan" - "without them. " - f"See minimum exposure time table: {minimum_exposure_times}" - ) - - -def save_panda_config_for_linkam(panda: HDFPanda = PANDA) -> MsgGenerator: - yield from save_panda_for_plan(STOPFLOW_FOLDER, panda) - - -def load_panda_config_for_linkam(panda: HDFPanda = PANDA) -> MsgGenerator: - yield from load_panda_for_plan(STOPFLOW_FOLDER, panda)