Skip to content

Commit

Permalink
Added exhaustive testing of automatic signals including cases where a…
Browse files Browse the repository at this point in the history
… proxy and datatype and unspecified at initialization
  • Loading branch information
burkeds committed Aug 19, 2024
1 parent 969db94 commit 6d41773
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 71 deletions.
20 changes: 12 additions & 8 deletions src/ophyd_async/tango/signal/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,15 @@ def tango_signal_x(

# --------------------------------------------------------------------
def tango_signal_auto(
datatype: Type[T], full_trl: str, device_proxy: Optional[DeviceProxy] = None
datatype: Type[T],
trl: str,
device_proxy: Optional[DeviceProxy] = None,
timeout: float = DEFAULT_TIMEOUT,
name: str = "",
) -> Union[SignalW, SignalX, SignalR, SignalRW]:
device_trl, tr_name = full_trl.rsplit("/", 1)
device_trl, tr_name = trl.rsplit("/", 1)
syn_proxy = SyncDeviceProxy(device_trl)
backend = _make_backend(datatype, full_trl, full_trl, device_proxy)
backend = _make_backend(datatype, trl, trl, device_proxy)

if tr_name not in syn_proxy.get_attribute_list():
if tr_name not in syn_proxy.get_command_list():
Expand All @@ -151,18 +155,18 @@ def tango_signal_auto(
if tr_name in syn_proxy.get_attribute_list():
config = syn_proxy.get_attribute_config(tr_name)
if config.writable in [AttrWriteType.READ_WRITE, AttrWriteType.READ_WITH_WRITE]:
return SignalRW(backend)
return SignalRW(backend, timeout=timeout, name=name)
elif config.writable == AttrWriteType.READ:
return SignalR(backend)
return SignalR(backend, timeout=timeout, name=name)
else:
return SignalW(backend)
return SignalW(backend, timeout=timeout, name=name)

if tr_name in syn_proxy.get_command_list():
config = syn_proxy.get_command_config(tr_name)
if config.in_type == CmdArgType.DevVoid:
return SignalX(backend)
return SignalX(backend, timeout=timeout, name=name)
elif config.out_type != CmdArgType.DevVoid:
return SignalRW(backend)
return SignalRW(backend, timeout=timeout, name=name)

if tr_name in device_proxy.get_pipe_list():
raise NotImplementedError("Pipes are not supported")
256 changes: 193 additions & 63 deletions tests/tango/test_tango_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from bluesky.protocols import Reading
from test_base_device import TestDevice

from ophyd_async.core import SignalBackend, SignalR, SignalRW, SignalW, SignalX, T
from ophyd_async.core import SignalBackend, SignalRW, SignalX, T
from ophyd_async.tango._backend import TangoTransport
from ophyd_async.tango.signal import (
tango_signal_auto,
Expand Down Expand Up @@ -561,10 +561,48 @@ async def test_tango_signal_rw(
assert_close(location["readback"], put_value)


# --------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
"pv, tango_type, d_format, py_type, initial_value, put_value",
COMMANDS_SET,
ids=[x[0] for x in COMMANDS_SET],
)
async def test_tango_signal_x(
echo_device: str,
pv: str,
tango_type: str,
d_format: AttrDataFormat,
py_type: Type[T],
initial_value: T,
put_value: T,
):
source = echo_device + "/" + pv
timeout = 0.1
signal = tango_signal_auto(
datatype=py_type,
trl=source,
device_proxy=None,
name="test_signal",
timeout=timeout,
)
await signal.connect()
assert signal
reading = await signal.read()
assert reading["test_signal"]["value"] is None

await signal.set(put_value, wait=True, timeout=0.1)
reading = await signal.read()
value = reading["test_signal"]["value"]
if isinstance(value, np.ndarray):
value = value.tolist()
assert_close(value, put_value)


# --------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize("use_proxy", [True, False])
async def test_tango_signal_x(tango_test_device: str, use_proxy: bool):
async def test_tango_signal_x_none(tango_test_device: str, use_proxy: bool):
proxy = await DeviceProxy(tango_test_device) if use_proxy else None
timeout = 0.1
signal = tango_signal_x(
Expand All @@ -581,75 +619,167 @@ async def test_tango_signal_x(tango_test_device: str, use_proxy: bool):

# --------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize("use_proxy", [True, False])
@pytest.mark.parametrize(
"attr_type",
"pv, tango_type, d_format, py_type, initial_value, put_value, use_dtype, use_proxy",
[
("justvalue", int),
("writeonly", int),
("readonly", int),
("clear", None),
("echo", str),
("nonexistant", int),
(
pv,
tango_type,
d_format,
py_type,
initial_value,
put_value,
use_dtype,
use_proxy,
)
for (
pv,
tango_type,
d_format,
py_type,
initial_value,
put_value,
) in ATTRIBUTES_SET
for use_dtype in [True, False]
for use_proxy in [True, False]
],
ids=[
f"{x[0]}_{use_dtype}_{use_proxy}"
for x in ATTRIBUTES_SET
for use_dtype in [True, False]
for use_proxy in [True, False]
],
)
async def test_tango_signal_auto(
tango_test_device: str, attr_type: tuple[str, Type[T]], use_proxy: bool
async def test_tango_signal_auto_attrs(
echo_device: str,
pv: str,
tango_type: str,
d_format: AttrDataFormat,
py_type: Type[T],
initial_value: T,
put_value: T,
use_dtype: bool,
use_proxy: bool,
):
proxy = await DeviceProxy(tango_test_device) if use_proxy else None
attr, py_type = attr_type
await prepare_device(echo_device, pv, initial_value)
source = echo_device + "/" + pv
proxy = await DeviceProxy(echo_device) if use_proxy else None
timeout = 0.1

async def _test_signal(dtype, proxy):
signal = tango_signal_auto(
datatype=dtype,
trl=source,
device_proxy=proxy,
timeout=timeout,
name="test_signal",
)
assert type(signal) is SignalRW
await signal.connect()
reading = await signal.read()
value = reading["test_signal"]["value"]
if isinstance(value, np.ndarray):
value = value.tolist()
assert_close(value, initial_value)

await signal.set(put_value, wait=True, timeout=timeout)
reading = await signal.read()
value = reading["test_signal"]["value"]
if isinstance(value, np.ndarray):
value = value.tolist()
assert_close(value, put_value)

dtype = py_type if use_dtype else None
await _test_signal(dtype, proxy)


# --------------------------------------------------------------------


@pytest.mark.asyncio
@pytest.mark.parametrize(
"pv, tango_type, d_format, py_type, initial_value, put_value, use_dtype, use_proxy",
[
(
pv,
tango_type,
d_format,
py_type,
initial_value,
put_value,
use_dtype,
use_proxy,
)
for (
pv,
tango_type,
d_format,
py_type,
initial_value,
put_value,
) in COMMANDS_SET
for use_dtype in [True, False]
for use_proxy in [True, False]
],
ids=[
f"{x[0]}_{use_dtype}_{use_proxy}"
for x in COMMANDS_SET
for use_dtype in [True, False]
for use_proxy in [True, False]
],
)
async def test_tango_signal_auto_cmds(
echo_device: str,
pv: str,
tango_type: str,
d_format: AttrDataFormat,
py_type: Type[T],
initial_value: T,
put_value: T,
use_dtype: bool,
use_proxy: bool,
):
source = echo_device + "/" + pv
timeout = 0.1
if attr == "nonexistant":
with pytest.raises(RuntimeError) as exc_info:
signal = tango_signal_auto(
datatype=py_type,
full_trl=tango_test_device + "/" + attr,
device_proxy=proxy,
)
await signal.connect()
assert "Cannot find" in str(exc_info.value)
return

async def _test_signal(dtype, proxy):
signal = tango_signal_auto(
datatype=dtype,
trl=source,
device_proxy=proxy,
name="test_signal",
timeout=timeout,
)
# Ophyd SignalX does not support types
assert type(signal) is SignalRW
await signal.connect()
assert signal
reading = await signal.read()
assert reading["test_signal"]["value"] is None

await signal.set(put_value, wait=True, timeout=0.1)
reading = await signal.read()
value = reading["test_signal"]["value"]
if isinstance(value, np.ndarray):
value = value.tolist()
assert_close(value, put_value)

proxy = await DeviceProxy(echo_device) if use_proxy else None
dtype = py_type if use_dtype else None
await _test_signal(dtype, proxy)


# --------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize("use_proxy", [True, False])
async def test_tango_signal_auto_cmds_none(tango_test_device: str, use_proxy: bool):
proxy = await DeviceProxy(tango_test_device) if use_proxy else None
signal = tango_signal_auto(
datatype=py_type,
full_trl=tango_test_device + "/" + attr,
datatype=None,
trl=tango_test_device + "/" + "clear",
device_proxy=proxy,
)
assert type(signal) is SignalX
await signal.connect()

if isinstance(signal, SignalRW):
datatype = signal._backend.datatype
if datatype is int:
new_value = choice([1, 2, 3, 4, 5])
elif datatype is float:
new_value = choice([1.1, 2.2, 3.3, 4.4, 5.5])
elif datatype is str:
new_value = choice(["aaa", "bbb", "ccc"])
elif datatype is np.ndarray:
new_value = np.array([choice([1, 2, 3, 4, 5]), choice([1, 2, 3, 4, 5])])
else:
new_value = None
await signal.set(new_value, timeout=timeout)
location = await signal.locate()
assert_close(location["setpoint"], new_value)
assert_close(location["readback"], new_value)
return

if isinstance(signal, SignalR):
reading = await signal.read()
assert reading[""]["value"] == 7
return

if isinstance(signal, SignalW):
status = signal.set(1, timeout=timeout)
await status
assert status.done is True and status.success is True
return

if isinstance(signal, SignalX):
status = signal.trigger()
await status
assert status.done is True and status.success is True
return

assert False
assert signal
await signal.trigger(wait=True)

0 comments on commit 6d41773

Please sign in to comment.