Skip to content

Commit

Permalink
discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
neilbacon committed Aug 28, 2023
1 parent 39e7187 commit d25f93d
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 60 deletions.
14 changes: 13 additions & 1 deletion adapters/devices/sofar/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,20 @@ func NewSofarLogger(serialNumber uint, connPort ports.CommunicationPort, attrWhi
}
}

func (s *Logger) nameFilter(k string) bool {
ok := len(s.attrWhiteList) == 0
if !ok { // TODO: also handle attrBlackList
_, ok = s.attrWhiteList[k]
}
return ok
}

func (s *Logger) GetDiscoveryFields() []ports.DiscoveryField {
return getDiscoveryFields(s.nameFilter)
}

func (s *Logger) Query() (map[string]interface{}, error) {
return readData(s.connPort, s.serialNumber, s.attrWhiteList, s.attrBlackList)
return readData(s.connPort, s.serialNumber, s.nameFilter)
}

func (s *Logger) Name() string {
Expand Down
9 changes: 2 additions & 7 deletions adapters/devices/sofar/lsw.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (l LSWRequest) checksum(buf []byte) uint8 {
return checksum
}

func readData(connPort ports.CommunicationPort, serialNumber uint, attrWhiteList map[string]struct{}, attrBlackList []string) (map[string]interface{}, error) {
func readData(connPort ports.CommunicationPort, serialNumber uint, nameFilter func(string) bool) (map[string]interface{}, error) {
result := make(map[string]interface{})

for _, rr := range allRegisterRanges {
Expand All @@ -82,12 +82,7 @@ func readData(connPort ports.CommunicationPort, serialNumber uint, attrWhiteList
return nil, err
}
for k, v := range reply {
ok := len(attrWhiteList) == 0
if !ok { // TODO: also handle attrBlackList
_, ok = attrWhiteList[k]
}
// log.Printf("readData: %s %v", k, ok)
if ok {
if nameFilter(k) {
result[k] = v
}
}
Expand Down
16 changes: 16 additions & 0 deletions adapters/devices/sofar/sofar_protocol.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package sofar

import (
"github.com/kubaceg/sofar_g3_lsw3_logger_reader/ports"
)

type field struct {
register int
name string
Expand Down Expand Up @@ -37,6 +41,18 @@ func GetAllRegisterNames() []string {
return result
}

func getDiscoveryFields(nameFilter func(string) bool) []ports.DiscoveryField {
result := make([]ports.DiscoveryField, 0)
for _, rr := range allRegisterRanges {
for _, f := range rr.replyFields {
if f.name != "" && f.valueType != "" && nameFilter(f.name) {
result = append(result, ports.DiscoveryField{Name: f.name, Factor: f.factor, Unit: f.unit})
}
}
}
return result
}

var rrSystemInfo = registerRange{
start: 0x400,
end: 0x43a,
Expand Down
74 changes: 47 additions & 27 deletions adapters/export/mosquitto/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/kubaceg/sofar_g3_lsw3_logger_reader/ports"
)

type MqttConfig struct {
Url string `yaml:"url"`
User string `yaml:"user"`
Password string `yaml:"password"`
Prefix string `yaml:"prefix"`
Url string `yaml:"url"`
User string `yaml:"user"`
Password string `yaml:"password"`
Discovery string `yaml:"discovery"`
State string `yaml:"state"`
}

type Connection struct {
client mqtt.Client
prefix string
state string
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
Expand Down Expand Up @@ -46,7 +49,7 @@ func New(config *MqttConfig) (*Connection, error) {

conn := &Connection{}
conn.client = mqtt.NewClient(opts)
conn.prefix = config.Prefix
conn.state = config.State
if token := conn.client.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
Expand All @@ -55,41 +58,58 @@ func New(config *MqttConfig) (*Connection, error) {

}

func publish(conn *Connection, k string, v interface{}) {
token := conn.client.Publish(fmt.Sprintf("%s/%s", conn.prefix, k), 0, true, fmt.Sprintf("%v", v))
func (conn *Connection) publish(topic string, msg string, retain bool) {
token := conn.client.Publish(topic, 0, retain, msg)
res := token.WaitTimeout(1 * time.Second)
if !res || token.Error() != nil {
log.Printf("error inserting to MQTT: %s", token.Error())
}
}

// next thing to do is add discovery
// func discovery() {
// MQTT Discovery: https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery

// homeassistant/sensor/inverter/config
// payload
// unique_id: PV_Generation_Today01ad
// state_topic: "homeassistant/sensor/inverter/PV_Generation_Today"
// device_class: energy
// state_class: measurement
// unit_of_measurement: 'kWh'
// value_template: "{{ value|int * 0.01 }}"```
// return "power" for kW etc., "energy" for kWh etc.
func unit2DeviceClass(unit string) string {
if strings.HasSuffix(unit, "Wh") {
return "energy"
} else if strings.HasSuffix(unit, "W") {
return "power"
} else {
return ""
}
}

// {"name": null, "device_class": "motion", "state_topic": "homeassistant/binary_sensor/garden/state", "unique_id": "motion01ad", "device": {"identifiers": ["01ad"], "name": "Garden" }}
//}
// MQTT Discovery: https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery
func (conn *Connection) InsertDiscoveryRecord(discovery string, state string, fields []ports.DiscoveryField) error {
uniq := "01ad" // TODO: get from config?
for _, f := range fields {
topic := fmt.Sprintf("%s/%s/config", discovery, f.Name)
json, _ := json.Marshal(map[string]interface{}{
"name": f.Name,
"unique_id": fmt.Sprintf("%s_%s", f.Name, uniq),
"device_class": unit2DeviceClass(f.Unit),
// "state_class": "measurement",
"state_topic": state,
"unit_of_measurement": f.Unit,
"value_template": fmt.Sprintf("{{ value_json.%s|int * %s }}", f.Name, f.Factor),
"device": map[string]interface{}{
"identifiers": [...]string{fmt.Sprintf("Inverter_%s", uniq)},
"name": "Inverter",
},
})
conn.publish(topic, string(json), true) // MQTT Discovery messages should be retained, but in dev it can become a pain
}
return nil
}

func (conn *Connection) InsertRecord(measurement map[string]interface{}) error {
// make a copy
m := make(map[string]interface{}, len(measurement))
for k, v := range measurement {
m[k] = v
}
// add LastTimestamp
m["LastTimestamp"] = time.Now().UnixNano() / int64(time.Millisecond)
all, _ := json.Marshal(m)
m["All"] = string(all)
for k, v := range m {
publish(conn, k, v)
}
json, _ := json.Marshal(m)
conn.publish(conn.state, string(json), false) // state messages should not be retained
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions config-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ inverter:
#
# if there is a non-empty attrWhiteList then only these explicitly listed attributes are output
attrWhiteList:
- ActivePower_Output_Total # Total PV generation (10W)
- ActivePower_Output_Total # Total PV generation (in units of 10W)
- Power_PV1 # PV output of string 1 (10W)
- Power_PV2 # PV output of string 2 (10W); PV1 + PV2 > ActivePower_Output_Total by about 3.5% due to inverter inefficiency?
- ActivePower_Load_Sys # total power consumption (10W)
- ActivePower_PCC_Total # grid export (+) or import (-) (10W) = ActivePower_Output_Total - ActivePower_Load_Sys (so it is redundant data)
- PV_Generation_Today # generation since midnight (10Wh)
# else attributes are output unless they match a regex in attrBlackList
attrBlackList:
- "^[ST]_" # prefix R_, S_, T_ for 3 phases, only R_ used in single phase systems
Expand All @@ -21,7 +22,8 @@ mqtt: # MQTT disabled if url & prefix both blank
url: 1.2.3.4:1883 # MQTT broker URL (e.g. 1.2.3.4:1883)
user: # MQTT username (leave empty when not needed)
password: # MQTT password (leave empty when not needed)
prefix: /sensors/energy/inverter # topic prefix on which data will be sent
discovery: homeassistant/sensor # topic prefix for MQTT Discovery
state: energy/inverter/state # topic for state

otlp: # OTLP disabled if both urls blank
grpc:
Expand Down
41 changes: 18 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/kubaceg/sofar_g3_lsw3_logger_reader/adapters/devices/sofar"
"github.com/kubaceg/sofar_g3_lsw3_logger_reader/adapters/export/mosquitto"
"github.com/kubaceg/sofar_g3_lsw3_logger_reader/adapters/export/otlp"

"github.com/kubaceg/sofar_g3_lsw3_logger_reader/ports"
)

Expand All @@ -40,7 +39,7 @@ func initialize() {
log.Fatalln(err)
}

hasMQTT = config.Mqtt.Url != "" && config.Mqtt.Prefix != ""
hasMQTT = config.Mqtt.Url != "" && config.Mqtt.State != ""
hasOTLP = config.Otlp.Grpc.Url != "" || config.Otlp.Http.Url != ""

if isSerialPort(config.Inverter.Port) {
Expand Down Expand Up @@ -71,6 +70,11 @@ func initialize() {

func main() {
initialize()

if hasMQTT {
mqtt.InsertDiscoveryRecord(config.Mqtt.Discovery, config.Mqtt.State, device.GetDiscoveryFields())
}

failedConnections := 0

for {
Expand All @@ -92,35 +96,26 @@ func main() {
failedConnections = 0

if hasMQTT {
go func() {
err = mqtt.InsertRecord(measurements)
if err != nil {
log.Printf("failed to insert record to MQTT: %s\n", err)
} else {
log.Println("measurements pushed to MQTT")
}
}()
// removed from async go func 'goroutine', not needed and proper usage requires WaitGroup to wait for completion
mqtt.InsertRecord(measurements) // logs errors, always returns nil
}

if hasOTLP {
go func() {
err = telem.CollectAndPushMetrics(context.Background(), measurements)
if err != nil {
log.Printf("error recording telemetry: %s\n", err)
} else {
log.Println("measurements pushed via OLTP")
}
}()
// removed from async go func 'goroutine'
err = telem.CollectAndPushMetrics(context.Background(), measurements)
if err != nil {
log.Printf("error recording telemetry: %s\n", err)
} else {
log.Println("measurements pushed via OLTP")
}
}

// if mqtt & otlp were done async then the WaitGroup to wait for completion would go here
duration := time.Since(timeStart)

delay := time.Duration(config.Inverter.ReadInterval)*time.Second - duration
if delay <= 0 {
delay = 1 * time.Second
if delay > 0 {
time.Sleep(delay)
}

time.Sleep(delay)
}

}
Expand Down
1 change: 1 addition & 0 deletions ports/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ports
import mqtt "github.com/eclipse/paho.mqtt.golang"

type Database interface {
InsertDiscoveryRecord(discovery string, prefix string, fields []DiscoveryField) error
InsertRecord(measurement map[string]interface{}) error
}

Expand Down
8 changes: 8 additions & 0 deletions ports/devices.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package ports

// support MQTT Discovery
type DiscoveryField struct {
Name string
Factor string
Unit string
}

type Device interface {
Name() string
GetDiscoveryFields() []DiscoveryField
Query() (map[string]interface{}, error)
}
13 changes: 13 additions & 0 deletions sh/cleanmqtt.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash
# delete all "retained" messages
echo "cleaning " $1 " :: usage: cleanmqtt <host>"
host="$1"
# get your MQTT credentials from HA UI: Settings > Devices & Services > MQTT > Configure > Reconfigure MQTT
user=homeassistant
paswd="yourPassword"

mosquitto_sub -h "$host" -u "$user" -P "$paswd" -t "#" -v --retained-only | while read topic value
do
echo "cleaning topic $topic"
mosquitto_pub -h "$host" -u "$user" -P "$paswd" -t "$topic" -r -n
done

0 comments on commit d25f93d

Please sign in to comment.