-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscriber.go
111 lines (100 loc) · 2.71 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"encoding/json"
"fmt"
"log"
"net"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/grandcat/zeroconf"
)
type subscribedService struct {
record ServiceRecord
server *zeroconf.Server
}
type Subscriber struct {
interfaces []net.Interface
mutex sync.Mutex
services map[ServiceIdentity]subscribedService
}
func NewSubscriber(client mqtt.Client, topic string, iface net.Interface) (*Subscriber, error) {
s := Subscriber{
interfaces: []net.Interface{iface},
services: map[ServiceIdentity]subscribedService{},
}
t := client.Subscribe(topic, 1, s.handleMessage)
if t.WaitTimeout(10 * time.Second) {
if t.Error() != nil {
return nil, t.Error()
}
} else {
return nil, fmt.Errorf("timeout subscribing to MQTT topic")
}
return &s, nil
}
func (s *Subscriber) HasService(identity ServiceIdentity) bool {
s.mutex.Lock()
_, ok := s.services[identity]
s.mutex.Unlock()
return ok
}
func (s *Subscriber) handleMessage(_ mqtt.Client, message mqtt.Message) {
if len(message.Payload()) == 0 {
s.mutex.Lock()
for _, service := range s.services {
s.removeService(service)
}
s.services = map[ServiceIdentity]subscribedService{}
s.mutex.Unlock()
return
}
m := Message{}
err := json.Unmarshal(message.Payload(), &m)
if err != nil {
log.Println("Failed to deserialize message", err)
return
}
seenServices := map[ServiceIdentity]interface{}{}
for _, record := range m.Records {
seenServices[record.ServiceIdentity] = struct{}{}
s.mutex.Lock()
existingService, ok := s.services[record.ServiceIdentity]
if ok {
if record.Equal(existingService.record) {
s.mutex.Unlock()
continue
}
existingService.server.Shutdown()
}
s.mutex.Unlock()
if ok {
log.Println("Updated remote service, announcing", record.Instance)
} else {
log.Println("Received remote service, announcing", record.Instance)
}
ips := []string{}
for _, ip := range record.IPs {
ips = append(ips, ip.String())
}
s.mutex.Lock()
server, err := zeroconf.RegisterProxy(record.Instance, record.Service, record.Domain, record.Port, record.HostName, ips, record.TXTRecords, s.interfaces)
if err != nil {
log.Println("Failed to publish zone", err)
s.mutex.Unlock()
continue
}
s.services[record.ServiceIdentity] = subscribedService{record: record, server: server}
s.mutex.Unlock()
}
for _, service := range s.services {
if _, ok := seenServices[service.record.ServiceIdentity]; !ok {
s.removeService(service)
delete(s.services, service.record.ServiceIdentity)
}
}
}
func (s *Subscriber) removeService(service subscribedService) {
log.Println("Remote service has stopped, no longer announcing", service.record.Instance)
service.server.Shutdown()
}