Skip to content

Commit

Permalink
Handle return values
Browse files Browse the repository at this point in the history
- Make sendWallboxRequest public
- Add sendWallboxSetRequest
- Turn set_wallbox_phases into toggle_wallbox_phases
- Specify return values
- Use RscpTag & RscpType
  • Loading branch information
mstv committed Apr 14, 2024
1 parent a59580b commit fc812bd
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 70 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,14 @@ Poll returns a dictionary like the following:
- `set_power_limits()`
- `set_powersave()`
- `set_wallbox_max_charge_current()`
- `set_wallbox_phases()`
- `set_wallbox_schuko()`
- `set_wallbox_sunmode()`
- `set_weather_regulated_charge()`
- `toggle_wallbox_charging()`
- `toggle_wallbox_phases()`

- `sendWallboxRequest()`
- `sendWallboxSetRequest()`

See the full documentation on [ReadTheDocs](https://python-e3dc.readthedocs.io/en/latest/)

Expand Down
220 changes: 151 additions & 69 deletions e3dc/_e3dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,28 +1013,28 @@ def get_wallbox_data(self, wbIndex: int = 0, keepAlive: bool = False):
"""
req = self.sendRequest(
(
"WB_REQ_DATA",
"Container",
RscpTag.WB_REQ_DATA,
RscpType.Container,
[
("WB_INDEX", "UChar8", wbIndex),
("WB_REQ_EXTERN_DATA_ALG", "None", None),
("WB_REQ_EXTERN_DATA_SUN", "None", None),
("WB_REQ_EXTERN_DATA_NET", "None", None),
("WB_REQ_APP_SOFTWARE", "None", None),
("WB_REQ_KEY_STATE", "None", None),
(RscpTag.WB_INDEX, RscpType.UChar8, wbIndex),
(RscpTag.WB_REQ_EXTERN_DATA_ALG, RscpType.NoneType, None),
(RscpTag.WB_REQ_EXTERN_DATA_SUN, RscpType.NoneType, None),
(RscpTag.WB_REQ_EXTERN_DATA_NET, RscpType.NoneType, None),
(RscpTag.WB_REQ_APP_SOFTWARE, RscpType.NoneType, None),
(RscpTag.WB_REQ_KEY_STATE, RscpType.NoneType, None),
],
),
keepAlive=True,
)

outObj = {
"index": rscpFindTagIndex(req, "WB_INDEX"),
"appSoftware": rscpFindTagIndex(req, "WB_APP_SOFTWARE"),
"index": rscpFindTagIndex(req, RscpTag.WB_INDEX),
"appSoftware": rscpFindTagIndex(req, RscpTag.WB_APP_SOFTWARE),
}

extern_data_alg = rscpFindTag(req, "WB_EXTERN_DATA_ALG")
extern_data_alg = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_ALG)
if extern_data_alg is not None:
extern_data = rscpFindTagIndex(extern_data_alg, "WB_EXTERN_DATA")
extern_data = rscpFindTagIndex(extern_data_alg, RscpTag.WB_EXTERN_DATA)
status_byte = extern_data[2]
outObj["sunModeOn"] = (status_byte & 128) != 0
outObj["chargingCanceled"] = (status_byte & 64) != 0
Expand All @@ -1046,149 +1046,231 @@ def get_wallbox_data(self, wbIndex: int = 0, keepAlive: bool = False):
outObj["maxChargeCurrent"] = extern_data[3]
outObj["schukoOn"] = extern_data[5] != 0

extern_data_sun = rscpFindTag(req, "WB_EXTERN_DATA_SUN")
extern_data_sun = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_SUN)
if extern_data_sun is not None:
extern_data = rscpFindTagIndex(extern_data_sun, "WB_EXTERN_DATA")
extern_data = rscpFindTagIndex(extern_data_sun, RscpTag.WB_EXTERN_DATA)
outObj["consumptionSun"] = struct.unpack("h", extern_data[0:2])[0]
outObj["energySun"] = struct.unpack("i", extern_data[2:6])[0]

extern_data_net = rscpFindTag(req, "WB_EXTERN_DATA_NET")
extern_data_net = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_NET)
if extern_data_net is not None:
extern_data = rscpFindTagIndex(extern_data_net, "WB_EXTERN_DATA")
extern_data = rscpFindTagIndex(extern_data_net, RscpTag.WB_EXTERN_DATA)
outObj["consumptionNet"] = struct.unpack("h", extern_data[0:2])[0]
outObj["energyNet"] = struct.unpack("i", extern_data[2:6])[0]

if "energySun" in outObj and "energyNet" in outObj:
outObj["energyAll"] = outObj["energyNet"] + outObj["energySun"]

key_state = rscpFindTag(req, "WB_KEY_STATE")
key_state = rscpFindTag(req, RscpTag.WB_KEY_STATE)
if key_state is not None:
outObj["keyState"] = rscpFindTagIndex(key_state, "WB_KEY_STATE")
outObj["keyState"] = rscpFindTagIndex(key_state, RscpTag.WB_KEY_STATE)

req = self.sendRequest(
("EMS_REQ_BATTERY_TO_CAR_MODE", "None", None),
(RscpTag.EMS_REQ_BATTERY_TO_CAR_MODE, RscpType.NoneType, None),
keepAlive=keepAlive,
)
battery_to_car = rscpFindTag(req, "EMS_BATTERY_TO_CAR_MODE")
battery_to_car = rscpFindTag(req, RscpTag.EMS_BATTERY_TO_CAR_MODE)
if battery_to_car is not None:
outObj["batteryToCar"] = rscpFindTagIndex(
battery_to_car, "EMS_BATTERY_TO_CAR_MODE"
battery_to_car, RscpTag.EMS_BATTERY_TO_CAR_MODE
)

outObj = {k: v for k, v in sorted(outObj.items())}
return outObj

def set_wallbox_sunmode(
self, enable: bool, wbIndex: int = 0, keepAlive: bool = False
):
) -> bool:
"""Sets the sun mode of the wallbox via rscp protocol locally.
Args:
enable (bool): True to enable sun mode, otherwise false,
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
return self.__wallbox_set_extern(0, 1 if enable else 2, wbIndex, keepAlive)
return self.sendWallboxSetRequest(
dataIndex=0, value=1 if enable else 2, wbIndex=wbIndex, keepAlive=keepAlive
)

def set_wallbox_schuko(self, on: bool, wbIndex: int = 0, keepAlive: bool = False):
def set_wallbox_schuko(
self, on: bool, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Sets the Schuko of the wallbox via rscp protocol locally.
Args:
on (bool): True to activate the Schuko, otherwise false
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success (wallbox has understood the request, but might have ignored an unsupported value)
False if error
"""
return self.__wallbox_set_extern(5, 1 if on else 0, wbIndex, keepAlive)
return self.sendWallboxSetRequest(
dataIndex=5, value=1 if on else 0, wbIndex=wbIndex, keepAlive=keepAlive
)

def set_wallbox_max_charge_current(
self, max_charge_current: int, wbIndex: int = 0, keepAlive: bool = False
):
) -> bool:
"""Sets the maximum charge current of the wallbox via rscp protocol locally.
Args:
max_charge_current (int): maximum allowed charge current in A
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success (wallbox has understood the request, but might have clipped the value)
False if error
"""
barry = bytearray([0, 0, max_charge_current, 0, 0, 0])
return self.sendRequest(
(
"WB_REQ_DATA",
"Container",
[
("WB_INDEX", "UChar8", wbIndex),
(
"WB_REQ_SET_PARAM_1",
"Container",
[
("WB_EXTERN_DATA", "ByteArray", barry),
("WB_EXTERN_DATA_LEN", "UChar8", 6),
],
),
],
),
return self.sendWallboxSetRequest(
dataIndex=2,
value=max_charge_current,
request=RscpTag.WB_REQ_SET_PARAM_1,
wbIndex=wbIndex,
keepAlive=keepAlive,
)

def set_wallbox_phases(
self, phases: int, wbIndex: int = 0, keepAlive: bool = False
):
"""Sets the number of phases used for charging on the wallbox via rscp protocol locally.
def toggle_wallbox_charging(
self, wbIndex: int = 0, keepAlive: bool = False
) -> bool:
"""Toggles charging of the wallbox via rscp protocol locally.
Args:
phases (int): number of phases used, valid values are 1 or 3
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
if phases not in [1, 3]:
raise Exception("Invalid phase given, valid values are 1 or 3")
return self.__wallbox_set_extern(3, phases, wbIndex, keepAlive)
return self.sendWallboxSetRequest(
dataIndex=4, value=1, wbIndex=wbIndex, keepAlive=keepAlive
)

def toggle_wallbox_charging(self, wbIndex: int = 0, keepAlive: bool = False):
"""Toggles charging of the wallbox via rscp protocol locally.
def toggle_wallbox_phases(self, wbIndex: int = 0, keepAlive: bool = False) -> bool:
"""Toggles the number of phases used for charging by the wallbox between 1 and 3 via rscp protocol locally.
Args:
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
return self.__wallbox_set_extern(4, 1, wbIndex, keepAlive)
return self.sendWallboxSetRequest(
dataIndex=3, value=1, wbIndex=wbIndex, keepAlive=keepAlive
)

def __wallbox_set_extern(
self, index: int, value: int, wbIndex: int, keepAlive: bool = False
):
barry = bytearray([0, 0, 0, 0, 0, 0])
barry[index] = value
self.sendRequest(
def sendWallboxRequest(
self,
dataIndex: int,
value: int,
request: RscpTag = RscpTag.WB_REQ_SET_EXTERN,
wbIndex: int = 0,
keepAlive: bool = False,
) -> Tuple[str | int | RscpTag, str | int | RscpType, Any]:
"""Sends a low-level request with WB_EXTERN_DATA to the wallbox via rscp protocol locally.
Args:
dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5)
value (int): byte value to be set in the WB_EXTERN_DATA array at the given index
request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2),
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
An object with the received data
"""
dataArray = bytearray([0, 0, 0, 0, 0, 0])
dataArray[dataIndex] = value
result = self.sendRequest(
(
"WB_REQ_DATA",
"Container",
RscpTag.WB_REQ_DATA,
RscpType.Container,
[
("WB_INDEX", "UChar8", wbIndex),
(RscpTag.WB_INDEX, RscpType.UChar8, wbIndex),
(
"WB_REQ_SET_EXTERN",
"Container",
request,
RscpType.Container,
[
("WB_EXTERN_DATA", "ByteArray", barry),
("WB_EXTERN_DATA_LEN", "UChar8", 6),
(RscpTag.WB_EXTERN_DATA, RscpType.ByteArray, dataArray),
(
RscpTag.WB_EXTERN_DATA_LEN,
RscpType.UChar8,
len(dataArray),
),
],
),
],
),
keepAlive=keepAlive,
)
return result

def sendWallboxSetRequest(
self,
dataIndex: int,
value: int,
request: RscpTag = RscpTag.WB_REQ_SET_EXTERN,
wbIndex: int = 0,
keepAlive: bool = False,
) -> bool:
"""Sends a low-level set request with WB_EXTERN_DATA to the wallbox via rscp protocol locally and evaluates the response.
Args:
dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5)
value (int): byte value to be set in the WB_EXTERN_DATA array at the given index
request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2),
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
response = self.sendWallboxRequest(
dataIndex, value, request, wbIndex, keepAlive
)

if response[0] != RscpTag.WB_DATA.name:
return False
responseData = response[2][-1]
return (
responseData[0][2:] == request.name[6:]
and responseData[1] != RscpType.Error.name
)

def set_battery_to_car_mode(self, enabled: bool, keepAlive: bool = False):
"""Sets whether the wallbox may use the battery.
Args:
enabled (bool): True to enable charging the car using the battery
keepAlive (Optional[bool]): True to keep connection alive
Returns:
True if success
False if error
"""
_ = self.sendRequest(
("EMS_REQ_SET_BATTERY_TO_CAR_MODE", "UChar8", 1 if enabled else 0),
enabledValue = 1 if enabled else 0

response = self.sendRequest(
(RscpTag.EMS_REQ_SET_BATTERY_TO_CAR_MODE, RscpType.UChar8, enabledValue),
keepAlive=keepAlive,
)

return response == (
RscpTag.EMS_SET_BATTERY_TO_CAR_MODE.name,
RscpType.UChar8.name,
enabledValue,
)

def get_batteries(self, keepAlive: bool = False):
"""Scans for installed batteries via rscp protocol.
Expand Down

0 comments on commit fc812bd

Please sign in to comment.