-
Notifications
You must be signed in to change notification settings - Fork 32
/
fixed_fifo_queue.go
169 lines (144 loc) · 5.12 KB
/
fixed_fifo_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package goconcurrentqueue
import (
"context"
)
// Fixed capacity FIFO (First In First Out) concurrent queue
type FixedFIFO struct {
queue chan interface{}
lockChan chan struct{}
// queue for watchers that will wait for next elements (if queue is empty at DequeueOrWaitForNextElement execution )
waitForNextElementChan chan chan interface{}
}
func NewFixedFIFO(capacity int) *FixedFIFO {
queue := &FixedFIFO{}
queue.initialize(capacity)
return queue
}
func (st *FixedFIFO) initialize(capacity int) {
st.queue = make(chan interface{}, capacity)
st.lockChan = make(chan struct{}, 1)
st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity)
}
// Enqueue enqueues an element. Returns error if queue is locked or it is at full capacity.
func (st *FixedFIFO) Enqueue(value interface{}) error {
if st.IsLocked() {
return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
}
// check if there is a listener waiting for the next element (this element)
select {
case listener := <-st.waitForNextElementChan:
// verify whether it is possible to notify the listener (it could be the listener is no longer
// available because the context expired: DequeueOrWaitForNextElementContext)
select {
// sends the element through the listener's channel instead of enqueueing it
case listener <- value:
default:
// push the element into the queue instead of sending it through the listener's channel (which is not available at this moment)
return st.enqueueIntoQueue(value)
}
default:
// enqueue the element into the queue
return st.enqueueIntoQueue(value)
}
return nil
}
// enqueueIntoQueue enqueues the given item directly into the regular queue
func (st *FixedFIFO) enqueueIntoQueue(value interface{}) error {
select {
case st.queue <- value:
default:
return NewQueueError(QueueErrorCodeFullCapacity, "FixedFIFO queue is at full capacity")
}
return nil
}
// Dequeue dequeues an element. Returns error if: queue is locked, queue is empty or internal channel is closed.
func (st *FixedFIFO) Dequeue() (interface{}, error) {
if st.IsLocked() {
return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
}
select {
case value, ok := <-st.queue:
if ok {
return value, nil
}
return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed")
default:
return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue")
}
}
// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
// Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements.
func (st *FixedFIFO) DequeueOrWaitForNextElement() (interface{}, error) {
return st.DequeueOrWaitForNextElementContext(context.Background())
}
// DequeueOrWaitForNextElementContext dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
// Multiple calls to DequeueOrWaitForNextElementContext() would enqueue multiple "listeners" for future enqueued elements.
// When the passed context expires this function exits and returns the context' error
func (st *FixedFIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interface{}, error) {
if st.IsLocked() {
return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
}
select {
case value, ok := <-st.queue:
if ok {
return value, nil
}
return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed")
case <-ctx.Done():
return nil, ctx.Err()
// queue is empty, add a listener to wait until next enqueued element is ready
default:
// channel to wait for next enqueued element
waitChan := make(chan interface{})
select {
// enqueue a watcher into the watchForNextElementChannel to wait for the next element
case st.waitForNextElementChan <- waitChan:
// return the next enqueued element, if any
select {
case item := <-waitChan:
return item, nil
case <-ctx.Done():
return nil, ctx.Err()
// try again to get the element from the regular queue (in case waitChan doesn't provide any item)
case value, ok := <-st.queue:
if ok {
return value, nil
}
return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed")
}
default:
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements
return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue and can't wait for next element")
}
//return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue")
}
}
// GetLen returns queue's length (total enqueued elements)
func (st *FixedFIFO) GetLen() int {
st.Lock()
defer st.Unlock()
return len(st.queue)
}
// GetCap returns the queue's capacity
func (st *FixedFIFO) GetCap() int {
st.Lock()
defer st.Unlock()
return cap(st.queue)
}
func (st *FixedFIFO) Lock() {
// non-blocking fill the channel
select {
case st.lockChan <- struct{}{}:
default:
}
}
func (st *FixedFIFO) Unlock() {
// non-blocking flush the channel
select {
case <-st.lockChan:
default:
}
}
func (st *FixedFIFO) IsLocked() bool {
return len(st.lockChan) >= 1
}