forked from itsmontoya/mailbox
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmailbox.go
205 lines (171 loc) · 4.13 KB
/
mailbox.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package mailbox
import (
"sync"
"sync/atomic"
)
// New returns a new instance of Mailbox
func New(sz int) *Mailbox {
mb := Mailbox{
cap: sz,
tail: -1,
s: make([]interface{}, sz),
}
// Initialize the conds
mb.sc = sync.NewCond(&mb.mux)
mb.rc = sync.NewCond(&mb.mux)
return &mb
}
// Mailbox is used to send and receive messages
type Mailbox struct {
mux sync.Mutex
sc *sync.Cond
rc *sync.Cond
s []interface{}
len int
cap int
head int
tail int
closed int32
}
func (m *Mailbox) isClosed() bool {
return atomic.LoadInt32(&m.closed) == 1
}
// rWait is a wait function for receivers
func (m *Mailbox) rWait() (ok bool) {
START:
if m.len > 0 {
// We have at least one unread message, return true
return true
}
if m.isClosed() {
// We have an empty inbox AND we are closed, done bro - done.
return false
}
// Let's wait for a signal..
m.rc.Wait()
// Signal received, let's check again!
goto START
}
// receive is the internal function for receiving messages
func (m *Mailbox) receive() (msg interface{}, state StateCode) {
if !m.rWait() {
// Ok was returned as false, set state to closed and return
state = StateClosed
return
}
// Set message as the current head
msg = m.s[m.head]
// Empty the current head value to avoid any retainment issues
m.s[m.head] = nil
// Goto the next index
if m.head++; m.head == m.cap {
// Our increment falls out of the bounds of our internal slice, reset to 0
m.head = 0
}
// Decrement the length
if m.len--; m.len == m.cap-1 {
// Notify the senders that we have a vacant entry
m.sc.Broadcast()
}
return
}
// send is the internal function used for sending messages
func (m *Mailbox) send(msg interface{}) {
CHECKFREE:
if m.cap-m.len == 0 {
// There are no vacant spots in the inbox, time to wait
m.sc.Wait()
// We received a signal, check again!
goto CHECKFREE
}
// Goto the next index
if m.tail++; m.tail == m.cap {
// Our increment falls out of the bounds of our internal slice, reset to 0
m.tail = 0
}
// Send the new tail as the provided message
m.s[m.tail] = msg
// Increment the length
if m.len++; m.len == 1 {
// Notify the receivers that we new message
m.rc.Broadcast()
}
}
// Send will send a message
func (m *Mailbox) Send(msg interface{}) {
m.mux.Lock()
if m.isClosed() {
goto END
}
m.send(msg)
END:
m.mux.Unlock()
}
// Batch will send a batch of messages
func (m *Mailbox) Batch(msgs ...interface{}) {
m.mux.Lock()
if m.isClosed() {
goto END
}
// Iterate through each message
for _, msg := range msgs {
m.send(msg)
}
END:
m.mux.Unlock()
}
// Receive will receive a message and state (See the "State" constants for more information)
func (m *Mailbox) Receive() (msg interface{}, state StateCode) {
m.mux.Lock()
msg, state = m.receive()
m.mux.Unlock()
return
}
// Listen will return all current and inbound messages until either:
// - The mailbox is empty and closed
// - The end boolean is returned
func (m *Mailbox) Listen(fn func(msg interface{}) (end bool)) (state StateCode) {
var msg interface{}
m.mux.Lock()
// Iterate until break is called
for {
// Get message and state
if msg, state = m.receive(); state != StateOK {
// Receiving was not successful, break
break
}
// Provide message to provided function
if fn(msg) {
// End was returned as true, set state accordingly and break
state = StateEnded
break
}
}
m.mux.Unlock()
return
}
// Close will close a mailbox
func (m *Mailbox) Close() {
// Attempt to set closed state to 1 (from 0)
if !atomic.CompareAndSwapInt32(&m.closed, 0, 1) {
// Already closed, return early
return
}
// Notify senders to attempt to send again
m.sc.Broadcast()
// Notify receivers to attempty to receive again
m.rc.Broadcast()
}
// StateCode represents the state of a response
type StateCode uint8
const (
// StateOK is returned when the request was OK
StateOK StateCode = iota
// StateEmpty is returned when the request was empty
// Note: This will be used when the reject option is implemented
StateEmpty
// StateEnded is returned when the client ends a listening
StateEnded
// StateClosed is returned when the calling mailbox is closed
StateClosed
)