-
-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathbroadcaster.go
53 lines (45 loc) · 1016 Bytes
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package neffos
import (
"sync"
"sync/atomic"
"unsafe"
)
// async broadcaster, doesn't wait for a publish to complete to all clients before any
// next broadcast call.
type broadcaster struct {
messages []Message
mu *sync.Mutex
awaiter unsafe.Pointer
}
func newBroadcaster() *broadcaster {
ch := make(chan struct{})
awaiter := unsafe.Pointer(&ch)
return &broadcaster{
mu: new(sync.Mutex),
awaiter: awaiter,
}
}
func (b *broadcaster) getAwaiter() <-chan struct{} {
ptr := atomic.LoadPointer(&b.awaiter)
return *((*chan struct{})(ptr))
}
func (b *broadcaster) broadcast(msgs []Message) {
b.mu.Lock()
b.messages = msgs
b.mu.Unlock()
ch := make(chan struct{})
old := atomic.SwapPointer(&b.awaiter, unsafe.Pointer(&ch))
close(*(*chan struct{})(old))
}
func (b *broadcaster) waitUntilClosed(closeCh <-chan struct{}) (msgs []Message, ok bool) {
ch := b.getAwaiter()
b.mu.Unlock()
select {
case <-ch:
msgs = b.messages[:]
ok = true
case <-closeCh:
}
b.mu.Lock()
return
}