-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathevents.go
136 lines (114 loc) · 2.46 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package hermes
import (
"context"
"errors"
"sync"
)
var _ (Pubsub) = (*pubsub)(nil)
// Status defines delivery status
type Status string
// Possible delivery statuses
const (
Unknown Status = "unknown"
Sent Status = "sent"
Failed Status = "failed"
Pending Status = "pending"
Submitted Status = "submitted"
Rejected Status = "rejected"
)
// St converts int to Status
func St(in int) Status {
switch in {
case 1:
return Sent
case 2:
return Failed
case 4:
return Pending
case 8:
return Submitted
case 16:
return Rejected
default:
return Unknown
}
}
// Event defines delivery event
type Event struct {
ID string `json:"id"`
Status Status `json:"status,string"`
Recipient string `json:"recipient"`
}
// Pubsub recieves message delivery events and generates events
type Pubsub interface {
//Subcribe to receive message delivery event
Subscribe(context.Context, string) (<-chan Event, error)
//Publish a new message delivery event
Publish(ctx context.Context, event Event)
// Done instructs pubsub to close the event channel when we are done reading
Done(context.Context, string) error
//Close the Pubsub
Close()
}
type pubsub struct {
mu sync.RWMutex
sink map[string]chan Event
closed bool
}
// NewPubsub creates a new in-memory pubsub instance
func NewPubsub() Pubsub {
ps := &pubsub{}
ps.sink = make(map[string]chan Event)
ps.closed = false
return ps
}
// the channel is closed of by the on-off subscriber
func (ps *pubsub) Subscribe(ctx context.Context, topic string) (<-chan Event, error) {
if ps.exists(topic) {
return nil, errors.New("already subscribed to this events")
}
ps.mu.Lock()
event := make(chan Event, 1)
ps.sink[topic] = event
ps.mu.Unlock()
go func() {
<-ctx.Done()
ps.mu.Lock()
delete(ps.sink, topic)
ps.mu.Unlock()
}()
return event, nil
}
func (ps *pubsub) Publish(ctx context.Context, event Event) {
ps.mu.RLock()
defer ps.mu.RUnlock()
sub := ps.sink[event.ID]
sub <- event
}
func (ps *pubsub) Done(ctx context.Context, topic string) error {
ps.mu.RLock()
defer ps.mu.RUnlock()
ch, ok := ps.sink[topic]
if !ok {
return errors.New("topic doesn't exists")
}
close(ch)
delete(ps.sink, topic)
return nil
}
func (ps *pubsub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.closed {
ps.closed = true
for _, ch := range ps.sink {
close(ch)
}
}
}
func (ps *pubsub) exists(topic string) bool {
ps.mu.Lock()
defer ps.mu.Unlock()
_, ok := ps.sink[topic]
return ok
}