Skip to content

Commit

Permalink
Add support for lights
Browse files Browse the repository at this point in the history
  • Loading branch information
albireox committed Aug 31, 2023
1 parent 43a3f6b commit d8c0226
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 126 deletions.
26 changes: 12 additions & 14 deletions python/lvmecp/actor/commands/lights.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

import click

from lvmecp.lights import CODE_TO_LIGHT

from . import parser


Expand All @@ -24,12 +22,12 @@


@parser.command()
@click.argument("LIGHT", type=str, required=False)
@click.argument(
"ACTION",
type=click.Choice(["on", "off", "switch"], case_sensitive=False),
type=click.Choice(["on", "off", "toggle", "status"], case_sensitive=False),
required=False,
)
@click.argument("LIGHT", type=str, required=False)
async def lights(
command: ECPCommand,
light: str | None = None,
Expand All @@ -39,23 +37,23 @@ async def lights(

plc = command.actor.plc

if light is None:
return command.finish(lights=(await plc.lights.get_all()))
if light is None or action == "status":
await plc.lights.notify_status(wait=True, command=command)
return command.finish()

try:
code = plc.lights.get_code(light)
full = CODE_TO_LIGHT[code]
plc.lights.get_code(light)
except ValueError:
return command.fail(f"Unknown light {light}.")

if action == "on":
await plc.lights.set(light, action=True)
await plc.lights.on(light)
elif action == "off":
await plc.lights.set(light, action=False)
elif action == "switch":
await plc.lights.set(light, action=None)
await plc.lights.off(light)
elif action == "toggle":
await plc.lights.toggle(light)

await asyncio.sleep(0.1)

status = await plc.lights.get_light_status(code)
return command.finish(lights={full: status})
await plc.lights.notify_status(wait=True, command=command)
return command.finish()
2 changes: 1 addition & 1 deletion python/lvmecp/actor/commands/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def status(command: ECPCommand):

command.info(registers=(await plc.read_all_registers()))

modules = [plc.dome, plc.safety]
modules = [plc.dome, plc.safety, plc.lights]
for module in modules:
await module.notify_status(wait=True, command=command)

Expand Down
5 changes: 1 addition & 4 deletions python/lvmecp/etc/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
"type": "object",
"properties": {
"lights": {
"type": "object",
"patternProperties": {
"[a-z]+": { "type": "boolean" }
}
"type": "string"
},
"registers": {
"type": "object",
Expand Down
141 changes: 35 additions & 106 deletions python/lvmecp/lights.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@

from __future__ import annotations

from typing import TYPE_CHECKING
import asyncio

from lvmecp import log
from lvmecp.maskbits import LightStatus
from lvmecp.tools import loop_coro


if TYPE_CHECKING:
from .plc import PLC
from lvmecp.module import PLCModule


__all__ = ["LightsController", "CODE_TO_LIGHT", "CODE_TO_FLAG"]
Expand All @@ -40,35 +37,23 @@
}


class LightsController:
class LightsController(PLCModule):
"""Controller for the light settings."""

def __init__(self, plc: PLC):
self.plc = plc
self.client = plc.modbus.client

self.status = LightStatus(0)

self.__update_loop_task = loop_coro(self.update, 60)

def __del__(self):
self.__update_loop_task.cancel()
flag = LightStatus

async def update(self):
"""Refreshes the lights status."""
async def _update_internal(self):
"""Update status."""

lights = await self.plc.modbus.read_group("lights")
light_registers = await self.plc.modbus.read_group("lights")

for light, value in lights.items():
code = light.split("_")[0]
flag = CODE_TO_FLAG[code]
active_bits = self.flag(0)
for key in light_registers:
if "status" in key and light_registers[key] is True:
code = key.split("_")[0]
active_bits |= CODE_TO_FLAG[code]

if value is True:
self.status |= flag
else:
self.status &= ~flag

return self.status
return active_bits

def get_code(self, light: str):
"""Returns the short-form code for a light. Case-insensitive.
Expand Down Expand Up @@ -116,8 +101,8 @@ def get_flag(self, light: str):
----------
light
The light for which the `.LightStatus` a flag is requested. It can
be specified in short form (e.g., `tr`), using underscores
(`telescope_red`), or spaces (`telescope red`). The light name
be specified in short form (e.g., ``tr``), using underscores
(``telescope_red``), or spaces (``telescope red``). The light name
is case-insensitive.
"""
Expand All @@ -126,91 +111,35 @@ def get_flag(self, light: str):

return CODE_TO_FLAG[code]

async def get_light_status(self, light: str, update: bool = True) -> bool:
"""Returns the status of a light.
async def toggle(self, light: str):
"""Switches a light."""

Parameters
----------
light
The light for which to get the status.
update
Whether to update the status of all lights before returning
the status. If `False`, the last cached status will be used.
code = self.get_code(light)

Returns
-------
status
`True` if the light is on, `False` otherwise.
log.debug(f"Toggling light {code}.")
await self.modbus[f"{code}_new"].set(True)

Raises
------
ValueError
If the light is unknown.
await asyncio.sleep(0.5)
await self.update()

"""
async def on(self, light: str):
"""Turns on a light."""

if update:
await self.update()
await self.update()

flag = self.get_flag(light)
if self.status & flag:
return

return bool(self.status & flag)

async def get_all(self, update: bool = True) -> dict[str, bool]:
"""Returns a dictionary with the status of all the lights.
Parameters
----------
update
Whether to update the status of all lights before returning.
If `False`, the last cached status will be used.
Returns
-------
status
A dictionary of light name to status (`True` for on, `False` for off).
"""

if update:
await self.update()

status = {}
for code, light in CODE_TO_LIGHT.items():
status[light] = bool(self.status & CODE_TO_FLAG[code])

return status

async def set(self, light: str, action: bool | None = None):
"""Turns a light on/off.
Updates the internal status dictionary.
Parameters
----------
light
The light to command.
action
`True` to turn on the light, `False` for off. If `None`, the light
status will be switched. Doesn't do anything if the light is
already at the desired state.
"""

current = await self.get_light_status(light, update=True)
await self.toggle(light)

code_new = self.get_code(light) + "_new"
async def off(self, light: str):
"""Turns off a light."""

if action is None:
action = not current
await self.update()

if current == action:
flag = self.get_flag(light)
if not (self.status & flag):
return

await self.plc.modbus[code_new].set(action)

flag = self.get_flag(light)
if action is True:
self.status |= flag
else:
self.status &= ~flag
await self.toggle(light)
7 changes: 6 additions & 1 deletion python/lvmecp/plc.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ def __init__(self, config: dict, actor: ECPActor | None = None):
notifier=create_actor_notifier(actor, "safety_status"),
)

self.lights = LightsController(self)
self.lights = LightsController(
"lights",
self,
interval=1,
notifier=create_actor_notifier(actor, "lights"),
)

async def read_all_registers(self):
"""Reads all the connected registers and returns a dictionary."""
Expand Down

0 comments on commit d8c0226

Please sign in to comment.