From 26cd982a70c56977ddcac25c77534f70fda7bad4 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Mon, 9 Jan 2023 09:34:57 +0300 Subject: [PATCH] Fix support Hass v2023.1.2 --- .../xiaomi_gateway3/core/converters/silabs.py | 124 +++++++++--------- 1 file changed, 63 insertions(+), 61 deletions(-) diff --git a/custom_components/xiaomi_gateway3/core/converters/silabs.py b/custom_components/xiaomi_gateway3/core/converters/silabs.py index 9793ee8a..414caaaf 100644 --- a/custom_components/xiaomi_gateway3/core/converters/silabs.py +++ b/custom_components/xiaomi_gateway3/core/converters/silabs.py @@ -3,11 +3,14 @@ from zigpy.types import EUI64 from zigpy.zcl import Cluster -from zigpy.zcl.foundation import Command, ZCLHeader, Attribute, \ - ReadAttributeRecord, DATA_TYPES +from zigpy.zcl.foundation import ZCLHeader, Attribute, ReadAttributeRecord, DATA_TYPES from zigpy.zdo import ZDO -from zigpy.zdo.types import ZDOCmd, SizePrefixedSimpleDescriptor, \ - NodeDescriptor +from zigpy.zdo.types import ZDOCmd, SizePrefixedSimpleDescriptor, NodeDescriptor + +try: + from zigpy.zcl.foundation import GeneralCommand as Command +except Exception: + from zigpy.zcl.foundation import Command _LOGGER = logging.getLogger(__name__) @@ -27,16 +30,12 @@ def decode(data: dict): else: zdo = CLUSTERS["zdo"] - cluster_id = int(data['clusterId'], 0) - raw = bytes.fromhex(data['APSPlayload'][2:]) + cluster_id = int(data["clusterId"], 0) + raw = bytes.fromhex(data["APSPlayload"][2:]) hdr, args = zdo.deserialize(cluster_id, raw) cmd = ZDOCmd(hdr.command_id).name if hdr.command_id == ZDOCmd.Active_EP_rsp: - return { - "command": cmd, - "status": str(args[0]), - "endpoints": args[2] - } + return {"command": cmd, "status": str(args[0]), "endpoints": args[2]} elif hdr.command_id == ZDOCmd.Simple_Desc_rsp: desc: SizePrefixedSimpleDescriptor = args[2] return { @@ -47,7 +46,7 @@ def decode(data: dict): "endpoint": desc.endpoint, "input_clusters": desc.input_clusters, "output_clusters": desc.output_clusters, - "profile": desc.profile + "profile": desc.profile, } elif hdr.command_id == ZDOCmd.Node_Desc_rsp: desc: NodeDescriptor = args[2] @@ -63,9 +62,7 @@ def decode(data: dict): "command": cmd, "status": str(args[0]), } - elif hdr.command_id in ( - ZDOCmd.Node_Desc_req, ZDOCmd.Active_EP_req - ): + elif hdr.command_id in (ZDOCmd.Node_Desc_req, ZDOCmd.Active_EP_req): return {"command": cmd} elif hdr.command_id == ZDOCmd.Simple_Desc_req: return { @@ -78,7 +75,7 @@ def decode(data: dict): "src_addr": args[0], "src_endpoint": args[1], "cluster": args[2], - "dst_addr": args[3] + "dst_addr": args[3], } elif hdr.command_id == ZDOCmd.IEEE_addr_rsp: return { @@ -112,14 +109,14 @@ def decode(data: dict): raise NotImplemented # decode ZCL - cluster_id = int(data['clusterId'], 0) + cluster_id = int(data["clusterId"], 0) if cluster_id not in CLUSTERS: cluster = CLUSTERS[cluster_id] = Cluster.from_id(None, cluster_id) cluster._log = lambda *_, **__: None else: cluster = CLUSTERS[cluster_id] - raw = bytes.fromhex(data['APSPlayload'][2:]) + raw = bytes.fromhex(data["APSPlayload"][2:]) try: hdr, args = cluster.deserialize(raw) hdr: ZCLHeader @@ -141,9 +138,11 @@ def decode(data: dict): if hdr.frame_control.is_general: payload["command"] = Command(hdr.command_id).name - if (hdr.command_id == Command.Report_Attributes or - hdr.command_id == Command.Write_Attributes): - attrs, = args + if ( + hdr.command_id == Command.Report_Attributes + or hdr.command_id == Command.Write_Attributes + ): + (attrs,) = args for attr in attrs: assert isinstance(attr, Attribute) if attr.attrid in cluster.attributes: @@ -154,8 +153,7 @@ def decode(data: dict): value = attr.value.value if isinstance(value, bytes) and value: payload[name] = "0x" + value.hex() - elif isinstance(value, list) and \ - not isinstance(value, EUI64): + elif isinstance(value, list) and not isinstance(value, EUI64): payload[name] = [v.value for v in value] elif isinstance(value, int): payload[name] = int(value) @@ -163,7 +161,7 @@ def decode(data: dict): payload[name] = value elif hdr.command_id == Command.Read_Attributes_rsp: - attrs, = args + (attrs,) = args for attr in attrs: assert isinstance(attr, ReadAttributeRecord) if attr.attrid in cluster.attributes: @@ -185,20 +183,22 @@ def decode(data: dict): payload[name] = str(attr.status) elif hdr.command_id == Command.Read_Attributes: - attrs, = args + (attrs,) = args payload["value"] = attrs elif hdr.command_id == Command.Configure_Reporting: - attrs, = args + (attrs,) = args # fix __repr__ bug for attr in attrs: if not hasattr(attr, "reportable_change"): attr.reportable_change = None payload["value"] = attrs - elif (hdr.command_id == Command.Write_Attributes_rsp or - hdr.command_id == Command.Configure_Reporting_rsp): - resp, = args + elif ( + hdr.command_id == Command.Write_Attributes_rsp + or hdr.command_id == Command.Configure_Reporting_rsp + ): + (resp,) = args payload["status"] = [str(attr.status) for attr in resp] elif hdr.command_id == Command.Discover_Commands_Received_rsp: @@ -241,8 +241,7 @@ def decode(data: dict): def get_cluster(cluster: str) -> Cluster: # noinspection PyProtectedMember return next( - cls for cls in Cluster._registry.values() if - cls.ep_attribute == cluster + cls for cls in Cluster._registry.values() if cls.ep_attribute == cluster ) @@ -253,10 +252,7 @@ def get_attr(attributes: dict, attr) -> int: def get_attr_type(attributes: dict, attr: str) -> (int, int): - attr, attr_type = next( - (k, v[1]) for k, v in attributes.items() - if v[0] == attr - ) + attr, attr_type = next((k, v[1]) for k, v in attributes.items() if v[0] == attr) type = next(k for k, v in DATA_TYPES.items() if v[1] == attr_type) return attr, type @@ -274,10 +270,7 @@ def get_type_len(type: int) -> int: def zcl_on_off(nwk: str, ep: int, value: bool) -> list: """Generate Silabs Z3 command (cluster 6).""" value = "on" if value else "off" - return [ - {"commandcli": f"zcl on-off {value}"}, - {"commandcli": f"send {nwk} 1 {ep}"} - ] + return [{"commandcli": f"zcl on-off {value}"}, {"commandcli": f"send {nwk} 1 {ep}"}] # zcl level-control mv-to-level [level:1] [transitionTime:2] [optionMask:1] [optionOverride:1] @@ -286,7 +279,7 @@ def zcl_level(nwk: str, ep: int, br: int, tr: float) -> list: tr = int(tr * 10.0) # zcl format - tenths of a seconds return [ {"commandcli": f"zcl level-control o-mv-to-level {br} {tr}"}, - {"commandcli": f"send {nwk} 1 {ep}"} + {"commandcli": f"send {nwk} 1 {ep}"}, ] @@ -295,7 +288,7 @@ def zcl_color(nwk: str, ep: int, ct: int, tr: float) -> list: tr = int(tr * 10.0) # zcl format - tenths of a seconds return [ {"commandcli": f"zcl color-control movetocolortemp {ct} {tr} 0 0"}, - {"commandcli": f"send {nwk} 1 {ep}"} + {"commandcli": f"send {nwk} 1 {ep}"}, ] @@ -324,14 +317,20 @@ def zcl_read(nwk: str, ep: int, cluster: Union[str, int], *attrs) -> list: return [ {"commandcli": f"zcl global read {cid} {attrs[0]}"}, - {"commandcli": f"send {nwk} 1 {ep}"} + {"commandcli": f"send {nwk} 1 {ep}"}, ] # zcl global write [cluster:2] [attributeId:2] [type:4] [data:-1] def zcl_write( - nwk: str, ep: int, cluster: Union[str, int], attr: Union[str, int], - data: int, *, mfg: int = None, type: int = None + nwk: str, + ep: int, + cluster: Union[str, int], + attr: Union[str, int], + data: int, + *, + mfg: int = None, + type: int = None, ) -> list: """Generate Silabs Z3 write attribute command.""" if isinstance(cluster, str): @@ -347,12 +346,10 @@ def zcl_write( # data always string hex: {FFFF} data = data.to_bytes(type_len, "little").hex() - pre = [ - {"commandcli": f"zcl mfg-code {mfg}"} - ] if mfg is not None else [] + pre = [{"commandcli": f"zcl mfg-code {mfg}"}] if mfg is not None else [] return pre + [ {"commandcli": f"zcl global write {cluster} {attr} {type} {{{data}}}"}, - {"commandcli": f"send {nwk} 1 {ep}"} + {"commandcli": f"send {nwk} 1 {ep}"}, ] @@ -361,9 +358,7 @@ def zdo_bind(nwk: str, ep: int, cluster: str, src: str, dst: str) -> list: """Generate Silabs Z3 bind command.""" cluster = get_cluster(cluster) cid = cluster.cluster_id - return [{ - "commandcli": f"zdo bind {nwk} {ep} 1 {cid} {{{src}}} {{{dst}}}" - }] + return [{"commandcli": f"zdo bind {nwk} {ep} 1 {cid} {{{src}}} {{{dst}}}"}] # zdo unbind unicast [target:2] [source eui64:8] [source endpoint:1] [clusterID:2] [destinationEUI64:8] [destEndpoint:1] @@ -371,9 +366,9 @@ def zdo_unbind(nwk: str, ep: int, cluster: str, src: str, dst: str) -> list: """Generate Silabs Z3 bind command.""" cluster = get_cluster(cluster) cid = cluster.cluster_id - return [{ - "commandcli": f"zdo unbind unicast {nwk} {{{src}}} {ep} {cid} {{{dst}}} 1" - }] + return [ + {"commandcli": f"zdo unbind unicast {nwk} {{{src}}} {ep} {cid} {{{dst}}} 1"} + ] # zcl global send-me-a-report [cluster:2] [attributeId:2] [dataType:1] [minReportTime:2] [maxReportTime:2] [reportableChange:-1] @@ -381,8 +376,14 @@ def zdo_unbind(nwk: str, ep: int, cluster: str, src: str, dst: str) -> list: # maxReportTime Maximum number of seconds between reports # reportableChange Amount of change to trigger a report def zdb_report( - nwk: str, ep: int, cluster: str, attr: str, mint: int, maxt: int, - change: int, type: int = None + nwk: str, + ep: int, + cluster: str, + attr: str, + mint: int, + maxt: int, + change: int, + type: int = None, ): cluster = get_cluster(cluster) cid = cluster.cluster_id @@ -392,11 +393,12 @@ def zdb_report( type_len = get_type_len(type) change = change.to_bytes(type_len, "little").hex() - return [{ - "commandcli": f"zcl global send-me-a-report {cid} {attr} {type} {mint} {maxt} {{{change}}}" - }, { - "commandcli": f"send {nwk} 1 {ep}" - }] + return [ + { + "commandcli": f"zcl global send-me-a-report {cid} {attr} {type} {mint} {maxt} {{{change}}}" + }, + {"commandcli": f"send {nwk} 1 {ep}"}, + ] # zdo leave [target:2] [removeChildren:1] [rejoin:1]