diff --git a/tests/data/devices/ikea-of-sweden-fyrtur-block-out-roller-blind.json b/tests/data/devices/ikea-of-sweden-fyrtur-block-out-roller-blind.json index 1e9a5f425..7cbcd9c59 100644 --- a/tests/data/devices/ikea-of-sweden-fyrtur-block-out-roller-blind.json +++ b/tests/data/devices/ikea-of-sweden-fyrtur-block-out-roller-blind.json @@ -341,7 +341,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/ikea-of-sweden-praktlysing-cellular-blind.json b/tests/data/devices/ikea-of-sweden-praktlysing-cellular-blind.json index 7eb29b743..00119d3f4 100644 --- a/tests/data/devices/ikea-of-sweden-praktlysing-cellular-blind.json +++ b/tests/data/devices/ikea-of-sweden-praktlysing-cellular-blind.json @@ -347,7 +347,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/smartwings-wm25-l-z.json b/tests/data/devices/smartwings-wm25-l-z.json index ecff3bf8d..8cff074bf 100644 --- a/tests/data/devices/smartwings-wm25-l-z.json +++ b/tests/data/devices/smartwings-wm25-l-z.json @@ -341,7 +341,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/third-reality-inc-3rsb015bz.json b/tests/data/devices/third-reality-inc-3rsb015bz.json index ff7bb1a0e..25bb6af68 100644 --- a/tests/data/devices/third-reality-inc-3rsb015bz.json +++ b/tests/data/devices/third-reality-inc-3rsb015bz.json @@ -840,7 +840,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/tze200-9caxna4s-ts0301.json b/tests/data/devices/tze200-9caxna4s-ts0301.json index f5a497766..c911b87bf 100644 --- a/tests/data/devices/tze200-9caxna4s-ts0301.json +++ b/tests/data/devices/tze200-9caxna4s-ts0301.json @@ -268,7 +268,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/yooksmart-d10110.json b/tests/data/devices/yooksmart-d10110.json index b267530c7..5663acc27 100644 --- a/tests/data/devices/yooksmart-d10110.json +++ b/tests/data/devices/yooksmart-d10110.json @@ -280,7 +280,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/test_cover.py b/tests/test_cover.py index 5e7a66ea1..7b73ef906 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -26,15 +26,10 @@ from zha.application import Platform from zha.application.const import ATTR_COMMAND from zha.application.gateway import Gateway -from zha.application.platforms.cover import ( - ATTR_CURRENT_POSITION, - STATE_CLOSED, - STATE_OPEN, -) from zha.application.platforms.cover.const import ( - STATE_CLOSING, - STATE_OPENING, + ATTR_CURRENT_POSITION, CoverEntityFeature, + CoverState, ) from zha.exceptions import ZHAException @@ -123,7 +118,7 @@ async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument entity = get_entity(zha_device, platform=Platform.COVER) state = entity.state - assert state["state"] == STATE_OPEN + assert state["state"] == CoverState.OPEN assert state[ATTR_CURRENT_POSITION] == 100 # test update @@ -137,7 +132,7 @@ async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument await entity.async_update() assert cluster.read_attributes.call_count == prev_call_count + 1 - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED assert entity.state[ATTR_CURRENT_POSITION] == 0 @@ -146,6 +141,13 @@ async def test_cover( ) -> None: """Test zha cover platform.""" + # Timeout for device movement following a position attribute update + DEFAULT_MOVEMENT_TIMEOUT: float = 5 + + # Upper limit for dynamic timeout + LIFT_MOVEMENT_TIMEOUT_RANGE: float = 300 + TILT_MOVEMENT_TIMEOUT_RANGE: float = 30 + zigpy_cover_device = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_DEVICE) cluster = zigpy_cover_device.endpoints.get(1).window_covering cluster.PLUGGED_ATTR_READS = { @@ -186,35 +188,42 @@ async def test_cover( | CoverEntityFeature.SET_TILT_POSITION ) - # test that the state has changed from unavailable to off + # set lift to 100% (closed) and test that the state has changed from unavailable to open + # the starting open tilt position overrides the closed lift state await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 100} ) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.OPEN - # test to see if it opens + # test that the state closes after tilting to 100% (closed) await send_attributes_report( - zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 0} + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.CLOSED - # test that the state remains after tilting to 100% + # set lift to 0% (open) and test to see if state changes to open await send_attributes_report( - zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} + zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN - # test to see the state remains after tilting to 0% + # test that the state remains after tilting to 0% (open) await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN + + # test to see the state remains after tilting to 100% (closed) + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} + ) + assert entity.state["state"] == CoverState.OPEN cluster.PLUGGED_ATTR_READS = {1: 100} update_attribute_cache(cluster) await entity.async_update() await zha_gateway.async_block_till_done() - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # close from client with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]): @@ -226,16 +235,26 @@ async def test_cover( assert cluster.request.call_args[0][2].command.name == WCCmds.down_close.name assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 100} ) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED + + # verify that a subsequent close command does not change the state to closing + await entity.async_close_cover() + assert entity.state["state"] == CoverState.CLOSED # tilt close from client with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]): + # reset the tilt to 0% (open) + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} + ) + assert entity.state["state"] == CoverState.OPEN + await entity.async_close_cover_tilt() await zha_gateway.async_block_till_done() assert cluster.request.call_count == 1 @@ -248,13 +267,17 @@ async def test_cover( assert cluster.request.call_args[0][3] == 100 assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} ) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED + + # verify that a subsequent close command does not change the state to closing + await entity.async_close_cover_tilt() + assert entity.state["state"] == CoverState.CLOSED # open from client with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): @@ -266,13 +289,17 @@ async def test_cover( assert cluster.request.call_args[0][2].command.name == WCCmds.up_open.name assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_OPENING + assert entity.state["state"] == CoverState.OPENING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN + + # verify that a subsequent open command does not change the state to opening + await entity.async_open_cover() + assert entity.state["state"] == CoverState.OPEN # open tilt from client with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): @@ -288,17 +315,22 @@ async def test_cover( assert cluster.request.call_args[0][3] == 0 assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_OPENING + assert entity.state["state"] == CoverState.OPENING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN - # set position UI + # verify that a subsequent open command does not change the state to opening + await entity.async_open_cover_tilt() + assert entity.state["state"] == CoverState.OPEN + + # test set position command, starting at 100 % / 0 ZCL (open) from previous lift test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): - await entity.async_set_cover_position(position=47) + assert entity.state["current_position"] == 100 + await entity.async_set_cover_position(position=47) # 53 when inverted for ZCL await zha_gateway.async_block_till_done() assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False @@ -307,23 +339,32 @@ async def test_cover( assert cluster.request.call_args[0][3] == 53 assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 35} ) - assert entity.state["state"] == STATE_CLOSING + assert entity.state["current_position"] == 65 + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 53} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["current_position"] == 47 + assert entity.state["state"] == CoverState.OPEN - # set tilt position UI + # verify that a subsequent go_to command does not change the state to closing/opening + await entity.async_set_cover_position(position=47) + assert entity.state["state"] == CoverState.OPEN + + # test set tilt position command, starting at 100 % / 0 ZCL (open) from previous tilt test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): - await entity.async_set_cover_tilt_position(tilt_position=47) + assert entity.state["current_tilt_position"] == 100 + await entity.async_set_cover_tilt_position( + tilt_position=47 + ) # 53 when inverted for ZCL await zha_gateway.async_block_till_done() assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False @@ -335,19 +376,167 @@ async def test_cover( assert cluster.request.call_args[0][3] == 53 assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( - zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 35} + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 35} ) - assert entity.state["state"] == STATE_CLOSING + assert entity.state["current_tilt_position"] == 65 + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( - zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 53} + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 53} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["current_tilt_position"] == 47 + assert entity.state["state"] == CoverState.OPEN + + # verify that a subsequent go_to command does not change the state to closing/opening + await entity.async_set_cover_tilt_position(tilt_position=47) + assert entity.state["state"] == CoverState.OPEN + + # test interrupted movement (e.g. device button press), starting from 47 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + await entity.async_set_cover_position(position=0) # 100 when inverted for ZCL + await zha_gateway.async_block_till_done() + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x05 + assert cluster.request.call_args[0][2].command.name == "go_to_lift_percentage" + assert cluster.request.call_args[0][3] == 100 + assert cluster.request.call_args[1]["expect_reply"] is True + + assert entity.state["current_position"] == 47 + assert entity.state["state"] == CoverState.CLOSING + + # simulate a device position update to set timer to the default duration rather than dynamic + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 70} + ) + + assert entity.state["current_position"] == 30 + assert entity.state["state"] == CoverState.CLOSING + + # wait the timer duration + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPEN + + # test interrupted tilt movement (e.g. device button press), starting from 47 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + await entity.async_set_cover_tilt_position( + tilt_position=0 + ) # 100 when inverted for ZCL + await zha_gateway.async_block_till_done() + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x08 + assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert cluster.request.call_args[0][3] == 100 + assert cluster.request.call_args[1]["expect_reply"] is True + + assert entity.state["current_tilt_position"] == 47 + assert entity.state["state"] == CoverState.CLOSING + + # simulate a device position update to set timer to the default duration rather than dynamic + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 70} + ) + + assert entity.state["current_tilt_position"] == 30 + assert entity.state["state"] == CoverState.CLOSING + + # wait the timer duration + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPEN + + # test device instigated movement (e.g. device button press), starting from 30 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_position"] == 30 + assert entity.state["state"] == CoverState.OPEN + + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 60} + ) + + assert entity.state["current_position"] == 40 + assert entity.state["state"] == CoverState.OPENING + + # wait the default timer duration + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPEN + + # test device instigated tilt movement (e.g. device button press), starting from 30 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_tilt_position"] == 30 + assert entity.state["state"] == CoverState.OPEN + + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 60} + ) + + assert entity.state["current_tilt_position"] == 40 + assert entity.state["state"] == CoverState.OPENING + + # wait the default timer duration + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPEN + + # test dynamic movement timeout, starting from 40 % and moving to 90 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_position"] == 40 + assert entity.state["state"] == CoverState.OPEN + + await entity.async_set_cover_position(position=90) # 10 when inverted for ZCL + await zha_gateway.async_block_till_done() + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x05 + assert cluster.request.call_args[0][2].command.name == "go_to_lift_percentage" + assert cluster.request.call_args[0][3] == 10 + assert cluster.request.call_args[1]["expect_reply"] is True + + assert entity.state["state"] == CoverState.OPENING + + # wait the default timer duration and verify status is still opening + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPENING + + # wait the remainder of the dynamic timeout and check if the movement timed out: (50% * 300 seconds) - default + await asyncio.sleep( + (50 * 0.01 * LIFT_MOVEMENT_TIMEOUT_RANGE) - DEFAULT_MOVEMENT_TIMEOUT + ) + assert entity.state["current_position"] == 40 + assert entity.state["state"] == CoverState.OPEN + + # test dynamic tilt movement timeout, starting from 40 % and moving to 90 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_tilt_position"] == 40 + assert entity.state["state"] == CoverState.OPEN + + await entity.async_set_cover_tilt_position( + tilt_position=90 + ) # 10 when inverted for ZCL + await zha_gateway.async_block_till_done() + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x08 + assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert cluster.request.call_args[0][3] == 10 + assert cluster.request.call_args[1]["expect_reply"] is True + + assert entity.state["state"] == CoverState.OPENING + + # wait the default timer duration and verify status is still opening + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPENING + + # wait the remainder of the dynamic timeout and check if the movement timed out: (50% * 30 seconds) - default + await asyncio.sleep( + (50 * 0.01 * TILT_MOVEMENT_TIMEOUT_RANGE) - DEFAULT_MOVEMENT_TIMEOUT + ) + assert entity.state["current_tilt_position"] == 40 + assert entity.state["state"] == CoverState.OPEN # stop from client with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]): @@ -390,7 +579,7 @@ async def test_cover_failures(zha_gateway: Gateway) -> None: zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # close from UI with patch( @@ -408,7 +597,7 @@ async def test_cover_failures(zha_gateway: Gateway) -> None: cluster.request.call_args[0][1] == closures.WindowCovering.ServerCommandDefs.down_close.id ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN with patch( "zigpy.zcl.Cluster.request", @@ -554,17 +743,17 @@ async def test_shade( await send_attributes_report( zha_gateway, cluster_on_off, {cluster_on_off.AttributeDefs.on_off.id: 0} ) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # test to see if it opens await send_attributes_report( zha_gateway, cluster_on_off, {cluster_on_off.AttributeDefs.on_off.id: 1} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN await entity.async_update() await zha_gateway.async_block_till_done() - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # close from client command fails with ( @@ -582,7 +771,7 @@ async def test_shade( assert cluster_on_off.request.call_count == 1 assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0000 - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN with patch( "zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x1, zcl_f.Status.SUCCESS]) @@ -592,11 +781,11 @@ async def test_shade( assert cluster_on_off.request.call_count == 1 assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0000 - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # open from client command fails await send_attributes_report(zha_gateway, cluster_level, {0: 0}) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED with ( patch( @@ -613,7 +802,7 @@ async def test_shade( assert cluster_on_off.request.call_count == 1 assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0001 - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # open from client succeeds with patch( @@ -624,7 +813,7 @@ async def test_shade( assert cluster_on_off.request.call_count == 1 assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0001 - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # set position UI command fails with ( @@ -718,11 +907,11 @@ async def test_keen_vent( # test that the state has changed from unavailable to off await send_attributes_report(zha_gateway, cluster_on_off, {8: 0, 0: False, 1: 1}) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED await entity.async_update() await zha_gateway.async_block_till_done() - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # open from client command fails p1 = patch.object(cluster_on_off, "request", side_effect=asyncio.TimeoutError) @@ -738,7 +927,7 @@ async def test_keen_vent( assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0001 assert cluster_level.request.call_count == 1 - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # open from client command success p1 = patch.object(cluster_on_off, "request", AsyncMock(return_value=[1, 0])) @@ -751,7 +940,7 @@ async def test_keen_vent( assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0001 assert cluster_level.request.call_count == 1 - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN assert entity.state["current_position"] == 100 @@ -798,16 +987,10 @@ async def test_cover_state_restoration( zha_device = await join_zigpy_device(zha_gateway, zigpy_cover_device) entity = get_entity(zha_device, platform=Platform.COVER) - assert entity.state["state"] != STATE_CLOSED - assert entity.state["target_lift_position"] != 12 - assert entity.state["target_tilt_position"] != 34 + assert entity.state["state"] != CoverState.CLOSED entity.restore_external_state_attributes( - state=STATE_CLOSED, - target_lift_position=12, - target_tilt_position=34, + state=CoverState.CLOSED, ) - assert entity.state["state"] == STATE_CLOSED - assert entity.state["target_lift_position"] == 12 - assert entity.state["target_tilt_position"] == 34 + assert entity.state["state"] == CoverState.CLOSED diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 14dfe71b3..cb61f1fa6 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -3,9 +3,10 @@ from __future__ import annotations import asyncio +from collections import deque import functools import logging -from typing import TYPE_CHECKING, Any, Literal, cast +from typing import TYPE_CHECKING, Any, cast from zigpy.zcl.clusters.general import OnOff from zigpy.zcl.foundation import Status @@ -14,16 +15,16 @@ from zha.application.platforms import PlatformEntity from zha.application.platforms.cover.const import ( ATTR_CURRENT_POSITION, + ATTR_CURRENT_TILT_POSITION, ATTR_POSITION, ATTR_TILT_POSITION, - STATE_CLOSED, - STATE_CLOSING, - STATE_OPEN, - STATE_OPENING, + POSITION_CLOSED, + POSITION_OPEN, WCT, ZCL_TO_COVER_DEVICE_CLASS, CoverDeviceClass, CoverEntityFeature, + CoverState, WCAttrs, ) from zha.application.registries import PLATFORM_ENTITIES @@ -49,6 +50,16 @@ MULTI_MATCH = functools.partial(PLATFORM_ENTITIES.multipass_match, Platform.COVER) +# Some devices do not stop on the exact target percentage +POSITION_TOLERANCE: int = 1 + +# Timeout for device movement following a position attribute update +DEFAULT_MOVEMENT_TIMEOUT: float = 5 + +# Upper limit for dynamic timeout +LIFT_MOVEMENT_TIMEOUT_RANGE: float = 300 +TILT_MOVEMENT_TIMEOUT_RANGE: float = 30 + @MULTI_MATCH(cluster_handler_names=CLUSTER_HANDLER_COVER) class Cover(PlatformEntity): @@ -57,10 +68,6 @@ class Cover(PlatformEntity): PLATFORM = Platform.COVER _attr_translation_key: str = "cover" - _attr_extra_state_attribute_names: set[str] = { - "target_lift_position", - "target_tilt_position", - } def __init__( self, @@ -74,10 +81,11 @@ def __init__( super().__init__(unique_id, cluster_handlers, endpoint, device, **kwargs) cluster_handler = self.cluster_handlers.get(CLUSTER_HANDLER_COVER) assert cluster_handler + self._cover_cluster_handler: WindowCoveringClusterHandler = cast( WindowCoveringClusterHandler, cluster_handler ) - if self._cover_cluster_handler.window_covering_type: + if self._cover_cluster_handler.window_covering_type is not None: self._attr_device_class: CoverDeviceClass | None = ( ZCL_TO_COVER_DEVICE_CLASS.get( self._cover_cluster_handler.window_covering_type @@ -86,15 +94,40 @@ def __init__( self._attr_supported_features: CoverEntityFeature = ( self._determine_supported_features() ) + self._target_lift_position: int | None = None self._target_tilt_position: int | None = None - self._state: str = STATE_OPEN - self._determine_initial_state() + self._lift_update_received: bool | None = None + self._tilt_update_received: bool | None = None + self._lift_state: CoverState | None = None + self._tilt_state: CoverState | None = None + self._lift_position_history: deque[int | None] = deque( + [self.current_cover_position], maxlen=2 + ) + self._tilt_position_history: deque[int | None] = deque( + [self.current_cover_tilt_position], maxlen=2 + ) + self._loop = asyncio.get_running_loop() + self._movement_timer: asyncio.TimerHandle | None = None + + self._state: CoverState | None = CoverState.OPEN + self._determine_state(refresh=True) self._cover_cluster_handler.on_event( CLUSTER_HANDLER_ATTRIBUTE_UPDATED, self.handle_cluster_handler_attribute_updated, ) + def restore_external_state_attributes( + self, + *, + state: CoverState | None, + target_lift_position: int | None = None, + target_tilt_position: int | None = None, + ): + """Restore external state attributes.""" + self._state = state + # Target positions have been removed + @property def supported_features(self) -> CoverEntityFeature: """Return supported features.""" @@ -107,52 +140,31 @@ def state(self) -> dict[str, Any]: response.update( { ATTR_CURRENT_POSITION: self.current_cover_position, + ATTR_CURRENT_TILT_POSITION: self.current_cover_tilt_position, "state": self._state, "is_opening": self.is_opening, "is_closing": self.is_closing, "is_closed": self.is_closed, - "target_lift_position": self._target_lift_position, - "target_tilt_position": self._target_tilt_position, } ) return response - def restore_external_state_attributes( - self, - *, - state: Literal[ - "open", "opening", "closed", "closing" - ], # FIXME: why must these be expanded? - target_lift_position: int | None, - target_tilt_position: int | None, - ): - """Restore external state attributes.""" - self._state = state - self._target_lift_position = target_lift_position - self._target_tilt_position = target_tilt_position - @property def is_closed(self) -> bool | None: - """Return True if the cover is closed. - - In HA None is unknown, 0 is closed, 100 is fully open. - In ZCL 0 is fully open, 100 is fully closed. - Keep in mind the values have already been flipped to match HA - in the WindowCovering cluster handler - """ + """Return True if the cover is closed.""" if self.current_cover_position is None: return None - return self.current_cover_position == 0 + return self.current_cover_position == POSITION_CLOSED @property def is_opening(self) -> bool: """Return if the cover is opening or not.""" - return self._state == STATE_OPENING + return self._state == CoverState.OPENING @property def is_closing(self) -> bool: """Return if the cover is closing or not.""" - return self._state == STATE_CLOSING + return self._state == CoverState.CLOSING @property def current_cover_position(self) -> int | None: @@ -161,13 +173,19 @@ def current_cover_position(self) -> int | None: In HA None is unknown, 0 is closed, 100 is fully open. In ZCL 0 is fully open, 100 is fully closed. Keep in mind the values have already been flipped to match HA - in the WindowCovering cluster handler + in the WindowCovering cluster handler. """ return self._cover_cluster_handler.current_position_lift_percentage @property def current_cover_tilt_position(self) -> int | None: - """Return the current tilt position of the cover.""" + """Return the current tilt position of the cover. + + In HA None is unknown, 0 is closed, 100 is fully open. + In ZCL 0 is fully open, 100 is fully closed. + Keep in mind the values have already been flipped to match HA + in the WindowCovering cluster handler. + """ return self._cover_cluster_handler.current_position_tilt_percentage def _determine_supported_features(self) -> CoverEntityFeature: @@ -193,81 +211,234 @@ def _determine_supported_features(self) -> CoverEntityFeature: supported_features |= CoverEntityFeature.STOP_TILT return supported_features - def _determine_initial_state(self) -> None: - """Determine the initial state of the cover.""" + @staticmethod + def _determine_axis_state( + current: int | None, + target: int | None, + history: deque[int | None], + is_update: bool = False, + ): + """Determine cover axis state (lift/tilt). + + Some device update position during movement, others only after stopping. + When a target is defined the logic aims to mitigate split-brain scenarios + where a HA command is interrupted by a device button press/physical obstruction. + + The logic considers previous position to determine if the cover is moving. + """ + + if current is None: + return None + previous = history[0] if history[0] is not None else current + + if target is None and is_update and previous != current: + target = POSITION_OPEN if current > previous else POSITION_CLOSED + if ( - self._cover_cluster_handler.window_covering_type - and self._cover_cluster_handler.window_covering_type - in ( - WCT.Shutter, - WCT.Tilt_blind_tilt_only, - WCT.Tilt_blind_tilt_and_lift, + target is not None + and current != target + and (not is_update or previous != current or history[0] is None) + and ( + previous <= current < target - POSITION_TOLERANCE + or target + POSITION_TOLERANCE < current <= previous ) ): - self._determine_state( - self.current_cover_tilt_position, is_lift_update=False + # ZHA thinks the cover is moving + return CoverState.OPENING if target > current else CoverState.CLOSING + + # Return the static position + return CoverState.OPEN if current > POSITION_CLOSED else CoverState.CLOSED + + def _determine_state( + self, + is_lift_update: bool = False, + is_tilt_update: bool = False, + refresh: bool = False, + ) -> None: + """Determine the state of the cover entity. + + This considers current state of both the lift and tilt axis. + """ + if self._lift_state is None or is_lift_update or refresh: + self._lift_state = self._determine_axis_state( + self.current_cover_position, + self._target_lift_position, + self._lift_position_history, + is_lift_update, ) - if ( - self._cover_cluster_handler.window_covering_type - == WCT.Tilt_blind_tilt_and_lift - ): - state = self._state - self._determine_state(self.current_cover_position) - if state == STATE_OPEN and self._state == STATE_CLOSED: - # let the tilt state override the lift state - self._state = STATE_OPEN + if self._tilt_state is None or is_tilt_update or refresh: + self._tilt_state = self._determine_axis_state( + self.current_cover_tilt_position, + self._target_tilt_position, + self._tilt_position_history, + is_tilt_update, + ) + + _LOGGER.debug( + "_determine_state: lift=(state: %s, is_update: %s, current: %s, target: %s, history: %s), tilt=(state: %s, is_update: %s, current: %s, target: %s, history: %s)", + self._lift_state, + is_lift_update, + self.current_cover_position, + self._target_lift_position, + self._lift_position_history, + self._tilt_state, + is_tilt_update, + self.current_cover_tilt_position, + self._target_tilt_position, + self._tilt_position_history, + ) + + # Clear target position if the cover axis is not moving + if self._lift_state not in (CoverState.OPENING, CoverState.CLOSING): + self._track_target_lift_position(None) + if self._tilt_state not in (CoverState.OPENING, CoverState.CLOSING): + self._track_target_tilt_position(None) + + # Start a movement timeout if the cover is moving, else cancel it + if CoverState.CLOSING in ( + self._lift_state, + self._tilt_state, + ) or CoverState.OPENING in ( + self._lift_state, + self._tilt_state, + ): + self._start_movement_timer() else: - self._determine_state(self.current_cover_position) + self._cancel_movement_timer() - def _determine_state(self, position_or_tilt, is_lift_update=True) -> None: - """Determine the state of the cover. + # Keep the last movement direction if either axis is still moving + if ( + self.is_closing + and CoverState.CLOSING in (self._lift_state, self._tilt_state) + or self.is_opening + and CoverState.OPENING in (self._lift_state, self._tilt_state) + ): + return + + # An open or moving tilt state overrides a static lift state + if self._tilt_state in ( + CoverState.OPEN, + CoverState.OPENING, + CoverState.CLOSING, + ) and self._lift_state in (CoverState.CLOSED, CoverState.OPEN): + self._state = self._tilt_state + return + + # Pick lift state in preference over tilt + self._state = self._lift_state or self._tilt_state + + def _dynamic_timeout(self) -> float: + """Return a timer duration in seconds based on expected movement distance. + + This is required because some devices only report a position update after stopping. + """ + + lift_timeout = 0.0 + tilt_timeout = 0.0 + + # Calculate dynamic timeout durations if a target is defined and the device has not reported a new position + if ( + self._target_lift_position is not None + and self.current_cover_position is not None + and not self._lift_update_received + ): + lift_timeout = ( + abs(self._target_lift_position - self.current_cover_position) + * 0.01 + * LIFT_MOVEMENT_TIMEOUT_RANGE + ) + if ( + self._target_tilt_position is not None + and self.current_cover_tilt_position is not None + and not self._tilt_update_received + ): + tilt_timeout = ( + abs(self._target_tilt_position - self.current_cover_tilt_position) + * 0.01 + * TILT_MOVEMENT_TIMEOUT_RANGE + ) + + _LOGGER.debug( + "_dynamic_timeout: lift=(timeout: %s, current: %s, target: %s, update_received: %s), tilt=(timeout: %s, current: %s, target: %s, update_received: %s)", + lift_timeout, + self.current_cover_position, + self._target_lift_position, + self._lift_update_received, + tilt_timeout, + self.current_cover_tilt_position, + self._target_tilt_position, + self._tilt_update_received, + ) + + # Return the longest axis movement timeout + return max(lift_timeout, tilt_timeout) + + def _start_movement_timer(self, seconds: float = 0) -> None: + """Start timer for clearing the movement state (opening/closing).""" + if self._movement_timer: + self._movement_timer.cancel() + duration = seconds or self._dynamic_timeout() or DEFAULT_MOVEMENT_TIMEOUT + if duration <= 0: + raise ZHAException(f"Invalid movement timer duration: {duration}") + _LOGGER.debug("Movement timer started with a duration of %s seconds", duration) + self._movement_timer = self._loop.call_later( + duration, self._clear_movement_state, duration + ) + + def _cancel_movement_timer(self) -> None: + """Cancel the movement timer.""" + _LOGGER.debug("Movement timer cancelled") + if self._movement_timer: + self._movement_timer.cancel() + self._movement_timer = None + + def _clear_movement_state(self, duration: float, _=None) -> None: + """Clear the movement state due to inactivity.""" + _LOGGER.debug("No movement reported for %s seconds", duration) + self._target_lift_position = None + self._target_tilt_position = None + self._determine_state(refresh=True) + self.maybe_emit_state_changed_event() + + def _track_target_lift_position(self, position: int | None): + """Track locally instigated lift target.""" + self._target_lift_position = position + if position is not None: + self._lift_update_received = False + self._lift_state = None + + def _track_target_tilt_position(self, position: int | None): + """Track locally instigated tilt target.""" + self._target_tilt_position = position + if position is not None: + self._tilt_update_received = False + self._tilt_state = None + + @staticmethod + def _ha_to_zcl_position(position: int) -> int: + """Convert the HA position to the ZCL position range. In HA None is unknown, 0 is closed, 100 is fully open. In ZCL 0 is fully open, 100 is fully closed. - Keep in mind the values have already been flipped to match HA - in the WindowCovering cluster handler """ - if is_lift_update: - target = self._target_lift_position - current = self.current_cover_position - else: - target = self._target_tilt_position - current = self.current_cover_tilt_position - - if position_or_tilt == 0: - self._state = ( - STATE_CLOSED - if is_lift_update - else STATE_OPEN - if self.current_cover_position is not None - and self.current_cover_position > 0 - else STATE_CLOSED - ) - return - if target is not None and target != current: - # we are mid transition and shouldn't update the state - return - self._state = STATE_OPEN + return 100 - position def handle_cluster_handler_attribute_updated( self, event: ClusterAttributeUpdatedEvent ) -> None: - """Handle position update from cluster handler.""" - if event.attribute_id in ( - WCAttrs.current_position_lift_percentage.id, - WCAttrs.current_position_tilt_percentage.id, - ): - value = ( - self.current_cover_position - if event.attribute_id == WCAttrs.current_position_lift_percentage.id - else self.current_cover_tilt_position - ) - self._determine_state( - value, - is_lift_update=( - event.attribute_id == WCAttrs.current_position_lift_percentage.id - ), - ) + """Handle position updates from cluster handler. + + The previous position is retained for use in state determination. + """ + _LOGGER.debug("handle_cluster_handler_attribute_updated=%s", event) + if event.attribute_id == WCAttrs.current_position_lift_percentage.id: + self._lift_position_history.append(self.current_cover_position) + self._lift_update_received = True + self._determine_state(is_lift_update=True) + if event.attribute_id == WCAttrs.current_position_tilt_percentage.id: + self._tilt_position_history.append(self.current_cover_tilt_position) + self._tilt_update_received = True + self._determine_state(is_tilt_update=True) self.maybe_emit_state_changed_event() def async_update_state(self, state): @@ -275,88 +446,116 @@ def async_update_state(self, state): _LOGGER.debug("async_update_state=%s", state) self._state = state self.maybe_emit_state_changed_event() + if state in (CoverState.OPENING, CoverState.CLOSING): + self._start_movement_timer() async def async_open_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Open the cover.""" + self._track_target_lift_position(POSITION_OPEN) res = await self._cover_cluster_handler.up_open() if res[1] is not Status.SUCCESS: + self._track_target_lift_position(None) raise ZHAException(f"Failed to open cover: {res[1]}") - self.async_update_state(STATE_OPENING) + if self.current_cover_position == POSITION_OPEN: + return + self.async_update_state(CoverState.OPENING) async def async_open_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Open the cover tilt.""" - # 0 is open in ZCL - res = await self._cover_cluster_handler.go_to_tilt_percentage(0) + self._track_target_tilt_position(POSITION_OPEN) + res = await self._cover_cluster_handler.go_to_tilt_percentage( + self._ha_to_zcl_position(POSITION_OPEN) + ) if res[1] is not Status.SUCCESS: + self._track_target_tilt_position(None) raise ZHAException(f"Failed to open cover tilt: {res[1]}") - self.async_update_state(STATE_OPENING) + if self.current_cover_tilt_position == POSITION_OPEN: + return + self.async_update_state(CoverState.OPENING) async def async_close_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Close the cover.""" + self._track_target_lift_position(POSITION_CLOSED) res = await self._cover_cluster_handler.down_close() if res[1] is not Status.SUCCESS: + self._track_target_lift_position(None) raise ZHAException(f"Failed to close cover: {res[1]}") - self.async_update_state(STATE_CLOSING) + if self.current_cover_position == POSITION_CLOSED: + return + self.async_update_state(CoverState.CLOSING) async def async_close_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Close the cover tilt.""" - # 100 is closed in ZCL - res = await self._cover_cluster_handler.go_to_tilt_percentage(100) + self._track_target_tilt_position(POSITION_CLOSED) + res = await self._cover_cluster_handler.go_to_tilt_percentage( + self._ha_to_zcl_position(POSITION_CLOSED) + ) if res[1] is not Status.SUCCESS: + self._track_target_tilt_position(None) raise ZHAException(f"Failed to close cover tilt: {res[1]}") - self.async_update_state(STATE_CLOSING) + if self.current_cover_tilt_position == POSITION_CLOSED: + return + self.async_update_state(CoverState.CLOSING) async def async_set_cover_position(self, **kwargs: Any) -> None: """Move the cover to a specific position.""" - self._target_lift_position = kwargs[ATTR_POSITION] - assert self._target_lift_position is not None assert self.current_cover_position is not None - # the 100 - value is because we need to invert the value before giving it to ZCL + target_position = kwargs[ATTR_POSITION] + assert target_position is not None + self._track_target_lift_position(target_position) res = await self._cover_cluster_handler.go_to_lift_percentage( - 100 - self._target_lift_position + self._ha_to_zcl_position(target_position) ) if res[1] is not Status.SUCCESS: + self._track_target_lift_position(None) raise ZHAException(f"Failed to set cover position: {res[1]}") + if target_position == self.current_cover_position: + return self.async_update_state( - STATE_CLOSING - if self._target_lift_position < self.current_cover_position - else STATE_OPENING + CoverState.CLOSING + if target_position < self.current_cover_position + else CoverState.OPENING ) async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: """Move the cover tilt to a specific position.""" - self._target_tilt_position = kwargs[ATTR_TILT_POSITION] - assert self._target_tilt_position is not None assert self.current_cover_tilt_position is not None - # the 100 - value is because we need to invert the value before giving it to ZCL + target_position = kwargs[ATTR_TILT_POSITION] + assert target_position is not None + self._track_target_tilt_position(target_position) res = await self._cover_cluster_handler.go_to_tilt_percentage( - 100 - self._target_tilt_position + self._ha_to_zcl_position(target_position) ) if res[1] is not Status.SUCCESS: + self._track_target_tilt_position(None) raise ZHAException(f"Failed to set cover tilt position: {res[1]}") + if target_position == self.current_cover_tilt_position: + return self.async_update_state( - STATE_CLOSING - if self._target_tilt_position < self.current_cover_tilt_position - else STATE_OPENING + CoverState.CLOSING + if target_position < self.current_cover_tilt_position + else CoverState.OPENING ) async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument - """Stop the cover.""" + """Stop the cover. + + Upon receipt of this command the cover stops both lift and tilt movement. + """ + self._track_target_lift_position(None) + self._track_target_tilt_position(None) res = await self._cover_cluster_handler.stop() if res[1] is not Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") - self._target_lift_position = self.current_cover_position - self._determine_state(self.current_cover_position) + self._determine_state(refresh=True) self.maybe_emit_state_changed_event() async def async_stop_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument - """Stop the cover tilt.""" - res = await self._cover_cluster_handler.stop() - if res[1] is not Status.SUCCESS: - raise ZHAException(f"Failed to stop cover: {res[1]}") - self._target_tilt_position = self.current_cover_tilt_position - self._determine_state(self.current_cover_tilt_position, is_lift_update=False) - self.maybe_emit_state_changed_event() + """Stop the cover tilt. + + This is handled by async_stop_cover because there is no tilt specific command for Zigbee covers. + """ + await self.async_stop_cover(**kwargs) @MULTI_MATCH( @@ -382,7 +581,7 @@ def __init__( device: Device, **kwargs, ) -> None: - """Initialize the ZHA light.""" + """Initialize the ZHA shade.""" super().__init__(unique_id, cluster_handlers, endpoint, device, **kwargs) self._on_off_cluster_handler: ClusterHandler = self.cluster_handlers[ CLUSTER_HANDLER_ON_OFF @@ -393,8 +592,7 @@ def __init__( self._is_open: bool = bool(self._on_off_cluster_handler.on_off) position = self._level_cluster_handler.current_level if position is not None: - position = max(0, min(255, position)) - position = int(position * 100 / 255) + position = self._zcl_to_ha_position(position) self._position: int | None = position self._on_off_cluster_handler.on_event( CLUSTER_HANDLER_ATTRIBUTE_UPDATED, @@ -416,7 +614,7 @@ def state(self) -> dict[str, Any]: if (closed := self.is_closed) is None: state = None else: - state = STATE_CLOSED if closed else STATE_OPEN + state = CoverState.CLOSED if closed else CoverState.OPEN response = super().state response.update( { @@ -470,8 +668,7 @@ def handle_cluster_handler_attribute_updated( def handle_cluster_handler_set_level(self, event: LevelChangeEvent) -> None: """Set the reported position.""" - value = max(0, min(255, event.level)) - self._position = int(value * 100 / 255) + self._position = self._zcl_to_ha_position(event.level) self.maybe_emit_state_changed_event() async def async_open_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument @@ -496,7 +693,7 @@ async def async_set_cover_position(self, **kwargs: Any) -> None: """Move the roller shutter to a specific position.""" new_pos = kwargs[ATTR_POSITION] res = await self._level_cluster_handler.move_to_level_with_on_off( - new_pos * 255 / 100, 1 + self._ha_to_zcl_position(new_pos), 1 ) if res[1] != Status.SUCCESS: @@ -511,6 +708,17 @@ async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unus if res[1] != Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") + @staticmethod + def _zcl_to_ha_position(level: int) -> int: + """Convert the ZCL level to the HA position range.""" + level = max(0, min(255, level)) + return round(level * 100 / 255) + + @staticmethod + def _ha_to_zcl_position(position: int) -> int: + """Convert the HA position to the ZCL level range.""" + return round(position * 255 / 100) + @MULTI_MATCH( cluster_handler_names={CLUSTER_HANDLER_LEVEL, CLUSTER_HANDLER_ON_OFF}, @@ -527,7 +735,7 @@ async def async_open_cover(self, **kwargs: Any) -> None: position = self._position or 100 await asyncio.gather( self._level_cluster_handler.move_to_level_with_on_off( - position * 255 / 100, 1 + self._ha_to_zcl_position(position), 1 ), self._on_off_cluster_handler.on(), ) diff --git a/zha/application/platforms/cover/const.py b/zha/application/platforms/cover/const.py index af6e25fd3..9e9ce34b7 100644 --- a/zha/application/platforms/cover/const.py +++ b/zha/application/platforms/cover/const.py @@ -10,10 +10,17 @@ ATTR_POSITION: Final[str] = "position" ATTR_TILT_POSITION: Final[str] = "tilt_position" -STATE_OPEN: Final = "open" -STATE_OPENING: Final = "opening" -STATE_CLOSED: Final = "closed" -STATE_CLOSING: Final = "closing" +POSITION_CLOSED: Final[int] = 0 +POSITION_OPEN: Final[int] = 100 + + +class CoverState(StrEnum): + """State of Cover entities.""" + + CLOSED = "closed" + CLOSING = "closing" + OPEN = "open" + OPENING = "opening" class CoverDeviceClass(StrEnum):