forked from RalfZim/venus.dbus-fronius-smartmeter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbus-hass-smartmeter.py
executable file
·191 lines (162 loc) · 6.83 KB
/
dbus-hass-smartmeter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/python3 -u
# -*- coding: utf-8 -*-
"""
Created by Ralf Zimmermann ([email protected]) in 2020.
This code and its documentation can be found on: https://github.com/RalfZim/venus.dbus-fronius-smartmeter
Used https://github.com/victronenergy/velib_python/blob/master/dbusdummyservice.py as basis for this service.
Reading information from the Home Assistant Smart Meter via http REST API and puts the info on dbus.
"""
try:
import gobject # Python 2.x
except:
from gi.repository import GLib as gobject # Python 3.x
import platform
import logging
import sys
import os
import requests # for http GET
try:
import thread # for daemon = True / Python 2.x
except:
import _thread as thread # for daemon = True / Python 3.x
# our own packages
sys.path.insert(1, os.path.join(os.path.dirname(__file__), "../ext/velib_python"))
from vedbus import VeDbusService
from ve_utils import exit_on_error
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
path_UpdateIndex = "/UpdateIndex"
with open(".token", "rt") as f:
TOKEN = f.read().strip()
def get_value(entity_id):
meter_url = f"http://192.168.227.11:8123/api/states/{entity_id}"
header = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
}
meter_r = requests.get(
url=meter_url,
headers=header,
timeout=5,
) # request data from the Home Assistant PV inverter
meter_data = meter_r.json() # convert JSON data
return float(meter_data["state"])
class DbusDummyService:
def __init__(
self,
servicename,
deviceinstance,
paths,
productname="Home Assistant Smart Meter",
connection="Home Assistant Smart Meter service",
):
self._dbusservice = VeDbusService(servicename)
self._paths = paths
logging.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance))
# Create the management objects, as specified in the ccgx dbus-api document
self._dbusservice.add_path("/Mgmt/ProcessName", __file__)
self._dbusservice.add_path(
"/Mgmt/ProcessVersion",
"Unkown version, and running on Python " + platform.python_version(),
)
self._dbusservice.add_path("/Mgmt/Connection", connection)
# Create the mandatory objects
self._dbusservice.add_path("/DeviceInstance", deviceinstance)
self._dbusservice.add_path(
"/ProductId", 16
) # value used in ac_sensor_bridge.cpp of dbus-cgwacs
self._dbusservice.add_path("/ProductName", productname)
self._dbusservice.add_path("/FirmwareVersion", 0.1)
self._dbusservice.add_path("/HardwareVersion", 0)
self._dbusservice.add_path("/Connected", 1)
for path, settings in self._paths.items():
self._dbusservice.add_path(
path,
settings["initial"],
writeable=True,
onchangecallback=self._handlechangedvalue,
gettextcallback=settings.get("unit"),
)
gobject.timeout_add(1000, self._update) # pause 200ms before the next request
def _update(self):
bezug = exit_on_error(get_value, "sensor.netz_bezug")
einspeisung = exit_on_error(get_value, "sensor.netz_einspeisung")
u1 = exit_on_error(get_value, "sensor.netz_u1")
u2 = exit_on_error(get_value, "sensor.netz_u2")
u3 = exit_on_error(get_value, "sensor.netz_u3")
i1 = exit_on_error(get_value, "sensor.netz_i1")
i2 = exit_on_error(get_value, "sensor.netz_i2")
i3 = exit_on_error(get_value, "sensor.netz_i3")
logging.debug(f"{bezug=}, {einspeisung=}")
meter_consumption = bezug - einspeisung
faktor = meter_consumption * 1000.0 / (u1 * i1 + u2 * i2 + u3 * i3)
self._dbusservice["/Ac/Power"] = (
1000.0 * meter_consumption
) # positive: consumption, negative: feed into grid
self._dbusservice["/Ac/L1/Voltage"] = u1
self._dbusservice["/Ac/L2/Voltage"] = u2
self._dbusservice["/Ac/L3/Voltage"] = u3
self._dbusservice["/Ac/L1/Current"] = i1
self._dbusservice["/Ac/L2/Current"] = i2
self._dbusservice["/Ac/L3/Current"] = i3
self._dbusservice["/Ac/L1/Power"] = u1 * i1 * faktor
self._dbusservice["/Ac/L2/Power"] = u2 * i2 * faktor
self._dbusservice["/Ac/L3/Power"] = u3 * i3 * faktor
self._dbusservice["/Ac/Energy/Forward"] = get_value("sensor.zahlerstand_bezug")
self._dbusservice["/Ac/Energy/Reverse"] = get_value(
"sensor.zahlerstand_einspeisung"
)
logging.info("House Consumption: {:.2f}".format(meter_consumption))
# increment UpdateIndex - to show that new data is available
index = self._dbusservice[path_UpdateIndex] + 1 # increment index
if index > 255: # maximum value of the index
index = 0 # overflow from 255 to 0
self._dbusservice[path_UpdateIndex] = index
return True
def _handlechangedvalue(self, path, value):
logging.debug("someone else updated %s to %s" % (path, value))
return True # accept the change
def main():
logging.basicConfig(level=logging.INFO) # use .INFO for less logging
thread.daemon = True # allow the program to quit
from dbus.mainloop.glib import DBusGMainLoop
# Have a mainloop, so we can send/receive asynchronous calls to and from dbus
DBusGMainLoop(set_as_default=True)
_kwh = lambda p, v: (f"{v:.0f}kWh")
_a = lambda p, v: (f"{v:.1f}A")
_w = lambda p, v: (f"{v:.0f}W")
_v = lambda p, v: (f"{v:.1f}V")
pvac_output = DbusDummyService(
servicename="com.victronenergy.grid.ha1",
deviceinstance=30,
paths={
"/Ac/Power": {"initial": 0, "unit": _w},
"/Ac/L1/Voltage": {"initial": 0, "unit": _v},
"/Ac/L2/Voltage": {"initial": 0, "unit": _v},
"/Ac/L3/Voltage": {"initial": 0, "unit": _v},
"/Ac/L1/Current": {"initial": 0, "unit": _a},
"/Ac/L2/Current": {"initial": 0, "unit": _a},
"/Ac/L3/Current": {"initial": 0, "unit": _a},
"/Ac/L1/Power": {"initial": 0, "unit": _w},
"/Ac/L2/Power": {"initial": 0, "unit": _w},
"/Ac/L3/Power": {"initial": 0, "unit": _w},
"/Ac/Energy/Forward": {
"initial": 0,
"unit": _kwh,
}, # energy bought from the grid
"/Ac/Energy/Reverse": {
"initial": 0,
"unit": _kwh,
}, # energy sold to the grid
path_UpdateIndex: {"initial": 0},
},
)
logging.info(
"Connected to dbus, and switching over to gobject.MainLoop() (= event based)"
)
mainloop = gobject.MainLoop()
mainloop.run()
if __name__ == "__main__":
main()