diff --git a/README.md b/README.md index 3bd518d..248d2b3 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,9 @@ globals: # kasa will monitor the current state of the device every # poll interval, in seconds. You can override on a per device poll_interval: 11 + # if devices support metering (aka emeter), use this poll + # interval to publish it. You can override on a per device + # emeter_poll_interval: 600 locations: # coffee maker. To turn it on, use mqtt publish # topic: /coffee_maker/switch payload: on @@ -36,10 +39,11 @@ locations: kitchen lights: host: 192.168.1.22 topic: /kitchen/light_switch - # example where we indicate a specific poll interval + # example where we indicate a specific poll intervals pantry: alias: storage poll_interval: 120 + emeter_poll_interval: 1800 ``` Devices are connected via **host** or discovered by Kasa via **alias**. There are more attributes @@ -76,10 +80,16 @@ $ MQTT=192.168.1.250 && \ 2021-01-30T21:43:03-0500 : 0 : /toaster/switch : on ``` +**NOTE on Metering**: If metering is supported by device and `emeter_poll_interval` was provided, it will be published via topics that end with "emeter": + +``` +$ mosquitto_sub -h $MQTT -t '/+/switch/emeter' +``` + In order to damper endless on/off cycles, this implementation sets an [async throttle](https://pypi.org/project/asyncio-throttle/) for each device. If there is a need to tweak that, the attributes are located in -[kasa_wrapper.py](https://github.com/flavio-fernandes/mqtt2kasa/blob/main/mqtt2kasa/kasa_wrapper.py#L26-L27). +[kasa_wrapper.py](https://github.com/flavio-fernandes/mqtt2kasa/blob/60e37a8e527a04eee54853d42366de314c10cefe/mqtt2kasa/kasa_wrapper.py#L30-L31). **NOTE:** Use python 3.7 or newer, as this project requires a somewhat recent implementation of [asyncio](https://realpython.com/async-io-python/). diff --git a/data/config.yaml b/data/config.yaml index 2e19d0f..af71b7b 100644 --- a/data/config.yaml +++ b/data/config.yaml @@ -33,10 +33,12 @@ locations: kitchen lights: host: 192.168.1.22 topic: /kitchen/light_switch - # example where we indicate a specific poll interval + # example where we indicate a specific poll interval. + # Also, adding a task to publish emeter info at provided interval pantry: alias: storage poll_interval: 120 + emeter_poll_interval: 600 keep_alives: # this is a very optional thing but can be useful. It will monitor a # specific topic to determine if a device should be on or off. The diff --git a/data/config.yaml.vagrant b/data/config.yaml.vagrant index 089a547..ffe315f 100644 --- a/data/config.yaml.vagrant +++ b/data/config.yaml.vagrant @@ -9,6 +9,7 @@ knobs: globals: # test with slow polls to make log less confusing poll_interval: 3600 + # emeter_poll_interval: 0 # topic_format: /kasa/device/{} locations: foo: @@ -18,8 +19,11 @@ locations: bar: # host: 192.168.123.202 alias: Mock HS105 thing2 + # this device does not have emeter capabilities + emeter_poll_interval: 888 lar: topic: /lar/switch host: 192.168.123.203 # alias: Mock HS110 thing3 - + # this device has emeter capabilities + emeter_poll_interval: 888 diff --git a/mqtt2kasa/config.py b/mqtt2kasa/config.py index e8aa1aa..28aff8b 100755 --- a/mqtt2kasa/config.py +++ b/mqtt2kasa/config.py @@ -105,6 +105,18 @@ def poll_interval(self, location_name): cfg_globals.get("poll_interval") or const.KASA_DEFAULT_POLL_INTERVAL ) + def emeter_poll_interval(self, location_name): + locations = self._get_info().locations + if isinstance(locations, collections.abc.Mapping): + location_attributes = locations.get(location_name, {}) + if location_attributes.get("emeter_poll_interval"): + return float(location_attributes["emeter_poll_interval"]) + cfg_globals = self._get_info().cfg_globals + return float( + cfg_globals.get("emeter_poll_interval") + or const.KASA_DEFAULT_EMETER_POLL_INTERVAL + ) + @property def locations(self): return self._get_info().locations diff --git a/mqtt2kasa/const.py b/mqtt2kasa/const.py index 01af11c..b1ef9de 100755 --- a/mqtt2kasa/const.py +++ b/mqtt2kasa/const.py @@ -5,4 +5,5 @@ MQTT_DEFAULT_BROKER_IP = "192.168.10.238" MQTT_DEFAULT_RECONNECT_INTERVAL = 13 # [seconds] KASA_DEFAULT_POLL_INTERVAL = 10 # [seconds] +KASA_DEFAULT_EMETER_POLL_INTERVAL = 0 # [seconds] 0 == disabled KEEP_ALIVE_DEFAULT_TASK_INTERVAL = 1.5 # [seconds] diff --git a/mqtt2kasa/events.py b/mqtt2kasa/events.py old mode 100644 new mode 100755 index 95ecf50..2d7cf4c --- a/mqtt2kasa/events.py +++ b/mqtt2kasa/events.py @@ -38,3 +38,9 @@ class KasaStateEvent(BaseEvent): def __init__(self, **attrs): expected_attrs = "name", "state" super().__init__(expected_attrs, attrs) + + +class KasaEmeterEvent(BaseEvent): + def __init__(self, **attrs): + expected_attrs = "name", "emeter_status" + super().__init__(expected_attrs, attrs) diff --git a/mqtt2kasa/kasa_wrapper.py b/mqtt2kasa/kasa_wrapper.py old mode 100644 new mode 100755 index f9670e8..ff7096e --- a/mqtt2kasa/kasa_wrapper.py +++ b/mqtt2kasa/kasa_wrapper.py @@ -4,12 +4,12 @@ from typing import Optional from asyncio_throttle import Throttler -from kasa import Discover +from kasa import Discover, EmeterStatus from kasa.smartdevice import SmartDevice, SmartDeviceException from mqtt2kasa import log from mqtt2kasa.config import Cfg -from mqtt2kasa.events import KasaStateEvent +from mqtt2kasa.events import KasaStateEvent, KasaEmeterEvent logger = log.getLogger() @@ -26,6 +26,7 @@ def __init__(self, name: str, topic: str, config: dict): self.host = config.get("host") self.alias = config.get("alias") self.poll_interval = Cfg().poll_interval(name) + self.emeter_poll_interval = Cfg().emeter_poll_interval(name) self.recv_q = asyncio.Queue(maxsize=4) self.throttler = Throttler(rate_limit=4, period=60) self.curr_state = None @@ -99,6 +100,26 @@ async def turn_off(self): except SmartDeviceException as e: logger.error(f"{self.host} unable to turn_off: {e}") + @property + async def has_emeter(self) -> Optional[bool]: + try: + device = await self._get_device() + await device.update() + return device.has_emeter + except SmartDeviceException as e: + logger.error(f"{self.host} unable to get has_emeter: {e}") + # implicit return None + + @property + async def emeter_realtime(self) -> Optional[EmeterStatus]: + try: + device = await self._get_device() + await device.update() + return device.emeter_realtime + except SmartDeviceException as e: + logger.error(f"{self.host} unable to fetch emeter: {e}") + # implicit return None + @classmethod def state_from_name(cls, is_on: Optional[str]) -> bool: return is_on == cls.STATE_ON @@ -168,13 +189,40 @@ async def handle_kasa_poller(kasa: Kasa, main_events_q: asyncio.Queue): ) ) kasa.curr_state = new_state - await asyncio.sleep(kasa.poll_interval) + await _sleep_with_jitter(kasa.poll_interval) + + +async def handle_kasa_emeter_poller(kasa: Kasa, main_events_q: asyncio.Queue): + fails = 0 + while True: + # chatty + # logger.debug(f"Polling {kasa.name} emeter now. Interval is {kasa.emeter_poll_interval} seconds") + if await kasa.has_emeter == False: + logger.info(f"{kasa.name} has no emeter. no emeter polling is needed") + break + + emeter_status = await kasa.emeter_realtime + if emeter_status is None: + fails += 1 + logger.error( + f"Polling {kasa.name} emeter ({kasa.host}) failed {fails} times" + ) + else: + fails = 0 + await main_events_q.put( + KasaEmeterEvent(name=kasa.name, emeter_status=str(emeter_status)) + ) + await _sleep_with_jitter(kasa.emeter_poll_interval) + + +async def _sleep_with_jitter(interval): + await asyncio.sleep(interval) - # In order to avoid all processes sleeping and waking up at the same time, - # add a little jitter. Pick a value between 0 and 1.2 seconds - jitter = random.randint(99, 1201) - jitterSleep = float(jitter) / 1000 - await asyncio.sleep(jitterSleep) + # In order to avoid all processes sleeping and waking up at the same time, + # add a little jitter. Pick a value between 0 and 1.2 seconds + jitter = random.randint(99, 1201) + jitterSleep = float(jitter) / 1000 + await asyncio.sleep(jitterSleep) async def handle_kasa_requests(kasa: Kasa): diff --git a/mqtt2kasa/keep_alive.py b/mqtt2kasa/keep_alive.py old mode 100644 new mode 100755 diff --git a/mqtt2kasa/main.py b/mqtt2kasa/main.py index 7c25758..abaacc1 100755 --- a/mqtt2kasa/main.py +++ b/mqtt2kasa/main.py @@ -3,12 +3,17 @@ import collections from contextlib import AsyncExitStack -from asyncio_mqtt import Client, MqttError +from aiomqtt import Client, MqttError from mqtt2kasa import log from mqtt2kasa.config import Cfg -from mqtt2kasa.events import KasaStateEvent, MqttMsgEvent -from mqtt2kasa.kasa_wrapper import Kasa, handle_kasa_poller, handle_kasa_requests +from mqtt2kasa.events import KasaStateEvent, KasaEmeterEvent, MqttMsgEvent +from mqtt2kasa.kasa_wrapper import ( + Kasa, + handle_kasa_poller, + handle_kasa_emeter_poller, + handle_kasa_requests, +) from mqtt2kasa.keep_alive import ( KeepAlive, handle_keep_alives, @@ -45,6 +50,24 @@ async def handle_main_event_kasa( await mqtt_send_q.put(MqttMsgEvent(topic=kasa.topic, payload=payload)) +async def handle_emeter_event_kasa( + kasa_emeter: KasaEmeterEvent, run_state: RunState, mqtt_send_q: asyncio.Queue +): + kasa = run_state.kasas.get(kasa_emeter.name) + if not kasa: + logger.warning( + f"Unable to find device with name {kasa_emeter.name}. Ignoring kasa emeter event" + ) + return + topic = f"{kasa.topic}/emeter" + payload = kasa_emeter.emeter_status + logger.info( + f"Kasa emeter event requesting mqtt for {kasa_emeter.name} to publish" + f" {topic} as {payload}" + ) + await mqtt_send_q.put(MqttMsgEvent(topic=topic, payload=payload)) + + async def handle_main_event_mqtt( mqtt_msg: MqttMsgEvent, run_state: RunState, mqtt_send_q: asyncio.Queue ): @@ -95,6 +118,7 @@ async def handle_main_events( ): handlers = { "KasaStateEvent": handle_main_event_kasa, + "KasaEmeterEvent": handle_emeter_event_kasa, "MqttMsgEvent": handle_main_event_mqtt, } while True: @@ -123,7 +147,8 @@ async def cancel_tasks(tasks): async def main_loop(): global stop_gracefully - # https://pypi.org/project/asyncio-mqtt/ + # used to be: https://pypi.org/project/asyncio-mqtt/ + # https://pypi.org/project/aiomqtt/ logger.debug("Starting main event processing loop") cfg = Cfg() mqtt_broker_ip = cfg.mqtt_host @@ -171,6 +196,10 @@ async def main_loop(): for kasa in run_state.kasas.values(): tasks.add(asyncio.create_task(handle_kasa_poller(kasa, main_events_q))) tasks.add(asyncio.create_task(handle_kasa_requests(kasa))) + if kasa.emeter_poll_interval: + tasks.add( + asyncio.create_task(handle_kasa_emeter_poller(kasa, main_events_q)) + ) for name, config in cfg.keep_alives.items(): if name not in run_state.kasas: diff --git a/mqtt2kasa/mqtt.py b/mqtt2kasa/mqtt.py old mode 100644 new mode 100755 diff --git a/mqtt2kasa/tests/basic_test.sh.vagrant b/mqtt2kasa/tests/basic_test.sh.vagrant index c7e9b54..24bf3a5 100755 --- a/mqtt2kasa/tests/basic_test.sh.vagrant +++ b/mqtt2kasa/tests/basic_test.sh.vagrant @@ -30,6 +30,12 @@ grep --quiet -E 'Discovered 192\.168\.123\.202 .*thing2' ${TMP_OUTPUT} || \ grep --quiet -E 'Discovered 192\.168\.123\.203 .* thing3' ${TMP_OUTPUT} || \ { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +echo TEST: EMeter +grep --quiet -E 'bar has no emeter' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E 'emeter event requesting mqtt for lar to publish /lar/switch/emeter' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } + echo TEST: Check on/off mosquitto_pub -h ${MQTT_BROKER} -t /foo -m "ofF" get_log_lines diff --git a/requirements.txt b/requirements.txt index c8d8e3b..c7c3d78 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -asyncio-mqtt +aiomqtt asyncio-throttle paho-mqtt python-kasa