From f7281ad2d7b96c705eea7ab67ad6ef65075aadd9 Mon Sep 17 00:00:00 2001 From: Jack <46714706+jeverley@users.noreply.github.com> Date: Fri, 10 Jan 2025 21:39:18 +0000 Subject: [PATCH 01/15] Improvements to Cover PlatformEntity status methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This aims to address issues in correctly representing the current cover status.  - Fixes issue where the movement state instigated by 'go to' commands is not cleared upon completion  - Implements movement timeout and previous sate comparison to determine device instigated changes  - Removed target position entity attributes (we don't need to restore these)  - Fix tests for tilt axis --- tests/test_cover.py | 42 ++- zha/application/platforms/cover/__init__.py | 355 ++++++++++++++------ zha/application/platforms/cover/const.py | 11 +- 3 files changed, 282 insertions(+), 126 deletions(-) diff --git a/tests/test_cover.py b/tests/test_cover.py index 5e7a66ea1..d292f0938 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -186,27 +186,34 @@ async def test_cover( | CoverEntityFeature.SET_TILT_POSITION ) - # test that the state has changed from unavailable to off + # set lift to 100% (closed) and test that the state has changed from unavailable to open + # the starting open tilt position overrides the closed lift state await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 100} ) + assert entity.state["state"] == STATE_OPEN + + # test that the state closes after tilting to 100% (closed) + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} + ) assert entity.state["state"] == STATE_CLOSED - # test to see if it opens + # set lift to 0% (open) and test to see if state changes to open await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 0} ) assert entity.state["state"] == STATE_OPEN - # test that the state remains after tilting to 100% + # test that the state remains after tilting to 0% (open) await send_attributes_report( - zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} ) assert entity.state["state"] == STATE_OPEN - # test to see the state remains after tilting to 0% + # test to see the state remains after tilting to 100% (closed) await send_attributes_report( - zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} ) assert entity.state["state"] == STATE_OPEN @@ -296,9 +303,9 @@ async def test_cover( assert entity.state["state"] == STATE_OPEN - # set position UI + # test set position command, starting at 100 % / 0 ZCL (open) from previous lift test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): - await entity.async_set_cover_position(position=47) + await entity.async_set_cover_position(position=47) # 53 when inverted for ZCL await zha_gateway.async_block_till_done() assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False @@ -307,23 +314,28 @@ async def test_cover( assert cluster.request.call_args[0][3] == 53 assert cluster.request.call_args[1]["expect_reply"] is True + assert entity.state["current_position"] == 100 assert entity.state["state"] == STATE_CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 35} ) + assert entity.state["current_position"] == 65 assert entity.state["state"] == STATE_CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 53} ) + assert entity.state["current_position"] == 47 assert entity.state["state"] == STATE_OPEN - # set tilt position UI + # test set tilt position command, starting at 100 % / 0 ZCL (open) from previous tilt test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): - await entity.async_set_cover_tilt_position(tilt_position=47) + await entity.async_set_cover_tilt_position( + tilt_position=47 + ) # 53 when inverted for ZCL await zha_gateway.async_block_till_done() assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False @@ -338,13 +350,13 @@ async def test_cover( assert entity.state["state"] == STATE_CLOSING await send_attributes_report( - zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 35} + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 35} ) assert entity.state["state"] == STATE_CLOSING await send_attributes_report( - zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 53} + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 53} ) assert entity.state["state"] == STATE_OPEN @@ -799,15 +811,9 @@ async def test_cover_state_restoration( entity = get_entity(zha_device, platform=Platform.COVER) assert entity.state["state"] != STATE_CLOSED - assert entity.state["target_lift_position"] != 12 - assert entity.state["target_tilt_position"] != 34 entity.restore_external_state_attributes( state=STATE_CLOSED, - target_lift_position=12, - target_tilt_position=34, ) assert entity.state["state"] == STATE_CLOSED - assert entity.state["target_lift_position"] == 12 - assert entity.state["target_tilt_position"] == 34 diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 14dfe71b3..8a5282e38 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +from collections import deque import functools import logging from typing import TYPE_CHECKING, Any, Literal, cast @@ -16,6 +17,8 @@ ATTR_CURRENT_POSITION, ATTR_POSITION, ATTR_TILT_POSITION, + POSITION_CLOSED, + POSITION_OPEN, STATE_CLOSED, STATE_CLOSING, STATE_OPEN, @@ -49,6 +52,16 @@ MULTI_MATCH = functools.partial(PLATFORM_ENTITIES.multipass_match, Platform.COVER) +# Some devices do not stop on the exact target percentage +POSITION_TOLERANCE: int = 1 + +# Timeout for device movement following a position attribute update +DEFAULT_MOVEMENT_TIMEOUT: float = 5 + +# Upper limit for dynamic timeout +LIFT_MOVEMENT_TIMEOUT_RANGE: float = 300 +TILT_MOVEMENT_TIMEOUT_RANGE: float = 30 + @MULTI_MATCH(cluster_handler_names=CLUSTER_HANDLER_COVER) class Cover(PlatformEntity): @@ -57,10 +70,6 @@ class Cover(PlatformEntity): PLATFORM = Platform.COVER _attr_translation_key: str = "cover" - _attr_extra_state_attribute_names: set[str] = { - "target_lift_position", - "target_tilt_position", - } def __init__( self, @@ -74,6 +83,7 @@ def __init__( super().__init__(unique_id, cluster_handlers, endpoint, device, **kwargs) cluster_handler = self.cluster_handlers.get(CLUSTER_HANDLER_COVER) assert cluster_handler + self._cover_cluster_handler: WindowCoveringClusterHandler = cast( WindowCoveringClusterHandler, cluster_handler ) @@ -86,15 +96,41 @@ def __init__( self._attr_supported_features: CoverEntityFeature = ( self._determine_supported_features() ) + self._target_lift_position: int | None = None self._target_tilt_position: int | None = None + self._lift_update_reported: bool | None = None + self._tilt_update_reported: bool | None = None + + self._lift_position_history: deque[int | None] = deque( + (self.current_cover_position,), maxlen=2 + ) + self._tilt_position_history: deque[int | None] = deque( + (self.current_cover_tilt_position,), maxlen=2 + ) + self._loop = asyncio.get_running_loop() + self._movement_timer: asyncio.TimerHandle | None = None + self._state: str = STATE_OPEN - self._determine_initial_state() + self._determine_state() self._cover_cluster_handler.on_event( CLUSTER_HANDLER_ATTRIBUTE_UPDATED, self.handle_cluster_handler_attribute_updated, ) + def restore_external_state_attributes( + self, + *, + state: Literal[ + "open", "opening", "closed", "closing" + ], # FIXME: why must these be expanded? + target_lift_position: int | None = None, + target_tilt_position: int | None = None, + ): + """Restore external state attributes.""" + self._state = state + # Target positions have been removed + @property def supported_features(self) -> CoverEntityFeature: """Return supported features.""" @@ -111,38 +147,16 @@ def state(self) -> dict[str, Any]: "is_opening": self.is_opening, "is_closing": self.is_closing, "is_closed": self.is_closed, - "target_lift_position": self._target_lift_position, - "target_tilt_position": self._target_tilt_position, } ) return response - def restore_external_state_attributes( - self, - *, - state: Literal[ - "open", "opening", "closed", "closing" - ], # FIXME: why must these be expanded? - target_lift_position: int | None, - target_tilt_position: int | None, - ): - """Restore external state attributes.""" - self._state = state - self._target_lift_position = target_lift_position - self._target_tilt_position = target_tilt_position - @property def is_closed(self) -> bool | None: - """Return True if the cover is closed. - - In HA None is unknown, 0 is closed, 100 is fully open. - In ZCL 0 is fully open, 100 is fully closed. - Keep in mind the values have already been flipped to match HA - in the WindowCovering cluster handler - """ + """Return True if the cover is closed.""" if self.current_cover_position is None: return None - return self.current_cover_position == 0 + return self.current_cover_position == POSITION_CLOSED @property def is_opening(self) -> bool: @@ -193,81 +207,196 @@ def _determine_supported_features(self) -> CoverEntityFeature: supported_features |= CoverEntityFeature.STOP_TILT return supported_features - def _determine_initial_state(self) -> None: - """Determine the initial state of the cover.""" + @staticmethod + def _determine_axis_state( + current: int | None, + target: int | None, + history: deque[int | None], + is_update: bool = False, + ): + """Determine cover axis state (lift/tilt). + + Some device update position during movement, others only after stopping. + When a target is defined the logic aims to mitigate split-brain scenarios + where a HA command is interrupted by a device button press/physical obstruction. + + The logic considers previous position to determine if the cover is moving, + if the position has not changed between two device updates it is not moving. + """ + + if current is None: + return None + previous = history[0] if history[0] is not None else current + + if target is None and is_update and previous != current: + target = POSITION_OPEN if current > previous else POSITION_CLOSED + if ( - self._cover_cluster_handler.window_covering_type - and self._cover_cluster_handler.window_covering_type - in ( - WCT.Shutter, - WCT.Tilt_blind_tilt_only, - WCT.Tilt_blind_tilt_and_lift, + target is not None + and current != target + and (not is_update or previous != current or history[0] is None) + and ( + previous <= current < target - POSITION_TOLERANCE + or target + POSITION_TOLERANCE < current <= previous ) ): - self._determine_state( - self.current_cover_tilt_position, is_lift_update=False - ) - if ( - self._cover_cluster_handler.window_covering_type - == WCT.Tilt_blind_tilt_and_lift - ): - state = self._state - self._determine_state(self.current_cover_position) - if state == STATE_OPEN and self._state == STATE_CLOSED: - # let the tilt state override the lift state - self._state = STATE_OPEN + # ZHA thinks the cover is moving + return STATE_OPENING if target > current else STATE_CLOSING + + # Return the static position + return STATE_OPEN if current > POSITION_CLOSED else STATE_CLOSED + + def _determine_state( + self, is_lift_update: bool | None = False, is_tilt_update: bool | None = False + ) -> None: + """Determine the state of the cover entity.""" + + # Determine state for lift and tilt axis + lift_state = self._determine_axis_state( + self.current_cover_position, + self._target_lift_position, + self._lift_position_history, + is_lift_update, + ) + tilt_state = self._determine_axis_state( + self.current_cover_tilt_position, + self._target_tilt_position, + self._tilt_position_history, + is_tilt_update, + ) + + _LOGGER.debug( + "_determine_state: lift=(state: %s, attr_update: %s, current: %s, target: %s, history: %s), tilt=(state: %s, attr_update: %s, current: %s, target: %s, history: %s)", + lift_state, + is_lift_update, + self.current_cover_position, + self._target_lift_position, + self._lift_position_history, + tilt_state, + is_tilt_update, + self.current_cover_tilt_position, + self._target_tilt_position, + self._tilt_position_history, + ) + + # Clear target position if the cover axis is not moving + if lift_state not in (STATE_OPENING, STATE_CLOSING): + self._target_lift_position = None + if tilt_state not in (STATE_OPENING, STATE_CLOSING): + self._target_tilt_position = None + + # Start a movement timeout if the cover is moving, else cancel it + if STATE_CLOSING in (lift_state, tilt_state) or STATE_OPENING in ( + lift_state, + tilt_state, + ): + self._start_movement_timer() else: - self._determine_state(self.current_cover_position) + self._cancel_movement_timer() - def _determine_state(self, position_or_tilt, is_lift_update=True) -> None: - """Determine the state of the cover. + # Keep the last movement direction if either axis is still moving + if ( + self.is_closing + and STATE_CLOSING in (lift_state, tilt_state) + or self.is_opening + and STATE_OPENING in (lift_state, tilt_state) + ): + return + + # An open tilt state overrides a closed lift state + if tilt_state == STATE_OPEN and lift_state == STATE_CLOSED: + self._state = STATE_OPEN + return + + # Pick lift state in preference over tilt + self._state = lift_state or tilt_state + + def _dynamic_timer_value(self) -> float: + """Return a timer duration in seconds based on expected movement distance. + + This is required because some devices only report position updates after stopping. + """ + + lift_timeout = 0.0 + tilt_timeout = 0.0 + + # Dynamic lift timeout if a local target is defined and the device has not reported a new position + if ( + self._target_lift_position is not None + and self.current_cover_position is not None + and not self._lift_update_reported + ): + lift_timeout = ( + abs(self._target_lift_position - self.current_cover_position) + * 0.01 + * LIFT_MOVEMENT_TIMEOUT_RANGE + ) + + # Dynamic tilt timeout if a local target is defined and the device has not reported a new position + if ( + self._target_tilt_position is not None + and self.current_cover_tilt_position is not None + and not self._tilt_update_reported + ): + tilt_timeout = ( + abs(self._target_tilt_position - self.current_cover_tilt_position) + * 0.01 + * TILT_MOVEMENT_TIMEOUT_RANGE + ) + + # Return the max axis timeout or default timeout if the max is 0 + return max(lift_timeout, tilt_timeout) or DEFAULT_MOVEMENT_TIMEOUT + + def _start_movement_timer(self, seconds: float = 0) -> None: + """Start timer for clearing the current movement state.""" + if self._movement_timer: + self._movement_timer.cancel() + duration = seconds or self._dynamic_timer_value() + if duration <= 0: + raise ZHAException(f"Invalid movement timer duration: {duration}") + self._movement_timer = self._loop.call_later( + duration, self._clear_movement_state, duration + ) + + def _cancel_movement_timer(self) -> None: + """Cancel the current movement timer.""" + if self._movement_timer: + self._movement_timer.cancel() + self._movement_timer = None + + def _clear_movement_state(self, duration: float, _=None) -> None: + """Clear the moving state of the cover due to inactivity.""" + _LOGGER.debug("No movement reported for %s seconds", duration) + self._target_lift_position = None + self._target_tilt_position = None + self._determine_state() + self.maybe_emit_state_changed_event() + + @staticmethod + def _invert_position_for_zcl(position: int) -> int: + """Convert the HA position to the ZCL position range. In HA None is unknown, 0 is closed, 100 is fully open. In ZCL 0 is fully open, 100 is fully closed. - Keep in mind the values have already been flipped to match HA - in the WindowCovering cluster handler """ - if is_lift_update: - target = self._target_lift_position - current = self.current_cover_position - else: - target = self._target_tilt_position - current = self.current_cover_tilt_position - - if position_or_tilt == 0: - self._state = ( - STATE_CLOSED - if is_lift_update - else STATE_OPEN - if self.current_cover_position is not None - and self.current_cover_position > 0 - else STATE_CLOSED - ) - return - if target is not None and target != current: - # we are mid transition and shouldn't update the state - return - self._state = STATE_OPEN + return 100 - position def handle_cluster_handler_attribute_updated( self, event: ClusterAttributeUpdatedEvent ) -> None: - """Handle position update from cluster handler.""" - if event.attribute_id in ( - WCAttrs.current_position_lift_percentage.id, - WCAttrs.current_position_tilt_percentage.id, - ): - value = ( - self.current_cover_position - if event.attribute_id == WCAttrs.current_position_lift_percentage.id - else self.current_cover_tilt_position - ) - self._determine_state( - value, - is_lift_update=( - event.attribute_id == WCAttrs.current_position_lift_percentage.id - ), - ) + """Handle position updates from cluster handler. + + The previous position is retained for use in state determination. + """ + _LOGGER.debug("handle_cluster_handler_attribute_updated=%s", event) + if event.attribute_id == WCAttrs.current_position_lift_percentage.id: + self._lift_position_history.append(self.current_cover_position) + self._lift_update_reported = True + self._determine_state(is_lift_update=True) + if event.attribute_id == WCAttrs.current_position_tilt_percentage.id: + self._tilt_position_history.append(self.current_cover_tilt_position) + self._tilt_update_reported = True + self._determine_state(is_tilt_update=True) self.maybe_emit_state_changed_event() def async_update_state(self, state): @@ -275,34 +404,48 @@ def async_update_state(self, state): _LOGGER.debug("async_update_state=%s", state) self._state = state self.maybe_emit_state_changed_event() + if state in (STATE_OPENING, STATE_CLOSING): + self._start_movement_timer() async def async_open_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Open the cover.""" + self._target_lift_position = POSITION_OPEN res = await self._cover_cluster_handler.up_open() if res[1] is not Status.SUCCESS: + self._target_lift_position = None raise ZHAException(f"Failed to open cover: {res[1]}") self.async_update_state(STATE_OPENING) async def async_open_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Open the cover tilt.""" - # 0 is open in ZCL - res = await self._cover_cluster_handler.go_to_tilt_percentage(0) + self._target_tilt_position = POSITION_OPEN + res = await self._cover_cluster_handler.go_to_tilt_percentage( + self._invert_position_for_zcl(POSITION_OPEN) + ) if res[1] is not Status.SUCCESS: + self._target_tilt_position = None raise ZHAException(f"Failed to open cover tilt: {res[1]}") self.async_update_state(STATE_OPENING) async def async_close_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Close the cover.""" + self._target_lift_position = POSITION_CLOSED + self._lift_update_reported = False res = await self._cover_cluster_handler.down_close() if res[1] is not Status.SUCCESS: + self._target_lift_position = None raise ZHAException(f"Failed to close cover: {res[1]}") self.async_update_state(STATE_CLOSING) async def async_close_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Close the cover tilt.""" - # 100 is closed in ZCL - res = await self._cover_cluster_handler.go_to_tilt_percentage(100) + self._target_tilt_position = POSITION_CLOSED + self._tilt_update_reported = False + res = await self._cover_cluster_handler.go_to_tilt_percentage( + self._invert_position_for_zcl(POSITION_CLOSED) + ) if res[1] is not Status.SUCCESS: + self._target_tilt_position = None raise ZHAException(f"Failed to close cover tilt: {res[1]}") self.async_update_state(STATE_CLOSING) @@ -311,11 +454,12 @@ async def async_set_cover_position(self, **kwargs: Any) -> None: self._target_lift_position = kwargs[ATTR_POSITION] assert self._target_lift_position is not None assert self.current_cover_position is not None - # the 100 - value is because we need to invert the value before giving it to ZCL + self._lift_update_reported = False res = await self._cover_cluster_handler.go_to_lift_percentage( - 100 - self._target_lift_position + self._invert_position_for_zcl(self._target_lift_position) ) if res[1] is not Status.SUCCESS: + self._target_lift_position = None raise ZHAException(f"Failed to set cover position: {res[1]}") self.async_update_state( STATE_CLOSING @@ -328,11 +472,12 @@ async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: self._target_tilt_position = kwargs[ATTR_TILT_POSITION] assert self._target_tilt_position is not None assert self.current_cover_tilt_position is not None - # the 100 - value is because we need to invert the value before giving it to ZCL + self._tilt_update_reported = False res = await self._cover_cluster_handler.go_to_tilt_percentage( - 100 - self._target_tilt_position + self._invert_position_for_zcl(self._target_tilt_position) ) if res[1] is not Status.SUCCESS: + self._target_tilt_position = None raise ZHAException(f"Failed to set cover tilt position: {res[1]}") self.async_update_state( STATE_CLOSING @@ -342,20 +487,22 @@ async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Stop the cover.""" + self._target_lift_position = None + self._lift_update_reported = False res = await self._cover_cluster_handler.stop() if res[1] is not Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") - self._target_lift_position = self.current_cover_position - self._determine_state(self.current_cover_position) + self._determine_state() self.maybe_emit_state_changed_event() async def async_stop_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Stop the cover tilt.""" + self._target_tilt_position = None + self._tilt_update_reported = False res = await self._cover_cluster_handler.stop() if res[1] is not Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") - self._target_tilt_position = self.current_cover_tilt_position - self._determine_state(self.current_cover_tilt_position, is_lift_update=False) + self._determine_state() self.maybe_emit_state_changed_event() @@ -382,7 +529,7 @@ def __init__( device: Device, **kwargs, ) -> None: - """Initialize the ZHA light.""" + """Initialize the ZHA shade.""" super().__init__(unique_id, cluster_handlers, endpoint, device, **kwargs) self._on_off_cluster_handler: ClusterHandler = self.cluster_handlers[ CLUSTER_HANDLER_ON_OFF diff --git a/zha/application/platforms/cover/const.py b/zha/application/platforms/cover/const.py index af6e25fd3..e4ff11163 100644 --- a/zha/application/platforms/cover/const.py +++ b/zha/application/platforms/cover/const.py @@ -10,10 +10,13 @@ ATTR_POSITION: Final[str] = "position" ATTR_TILT_POSITION: Final[str] = "tilt_position" -STATE_OPEN: Final = "open" -STATE_OPENING: Final = "opening" -STATE_CLOSED: Final = "closed" -STATE_CLOSING: Final = "closing" +POSITION_CLOSED: Final[int] = 0 +POSITION_OPEN: Final[int] = 100 + +STATE_OPEN: Final[str] = "open" +STATE_OPENING: Final[str] = "opening" +STATE_CLOSED: Final[str] = "closed" +STATE_CLOSING: Final[str] = "closing" class CoverDeviceClass(StrEnum): From c1085f8fedb3078ad0ec06fedeb8740924f4b773 Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Sun, 9 Feb 2025 16:08:57 +0000 Subject: [PATCH 02/15] Test coverage for timeout and device instigated movement --- tests/test_cover.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/test_cover.py b/tests/test_cover.py index d292f0938..19084e7c5 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -361,6 +361,49 @@ async def test_cover( assert entity.state["state"] == STATE_OPEN + # test interrupted movement (e.g. device button press), starting from 47 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + await entity.async_set_cover_position(position=0) # 100 when inverted for ZCL + await zha_gateway.async_block_till_done() + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x05 + assert cluster.request.call_args[0][2].command.name == "go_to_lift_percentage" + assert cluster.request.call_args[0][3] == 100 + assert cluster.request.call_args[1]["expect_reply"] is True + + assert entity.state["current_position"] == 47 + assert entity.state["state"] == STATE_CLOSING + + # simulate a device position update to set timer to the default duration rather than dynamic + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 70} + ) + + assert entity.state["current_position"] == 30 + assert entity.state["state"] == STATE_CLOSING + + # wait the timer duration + await asyncio.sleep(5) + + assert entity.state["current_position"] == 30 + assert entity.state["state"] == STATE_OPEN + + # test device instigated movement (e.g. device button press), starting from 30 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 60} + ) + + assert entity.state["current_position"] == 40 + assert entity.state["state"] == STATE_OPENING + + # wait the default timer duration + await asyncio.sleep(5) + + assert entity.state["current_position"] == 40 + assert entity.state["state"] == STATE_OPEN + # stop from client with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]): await entity.async_stop_cover() From a96d448608c8a8b6260019f4d9c5cf9f3d0cb907 Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Sun, 9 Feb 2025 20:35:22 +0000 Subject: [PATCH 03/15] Use common functions for instigating target tracking --- zha/application/platforms/cover/__init__.py | 58 ++++++++++++--------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 8a5282e38..3c80b4ae0 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -372,6 +372,16 @@ def _clear_movement_state(self, duration: float, _=None) -> None: self._determine_state() self.maybe_emit_state_changed_event() + def _track_target_lift_position(self, position: int | None): + """Track locally instigated lift movement.""" + self._target_lift_position = position + self._lift_update_reported = False + + def _track_target_tilt_position(self, position: int | None): + """Track locally instigated tilt movement.""" + self._target_tilt_position = position + self._tilt_update_reported = False + @staticmethod def _invert_position_for_zcl(position: int) -> int: """Convert the HA position to the ZCL position range. @@ -409,86 +419,83 @@ def async_update_state(self, state): async def async_open_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Open the cover.""" - self._target_lift_position = POSITION_OPEN + self._track_target_lift_position(POSITION_OPEN) res = await self._cover_cluster_handler.up_open() if res[1] is not Status.SUCCESS: - self._target_lift_position = None + self._track_target_lift_position(None) raise ZHAException(f"Failed to open cover: {res[1]}") self.async_update_state(STATE_OPENING) async def async_open_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Open the cover tilt.""" - self._target_tilt_position = POSITION_OPEN + self._track_target_tilt_position(POSITION_OPEN) res = await self._cover_cluster_handler.go_to_tilt_percentage( self._invert_position_for_zcl(POSITION_OPEN) ) if res[1] is not Status.SUCCESS: - self._target_tilt_position = None + self._track_target_tilt_position(None) raise ZHAException(f"Failed to open cover tilt: {res[1]}") self.async_update_state(STATE_OPENING) async def async_close_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Close the cover.""" - self._target_lift_position = POSITION_CLOSED - self._lift_update_reported = False + self._track_target_lift_position(POSITION_CLOSED) res = await self._cover_cluster_handler.down_close() if res[1] is not Status.SUCCESS: - self._target_lift_position = None + self._track_target_lift_position(None) raise ZHAException(f"Failed to close cover: {res[1]}") self.async_update_state(STATE_CLOSING) async def async_close_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Close the cover tilt.""" - self._target_tilt_position = POSITION_CLOSED - self._tilt_update_reported = False + self._track_target_tilt_position(POSITION_CLOSED) res = await self._cover_cluster_handler.go_to_tilt_percentage( self._invert_position_for_zcl(POSITION_CLOSED) ) if res[1] is not Status.SUCCESS: - self._target_tilt_position = None + self._track_target_tilt_position(None) raise ZHAException(f"Failed to close cover tilt: {res[1]}") self.async_update_state(STATE_CLOSING) async def async_set_cover_position(self, **kwargs: Any) -> None: """Move the cover to a specific position.""" - self._target_lift_position = kwargs[ATTR_POSITION] - assert self._target_lift_position is not None assert self.current_cover_position is not None - self._lift_update_reported = False + target_position = kwargs[ATTR_POSITION] + assert target_position is not None + self._track_target_lift_position(target_position) res = await self._cover_cluster_handler.go_to_lift_percentage( - self._invert_position_for_zcl(self._target_lift_position) + self._invert_position_for_zcl(target_position) ) if res[1] is not Status.SUCCESS: - self._target_lift_position = None + self._track_target_lift_position(None) raise ZHAException(f"Failed to set cover position: {res[1]}") self.async_update_state( STATE_CLOSING - if self._target_lift_position < self.current_cover_position + if target_position < self.current_cover_position else STATE_OPENING ) async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: """Move the cover tilt to a specific position.""" - self._target_tilt_position = kwargs[ATTR_TILT_POSITION] - assert self._target_tilt_position is not None assert self.current_cover_tilt_position is not None - self._tilt_update_reported = False + target_position = kwargs[ATTR_TILT_POSITION] + assert target_position is not None + self._track_target_tilt_position(target_position) res = await self._cover_cluster_handler.go_to_tilt_percentage( - self._invert_position_for_zcl(self._target_tilt_position) + self._invert_position_for_zcl(target_position) ) if res[1] is not Status.SUCCESS: - self._target_tilt_position = None + self._track_target_tilt_position(None) raise ZHAException(f"Failed to set cover tilt position: {res[1]}") self.async_update_state( STATE_CLOSING - if self._target_tilt_position < self.current_cover_tilt_position + if target_position < self.current_cover_tilt_position else STATE_OPENING ) async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Stop the cover.""" - self._target_lift_position = None - self._lift_update_reported = False + self._track_target_lift_position(None) res = await self._cover_cluster_handler.stop() if res[1] is not Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") @@ -497,8 +504,7 @@ async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unus async def async_stop_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Stop the cover tilt.""" - self._target_tilt_position = None - self._tilt_update_reported = False + self._track_target_tilt_position(None) res = await self._cover_cluster_handler.stop() if res[1] is not Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") From fb994ff0a5107f45384f059996ae6cb5ec66731f Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Sun, 9 Feb 2025 21:09:25 +0000 Subject: [PATCH 04/15] Calculate axis states only if their position/target has changed More efficient on devices that frequently report position updates. --- zha/application/platforms/cover/__init__.py | 68 +++++++++++++-------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 3c80b4ae0..1e892b6f6 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -101,7 +101,8 @@ def __init__( self._target_tilt_position: int | None = None self._lift_update_reported: bool | None = None self._tilt_update_reported: bool | None = None - + self._lift_state: str | None = None + self._tilt_state: str | None = None self._lift_position_history: deque[int | None] = deque( (self.current_cover_position,), maxlen=2 ) @@ -112,7 +113,7 @@ def __init__( self._movement_timer: asyncio.TimerHandle | None = None self._state: str = STATE_OPEN - self._determine_state() + self._determine_state(refresh=True) self._cover_cluster_handler.on_event( CLUSTER_HANDLER_ATTRIBUTE_UPDATED, self.handle_cluster_handler_attribute_updated, @@ -247,32 +248,43 @@ def _determine_axis_state( return STATE_OPEN if current > POSITION_CLOSED else STATE_CLOSED def _determine_state( - self, is_lift_update: bool | None = False, is_tilt_update: bool | None = False + self, + is_lift_update: bool = False, + is_tilt_update: bool = False, + refresh: bool = False, ) -> None: """Determine the state of the cover entity.""" # Determine state for lift and tilt axis - lift_state = self._determine_axis_state( - self.current_cover_position, - self._target_lift_position, - self._lift_position_history, - is_lift_update, + self._lift_state = ( + self._determine_axis_state( + self.current_cover_position, + self._target_lift_position, + self._lift_position_history, + is_lift_update, + ) + if is_lift_update or refresh or self._lift_state is None + else self._lift_state ) - tilt_state = self._determine_axis_state( - self.current_cover_tilt_position, - self._target_tilt_position, - self._tilt_position_history, - is_tilt_update, + self._tilt_state = ( + self._determine_axis_state( + self.current_cover_tilt_position, + self._target_tilt_position, + self._tilt_position_history, + is_tilt_update, + ) + if is_tilt_update or refresh or self._tilt_state is None + else self._tilt_state ) _LOGGER.debug( "_determine_state: lift=(state: %s, attr_update: %s, current: %s, target: %s, history: %s), tilt=(state: %s, attr_update: %s, current: %s, target: %s, history: %s)", - lift_state, + self._lift_state, is_lift_update, self.current_cover_position, self._target_lift_position, self._lift_position_history, - tilt_state, + self._tilt_state, is_tilt_update, self.current_cover_tilt_position, self._target_tilt_position, @@ -280,15 +292,15 @@ def _determine_state( ) # Clear target position if the cover axis is not moving - if lift_state not in (STATE_OPENING, STATE_CLOSING): + if self._lift_state not in (STATE_OPENING, STATE_CLOSING): self._target_lift_position = None - if tilt_state not in (STATE_OPENING, STATE_CLOSING): + if self._tilt_state not in (STATE_OPENING, STATE_CLOSING): self._target_tilt_position = None # Start a movement timeout if the cover is moving, else cancel it - if STATE_CLOSING in (lift_state, tilt_state) or STATE_OPENING in ( - lift_state, - tilt_state, + if STATE_CLOSING in (self._lift_state, self._tilt_state) or STATE_OPENING in ( + self._lift_state, + self._tilt_state, ): self._start_movement_timer() else: @@ -297,19 +309,19 @@ def _determine_state( # Keep the last movement direction if either axis is still moving if ( self.is_closing - and STATE_CLOSING in (lift_state, tilt_state) + and STATE_CLOSING in (self._lift_state, self._tilt_state) or self.is_opening - and STATE_OPENING in (lift_state, tilt_state) + and STATE_OPENING in (self._lift_state, self._tilt_state) ): return # An open tilt state overrides a closed lift state - if tilt_state == STATE_OPEN and lift_state == STATE_CLOSED: + if self._tilt_state == STATE_OPEN and self._lift_state == STATE_CLOSED: self._state = STATE_OPEN return # Pick lift state in preference over tilt - self._state = lift_state or tilt_state + self._state = self._lift_state or self._tilt_state def _dynamic_timer_value(self) -> float: """Return a timer duration in seconds based on expected movement distance. @@ -369,18 +381,20 @@ def _clear_movement_state(self, duration: float, _=None) -> None: _LOGGER.debug("No movement reported for %s seconds", duration) self._target_lift_position = None self._target_tilt_position = None - self._determine_state() + self._determine_state(refresh=True) self.maybe_emit_state_changed_event() def _track_target_lift_position(self, position: int | None): """Track locally instigated lift movement.""" self._target_lift_position = position self._lift_update_reported = False + self._lift_state = None def _track_target_tilt_position(self, position: int | None): """Track locally instigated tilt movement.""" self._target_tilt_position = position self._tilt_update_reported = False + self._tilt_state = None @staticmethod def _invert_position_for_zcl(position: int) -> int: @@ -499,7 +513,7 @@ async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unus res = await self._cover_cluster_handler.stop() if res[1] is not Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") - self._determine_state() + self._determine_state(refresh=True) self.maybe_emit_state_changed_event() async def async_stop_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument @@ -508,7 +522,7 @@ async def async_stop_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable res = await self._cover_cluster_handler.stop() if res[1] is not Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") - self._determine_state() + self._determine_state(refresh=True) self.maybe_emit_state_changed_event() From f2683cbb13eb8445f2260f362ef27b79fa928089 Mon Sep 17 00:00:00 2001 From: Jack <46714706+jeverley@users.noreply.github.com> Date: Mon, 10 Feb 2025 10:16:04 +0000 Subject: [PATCH 05/15] Improve debug logging outputs --- zha/application/platforms/cover/__init__.py | 54 +++++++++++++-------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 1e892b6f6..99fb712b5 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -99,8 +99,8 @@ def __init__( self._target_lift_position: int | None = None self._target_tilt_position: int | None = None - self._lift_update_reported: bool | None = None - self._tilt_update_reported: bool | None = None + self._lift_update_received: bool | None = None + self._tilt_update_received: bool | None = None self._lift_state: str | None = None self._tilt_state: str | None = None self._lift_position_history: deque[int | None] = deque( @@ -278,7 +278,7 @@ def _determine_state( ) _LOGGER.debug( - "_determine_state: lift=(state: %s, attr_update: %s, current: %s, target: %s, history: %s), tilt=(state: %s, attr_update: %s, current: %s, target: %s, history: %s)", + "_determine_state: lift=(state: %s, is_update: %s, current: %s, target: %s, history: %s), tilt=(state: %s, is_update: %s, current: %s, target: %s, history: %s)", self._lift_state, is_lift_update, self.current_cover_position, @@ -293,9 +293,9 @@ def _determine_state( # Clear target position if the cover axis is not moving if self._lift_state not in (STATE_OPENING, STATE_CLOSING): - self._target_lift_position = None + self._track_target_lift_position(None) if self._tilt_state not in (STATE_OPENING, STATE_CLOSING): - self._target_tilt_position = None + self._track_target_tilt_position(None) # Start a movement timeout if the cover is moving, else cancel it if STATE_CLOSING in (self._lift_state, self._tilt_state) or STATE_OPENING in ( @@ -323,7 +323,7 @@ def _determine_state( # Pick lift state in preference over tilt self._state = self._lift_state or self._tilt_state - def _dynamic_timer_value(self) -> float: + def _dynamic_timeout(self) -> float: """Return a timer duration in seconds based on expected movement distance. This is required because some devices only report position updates after stopping. @@ -332,23 +332,21 @@ def _dynamic_timer_value(self) -> float: lift_timeout = 0.0 tilt_timeout = 0.0 - # Dynamic lift timeout if a local target is defined and the device has not reported a new position + # Calculate dynamic timeout durations if a target is defined and the device has not reported a new position if ( self._target_lift_position is not None and self.current_cover_position is not None - and not self._lift_update_reported + and not self._lift_update_received ): lift_timeout = ( abs(self._target_lift_position - self.current_cover_position) * 0.01 * LIFT_MOVEMENT_TIMEOUT_RANGE ) - - # Dynamic tilt timeout if a local target is defined and the device has not reported a new position if ( self._target_tilt_position is not None and self.current_cover_tilt_position is not None - and not self._tilt_update_reported + and not self._tilt_update_received ): tilt_timeout = ( abs(self._target_tilt_position - self.current_cover_tilt_position) @@ -356,22 +354,36 @@ def _dynamic_timer_value(self) -> float: * TILT_MOVEMENT_TIMEOUT_RANGE ) - # Return the max axis timeout or default timeout if the max is 0 - return max(lift_timeout, tilt_timeout) or DEFAULT_MOVEMENT_TIMEOUT + _LOGGER.debug( + "_dynamic_timeout: lift=(timeout: %s, current: %s, target: %s, update_received: %s), tilt=(timeout: %s, current: %s, target: %s, update_received: %s)", + lift_timeout, + self.current_cover_position, + self._target_lift_position, + self._lift_update_received, + tilt_timeout, + self.current_cover_tilt_position, + self._target_tilt_position, + self._tilt_update_received, + ) + + # Return the longest axis movement timeout + return max(lift_timeout, tilt_timeout) def _start_movement_timer(self, seconds: float = 0) -> None: """Start timer for clearing the current movement state.""" if self._movement_timer: self._movement_timer.cancel() - duration = seconds or self._dynamic_timer_value() + duration = seconds or self._dynamic_timeout() or DEFAULT_MOVEMENT_TIMEOUT if duration <= 0: raise ZHAException(f"Invalid movement timer duration: {duration}") + _LOGGER.debug("Movement timer started with a duration of %s seconds", duration) self._movement_timer = self._loop.call_later( duration, self._clear_movement_state, duration ) def _cancel_movement_timer(self) -> None: """Cancel the current movement timer.""" + _LOGGER.debug("Movement timer cancelled") if self._movement_timer: self._movement_timer.cancel() self._movement_timer = None @@ -387,14 +399,16 @@ def _clear_movement_state(self, duration: float, _=None) -> None: def _track_target_lift_position(self, position: int | None): """Track locally instigated lift movement.""" self._target_lift_position = position - self._lift_update_reported = False - self._lift_state = None + if position is not None: + self._lift_update_received = False + self._lift_state = None def _track_target_tilt_position(self, position: int | None): """Track locally instigated tilt movement.""" self._target_tilt_position = position - self._tilt_update_reported = False - self._tilt_state = None + if position is not None: + self._tilt_update_received = False + self._tilt_state = None @staticmethod def _invert_position_for_zcl(position: int) -> int: @@ -415,11 +429,11 @@ def handle_cluster_handler_attribute_updated( _LOGGER.debug("handle_cluster_handler_attribute_updated=%s", event) if event.attribute_id == WCAttrs.current_position_lift_percentage.id: self._lift_position_history.append(self.current_cover_position) - self._lift_update_reported = True + self._lift_update_received = True self._determine_state(is_lift_update=True) if event.attribute_id == WCAttrs.current_position_tilt_percentage.id: self._tilt_position_history.append(self.current_cover_tilt_position) - self._tilt_update_reported = True + self._tilt_update_received = True self._determine_state(is_tilt_update=True) self.maybe_emit_state_changed_event() From 0a4138ba216e431b3215b277653eecd9494e0231 Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Mon, 10 Feb 2025 10:47:21 +0000 Subject: [PATCH 06/15] Use CoverState StrEnum in const for states rather than literals --- tests/test_cover.py | 100 ++++++++++---------- zha/application/platforms/cover/__init__.py | 65 ++++++------- zha/application/platforms/cover/const.py | 12 ++- 3 files changed, 90 insertions(+), 87 deletions(-) diff --git a/tests/test_cover.py b/tests/test_cover.py index 19084e7c5..07ef838ed 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -26,15 +26,10 @@ from zha.application import Platform from zha.application.const import ATTR_COMMAND from zha.application.gateway import Gateway -from zha.application.platforms.cover import ( - ATTR_CURRENT_POSITION, - STATE_CLOSED, - STATE_OPEN, -) from zha.application.platforms.cover.const import ( - STATE_CLOSING, - STATE_OPENING, + ATTR_CURRENT_POSITION, CoverEntityFeature, + CoverState, ) from zha.exceptions import ZHAException @@ -123,7 +118,7 @@ async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument entity = get_entity(zha_device, platform=Platform.COVER) state = entity.state - assert state["state"] == STATE_OPEN + assert state["state"] == CoverState.OPEN assert state[ATTR_CURRENT_POSITION] == 100 # test update @@ -137,7 +132,7 @@ async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument await entity.async_update() assert cluster.read_attributes.call_count == prev_call_count + 1 - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED assert entity.state[ATTR_CURRENT_POSITION] == 0 @@ -191,37 +186,37 @@ async def test_cover( await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 100} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # test that the state closes after tilting to 100% (closed) await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} ) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # set lift to 0% (open) and test to see if state changes to open await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # test that the state remains after tilting to 0% (open) await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # test to see the state remains after tilting to 100% (closed) await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN cluster.PLUGGED_ATTR_READS = {1: 100} update_attribute_cache(cluster) await entity.async_update() await zha_gateway.async_block_till_done() - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # close from client with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]): @@ -233,13 +228,13 @@ async def test_cover( assert cluster.request.call_args[0][2].command.name == WCCmds.down_close.name assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 100} ) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # tilt close from client with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]): @@ -255,13 +250,13 @@ async def test_cover( assert cluster.request.call_args[0][3] == 100 assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} ) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # open from client with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): @@ -273,13 +268,13 @@ async def test_cover( assert cluster.request.call_args[0][2].command.name == WCCmds.up_open.name assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_OPENING + assert entity.state["state"] == CoverState.OPENING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # open tilt from client with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): @@ -295,13 +290,13 @@ async def test_cover( assert cluster.request.call_args[0][3] == 0 assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_OPENING + assert entity.state["state"] == CoverState.OPENING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # test set position command, starting at 100 % / 0 ZCL (open) from previous lift test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): @@ -315,21 +310,21 @@ async def test_cover( assert cluster.request.call_args[1]["expect_reply"] is True assert entity.state["current_position"] == 100 - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 35} ) assert entity.state["current_position"] == 65 - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 53} ) assert entity.state["current_position"] == 47 - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # test set tilt position command, starting at 100 % / 0 ZCL (open) from previous tilt test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): @@ -347,19 +342,19 @@ async def test_cover( assert cluster.request.call_args[0][3] == 53 assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 35} ) - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 53} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # test interrupted movement (e.g. device button press), starting from 47 % with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): @@ -373,7 +368,7 @@ async def test_cover( assert cluster.request.call_args[1]["expect_reply"] is True assert entity.state["current_position"] == 47 - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING # simulate a device position update to set timer to the default duration rather than dynamic await send_attributes_report( @@ -381,28 +376,31 @@ async def test_cover( ) assert entity.state["current_position"] == 30 - assert entity.state["state"] == STATE_CLOSING + assert entity.state["state"] == CoverState.CLOSING # wait the timer duration await asyncio.sleep(5) assert entity.state["current_position"] == 30 - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # test device instigated movement (e.g. device button press), starting from 30 % with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_position"] == 30 + assert entity.state["state"] == CoverState.OPEN + await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 60} ) assert entity.state["current_position"] == 40 - assert entity.state["state"] == STATE_OPENING + assert entity.state["state"] == CoverState.OPENING # wait the default timer duration await asyncio.sleep(5) assert entity.state["current_position"] == 40 - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # stop from client with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]): @@ -445,7 +443,7 @@ async def test_cover_failures(zha_gateway: Gateway) -> None: zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 0} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # close from UI with patch( @@ -463,7 +461,7 @@ async def test_cover_failures(zha_gateway: Gateway) -> None: cluster.request.call_args[0][1] == closures.WindowCovering.ServerCommandDefs.down_close.id ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN with patch( "zigpy.zcl.Cluster.request", @@ -609,17 +607,17 @@ async def test_shade( await send_attributes_report( zha_gateway, cluster_on_off, {cluster_on_off.AttributeDefs.on_off.id: 0} ) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # test to see if it opens await send_attributes_report( zha_gateway, cluster_on_off, {cluster_on_off.AttributeDefs.on_off.id: 1} ) - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN await entity.async_update() await zha_gateway.async_block_till_done() - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # close from client command fails with ( @@ -637,7 +635,7 @@ async def test_shade( assert cluster_on_off.request.call_count == 1 assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0000 - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN with patch( "zigpy.zcl.Cluster.request", AsyncMock(return_value=[0x1, zcl_f.Status.SUCCESS]) @@ -647,11 +645,11 @@ async def test_shade( assert cluster_on_off.request.call_count == 1 assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0000 - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # open from client command fails await send_attributes_report(zha_gateway, cluster_level, {0: 0}) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED with ( patch( @@ -668,7 +666,7 @@ async def test_shade( assert cluster_on_off.request.call_count == 1 assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0001 - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # open from client succeeds with patch( @@ -679,7 +677,7 @@ async def test_shade( assert cluster_on_off.request.call_count == 1 assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0001 - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN # set position UI command fails with ( @@ -773,11 +771,11 @@ async def test_keen_vent( # test that the state has changed from unavailable to off await send_attributes_report(zha_gateway, cluster_on_off, {8: 0, 0: False, 1: 1}) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED await entity.async_update() await zha_gateway.async_block_till_done() - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # open from client command fails p1 = patch.object(cluster_on_off, "request", side_effect=asyncio.TimeoutError) @@ -793,7 +791,7 @@ async def test_keen_vent( assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0001 assert cluster_level.request.call_count == 1 - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED # open from client command success p1 = patch.object(cluster_on_off, "request", AsyncMock(return_value=[1, 0])) @@ -806,7 +804,7 @@ async def test_keen_vent( assert cluster_on_off.request.call_args[0][0] is False assert cluster_on_off.request.call_args[0][1] == 0x0001 assert cluster_level.request.call_count == 1 - assert entity.state["state"] == STATE_OPEN + assert entity.state["state"] == CoverState.OPEN assert entity.state["current_position"] == 100 @@ -853,10 +851,10 @@ async def test_cover_state_restoration( zha_device = await join_zigpy_device(zha_gateway, zigpy_cover_device) entity = get_entity(zha_device, platform=Platform.COVER) - assert entity.state["state"] != STATE_CLOSED + assert entity.state["state"] != CoverState.CLOSED entity.restore_external_state_attributes( - state=STATE_CLOSED, + state=CoverState.CLOSED, ) - assert entity.state["state"] == STATE_CLOSED + assert entity.state["state"] == CoverState.CLOSED diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 99fb712b5..75add3196 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -6,7 +6,7 @@ from collections import deque import functools import logging -from typing import TYPE_CHECKING, Any, Literal, cast +from typing import TYPE_CHECKING, Any, cast from zigpy.zcl.clusters.general import OnOff from zigpy.zcl.foundation import Status @@ -19,14 +19,11 @@ ATTR_TILT_POSITION, POSITION_CLOSED, POSITION_OPEN, - STATE_CLOSED, - STATE_CLOSING, - STATE_OPEN, - STATE_OPENING, WCT, ZCL_TO_COVER_DEVICE_CLASS, CoverDeviceClass, CoverEntityFeature, + CoverState, WCAttrs, ) from zha.application.registries import PLATFORM_ENTITIES @@ -101,8 +98,8 @@ def __init__( self._target_tilt_position: int | None = None self._lift_update_received: bool | None = None self._tilt_update_received: bool | None = None - self._lift_state: str | None = None - self._tilt_state: str | None = None + self._lift_state: CoverState | None = None + self._tilt_state: CoverState | None = None self._lift_position_history: deque[int | None] = deque( (self.current_cover_position,), maxlen=2 ) @@ -112,7 +109,7 @@ def __init__( self._loop = asyncio.get_running_loop() self._movement_timer: asyncio.TimerHandle | None = None - self._state: str = STATE_OPEN + self._state: CoverState | None = CoverState.OPEN self._determine_state(refresh=True) self._cover_cluster_handler.on_event( CLUSTER_HANDLER_ATTRIBUTE_UPDATED, @@ -122,9 +119,7 @@ def __init__( def restore_external_state_attributes( self, *, - state: Literal[ - "open", "opening", "closed", "closing" - ], # FIXME: why must these be expanded? + state: CoverState | None, target_lift_position: int | None = None, target_tilt_position: int | None = None, ): @@ -162,12 +157,12 @@ def is_closed(self) -> bool | None: @property def is_opening(self) -> bool: """Return if the cover is opening or not.""" - return self._state == STATE_OPENING + return self._state == CoverState.OPENING @property def is_closing(self) -> bool: """Return if the cover is closing or not.""" - return self._state == STATE_CLOSING + return self._state == CoverState.CLOSING @property def current_cover_position(self) -> int | None: @@ -242,10 +237,10 @@ def _determine_axis_state( ) ): # ZHA thinks the cover is moving - return STATE_OPENING if target > current else STATE_CLOSING + return CoverState.OPENING if target > current else CoverState.CLOSING # Return the static position - return STATE_OPEN if current > POSITION_CLOSED else STATE_CLOSED + return CoverState.OPEN if current > POSITION_CLOSED else CoverState.CLOSED def _determine_state( self, @@ -292,13 +287,16 @@ def _determine_state( ) # Clear target position if the cover axis is not moving - if self._lift_state not in (STATE_OPENING, STATE_CLOSING): + if self._lift_state not in (CoverState.OPENING, CoverState.CLOSING): self._track_target_lift_position(None) - if self._tilt_state not in (STATE_OPENING, STATE_CLOSING): + if self._tilt_state not in (CoverState.OPENING, CoverState.CLOSING): self._track_target_tilt_position(None) # Start a movement timeout if the cover is moving, else cancel it - if STATE_CLOSING in (self._lift_state, self._tilt_state) or STATE_OPENING in ( + if CoverState.CLOSING in ( + self._lift_state, + self._tilt_state, + ) or CoverState.OPENING in ( self._lift_state, self._tilt_state, ): @@ -309,15 +307,18 @@ def _determine_state( # Keep the last movement direction if either axis is still moving if ( self.is_closing - and STATE_CLOSING in (self._lift_state, self._tilt_state) + and CoverState.CLOSING in (self._lift_state, self._tilt_state) or self.is_opening - and STATE_OPENING in (self._lift_state, self._tilt_state) + and CoverState.OPENING in (self._lift_state, self._tilt_state) ): return # An open tilt state overrides a closed lift state - if self._tilt_state == STATE_OPEN and self._lift_state == STATE_CLOSED: - self._state = STATE_OPEN + if ( + self._tilt_state == CoverState.OPEN + and self._lift_state == CoverState.CLOSED + ): + self._state = CoverState.OPEN return # Pick lift state in preference over tilt @@ -442,7 +443,7 @@ def async_update_state(self, state): _LOGGER.debug("async_update_state=%s", state) self._state = state self.maybe_emit_state_changed_event() - if state in (STATE_OPENING, STATE_CLOSING): + if state in (CoverState.OPENING, CoverState.CLOSING): self._start_movement_timer() async def async_open_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument @@ -452,7 +453,7 @@ async def async_open_cover(self, **kwargs: Any) -> None: # pylint: disable=unus if res[1] is not Status.SUCCESS: self._track_target_lift_position(None) raise ZHAException(f"Failed to open cover: {res[1]}") - self.async_update_state(STATE_OPENING) + self.async_update_state(CoverState.OPENING) async def async_open_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Open the cover tilt.""" @@ -463,7 +464,7 @@ async def async_open_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable if res[1] is not Status.SUCCESS: self._track_target_tilt_position(None) raise ZHAException(f"Failed to open cover tilt: {res[1]}") - self.async_update_state(STATE_OPENING) + self.async_update_state(CoverState.OPENING) async def async_close_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Close the cover.""" @@ -472,7 +473,7 @@ async def async_close_cover(self, **kwargs: Any) -> None: # pylint: disable=unu if res[1] is not Status.SUCCESS: self._track_target_lift_position(None) raise ZHAException(f"Failed to close cover: {res[1]}") - self.async_update_state(STATE_CLOSING) + self.async_update_state(CoverState.CLOSING) async def async_close_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument """Close the cover tilt.""" @@ -483,7 +484,7 @@ async def async_close_cover_tilt(self, **kwargs: Any) -> None: # pylint: disabl if res[1] is not Status.SUCCESS: self._track_target_tilt_position(None) raise ZHAException(f"Failed to close cover tilt: {res[1]}") - self.async_update_state(STATE_CLOSING) + self.async_update_state(CoverState.CLOSING) async def async_set_cover_position(self, **kwargs: Any) -> None: """Move the cover to a specific position.""" @@ -498,9 +499,9 @@ async def async_set_cover_position(self, **kwargs: Any) -> None: self._track_target_lift_position(None) raise ZHAException(f"Failed to set cover position: {res[1]}") self.async_update_state( - STATE_CLOSING + CoverState.CLOSING if target_position < self.current_cover_position - else STATE_OPENING + else CoverState.OPENING ) async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: @@ -516,9 +517,9 @@ async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: self._track_target_tilt_position(None) raise ZHAException(f"Failed to set cover tilt position: {res[1]}") self.async_update_state( - STATE_CLOSING + CoverState.CLOSING if target_position < self.current_cover_tilt_position - else STATE_OPENING + else CoverState.OPENING ) async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument @@ -597,7 +598,7 @@ def state(self) -> dict[str, Any]: if (closed := self.is_closed) is None: state = None else: - state = STATE_CLOSED if closed else STATE_OPEN + state = CoverState.CLOSED if closed else CoverState.OPEN response = super().state response.update( { diff --git a/zha/application/platforms/cover/const.py b/zha/application/platforms/cover/const.py index e4ff11163..9e9ce34b7 100644 --- a/zha/application/platforms/cover/const.py +++ b/zha/application/platforms/cover/const.py @@ -13,10 +13,14 @@ POSITION_CLOSED: Final[int] = 0 POSITION_OPEN: Final[int] = 100 -STATE_OPEN: Final[str] = "open" -STATE_OPENING: Final[str] = "opening" -STATE_CLOSED: Final[str] = "closed" -STATE_CLOSING: Final[str] = "closing" + +class CoverState(StrEnum): + """State of Cover entities.""" + + CLOSED = "closed" + CLOSING = "closing" + OPEN = "open" + OPENING = "opening" class CoverDeviceClass(StrEnum): From 6d6e7fa0e6ac772f582cf3b51d038b2fab527c99 Mon Sep 17 00:00:00 2001 From: Jack <46714706+jeverley@users.noreply.github.com> Date: Mon, 10 Feb 2025 11:18:24 +0000 Subject: [PATCH 07/15] Readability improvement for _determine_state method --- zha/application/platforms/cover/__init__.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 75add3196..66df68ffa 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -248,29 +248,24 @@ def _determine_state( is_tilt_update: bool = False, refresh: bool = False, ) -> None: - """Determine the state of the cover entity.""" + """Determine the state of the cover entity. - # Determine state for lift and tilt axis - self._lift_state = ( - self._determine_axis_state( + This considers current state of both the lift and tilt axis. + """ + if self._lift_state is None or is_lift_update or refresh: + self._lift_state = self._determine_axis_state( self.current_cover_position, self._target_lift_position, self._lift_position_history, is_lift_update, ) - if is_lift_update or refresh or self._lift_state is None - else self._lift_state - ) - self._tilt_state = ( - self._determine_axis_state( + if self._tilt_state is None or is_tilt_update or refresh: + self._tilt_state = self._determine_axis_state( self.current_cover_tilt_position, self._target_tilt_position, self._tilt_position_history, is_tilt_update, ) - if is_tilt_update or refresh or self._tilt_state is None - else self._tilt_state - ) _LOGGER.debug( "_determine_state: lift=(state: %s, is_update: %s, current: %s, target: %s, history: %s), tilt=(state: %s, is_update: %s, current: %s, target: %s, history: %s)", From 9992d3a2dc49dc8d00b744a57cb03e004b90a4e4 Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Thu, 13 Feb 2025 13:57:09 +0000 Subject: [PATCH 08/15] Simplify stop_cover methods --- zha/application/platforms/cover/__init__.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 66df68ffa..9314b75e2 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -518,8 +518,12 @@ async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: ) async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument - """Stop the cover.""" + """Stop the cover. + + Upon receipt of this command the cover stops both lift and tilt movement. + """ self._track_target_lift_position(None) + self._track_target_tilt_position(None) res = await self._cover_cluster_handler.stop() if res[1] is not Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") @@ -527,13 +531,11 @@ async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unus self.maybe_emit_state_changed_event() async def async_stop_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument - """Stop the cover tilt.""" - self._track_target_tilt_position(None) - res = await self._cover_cluster_handler.stop() - if res[1] is not Status.SUCCESS: - raise ZHAException(f"Failed to stop cover: {res[1]}") - self._determine_state(refresh=True) - self.maybe_emit_state_changed_event() + """Stop the cover tilt. + + This is handled by async_stop_cover because there is no tilt specific command for Zigbee covers. + """ + await self.async_stop_cover(**kwargs) @MULTI_MATCH( From 73d51b763d6b3003dfec6987148d89d704d90024 Mon Sep 17 00:00:00 2001 From: Jack <46714706+jeverley@users.noreply.github.com> Date: Thu, 13 Feb 2025 17:27:09 +0000 Subject: [PATCH 09/15] Fix _attr_device_class for WindowCoveringType.Rollershade The previous logic was failing to populate the `_attr_device_class` because WindowCoveringType.Rollershade == 0x00. We should instead check for `is not None`. --- zha/application/platforms/cover/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 9314b75e2..9ee1d4554 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -84,7 +84,7 @@ def __init__( self._cover_cluster_handler: WindowCoveringClusterHandler = cast( WindowCoveringClusterHandler, cluster_handler ) - if self._cover_cluster_handler.window_covering_type: + if self._cover_cluster_handler.window_covering_type is not None: self._attr_device_class: CoverDeviceClass | None = ( ZCL_TO_COVER_DEVICE_CLASS.get( self._cover_cluster_handler.window_covering_type From f368f06c0e27869bcb33a9c586f88d0ce4de4f9f Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Thu, 13 Feb 2025 17:50:29 +0000 Subject: [PATCH 10/15] Amend tests following shade device_class fix --- .../devices/ikea-of-sweden-fyrtur-block-out-roller-blind.json | 2 +- .../data/devices/ikea-of-sweden-praktlysing-cellular-blind.json | 2 +- tests/data/devices/smartwings-wm25-l-z.json | 2 +- tests/data/devices/third-reality-inc-3rsb015bz.json | 2 +- tests/data/devices/tze200-9caxna4s-ts0301.json | 2 +- tests/data/devices/yooksmart-d10110.json | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/data/devices/ikea-of-sweden-fyrtur-block-out-roller-blind.json b/tests/data/devices/ikea-of-sweden-fyrtur-block-out-roller-blind.json index 1e9a5f425..7cbcd9c59 100644 --- a/tests/data/devices/ikea-of-sweden-fyrtur-block-out-roller-blind.json +++ b/tests/data/devices/ikea-of-sweden-fyrtur-block-out-roller-blind.json @@ -341,7 +341,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/ikea-of-sweden-praktlysing-cellular-blind.json b/tests/data/devices/ikea-of-sweden-praktlysing-cellular-blind.json index 7eb29b743..00119d3f4 100644 --- a/tests/data/devices/ikea-of-sweden-praktlysing-cellular-blind.json +++ b/tests/data/devices/ikea-of-sweden-praktlysing-cellular-blind.json @@ -347,7 +347,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/smartwings-wm25-l-z.json b/tests/data/devices/smartwings-wm25-l-z.json index ecff3bf8d..8cff074bf 100644 --- a/tests/data/devices/smartwings-wm25-l-z.json +++ b/tests/data/devices/smartwings-wm25-l-z.json @@ -341,7 +341,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/third-reality-inc-3rsb015bz.json b/tests/data/devices/third-reality-inc-3rsb015bz.json index ff7bb1a0e..25bb6af68 100644 --- a/tests/data/devices/third-reality-inc-3rsb015bz.json +++ b/tests/data/devices/third-reality-inc-3rsb015bz.json @@ -840,7 +840,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/tze200-9caxna4s-ts0301.json b/tests/data/devices/tze200-9caxna4s-ts0301.json index f5a497766..c911b87bf 100644 --- a/tests/data/devices/tze200-9caxna4s-ts0301.json +++ b/tests/data/devices/tze200-9caxna4s-ts0301.json @@ -268,7 +268,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, diff --git a/tests/data/devices/yooksmart-d10110.json b/tests/data/devices/yooksmart-d10110.json index b267530c7..5663acc27 100644 --- a/tests/data/devices/yooksmart-d10110.json +++ b/tests/data/devices/yooksmart-d10110.json @@ -280,7 +280,7 @@ "platform": "cover", "class_name": "Cover", "translation_key": "cover", - "device_class": null, + "device_class": "shade", "state_class": null, "entity_category": null, "entity_registry_enabled_default": true, From d91aea1b1ea64fea968611bc7a3f5bd5dc361333 Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Fri, 14 Feb 2025 12:23:06 +0000 Subject: [PATCH 11/15] Include missing ATTR_CURRENT_TILT_POSITION in state property --- zha/application/platforms/cover/__init__.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 9ee1d4554..2f0f0194c 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -15,6 +15,7 @@ from zha.application.platforms import PlatformEntity from zha.application.platforms.cover.const import ( ATTR_CURRENT_POSITION, + ATTR_CURRENT_TILT_POSITION, ATTR_POSITION, ATTR_TILT_POSITION, POSITION_CLOSED, @@ -101,10 +102,10 @@ def __init__( self._lift_state: CoverState | None = None self._tilt_state: CoverState | None = None self._lift_position_history: deque[int | None] = deque( - (self.current_cover_position,), maxlen=2 + [self.current_cover_position], maxlen=2 ) self._tilt_position_history: deque[int | None] = deque( - (self.current_cover_tilt_position,), maxlen=2 + [self.current_cover_tilt_position], maxlen=2 ) self._loop = asyncio.get_running_loop() self._movement_timer: asyncio.TimerHandle | None = None @@ -139,6 +140,7 @@ def state(self) -> dict[str, Any]: response.update( { ATTR_CURRENT_POSITION: self.current_cover_position, + ATTR_CURRENT_TILT_POSITION: self.current_cover_tilt_position, "state": self._state, "is_opening": self.is_opening, "is_closing": self.is_closing, @@ -171,13 +173,19 @@ def current_cover_position(self) -> int | None: In HA None is unknown, 0 is closed, 100 is fully open. In ZCL 0 is fully open, 100 is fully closed. Keep in mind the values have already been flipped to match HA - in the WindowCovering cluster handler + in the WindowCovering cluster handler. """ return self._cover_cluster_handler.current_position_lift_percentage @property def current_cover_tilt_position(self) -> int | None: - """Return the current tilt position of the cover.""" + """Return the current tilt position of the cover. + + In HA None is unknown, 0 is closed, 100 is fully open. + In ZCL 0 is fully open, 100 is fully closed. + Keep in mind the values have already been flipped to match HA + in the WindowCovering cluster handler. + """ return self._cover_cluster_handler.current_position_tilt_percentage def _determine_supported_features(self) -> CoverEntityFeature: From fd9f968260f28751ba40143c43b6b45356d7a74d Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:00:04 +0000 Subject: [PATCH 12/15] Docstring updates and use of converter functions for Shade PlatformEntity --- zha/application/platforms/cover/__init__.py | 46 ++++++++++++--------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 2f0f0194c..34aa6a6f8 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -224,8 +224,7 @@ def _determine_axis_state( When a target is defined the logic aims to mitigate split-brain scenarios where a HA command is interrupted by a device button press/physical obstruction. - The logic considers previous position to determine if the cover is moving, - if the position has not changed between two device updates it is not moving. + The logic considers previous position to determine if the cover is moving. """ if current is None: @@ -330,7 +329,7 @@ def _determine_state( def _dynamic_timeout(self) -> float: """Return a timer duration in seconds based on expected movement distance. - This is required because some devices only report position updates after stopping. + This is required because some devices only report a position update after stopping. """ lift_timeout = 0.0 @@ -374,7 +373,7 @@ def _dynamic_timeout(self) -> float: return max(lift_timeout, tilt_timeout) def _start_movement_timer(self, seconds: float = 0) -> None: - """Start timer for clearing the current movement state.""" + """Start timer for clearing the movement state (opening/closing).""" if self._movement_timer: self._movement_timer.cancel() duration = seconds or self._dynamic_timeout() or DEFAULT_MOVEMENT_TIMEOUT @@ -386,14 +385,14 @@ def _start_movement_timer(self, seconds: float = 0) -> None: ) def _cancel_movement_timer(self) -> None: - """Cancel the current movement timer.""" + """Cancel the movement timer.""" _LOGGER.debug("Movement timer cancelled") if self._movement_timer: self._movement_timer.cancel() self._movement_timer = None def _clear_movement_state(self, duration: float, _=None) -> None: - """Clear the moving state of the cover due to inactivity.""" + """Clear the movement state due to inactivity.""" _LOGGER.debug("No movement reported for %s seconds", duration) self._target_lift_position = None self._target_tilt_position = None @@ -401,21 +400,21 @@ def _clear_movement_state(self, duration: float, _=None) -> None: self.maybe_emit_state_changed_event() def _track_target_lift_position(self, position: int | None): - """Track locally instigated lift movement.""" + """Track locally instigated lift target.""" self._target_lift_position = position if position is not None: self._lift_update_received = False self._lift_state = None def _track_target_tilt_position(self, position: int | None): - """Track locally instigated tilt movement.""" + """Track locally instigated tilt target.""" self._target_tilt_position = position if position is not None: self._tilt_update_received = False self._tilt_state = None @staticmethod - def _invert_position_for_zcl(position: int) -> int: + def _ha_to_zcl_position(position: int) -> int: """Convert the HA position to the ZCL position range. In HA None is unknown, 0 is closed, 100 is fully open. @@ -462,7 +461,7 @@ async def async_open_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable """Open the cover tilt.""" self._track_target_tilt_position(POSITION_OPEN) res = await self._cover_cluster_handler.go_to_tilt_percentage( - self._invert_position_for_zcl(POSITION_OPEN) + self._ha_to_zcl_position(POSITION_OPEN) ) if res[1] is not Status.SUCCESS: self._track_target_tilt_position(None) @@ -482,7 +481,7 @@ async def async_close_cover_tilt(self, **kwargs: Any) -> None: # pylint: disabl """Close the cover tilt.""" self._track_target_tilt_position(POSITION_CLOSED) res = await self._cover_cluster_handler.go_to_tilt_percentage( - self._invert_position_for_zcl(POSITION_CLOSED) + self._ha_to_zcl_position(POSITION_CLOSED) ) if res[1] is not Status.SUCCESS: self._track_target_tilt_position(None) @@ -496,7 +495,7 @@ async def async_set_cover_position(self, **kwargs: Any) -> None: assert target_position is not None self._track_target_lift_position(target_position) res = await self._cover_cluster_handler.go_to_lift_percentage( - self._invert_position_for_zcl(target_position) + self._ha_to_zcl_position(target_position) ) if res[1] is not Status.SUCCESS: self._track_target_lift_position(None) @@ -514,7 +513,7 @@ async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: assert target_position is not None self._track_target_tilt_position(target_position) res = await self._cover_cluster_handler.go_to_tilt_percentage( - self._invert_position_for_zcl(target_position) + self._ha_to_zcl_position(target_position) ) if res[1] is not Status.SUCCESS: self._track_target_tilt_position(None) @@ -580,8 +579,7 @@ def __init__( self._is_open: bool = bool(self._on_off_cluster_handler.on_off) position = self._level_cluster_handler.current_level if position is not None: - position = max(0, min(255, position)) - position = int(position * 100 / 255) + position = self._zcl_to_ha_position(position) self._position: int | None = position self._on_off_cluster_handler.on_event( CLUSTER_HANDLER_ATTRIBUTE_UPDATED, @@ -657,8 +655,7 @@ def handle_cluster_handler_attribute_updated( def handle_cluster_handler_set_level(self, event: LevelChangeEvent) -> None: """Set the reported position.""" - value = max(0, min(255, event.level)) - self._position = int(value * 100 / 255) + self._position = self._zcl_to_ha_position(event.level) self.maybe_emit_state_changed_event() async def async_open_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument @@ -683,7 +680,7 @@ async def async_set_cover_position(self, **kwargs: Any) -> None: """Move the roller shutter to a specific position.""" new_pos = kwargs[ATTR_POSITION] res = await self._level_cluster_handler.move_to_level_with_on_off( - new_pos * 255 / 100, 1 + self._ha_to_zcl_position(new_pos), 1 ) if res[1] != Status.SUCCESS: @@ -698,6 +695,17 @@ async def async_stop_cover(self, **kwargs: Any) -> None: # pylint: disable=unus if res[1] != Status.SUCCESS: raise ZHAException(f"Failed to stop cover: {res[1]}") + @staticmethod + def _zcl_to_ha_position(level: int) -> int: + """Convert the ZCL level to the HA position range.""" + level = max(0, min(255, level)) + return round(level * 100 / 255) + + @staticmethod + def _ha_to_zcl_position(position: int) -> int: + """Convert the HA position to the ZCL level range.""" + return round(position * 255 / 100) + @MULTI_MATCH( cluster_handler_names={CLUSTER_HANDLER_LEVEL, CLUSTER_HANDLER_ON_OFF}, @@ -714,7 +722,7 @@ async def async_open_cover(self, **kwargs: Any) -> None: position = self._position or 100 await asyncio.gather( self._level_cluster_handler.move_to_level_with_on_off( - position * 255 / 100, 1 + self._ha_to_zcl_position(position), 1 ), self._on_off_cluster_handler.on(), ) From cbd377a6a1ba184c0a7ae4dd0b7ec530c8efb37d Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:58:56 +0000 Subject: [PATCH 13/15] Guard against state change when target is equal to current position This prevents ZHA from incorrectly marking the state as opening/closing if the cover is already at its target. This could occur if the cover immediately reported an attribute update after receiving the command. --- tests/test_cover.py | 30 +++++++++++++++++++++ zha/application/platforms/cover/__init__.py | 12 +++++++++ 2 files changed, 42 insertions(+) diff --git a/tests/test_cover.py b/tests/test_cover.py index 07ef838ed..dc4581ae6 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -236,8 +236,18 @@ async def test_cover( assert entity.state["state"] == CoverState.CLOSED + # verify that a subsequent close command does not change the state to closing + await entity.async_close_cover() + assert entity.state["state"] == CoverState.CLOSED + # tilt close from client with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]): + # reset the tilt to 0% (open) + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} + ) + assert entity.state["state"] == CoverState.OPEN + await entity.async_close_cover_tilt() await zha_gateway.async_block_till_done() assert cluster.request.call_count == 1 @@ -258,6 +268,10 @@ async def test_cover( assert entity.state["state"] == CoverState.CLOSED + # verify that a subsequent close command does not change the state to closing + await entity.async_close_cover_tilt() + assert entity.state["state"] == CoverState.CLOSED + # open from client with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): await entity.async_open_cover() @@ -276,6 +290,10 @@ async def test_cover( assert entity.state["state"] == CoverState.OPEN + # verify that a subsequent open command does not change the state to opening + await entity.async_open_cover() + assert entity.state["state"] == CoverState.OPEN + # open tilt from client with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): await entity.async_open_cover_tilt() @@ -298,6 +316,10 @@ async def test_cover( assert entity.state["state"] == CoverState.OPEN + # verify that a subsequent open command does not change the state to opening + await entity.async_open_cover_tilt() + assert entity.state["state"] == CoverState.OPEN + # test set position command, starting at 100 % / 0 ZCL (open) from previous lift test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): await entity.async_set_cover_position(position=47) # 53 when inverted for ZCL @@ -326,6 +348,10 @@ async def test_cover( assert entity.state["current_position"] == 47 assert entity.state["state"] == CoverState.OPEN + # verify that a subsequent go_to command does not change the state to closing/opening + await entity.async_set_cover_position(position=47) + assert entity.state["state"] == CoverState.OPEN + # test set tilt position command, starting at 100 % / 0 ZCL (open) from previous tilt test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): await entity.async_set_cover_tilt_position( @@ -356,6 +382,10 @@ async def test_cover( assert entity.state["state"] == CoverState.OPEN + # verify that a subsequent go_to command does not change the state to closing/opening + await entity.async_set_cover_tilt_position(tilt_position=47) + assert entity.state["state"] == CoverState.OPEN + # test interrupted movement (e.g. device button press), starting from 47 % with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): await entity.async_set_cover_position(position=0) # 100 when inverted for ZCL diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 34aa6a6f8..121e07826 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -455,6 +455,8 @@ async def async_open_cover(self, **kwargs: Any) -> None: # pylint: disable=unus if res[1] is not Status.SUCCESS: self._track_target_lift_position(None) raise ZHAException(f"Failed to open cover: {res[1]}") + if self.current_cover_position == POSITION_OPEN: + return self.async_update_state(CoverState.OPENING) async def async_open_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument @@ -466,6 +468,8 @@ async def async_open_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable if res[1] is not Status.SUCCESS: self._track_target_tilt_position(None) raise ZHAException(f"Failed to open cover tilt: {res[1]}") + if self.current_cover_tilt_position == POSITION_OPEN: + return self.async_update_state(CoverState.OPENING) async def async_close_cover(self, **kwargs: Any) -> None: # pylint: disable=unused-argument @@ -475,6 +479,8 @@ async def async_close_cover(self, **kwargs: Any) -> None: # pylint: disable=unu if res[1] is not Status.SUCCESS: self._track_target_lift_position(None) raise ZHAException(f"Failed to close cover: {res[1]}") + if self.current_cover_position == POSITION_CLOSED: + return self.async_update_state(CoverState.CLOSING) async def async_close_cover_tilt(self, **kwargs: Any) -> None: # pylint: disable=unused-argument @@ -486,6 +492,8 @@ async def async_close_cover_tilt(self, **kwargs: Any) -> None: # pylint: disabl if res[1] is not Status.SUCCESS: self._track_target_tilt_position(None) raise ZHAException(f"Failed to close cover tilt: {res[1]}") + if self.current_cover_tilt_position == POSITION_CLOSED: + return self.async_update_state(CoverState.CLOSING) async def async_set_cover_position(self, **kwargs: Any) -> None: @@ -500,6 +508,8 @@ async def async_set_cover_position(self, **kwargs: Any) -> None: if res[1] is not Status.SUCCESS: self._track_target_lift_position(None) raise ZHAException(f"Failed to set cover position: {res[1]}") + if target_position == self.current_cover_position: + return self.async_update_state( CoverState.CLOSING if target_position < self.current_cover_position @@ -518,6 +528,8 @@ async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: if res[1] is not Status.SUCCESS: self._track_target_tilt_position(None) raise ZHAException(f"Failed to set cover tilt position: {res[1]}") + if target_position == self.current_cover_tilt_position: + return self.async_update_state( CoverState.CLOSING if target_position < self.current_cover_tilt_position From af3aafdfd398b8fc3d642bac7556d51c2f77df14 Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:57:36 +0000 Subject: [PATCH 14/15] Add test for dynamic movement timeout --- tests/test_cover.py | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/tests/test_cover.py b/tests/test_cover.py index dc4581ae6..ab811f9a8 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -141,6 +141,12 @@ async def test_cover( ) -> None: """Test zha cover platform.""" + # Timeout for device movement following a position attribute update + DEFAULT_MOVEMENT_TIMEOUT: float = 5 + + # Upper limit for dynamic timeout + LIFT_MOVEMENT_TIMEOUT_RANGE: float = 300 + zigpy_cover_device = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_DEVICE) cluster = zigpy_cover_device.endpoints.get(1).window_covering cluster.PLUGGED_ATTR_READS = { @@ -409,7 +415,7 @@ async def test_cover( assert entity.state["state"] == CoverState.CLOSING # wait the timer duration - await asyncio.sleep(5) + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) assert entity.state["current_position"] == 30 assert entity.state["state"] == CoverState.OPEN @@ -427,8 +433,35 @@ async def test_cover( assert entity.state["state"] == CoverState.OPENING # wait the default timer duration - await asyncio.sleep(5) + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + + assert entity.state["current_position"] == 40 + assert entity.state["state"] == CoverState.OPEN + # test dynamic movement timeout, starting from 40 % and moving to 90 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_position"] == 40 + assert entity.state["state"] == CoverState.OPEN + + await entity.async_set_cover_position(position=90) # 10 when inverted for ZCL + await zha_gateway.async_block_till_done() + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x05 + assert cluster.request.call_args[0][2].command.name == "go_to_lift_percentage" + assert cluster.request.call_args[0][3] == 10 + assert cluster.request.call_args[1]["expect_reply"] is True + + assert entity.state["state"] == CoverState.OPENING + + # wait the default timer duration and verify status is still opening + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPENING + + # wait the remainder of the dynamic timeout and check if the movement timed out: (50% * 300 seconds) - default + await asyncio.sleep( + (50 * 0.01 * LIFT_MOVEMENT_TIMEOUT_RANGE) - DEFAULT_MOVEMENT_TIMEOUT + ) assert entity.state["current_position"] == 40 assert entity.state["state"] == CoverState.OPEN From cc2a967abc7dacffe3bf6dc36706dc842113c9e9 Mon Sep 17 00:00:00 2001 From: jeverley <46714706+jeverley@users.noreply.github.com> Date: Fri, 14 Feb 2025 16:27:05 +0000 Subject: [PATCH 15/15] Full test coverage for tilt axis --- tests/test_cover.py | 79 ++++++++++++++++++++- zha/application/platforms/cover/__init__.py | 13 ++-- 2 files changed, 83 insertions(+), 9 deletions(-) diff --git a/tests/test_cover.py b/tests/test_cover.py index ab811f9a8..7b73ef906 100644 --- a/tests/test_cover.py +++ b/tests/test_cover.py @@ -146,6 +146,7 @@ async def test_cover( # Upper limit for dynamic timeout LIFT_MOVEMENT_TIMEOUT_RANGE: float = 300 + TILT_MOVEMENT_TIMEOUT_RANGE: float = 30 zigpy_cover_device = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_DEVICE) cluster = zigpy_cover_device.endpoints.get(1).window_covering @@ -328,6 +329,7 @@ async def test_cover( # test set position command, starting at 100 % / 0 ZCL (open) from previous lift test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_position"] == 100 await entity.async_set_cover_position(position=47) # 53 when inverted for ZCL await zha_gateway.async_block_till_done() assert cluster.request.call_count == 1 @@ -337,7 +339,6 @@ async def test_cover( assert cluster.request.call_args[0][3] == 53 assert cluster.request.call_args[1]["expect_reply"] is True - assert entity.state["current_position"] == 100 assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( @@ -360,6 +361,7 @@ async def test_cover( # test set tilt position command, starting at 100 % / 0 ZCL (open) from previous tilt test with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_tilt_position"] == 100 await entity.async_set_cover_tilt_position( tilt_position=47 ) # 53 when inverted for ZCL @@ -380,12 +382,14 @@ async def test_cover( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 35} ) + assert entity.state["current_tilt_position"] == 65 assert entity.state["state"] == CoverState.CLOSING await send_attributes_report( zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 53} ) + assert entity.state["current_tilt_position"] == 47 assert entity.state["state"] == CoverState.OPEN # verify that a subsequent go_to command does not change the state to closing/opening @@ -416,8 +420,34 @@ async def test_cover( # wait the timer duration await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPEN - assert entity.state["current_position"] == 30 + # test interrupted tilt movement (e.g. device button press), starting from 47 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + await entity.async_set_cover_tilt_position( + tilt_position=0 + ) # 100 when inverted for ZCL + await zha_gateway.async_block_till_done() + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x08 + assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert cluster.request.call_args[0][3] == 100 + assert cluster.request.call_args[1]["expect_reply"] is True + + assert entity.state["current_tilt_position"] == 47 + assert entity.state["state"] == CoverState.CLOSING + + # simulate a device position update to set timer to the default duration rather than dynamic + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 70} + ) + + assert entity.state["current_tilt_position"] == 30 + assert entity.state["state"] == CoverState.CLOSING + + # wait the timer duration + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) assert entity.state["state"] == CoverState.OPEN # test device instigated movement (e.g. device button press), starting from 30 % @@ -434,8 +464,22 @@ async def test_cover( # wait the default timer duration await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPEN - assert entity.state["current_position"] == 40 + # test device instigated tilt movement (e.g. device button press), starting from 30 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_tilt_position"] == 30 + assert entity.state["state"] == CoverState.OPEN + + await send_attributes_report( + zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 60} + ) + + assert entity.state["current_tilt_position"] == 40 + assert entity.state["state"] == CoverState.OPENING + + # wait the default timer duration + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) assert entity.state["state"] == CoverState.OPEN # test dynamic movement timeout, starting from 40 % and moving to 90 % @@ -465,6 +509,35 @@ async def test_cover( assert entity.state["current_position"] == 40 assert entity.state["state"] == CoverState.OPEN + # test dynamic tilt movement timeout, starting from 40 % and moving to 90 % + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): + assert entity.state["current_tilt_position"] == 40 + assert entity.state["state"] == CoverState.OPEN + + await entity.async_set_cover_tilt_position( + tilt_position=90 + ) # 10 when inverted for ZCL + await zha_gateway.async_block_till_done() + assert cluster.request.call_count == 1 + assert cluster.request.call_args[0][0] is False + assert cluster.request.call_args[0][1] == 0x08 + assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert cluster.request.call_args[0][3] == 10 + assert cluster.request.call_args[1]["expect_reply"] is True + + assert entity.state["state"] == CoverState.OPENING + + # wait the default timer duration and verify status is still opening + await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT) + assert entity.state["state"] == CoverState.OPENING + + # wait the remainder of the dynamic timeout and check if the movement timed out: (50% * 30 seconds) - default + await asyncio.sleep( + (50 * 0.01 * TILT_MOVEMENT_TIMEOUT_RANGE) - DEFAULT_MOVEMENT_TIMEOUT + ) + assert entity.state["current_tilt_position"] == 40 + assert entity.state["state"] == CoverState.OPEN + # stop from client with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]): await entity.async_stop_cover() diff --git a/zha/application/platforms/cover/__init__.py b/zha/application/platforms/cover/__init__.py index 121e07826..cb61f1fa6 100644 --- a/zha/application/platforms/cover/__init__.py +++ b/zha/application/platforms/cover/__init__.py @@ -315,12 +315,13 @@ def _determine_state( ): return - # An open tilt state overrides a closed lift state - if ( - self._tilt_state == CoverState.OPEN - and self._lift_state == CoverState.CLOSED - ): - self._state = CoverState.OPEN + # An open or moving tilt state overrides a static lift state + if self._tilt_state in ( + CoverState.OPEN, + CoverState.OPENING, + CoverState.CLOSING, + ) and self._lift_state in (CoverState.CLOSED, CoverState.OPEN): + self._state = self._tilt_state return # Pick lift state in preference over tilt