Skip to content

Commit

Permalink
Merge pull request #15 from enriquebris/DequeueOrWaitForNextElement
Browse files Browse the repository at this point in the history
Dequeue or wait for next element
  • Loading branch information
enriquebris authored Jun 5, 2019
2 parents 75a9832 + ef2096e commit 4212a49
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 10 deletions.
64 changes: 59 additions & 5 deletions fifo_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ import (
"sync"
)

const (
WaitForNextElementChanCapacity = 100
)

// FIFO (First In First Out) concurrent queue
type FIFO struct {
slice []interface{}
rwmutex sync.RWMutex
lockRWmutex sync.RWMutex
isLocked bool
// queue for watchers that will wait for next elements (if queue is empty at DequeueOrWaitForNextElement execution )
waitForNextElementChan chan chan interface{}
}

// NewFIFO returns a new FIFO concurrent queue
Expand All @@ -23,22 +29,33 @@ func NewFIFO() *FIFO {

func (st *FIFO) initialize() {
st.slice = make([]interface{}, 0)
st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity)
}

// Enqueue enqueues an element
// Enqueue enqueues an element. Returns error if queue is locked.
func (st *FIFO) Enqueue(value interface{}) error {
if st.isLocked {
return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
}

st.rwmutex.Lock()
defer st.rwmutex.Unlock()
// check if there is a listener waiting for the next element (this element)
select {
case listener := <-st.waitForNextElementChan:
// send the element through the listener's channel instead of enqueue it
listener <- value

default:
// lock the object to enqueue the element into the slice
st.rwmutex.Lock()
defer st.rwmutex.Unlock()
// enqueue the element
st.slice = append(st.slice, value)
}

st.slice = append(st.slice, value)
return nil
}

// Dequeue dequeues an element
// Dequeue dequeues an element. Returns error if queue is locked or empty.
func (st *FIFO) Dequeue() (interface{}, error) {
if st.isLocked {
return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
Expand All @@ -58,6 +75,43 @@ func (st *FIFO) Dequeue() (interface{}, error) {
return elementToReturn, nil
}

// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
// Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements.
func (st *FIFO) DequeueOrWaitForNextElement() (interface{}, error) {
if st.isLocked {
return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
}

// get the slice's len
st.rwmutex.Lock()
len := len(st.slice)
st.rwmutex.Unlock()

if len == 0 {
// channel to wait for next enqueued element
waitChan := make(chan interface{})

select {
// enqueue a watcher into the watchForNextElementChannel to wait for the next element
case st.waitForNextElementChan <- waitChan:
// return the next enqueued element, if any
return <-waitChan, nil
default:
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements
return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue and can't wait for next element")
}
}

st.rwmutex.Lock()
defer st.rwmutex.Unlock()

// there is at least one element into the queue
elementToReturn := st.slice[0]
st.slice = st.slice[1:]

return elementToReturn, nil
}

// Get returns an element's value and keeps the element at the queue
func (st *FIFO) Get(index int) (interface{}, error) {
if st.isLocked {
Expand Down
151 changes: 151 additions & 0 deletions fifo_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goconcurrentqueue
import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/suite"
)
Expand Down Expand Up @@ -65,6 +66,21 @@ func (suite *FIFOTestSuite) TestEnqueueLenSingleGR() {
suite.Equalf(2, len, "Expected number of elements in queue: 2, currently: %v", len)
}

// single enqueue and wait for next element
func (suite *FIFOTestSuite) TestEnqueueWaitForNextElementSingleGR() {
waitForNextElement := make(chan interface{})
// add the listener manually (ONLY for testings purposes)
suite.fifo.waitForNextElementChan <- waitForNextElement

value := 100
// enqueue from a different GR to avoid blocking the listener channel
go suite.fifo.Enqueue(value)
// wait for the enqueued element
result := <-waitForNextElement

suite.Equal(value, result)
}

// TestEnqueueLenMultipleGR enqueues elements concurrently
//
// Detailed steps:
Expand Down Expand Up @@ -415,6 +431,141 @@ func (suite *FIFOTestSuite) TestDequeueMultipleGRs() {
suite.Equalf(totalElementsToDequeue, val, "The expected last element's value should be: %v", totalElementsToEnqueue-totalElementsToDequeue)
}

// ***************************************************************************************
// ** DequeueOrWaitForNextElement
// ***************************************************************************************

// single GR Locked queue
func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementLockSingleGR() {
suite.fifo.Lock()
result, err := suite.fifo.DequeueOrWaitForNextElement()
suite.Nil(result, "No value expected if queue is locked")
suite.Error(err, "Locked queue does not allow to enqueue elements")

// verify custom error: code: QueueErrorCodeLockedQueue
customError, ok := err.(*QueueError)
suite.True(ok, "Expected error type: QueueError")
// verify custom error code
suite.Equalf(QueueErrorCodeLockedQueue, customError.Code(), "Expected code: '%v'", QueueErrorCodeLockedQueue)
}

// single GR DequeueOrWaitForNextElement with a previous enqueued element
func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementWithEnqueuedElementSingleGR() {
value := 100
len := suite.fifo.GetLen()
suite.fifo.Enqueue(value)

result, err := suite.fifo.DequeueOrWaitForNextElement()

suite.NoError(err)
suite.Equal(value, result)
// length must be exactly the same as it was before
suite.Equal(len, suite.fifo.GetLen())
}

// single GR DequeueOrWaitForNextElement 1 element
func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementWithEmptyQueue() {
var (
value = 100
result interface{}
err error
done = make(chan struct{})
)

// waiting for next enqueued element
go func() {
result, err = suite.fifo.DequeueOrWaitForNextElement()
done <- struct{}{}
}()

// enqueue an element
go func() {
suite.fifo.Enqueue(value)
}()

select {
// wait for the dequeued element
case <-done:
suite.NoError(err)
suite.Equal(value, result)

// the following comes first if more time than expected happened while waiting for the dequeued element
case <-time.After(2 * time.Second):
suite.Fail("too much time waiting for the enqueued element")

}
}

// single GR calling DequeueOrWaitForNextElement (WaitForNextElementChanCapacity + 1) times, last one should return error
func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementWithFullWaitingChannel() {
// enqueue WaitForNextElementChanCapacity listeners to future enqueued elements
for i := 0; i < WaitForNextElementChanCapacity; i++ {
suite.fifo.waitForNextElementChan <- make(chan interface{})
}

result, err := suite.fifo.DequeueOrWaitForNextElement()
suite.Nil(result)
suite.Error(err)
// verify custom error: code: QueueErrorCodeEmptyQueue
customError, ok := err.(*QueueError)
suite.True(ok, "Expected error type: QueueError")
// verify custom error code
suite.Equalf(QueueErrorCodeEmptyQueue, customError.Code(), "Expected code: '%v'", QueueErrorCodeEmptyQueue)
}

// multiple GRs, calling DequeueOrWaitForNextElement from different GRs and enqueuing the expected values later
func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementMultiGR() {
var (
wg sync.WaitGroup
// channel to enqueue dequeued values
dequeuedValues = make(chan int, WaitForNextElementChanCapacity)
// map[dequeued_value] = times dequeued
mp = make(map[int]int)
)

for i := 0; i < WaitForNextElementChanCapacity; i++ {
go func() {
// wait for the next enqueued element
result, err := suite.fifo.DequeueOrWaitForNextElement()
// no error && no nil result
suite.NoError(err)
suite.NotNil(result)

// send each dequeued element into the dequeuedValues channel
resultInt, _ := result.(int)
dequeuedValues <- resultInt

// let the wg.Wait() know that this GR is done
wg.Done()
}()
}

// enqueue all needed elements
for i := 0; i < WaitForNextElementChanCapacity; i++ {
wg.Add(1)
suite.fifo.Enqueue(i)
// save the enqueued value as index
mp[i] = 0
}

// wait until all GRs dequeue the elements
wg.Wait()
// close dequeuedValues channel in order to only read the previous enqueued values (from the channel)
close(dequeuedValues)

// verify that all enqueued values were dequeued
for v := range dequeuedValues {
val, ok := mp[v]
suite.Truef(ok, "element dequeued but never enqueued: %v", val)
// increment the m[p] value meaning the value p was dequeued
mp[v] = val + 1
}
// verify that there are no duplicates
for k, v := range mp {
suite.Equalf(1, v, "%v was dequeued %v times", k, v)
}
}

// ***************************************************************************************
// ** Lock / Unlock / IsLocked
// ***************************************************************************************
Expand Down
54 changes: 51 additions & 3 deletions fixed_fifo_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package goconcurrentqueue
type FixedFIFO struct {
queue chan interface{}
lockChan chan struct{}
// queue for watchers that will wait for next elements (if queue is empty at DequeueOrWaitForNextElement execution )
waitForNextElementChan chan chan interface{}
}

func NewFixedFIFO(capacity int) *FixedFIFO {
Expand All @@ -16,21 +18,34 @@ func NewFixedFIFO(capacity int) *FixedFIFO {
func (st *FixedFIFO) initialize(capacity int) {
st.queue = make(chan interface{}, capacity)
st.lockChan = make(chan struct{}, 1)
st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity)
}

// Enqueue enqueues an element. Returns error if queue is locked or it is at full capacity.
func (st *FixedFIFO) Enqueue(value interface{}) error {
if st.IsLocked() {
return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
}

// check if there is a listener waiting for the next element (this element)
select {
case st.queue <- value:
return nil
case listener := <-st.waitForNextElementChan:
// send the element through the listener's channel instead of enqueue it
listener <- value

default:
return NewQueueError(QueueErrorCodeFullCapacity, "FixedFIFO queue is at full capacity")
// enqueue the element following the "normal way"
select {
case st.queue <- value:
default:
return NewQueueError(QueueErrorCodeFullCapacity, "FixedFIFO queue is at full capacity")
}
}

return nil
}

// Dequeue dequeues an element. Returns error if: queue is locked, queue is empty or internal channel is closed.
func (st *FixedFIFO) Dequeue() (interface{}, error) {
if st.IsLocked() {
return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
Expand All @@ -47,6 +62,39 @@ func (st *FixedFIFO) Dequeue() (interface{}, error) {
}
}

// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
// Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements.
func (st *FixedFIFO) DequeueOrWaitForNextElement() (interface{}, error) {
if st.IsLocked() {
return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
}

select {
case value, ok := <-st.queue:
if ok {
return value, nil
}
return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed")

// queue is empty, add a listener to wait until next enqueued element is ready
default:
// channel to wait for next enqueued element
waitChan := make(chan interface{})

select {
// enqueue a watcher into the watchForNextElementChannel to wait for the next element
case st.waitForNextElementChan <- waitChan:
// return the next enqueued element, if any
return <-waitChan, nil
default:
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements
return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue and can't wait for next element")
}

//return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue")
}
}

// GetLen returns queue's length (total enqueued elements)
func (st *FixedFIFO) GetLen() int {
st.Lock()
Expand Down
Loading

0 comments on commit 4212a49

Please sign in to comment.