Skip to content

Commit

Permalink
Fix support Hass v2023.1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Jan 9, 2023
1 parent aba79b4 commit 26cd982
Showing 1 changed file with 63 additions and 61 deletions.
124 changes: 63 additions & 61 deletions custom_components/xiaomi_gateway3/core/converters/silabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

from zigpy.types import EUI64
from zigpy.zcl import Cluster
from zigpy.zcl.foundation import Command, ZCLHeader, Attribute, \
ReadAttributeRecord, DATA_TYPES
from zigpy.zcl.foundation import ZCLHeader, Attribute, ReadAttributeRecord, DATA_TYPES
from zigpy.zdo import ZDO
from zigpy.zdo.types import ZDOCmd, SizePrefixedSimpleDescriptor, \
NodeDescriptor
from zigpy.zdo.types import ZDOCmd, SizePrefixedSimpleDescriptor, NodeDescriptor

try:
from zigpy.zcl.foundation import GeneralCommand as Command
except Exception:
from zigpy.zcl.foundation import Command

_LOGGER = logging.getLogger(__name__)

Expand All @@ -27,16 +30,12 @@ def decode(data: dict):
else:
zdo = CLUSTERS["zdo"]

cluster_id = int(data['clusterId'], 0)
raw = bytes.fromhex(data['APSPlayload'][2:])
cluster_id = int(data["clusterId"], 0)
raw = bytes.fromhex(data["APSPlayload"][2:])
hdr, args = zdo.deserialize(cluster_id, raw)
cmd = ZDOCmd(hdr.command_id).name
if hdr.command_id == ZDOCmd.Active_EP_rsp:
return {
"command": cmd,
"status": str(args[0]),
"endpoints": args[2]
}
return {"command": cmd, "status": str(args[0]), "endpoints": args[2]}
elif hdr.command_id == ZDOCmd.Simple_Desc_rsp:
desc: SizePrefixedSimpleDescriptor = args[2]
return {
Expand All @@ -47,7 +46,7 @@ def decode(data: dict):
"endpoint": desc.endpoint,
"input_clusters": desc.input_clusters,
"output_clusters": desc.output_clusters,
"profile": desc.profile
"profile": desc.profile,
}
elif hdr.command_id == ZDOCmd.Node_Desc_rsp:
desc: NodeDescriptor = args[2]
Expand All @@ -63,9 +62,7 @@ def decode(data: dict):
"command": cmd,
"status": str(args[0]),
}
elif hdr.command_id in (
ZDOCmd.Node_Desc_req, ZDOCmd.Active_EP_req
):
elif hdr.command_id in (ZDOCmd.Node_Desc_req, ZDOCmd.Active_EP_req):
return {"command": cmd}
elif hdr.command_id == ZDOCmd.Simple_Desc_req:
return {
Expand All @@ -78,7 +75,7 @@ def decode(data: dict):
"src_addr": args[0],
"src_endpoint": args[1],
"cluster": args[2],
"dst_addr": args[3]
"dst_addr": args[3],
}
elif hdr.command_id == ZDOCmd.IEEE_addr_rsp:
return {
Expand Down Expand Up @@ -112,14 +109,14 @@ def decode(data: dict):
raise NotImplemented

# decode ZCL
cluster_id = int(data['clusterId'], 0)
cluster_id = int(data["clusterId"], 0)
if cluster_id not in CLUSTERS:
cluster = CLUSTERS[cluster_id] = Cluster.from_id(None, cluster_id)
cluster._log = lambda *_, **__: None
else:
cluster = CLUSTERS[cluster_id]

raw = bytes.fromhex(data['APSPlayload'][2:])
raw = bytes.fromhex(data["APSPlayload"][2:])
try:
hdr, args = cluster.deserialize(raw)
hdr: ZCLHeader
Expand All @@ -141,9 +138,11 @@ def decode(data: dict):
if hdr.frame_control.is_general:
payload["command"] = Command(hdr.command_id).name

if (hdr.command_id == Command.Report_Attributes or
hdr.command_id == Command.Write_Attributes):
attrs, = args
if (
hdr.command_id == Command.Report_Attributes
or hdr.command_id == Command.Write_Attributes
):
(attrs,) = args
for attr in attrs:
assert isinstance(attr, Attribute)
if attr.attrid in cluster.attributes:
Expand All @@ -154,16 +153,15 @@ def decode(data: dict):
value = attr.value.value
if isinstance(value, bytes) and value:
payload[name] = "0x" + value.hex()
elif isinstance(value, list) and \
not isinstance(value, EUI64):
elif isinstance(value, list) and not isinstance(value, EUI64):
payload[name] = [v.value for v in value]
elif isinstance(value, int):
payload[name] = int(value)
else:
payload[name] = value

elif hdr.command_id == Command.Read_Attributes_rsp:
attrs, = args
(attrs,) = args
for attr in attrs:
assert isinstance(attr, ReadAttributeRecord)
if attr.attrid in cluster.attributes:
Expand All @@ -185,20 +183,22 @@ def decode(data: dict):
payload[name] = str(attr.status)

elif hdr.command_id == Command.Read_Attributes:
attrs, = args
(attrs,) = args
payload["value"] = attrs

elif hdr.command_id == Command.Configure_Reporting:
attrs, = args
(attrs,) = args
# fix __repr__ bug
for attr in attrs:
if not hasattr(attr, "reportable_change"):
attr.reportable_change = None
payload["value"] = attrs

elif (hdr.command_id == Command.Write_Attributes_rsp or
hdr.command_id == Command.Configure_Reporting_rsp):
resp, = args
elif (
hdr.command_id == Command.Write_Attributes_rsp
or hdr.command_id == Command.Configure_Reporting_rsp
):
(resp,) = args
payload["status"] = [str(attr.status) for attr in resp]

elif hdr.command_id == Command.Discover_Commands_Received_rsp:
Expand Down Expand Up @@ -241,8 +241,7 @@ def decode(data: dict):
def get_cluster(cluster: str) -> Cluster:
# noinspection PyProtectedMember
return next(
cls for cls in Cluster._registry.values() if
cls.ep_attribute == cluster
cls for cls in Cluster._registry.values() if cls.ep_attribute == cluster
)


Expand All @@ -253,10 +252,7 @@ def get_attr(attributes: dict, attr) -> int:


def get_attr_type(attributes: dict, attr: str) -> (int, int):
attr, attr_type = next(
(k, v[1]) for k, v in attributes.items()
if v[0] == attr
)
attr, attr_type = next((k, v[1]) for k, v in attributes.items() if v[0] == attr)
type = next(k for k, v in DATA_TYPES.items() if v[1] == attr_type)
return attr, type

Expand All @@ -274,10 +270,7 @@ def get_type_len(type: int) -> int:
def zcl_on_off(nwk: str, ep: int, value: bool) -> list:
"""Generate Silabs Z3 command (cluster 6)."""
value = "on" if value else "off"
return [
{"commandcli": f"zcl on-off {value}"},
{"commandcli": f"send {nwk} 1 {ep}"}
]
return [{"commandcli": f"zcl on-off {value}"}, {"commandcli": f"send {nwk} 1 {ep}"}]


# zcl level-control mv-to-level [level:1] [transitionTime:2] [optionMask:1] [optionOverride:1]
Expand All @@ -286,7 +279,7 @@ def zcl_level(nwk: str, ep: int, br: int, tr: float) -> list:
tr = int(tr * 10.0) # zcl format - tenths of a seconds
return [
{"commandcli": f"zcl level-control o-mv-to-level {br} {tr}"},
{"commandcli": f"send {nwk} 1 {ep}"}
{"commandcli": f"send {nwk} 1 {ep}"},
]


Expand All @@ -295,7 +288,7 @@ def zcl_color(nwk: str, ep: int, ct: int, tr: float) -> list:
tr = int(tr * 10.0) # zcl format - tenths of a seconds
return [
{"commandcli": f"zcl color-control movetocolortemp {ct} {tr} 0 0"},
{"commandcli": f"send {nwk} 1 {ep}"}
{"commandcli": f"send {nwk} 1 {ep}"},
]


Expand Down Expand Up @@ -324,14 +317,20 @@ def zcl_read(nwk: str, ep: int, cluster: Union[str, int], *attrs) -> list:

return [
{"commandcli": f"zcl global read {cid} {attrs[0]}"},
{"commandcli": f"send {nwk} 1 {ep}"}
{"commandcli": f"send {nwk} 1 {ep}"},
]


# zcl global write [cluster:2] [attributeId:2] [type:4] [data:-1]
def zcl_write(
nwk: str, ep: int, cluster: Union[str, int], attr: Union[str, int],
data: int, *, mfg: int = None, type: int = None
nwk: str,
ep: int,
cluster: Union[str, int],
attr: Union[str, int],
data: int,
*,
mfg: int = None,
type: int = None,
) -> list:
"""Generate Silabs Z3 write attribute command."""
if isinstance(cluster, str):
Expand All @@ -347,12 +346,10 @@ def zcl_write(
# data always string hex: {FFFF}
data = data.to_bytes(type_len, "little").hex()

pre = [
{"commandcli": f"zcl mfg-code {mfg}"}
] if mfg is not None else []
pre = [{"commandcli": f"zcl mfg-code {mfg}"}] if mfg is not None else []
return pre + [
{"commandcli": f"zcl global write {cluster} {attr} {type} {{{data}}}"},
{"commandcli": f"send {nwk} 1 {ep}"}
{"commandcli": f"send {nwk} 1 {ep}"},
]


Expand All @@ -361,28 +358,32 @@ def zdo_bind(nwk: str, ep: int, cluster: str, src: str, dst: str) -> list:
"""Generate Silabs Z3 bind command."""
cluster = get_cluster(cluster)
cid = cluster.cluster_id
return [{
"commandcli": f"zdo bind {nwk} {ep} 1 {cid} {{{src}}} {{{dst}}}"
}]
return [{"commandcli": f"zdo bind {nwk} {ep} 1 {cid} {{{src}}} {{{dst}}}"}]


# zdo unbind unicast [target:2] [source eui64:8] [source endpoint:1] [clusterID:2] [destinationEUI64:8] [destEndpoint:1]
def zdo_unbind(nwk: str, ep: int, cluster: str, src: str, dst: str) -> list:
"""Generate Silabs Z3 bind command."""
cluster = get_cluster(cluster)
cid = cluster.cluster_id
return [{
"commandcli": f"zdo unbind unicast {nwk} {{{src}}} {ep} {cid} {{{dst}}} 1"
}]
return [
{"commandcli": f"zdo unbind unicast {nwk} {{{src}}} {ep} {cid} {{{dst}}} 1"}
]


# zcl global send-me-a-report [cluster:2] [attributeId:2] [dataType:1] [minReportTime:2] [maxReportTime:2] [reportableChange:-1]
# minReportTime Minimum number of seconds between reports
# maxReportTime Maximum number of seconds between reports
# reportableChange Amount of change to trigger a report
def zdb_report(
nwk: str, ep: int, cluster: str, attr: str, mint: int, maxt: int,
change: int, type: int = None
nwk: str,
ep: int,
cluster: str,
attr: str,
mint: int,
maxt: int,
change: int,
type: int = None,
):
cluster = get_cluster(cluster)
cid = cluster.cluster_id
Expand All @@ -392,11 +393,12 @@ def zdb_report(

type_len = get_type_len(type)
change = change.to_bytes(type_len, "little").hex()
return [{
"commandcli": f"zcl global send-me-a-report {cid} {attr} {type} {mint} {maxt} {{{change}}}"
}, {
"commandcli": f"send {nwk} 1 {ep}"
}]
return [
{
"commandcli": f"zcl global send-me-a-report {cid} {attr} {type} {mint} {maxt} {{{change}}}"
},
{"commandcli": f"send {nwk} 1 {ep}"},
]


# zdo leave [target:2] [removeChildren:1] [rejoin:1]
Expand Down

0 comments on commit 26cd982

Please sign in to comment.