diff --git a/src/ophyd_epics_devices/panda.py b/src/ophyd_epics_devices/panda.py index 4e3ffe6..4f75bc0 100644 --- a/src/ophyd_epics_devices/panda.py +++ b/src/ophyd_epics_devices/panda.py @@ -8,6 +8,7 @@ Optional, Sequence, Tuple, + Type, TypedDict, get_args, get_origin, @@ -25,6 +26,7 @@ SimSignalBackend, ) from ophyd.v2.epics import ( + SignalR, epics_signal_r, epics_signal_rw, epics_signal_w, @@ -79,7 +81,7 @@ class SeqBlock(Device): class PcapBlock(Device): - arm: SignalX + active: SignalR[bool] class PVIEntry(TypedDict, total=False): @@ -177,22 +179,11 @@ async def _make_block(self, name: str, num: int, block_pv: str, sim: bool = Fals entry: Optional[PVIEntry] = block_pvi.get(sig_name) if entry is None: raise Exception( - f"{self.__class__.__name__} has a {name} block containing a " - + f"{sig_name} signal which has not been retrieved by PVI." + f"{self.__class__.__name__} has a {name} block containing a/" + + f"an {sig_name} signal which has not been retrieved by PVI." ) - pvs = [entry[i] for i in frozenset(entry.keys())] # type: ignore - if len(pvs) == 1: - read_pv = write_pv = pvs[0] - else: - read_pv, write_pv = pvs - - signal_factory = self.pvi_mapping[frozenset(entry.keys())] - signal = signal_factory( - args[0] if len(args) > 0 else None, - "pva://" + read_pv, - "pva://" + write_pv, - ) + signal = self._make_signal(entry, args[0] if len(args) > 0 else None) else: backend = SimSignalBackend(args[0] if len(args) > 0 else None, block_pv) @@ -200,6 +191,14 @@ async def _make_block(self, name: str, num: int, block_pv: str, sim: bool = Fals setattr(block, sig_name, signal) + # checks for any extra pvi information not contained in this class + if block_pvi: + for attr, attr_pvi in block_pvi.items(): + if not hasattr(block, attr): + # makes any extra signals + signal = self._make_signal(attr_pvi) + setattr(block, attr, signal) + return block async def _make_untyped_block(self, block_pv: str): @@ -212,19 +211,24 @@ async def _make_untyped_block(self, block_pv: str): block_pvi = await pvi_get(block_pv, self.ctxt) for signal_name, signal_pvi in block_pvi.items(): - signal_factory = self.pvi_mapping[frozenset(signal_pvi.keys())] + signal = self._make_signal(signal_pvi) + setattr(block, signal_name, signal) - pvs = [signal_pvi[i] for i in frozenset(signal_pvi.keys())] # type: ignore - if len(pvs) == 1: - read_pv = write_pv = pvs[0] - else: - read_pv, write_pv = pvs + return block - signal = signal_factory(None, "pva://" + read_pv, "pva://" + write_pv) + def _make_signal(self, signal_pvi: PVIEntry, dtype: Optional[Type] = None): + """Make a signal. - setattr(block, signal_name, signal) + This assumes datatype is None so it can be used to create dynamic signals. + """ + operations = frozenset(signal_pvi.keys()) + pvs = [signal_pvi[i] for i in operations] # type: ignore + signal_factory = self.pvi_mapping[operations] - return block + write_pv = pvs[0] + read_pv = write_pv if len(pvs) == 1 else pvs[1] + + return signal_factory(dtype, "pva://" + read_pv, "pva://" + write_pv) def set_attribute(self, name, num, block): anno = get_type_hints(self).get(name) @@ -263,10 +267,9 @@ async def connect(self, sim=False) -> None: self.set_attribute(name, num, block) - # then check if the ones defined in this class are at least made. + # then check if the ones defined in this class are in the pvi info + # make them if there is no pvi info, i.e. sim mode. for block_name in hints.keys(): - pv = "sim://" - if pvi is not None: pvi_name = block_name @@ -280,8 +283,7 @@ async def connect(self, sim=False) -> None: "d" ], f"Expected PandA to only contain blocks, got {entry}" else: - # or, if there's no pvi info, just make the minimum blocks needed - block = await self._make_block(block_name, 1, pv, sim=sim) + block = await self._make_block(block_name, 1, "sim://", sim=sim) self.set_attribute(block_name, 1, block) self.set_name(self.name) diff --git a/tests/conftest.py b/tests/conftest.py index 4aa4f94..5f13cec 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -54,7 +54,7 @@ def pva(): universal_newlines=True, ) for macros in [ - "INCLUDE_EXTRA_BLOCK=", + "INCLUDE_EXTRA_BLOCK=,INCLUDE_EXTRA_SIGNAL=", "EXCLUDE_WIDTH=#,IOC_NAME=PANDAQSRVIB", "EXCLUDE_PCAP=#,IOC_NAME=PANDAQSRVI", ] diff --git a/tests/db/panda.db b/tests/db/panda.db index 97d4f87..3589d4f 100644 --- a/tests/db/panda.db +++ b/tests/db/panda.db @@ -28,6 +28,21 @@ $(EXCLUDE_WIDTH=) } $(EXCLUDE_WIDTH=) }) $(EXCLUDE_WIDTH=)} +record(bi, "$(IOC_NAME=PANDAQSRV):PCAP:ACTIVE") +{ + field(ZNAM, "0") + field(ONAM, "1") + field(PINI, "YES") + info(Q:group, { + "$(IOC_NAME=PANDAQSRV):PCAP:PVI": { + "pvi.active.r": { + "+channel": "NAME", + "+type": "plain" + } + } + }) +} + record(waveform, "BOOL:PLEASE") { field(NELM, 10) @@ -436,6 +451,19 @@ $(EXCLUDE_PCAP=) } $(EXCLUDE_PCAP=) } $(EXCLUDE_PCAP=) }) $(EXCLUDE_PCAP=)} + +$(INCLUDE_EXTRA_SIGNAL=#)record(ao, "$(IOC_NAME=PANDAQSRV):PCAP:ARM2") +$(INCLUDE_EXTRA_SIGNAL=#){ +$(INCLUDE_EXTRA_SIGNAL=#) info(Q:group, { +$(INCLUDE_EXTRA_SIGNAL=#) "$(IOC_NAME=PANDAQSRV):PCAP:PVI": { +$(INCLUDE_EXTRA_SIGNAL=#) "pvi.arm2.x": { +$(INCLUDE_EXTRA_SIGNAL=#) "+channel": "NAME", +$(INCLUDE_EXTRA_SIGNAL=#) "+type": "plain" +$(INCLUDE_EXTRA_SIGNAL=#) } +$(INCLUDE_EXTRA_SIGNAL=#) } +$(INCLUDE_EXTRA_SIGNAL=#) }) +$(INCLUDE_EXTRA_SIGNAL=#)} + $(EXCLUDE_PCAP=) $(EXCLUDE_PCAP=) $(EXCLUDE_PCAP=)record(stringin, "$(IOC_NAME=PANDAQSRV):PCAP:_PVI") diff --git a/tests/test_panda.py b/tests/test_panda.py index 691b6f7..c4528e4 100644 --- a/tests/test_panda.py +++ b/tests/test_panda.py @@ -62,11 +62,12 @@ async def test_panda_with_missing_blocks(pva): await panda.connect() -async def test_panda_with_extra_blocks(pva): +async def test_panda_with_extra_blocks_and_signals(pva): panda = PandA("PANDAQSRV") await panda.connect() assert panda.extra, "extra device has not been instantiated" # type: ignore + assert panda.pcap.arm2, "extra signal not instantiated" # type: ignore async def test_panda_block_missing_signals(pva):