From e69eb6fd76e7c142a819a6bdf7fcc853b1a8da61 Mon Sep 17 00:00:00 2001 From: Rose Yemelyanova Date: Mon, 7 Aug 2023 16:21:55 +0100 Subject: [PATCH] Fix segmentation fault caused by not dereferencing p4p context --- pyproject.toml | 2 +- src/ophyd_epics_devices/panda.py | 35 ++- src/ophyd_epics_devices/zebra.py | 423 ------------------------------- tests/test_panda.py | 4 +- tests/test_zebra.py | 114 --------- 5 files changed, 29 insertions(+), 549 deletions(-) delete mode 100644 src/ophyd_epics_devices/zebra.py delete mode 100644 tests/test_zebra.py diff --git a/pyproject.toml b/pyproject.toml index 1f261c8..ba0bfa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ ] description = "Cross-institution EPICS Devices for Ophyd" dependencies = [ - "ophyd @ git+https://github.com/bluesky/ophyd.git@sim-backend-numpy-typing", + "ophyd[ca] @ git+https://github.com/bluesky/ophyd.git", "bluesky", "event-model", "p4p", diff --git a/src/ophyd_epics_devices/panda.py b/src/ophyd_epics_devices/panda.py index 9e1c87a..1cd3262 100644 --- a/src/ophyd_epics_devices/panda.py +++ b/src/ophyd_epics_devices/panda.py @@ -1,3 +1,4 @@ +import atexit import re from enum import Enum from typing import ( @@ -31,8 +32,6 @@ ) from p4p.client.thread import Context -ctxt = Context("pva") - class PulseBlock(Device): delay: SignalRW[float] @@ -98,8 +97,7 @@ def block_name_number(block_name: str) -> Tuple[str, int]: return name, int(num or 1) -# in sim mode, this should be called with a timeout of 0.0. -async def pvi_get(pv: str, timeout: float = 5.0) -> Dict[str, PVIEntry]: +async def pvi_get(pv: str, ctxt: Context, timeout: float = 5.0) -> Dict[str, PVIEntry]: pv_info: Dict[str, Dict[str, str]] = {} try: pv_info = ctxt.get(pv, timeout=timeout).get("pvi").todict() @@ -115,13 +113,14 @@ async def pvi_get(pv: str, timeout: float = 5.0) -> Dict[str, PVIEntry]: class PandA(Device): + _ctxt: Optional[Context] = None + pulse: DeviceVector[PulseBlock] seq: DeviceVector[SeqBlock] pcap: PcapBlock def __init__(self, prefix: str, name: str = "") -> None: self._init_prefix = prefix - self.pvi_mapping: Dict[FrozenSet[str], Callable[..., Signal]] = { frozenset({"r", "w"}): lambda dtype, rpv, wpv: epics_signal_rw( dtype, rpv, wpv @@ -132,6 +131,20 @@ def __init__(self, prefix: str, name: str = "") -> None: frozenset({"x"}): lambda dtype, rpv, wpv: epics_signal_x(wpv), } + @property + def ctxt(self) -> Context: + if PandA._ctxt is None: + PandA._ctxt = Context("pva", nt=False) + + @atexit.register + def _del_ctxt(): + # If we don't do this we get messages like this on close: + # Error in sys.excepthook: + # Original exception was: + PandA._ctxt = None + + return PandA._ctxt + def verify_block(self, name: str, num: int): """Given a block name and number, return information about a block.""" anno = get_type_hints(self).get(name) @@ -156,7 +169,7 @@ async def _make_block(self, name: str, num: int, block_pv: str, sim: bool = Fals block = self.verify_block(name, num) field_annos = get_type_hints(block) - block_pvi = await pvi_get(block_pv) if not sim else None + block_pvi = await pvi_get(block_pv, self.ctxt) if not sim else None # finds which fields this class actually has, e.g. delay, width... for sig_name, sig_type in field_annos.items(): @@ -201,7 +214,7 @@ async def _make_untyped_block(self, block_pv: str): included dynamically anyway. """ block = Device() - block_pvi = await pvi_get(block_pv) + block_pvi = await pvi_get(block_pv, self.ctxt) for signal_name, signal_pvi in block_pvi.items(): signal_factory = self.pvi_mapping[frozenset(signal_pvi.keys())] @@ -236,8 +249,12 @@ async def connect(self, sim=False) -> None: If there's no pvi information, that's because we're in sim mode. In that case, makes all required blocks. """ - pvi = await pvi_get(self._init_prefix + ":PVI") if not sim else None - hints = get_type_hints(self) + pvi = await pvi_get(self._init_prefix + ":PVI", self.ctxt) if not sim else None + hints = { + attr_name: attr_type + for attr_name, attr_type in get_type_hints(self).items() + if not attr_name.startswith("_") + } # create all the blocks pvi says it should have, if pvi: diff --git a/src/ophyd_epics_devices/zebra.py b/src/ophyd_epics_devices/zebra.py deleted file mode 100644 index c193f3e..0000000 --- a/src/ophyd_epics_devices/zebra.py +++ /dev/null @@ -1,423 +0,0 @@ -import asyncio -from enum import Enum - -import numpy as np -import numpy.typing as npt -from bluesky import RunEngine -from ophyd.v2.core import Device, DeviceCollector, DeviceVector, SignalR, SignalRW -from ophyd.v2.epics import epics_signal_r, epics_signal_rw -from typing_extensions import TypeAlias - - -class Bool(Enum): - no = "No" - yes = "Yes" - - -class GatePulseSelect(Enum): - time = "Time" - position = "Position" - external = "External" - - -class ArmSelect(Enum): - soft = "Soft" - external = "External" - - -class Direction(Enum): - positive = "Positive" - negative = "Negative" - - -class EncoderType(Enum): - enc1 = "Enc1" - enc2 = "Enc2" - enc3 = "Enc3" - enc4 = "Enc4" - enc1_4av = "Enc1-4Av" - - -class TimeUnits(Enum): - ms = "ms" - s = "s" - s10 = "10s" - - -class UpdateRate(Enum): - passive = "Passive" - event = "Event" - io_intr = "I/O Intr" - s10 = "10 second" - s5 = "5 second" - s2 = "2 second" - s1 = "1 second" - s0_5 = ".5 second" - s0_2 = ".2 second" - s0_1 = ".1 second" - - -CapturePvType: TypeAlias = DeviceVector[SignalRW[Bool]] -ArrayOutPvType: TypeAlias = DeviceVector[SignalRW[npt.NDArray[np.float64]]] - - -####################################################################################### - - -class PcSetupCapture(Device): - def __init__(self, prefix: str, name="") -> None: - self.enc: CapturePvType = DeviceVector( - { - idx + 1: epics_signal_rw(Bool, f"{prefix}:B{key}") - for idx, key in enumerate(range(4)) - } - ) - self.sys = DeviceVector( - { - idx + 1: epics_signal_rw(Bool, f"{prefix}:B{key}") - for idx, key in enumerate(range(4, 6)) - } - ) - self.div = DeviceVector( - { - idx + 1: epics_signal_rw(Bool, f"{prefix}:B{key}") - for idx, key in enumerate(range(6, 10)) - } - ) - - -class PcSetup(Device): - def __init__(self, prefix: str) -> None: - self.capture = PcSetupCapture(f"{prefix}:PC_BIT_CAP") - self.posn_trig = epics_signal_rw(EncoderType, f"{prefix}:PC_ENC") - self.posn_dir = epics_signal_rw(Direction, f"{prefix}:PC_DIR") - self.time_units = epics_signal_rw(TimeUnits, f"{prefix}:PC_TSPRE") - - async def reset(self): - await self.time_units.set(TimeUnits.ms) - - -class PcArm(Device): - def __init__(self, prefix: str) -> None: - self.trig_source = epics_signal_rw(ArmSelect, f"{prefix}:PC_ARM_SEL") - self.arm = epics_signal_rw(float, f"{prefix}:PC_ARM") - self.disarm = epics_signal_rw(float, f"{prefix}:PC_DISARM") - self.arm_status = epics_signal_rw(float, f"{prefix}:PC_ARM_OUT") - - async def reset(self): - await self.disarm.set(1) - await self.trig_source.set(ArmSelect.soft) - - -class PcGate(Device): - def __init__(self, prefix: str) -> None: - self.trig_source = epics_signal_rw(GatePulseSelect, f"{prefix}:PC_GATE_SEL") - self.gate_start = epics_signal_rw(float, f"{prefix}:PC_GATE_START") - self.gate_width = epics_signal_rw(float, f"{prefix}:PC_GATE_WID") - self.num_gates = epics_signal_rw(float, f"{prefix}:PC_GATE_NGATE") - # self.gate_input = epics_signal_rw(int, "PC_GATE_INP") - self.gate_step = epics_signal_rw(float, f"{prefix}:PC_GATE_STEP") - self.gate_status = epics_signal_rw(float, f"{prefix}:PC_GATE_OUT") - - async def reset(self): - await self.trig_source.set(GatePulseSelect.position) - - -class PcPulse(Device): - def __init__(self, prefix: str) -> None: - self.trig_source = epics_signal_rw(GatePulseSelect, f"{prefix}:PC_PULSE_SEL") - self.pulse_start = epics_signal_rw(float, f"{prefix}:PC_PULSE_START") - self.pulse_width = epics_signal_rw(float, f"{prefix}:PC_PULSE_WID") - self.pulse_step = epics_signal_rw(float, f"{prefix}:PC_PULSE_STEP") - self.capt_delay = epics_signal_rw(float, f"{prefix}:PC_PULSE_DLY") - # self.pulse_input = epics_signal_rw(int, "PC_PULSE_INP") - self.max_pulses = epics_signal_rw(float, f"{prefix}:PC_PULSE_MAX") - self.pulse_status = epics_signal_rw(float, f"{prefix}:PC_PULSE_OUT") - - async def reset(self): - await self.trig_source.set(GatePulseSelect.time) - await self.pulse_start.set(0.0) - - -class ArrayOuts(Device): - def __init__(self, prefix: str) -> None: - def make_pv(suffix: str): - # return epics_signal_rw(npt.NDArray[np.float64], f"{prefix}:PC_{suffix}") - return epics_signal_rw(npt.NDArray[np.float64], f"{prefix}:PC_{suffix}") - - self.enc: ArrayOutPvType = DeviceVector( - { - idx + 1: make_pv(name) - for idx, name in enumerate(["ENC1", "ENC2", "ENC3", "ENC4"]) - } - ) - self.sys: ArrayOutPvType = DeviceVector( - {idx + 1: make_pv(name) for idx, name in enumerate(["SYS1", "SYS2"])} - ) - self.div: ArrayOutPvType = DeviceVector( - { - idx + 1: make_pv(name) - for idx, name in enumerate(["DIV1", "DIV2", "DIV3", "DIV4"]) - } - ) - - -class PcDownload(Device): - def __init__(self, prefix: str) -> None: - self.array_outputs = ArrayOuts(prefix) - self.captured = epics_signal_r(float, f"{prefix}:PC_NUM_CAP") - self.downloaded = epics_signal_r(float, f"{prefix}:PC_NUM_DOWN") - self.in_progress = epics_signal_r(float, f"{prefix}:ARRAY_ACQ") - self.update_rate = epics_signal_rw(UpdateRate, f"{prefix}:ARRAY_UPDATE.SCAN") - - -class PositionCompare(Device): - def __init__(self, prefix: str) -> None: - self.setup = PcSetup(prefix) - self.arm = PcArm(prefix) - self.gate = PcGate(prefix) - self.pulse = PcPulse(prefix) - self.download = PcDownload(prefix) - - async def reset(self): - await self.setup.reset() - await self.arm.reset() - await self.gate.reset() - await self.pulse.reset() - - -####################################################################################### - - -class Input(Device): - """Designed to represent the 'INP' fields in GATE tab for example.""" - - def __init__(self, prefix: str): - self.input = epics_signal_rw(float, f"{prefix}") - self.source = epics_signal_rw(str, f"{prefix}:STR") - self.status = epics_signal_rw(float, f"{prefix}:STA") - - -class LogicGatePanelInput(Input): - def __init__(self, prefix: str, number: int): - self.use = epics_signal_rw(Bool, f"{prefix}_ENA:B{number-1}") - self.invert = epics_signal_rw(Bool, f"{prefix}_INV:B{number-1}") - super().__init__(f"{prefix}_INP{number}") - - -class LogicGatePanel(Device): - def __init__(self, prefix: str): - self.inp = DeviceVector( - {channel: LogicGatePanelInput(prefix, channel) for channel in range(1, 5)} - ) - - -####################################################################################### - - -class Gate(Device): - def __init__(self, prefix: str, number: int): - self.inp1 = Input(f"{prefix}:GATE{number}_INP1") - self.inp1_trigger = epics_signal_rw(Bool, f"{prefix}:POLARITY:B{number-1}") - self.inp2 = Input(f"{prefix}:GATE{number}_INP2") - self.inp2_trigger = epics_signal_rw(Bool, f"{prefix}:POLARITY:B{number+3}") - self.out = epics_signal_r(float, f"{prefix}:GATE{number}_OUT") - - -####################################################################################### - - -class Div(Device): - triggers = {1: "8", 2: "9", 3: "A", 4: "B"} - - def __init__(self, prefix: str, number: int): - self.input = Input(f"{prefix}:DIV{number}_INP") - self.trigger = epics_signal_rw( - Bool, f"{prefix}:POLARITY:B{self.triggers[number]}" - ) - self.divisor = epics_signal_rw(float, f"{prefix}:DIV{number}_DIV") - self.first_pulse = epics_signal_rw(Bool, f"{prefix}:DIV_FIRST:B{number-1}") - self.outd = epics_signal_r(float, f"{prefix}:DIV{number}_OUTD") - self.outn = epics_signal_r(float, f"{prefix}:DIV{number}_OUTN") - - -####################################################################################### - - -class Pulse(Device): - triggers = {1: "C", 2: "D", 3: "E", 4: "F"} - - def __init__(self, prefix: str, number: int): - self.input = Input(f"{prefix}:PULSE{number}_INP") - self.trigger = epics_signal_rw( - Bool, f"{prefix}:POLARITY:B{self.triggers[number]}" - ) - self.delay_before = epics_signal_rw(float, f"{prefix}:PULSE{number}_DLY") - self.pulse_width = epics_signal_rw(float, f"{prefix}:PULSE{number}_WID") - self.time_units = epics_signal_rw(TimeUnits, f"{prefix}:PULSE{number}_PRE") - - self.trig_while_active = epics_signal_r( - int, f"{prefix}:SYS_STATERR.B{number-1}" - ) - self.output_pulse = epics_signal_r(float, f"{prefix}:PULSE{number}_OUT") - - -####################################################################################### - - -class EachMotor(Device): - def __init__(self, prefix: str, number: int): - self.title = epics_signal_r(str, f"{prefix}:M{number}") - self.description = epics_signal_r(str, f"{prefix}:M{number}:DESC") - self.motor_current_pos = epics_signal_r(float, f"{prefix}:M{number}:RBV") - self.set_zebra_pos = epics_signal_rw(float, f"{prefix}:POS{number}_SET") - self.copy_motor_pos_to_zebra = epics_signal_rw( - int, f"{prefix}:M{number}:SETPOS.PROC" - ) - - -class Quad(Device): - def __init__(self, prefix: str): - self.step = Input(f"{prefix}:QUAD_STEP") - self.dir = Input(f"{prefix}:QUAD_DIR") - - self.outa = epics_signal_r(float, f"{prefix}:QUAD_OUTA") - self.outb = epics_signal_r(float, f"{prefix}:QUAD_OUTB") - - -class Enc(Device): - def __init__(self, prefix: str): - self.pos: DeviceVector[EachMotor] = DeviceVector( - {number: EachMotor(prefix, number) for number in range(1, 5)} - ) - self.quad = Quad(prefix) - - -####################################################################################### - - -class SysFrontPanelOutputs(Device): - def __init__(self, prefix: str) -> None: - self.out_ttl: DeviceVector[Input] = DeviceVector( - {channel: Input(f"{prefix}:OUT{channel}_TTL") for channel in range(1, 5)} - ) - self.out_nim: DeviceVector[Input] = DeviceVector( - {channel: Input(f"{prefix}:OUT{channel}_NIM") for channel in [1, 2, 4]} - ) - self.out_lvds: DeviceVector[Input] = DeviceVector( - {channel: Input(f"{prefix}:OUT{channel}_LVDS") for channel in [1, 2, 3]} - ) - self.out_oc: DeviceVector[Input] = DeviceVector({3: Input(f"{prefix}:OUT3_OC")}) - self.out_pecl: DeviceVector[Input] = DeviceVector( - {4: Input(f"{prefix}:OUT4_PECL")} - ) - - -class SysRearPanelOutputs(Device): - def __init__(self, prefix: str) -> None: - self.out_enca: DeviceVector[Input] = DeviceVector( - {channel: Input(f"{prefix}:OUT{channel}_ENCA") for channel in range(5, 9)} - ) - self.out_encb: DeviceVector[Input] = DeviceVector( - {channel: Input(f"{prefix}:OUT{channel}_ENCB") for channel in range(5, 9)} - ) - self.out_encz: DeviceVector[Input] = DeviceVector( - {channel: Input(f"{prefix}:OUT{channel}_ENCZ") for channel in range(5, 9)} - ) - self.out_conn: DeviceVector[Input] = DeviceVector( - {channel: Input(f"{prefix}:OUT{channel}_CONN") for channel in range(5, 9)} - ) - - -class SysWriteRegsToFileFlash(Device): - def __init__(self, prefix: str): - self.file = epics_signal_rw(str, f"{prefix}:CONFIG_FILE") - self.store_to_file = epics_signal_rw(int, f"{prefix}:CONFIG_WRITE.PROC") - self.restore_from_file = epics_signal_rw(int, f"{prefix}:CONFIG_READ.PROC") - self.status = epics_signal_r(str, f"{prefix}:CONFIG_STATUS") - self.store_to_flash = epics_signal_rw(int, f"{prefix}:STORE.PROC") - self.restore_from_flash = epics_signal_rw(int, f"{prefix}:RESTORE.PROC") - - -class Sys(Device): - def __init__(self, prefix: str): - self.front_panel_outputs = SysFrontPanelOutputs(prefix) - self.rear_panel_outputs = SysRearPanelOutputs(prefix) - self.write_regs_to_file_or_flash = SysWriteRegsToFileFlash(prefix) - - self.version = epics_signal_r(float, f"{prefix}:SYS_VER") - self.initial_poll_done = epics_signal_r(Bool, f"{prefix}:INITIAL_POLL_DONE") - - async def reset(self): - return None - - -####################################################################################### - - -class SoftIn(Device): - def __init__(self, prefix: str): - self.input: DeviceVector[epics_signal_rw[Bool]] = DeviceVector( - { - number: epics_signal_rw(Bool, f"{prefix}:B{number-1}") - for number in range(1, 5) - } - ) - - -class Zebra(Device): - def __init__(self, prefix: str): - """ - Designed to pair well with the epics EDM screens for zebras. - """ - self.pc = PositionCompare(prefix) - - self.and_gates: DeviceVector[LogicGatePanel] = DeviceVector( - { - channel: LogicGatePanel(f"{prefix}:AND{channel}") - for channel in range(1, 5) - } - ) - self.or_gates: DeviceVector[LogicGatePanel] = DeviceVector( - { - channel: LogicGatePanel(f"{prefix}:OR{channel}") - for channel in range(1, 5) - } - ) - self.gate: DeviceVector[Gate] = DeviceVector( - {number: Gate(prefix, number) for number in range(1, 5)} - ) - self.div: DeviceVector[Div] = DeviceVector( - {number: Div(prefix, number) for number in range(1, 5)} - ) - self.pulse: DeviceVector[Pulse] = DeviceVector( - {number: Pulse(prefix, number) for number in range(1, 5)} - ) - self.enc = Enc(prefix) - self.sys = Sys(prefix) - - self.soft_in = SoftIn(f"{prefix}:SOFT_IN") - self.block_state = epics_signal_rw(int, f"{prefix}:SYS_RESET.PROC") - - async def reset(self): - await self.pc.reset() - await self.sys.reset() - - -# RE = RunEngine() - - -# async def somefunc(): -# async with DeviceCollector(): -# # I think I'd like to do and_screen.and[1].inp[1]... -# # so let's start with inp[1]... - -# # and_gates[1].inp[1] - -# # want to do, inp[1].use for example... -# setup_cap = Zebra("BL03S-EA-ZEBRA-01") -# return setup_cap - - -# zebra = asyncio.run(somefunc()) -# print("aha") diff --git a/tests/test_panda.py b/tests/test_panda.py index 694193b..691b6f7 100644 --- a/tests/test_panda.py +++ b/tests/test_panda.py @@ -15,12 +15,12 @@ async def sim_panda(): yield sim_panda -def test_panda_names_correct(pva, sim_panda: PandA): +def test_panda_names_correct(sim_panda: PandA): assert sim_panda.seq[1].name == "sim_panda-seq-1" assert sim_panda.pulse[1].name == "sim_panda-pulse-1" -async def test_panda_children_connected(pva, sim_panda: PandA): +async def test_panda_children_connected(sim_panda: PandA): # try to set and retrieve from simulated values... table = SeqTable( repeats=np.array([1, 1, 1, 32]).astype(np.uint16), diff --git a/tests/test_zebra.py b/tests/test_zebra.py deleted file mode 100644 index f666598..0000000 --- a/tests/test_zebra.py +++ /dev/null @@ -1,114 +0,0 @@ -from typing import Any - -import pytest -from ophyd.v2.core import DeviceCollector - -from ophyd_epics_devices.zebra import ( - ArmSelect, - Bool, - Direction, - EncoderType, - GatePulseSelect, - TimeUnits, - Zebra, -) - -# Long enough for multiple asyncio event loop cycles to run so -# all the tasks have a chance to run -A_WHILE = 0.001 - - -@pytest.fixture -async def sim_zebra(): - async with DeviceCollector(sim=True): - sim_zebra = Zebra("BLxxI-MO-TABLE-01") - # Signals connected here - - assert sim_zebra.name == "sim_zebra" - await sim_zebra.reset() - - yield sim_zebra - - -@pytest.mark.parametrize( - "value", - [(Direction.positive), (Direction.negative)], -) -async def test_setting_direction(value: Any, sim_zebra: Zebra) -> None: - await sim_zebra.pc.setup.posn_dir.set(value) - assert await sim_zebra.pc.setup.posn_dir.get_value() == value - - -@pytest.mark.parametrize( - "value", - [TimeUnits.ms, TimeUnits.s, TimeUnits.s10], -) -async def test_setting_time_units(value: Any, sim_zebra: Zebra) -> None: - await sim_zebra.pc.setup.time_units.set(value) - assert await sim_zebra.pc.setup.time_units.get_value() == value - - -@pytest.mark.parametrize( - "value", - [GatePulseSelect.time, GatePulseSelect.external, GatePulseSelect.position], -) -async def test_setting_pulse_and_gate_selection(value: Any, sim_zebra: Zebra) -> None: - await sim_zebra.pc.pulse.trig_source.set(value) - assert await sim_zebra.pc.pulse.trig_source.get_value() == value - - await sim_zebra.pc.gate.trig_source.set(value) - assert await sim_zebra.pc.gate.trig_source.get_value() == value - - -@pytest.mark.parametrize( - "value", - [ArmSelect.external, ArmSelect.soft], -) -async def test_setting_arm_selection(value: Any, sim_zebra: Zebra) -> None: - await sim_zebra.pc.arm.trig_source.set(value) - assert await sim_zebra.pc.arm.trig_source.get_value() == value - - -@pytest.mark.parametrize( - "value", - [EncoderType.enc1, EncoderType.enc2, EncoderType.enc3, EncoderType.enc4], -) -async def test_setting_gate_trigger(value: Any, sim_zebra: Zebra) -> None: - await sim_zebra.pc.gate.trig_source.set(value) - assert await sim_zebra.pc.gate.trig_source.get_value() == value - - -@pytest.mark.parametrize( - "value", - [Bool.yes, Bool.no], -) -async def test_setting_capture_pvs(value: Any, sim_zebra: Zebra) -> None: - await sim_zebra.pc.setup.capture.div[1].set(value) - assert await sim_zebra.pc.setup.capture.div[1].get_value() == value - - await sim_zebra.pc.setup.capture.div[2].set(value) - assert await sim_zebra.pc.setup.capture.div[2].get_value() == value - - await sim_zebra.pc.setup.capture.div[3].set(value) - assert await sim_zebra.pc.setup.capture.div[3].get_value() == value - - await sim_zebra.pc.setup.capture.div[4].set(value) - assert await sim_zebra.pc.setup.capture.div[4].get_value() == value - - await sim_zebra.pc.setup.capture.sys[1].set(value) - assert await sim_zebra.pc.setup.capture.sys[1].get_value() == value - - await sim_zebra.pc.setup.capture.sys[2].set(value) - assert await sim_zebra.pc.setup.capture.sys[2].get_value() == value - - await sim_zebra.pc.setup.capture.enc[1].set(value) - assert await sim_zebra.pc.setup.capture.enc[1].get_value() == value - - await sim_zebra.pc.setup.capture.enc[2].set(value) - assert await sim_zebra.pc.setup.capture.enc[2].get_value() == value - - await sim_zebra.pc.setup.capture.enc[3].set(value) - assert await sim_zebra.pc.setup.capture.enc[3].get_value() == value - - await sim_zebra.pc.setup.capture.enc[4].set(value) - assert await sim_zebra.pc.setup.capture.enc[4].get_value() == value