From b32e524749b91460994f18865941e13acb6da5a7 Mon Sep 17 00:00:00 2001 From: Enrique Bris Date: Tue, 4 Jun 2019 16:21:21 -0400 Subject: [PATCH 1/6] fifo - DequeueOrWaitForNextElement --- fifo_queue.go | 56 +++++++++++++++- fifo_queue_test.go | 159 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+), 3 deletions(-) diff --git a/fifo_queue.go b/fifo_queue.go index d857e1c..bec27b8 100644 --- a/fifo_queue.go +++ b/fifo_queue.go @@ -5,12 +5,18 @@ import ( "sync" ) +const ( + WaitForNextElementChanCapacity = 100 +) + // FIFO (First In First Out) concurrent queue type FIFO struct { slice []interface{} rwmutex sync.RWMutex lockRWmutex sync.RWMutex isLocked bool + // queue for watchers that will wait for next elements (if queue is empty at DequeueOrWaitForNextElement execution ) + waitForNextElementChan chan chan interface{} } // NewFIFO returns a new FIFO concurrent queue @@ -23,6 +29,7 @@ func NewFIFO() *FIFO { func (st *FIFO) initialize() { st.slice = make([]interface{}, 0) + st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity) } // Enqueue enqueues an element @@ -31,10 +38,20 @@ func (st *FIFO) Enqueue(value interface{}) error { return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked") } - st.rwmutex.Lock() - defer st.rwmutex.Unlock() + // check if there is a listener waiting for the next element (this element) + select { + case listener := <-st.waitForNextElementChan: + // send the element through the listener's channel instead of enqueue it + listener <- value + + default: + // lock the object to enqueue the element into the slice + st.rwmutex.Lock() + defer st.rwmutex.Unlock() + // enqueue the element + st.slice = append(st.slice, value) + } - st.slice = append(st.slice, value) return nil } @@ -58,6 +75,39 @@ func (st *FIFO) Dequeue() (interface{}, error) { return elementToReturn, nil } +// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element get enqueued and returns it. +// Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements. +func (st *FIFO) DequeueOrWaitForNextElement() (interface{}, error) { + if st.isLocked { + return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked") + } + + len := len(st.slice) + if len == 0 { + // channel to wait for next enqueued element + waitChan := make(chan interface{}) + + select { + // enqueue a watcher into the watchForNextElementChannel to wait for the next element + case st.waitForNextElementChan <- waitChan: + // return the next enqueued element, if any + return <-waitChan, nil + default: + // too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements + return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue and can't wait for next element") + } + } + + st.rwmutex.Lock() + defer st.rwmutex.Unlock() + + // there is at least one element into the queue + elementToReturn := st.slice[0] + st.slice = st.slice[1:] + + return elementToReturn, nil +} + // Get returns an element's value and keeps the element at the queue func (st *FIFO) Get(index int) (interface{}, error) { if st.isLocked { diff --git a/fifo_queue_test.go b/fifo_queue_test.go index 29929ef..705d442 100644 --- a/fifo_queue_test.go +++ b/fifo_queue_test.go @@ -3,6 +3,7 @@ package goconcurrentqueue import ( "sync" "testing" + "time" "github.com/stretchr/testify/suite" ) @@ -65,6 +66,21 @@ func (suite *FIFOTestSuite) TestEnqueueLenSingleGR() { suite.Equalf(2, len, "Expected number of elements in queue: 2, currently: %v", len) } +// single enqueue and wait for next element +func (suite *FIFOTestSuite) TestEnqueueWaitForNextElementSingleGR() { + waitForNextElement := make(chan interface{}) + // add the listener manually (ONLY for testings purposes) + suite.fifo.waitForNextElementChan <- waitForNextElement + + value := 100 + // enqueue from a different GR to avoid blocking the listener channel + go suite.fifo.Enqueue(value) + // wait for the enqueued element + result := <-waitForNextElement + + suite.Equal(value, result) +} + // TestEnqueueLenMultipleGR enqueues elements concurrently // // Detailed steps: @@ -415,6 +431,149 @@ func (suite *FIFOTestSuite) TestDequeueMultipleGRs() { suite.Equalf(totalElementsToDequeue, val, "The expected last element's value should be: %v", totalElementsToEnqueue-totalElementsToDequeue) } +// *************************************************************************************** +// ** DequeueOrWaitForNextElement +// *************************************************************************************** + +// single GR Locked queue +func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementLockSingleGR() { + suite.fifo.Lock() + result, err := suite.fifo.DequeueOrWaitForNextElement() + suite.Nil(result, "No value expected if queue is locked") + suite.Error(err, "Locked queue does not allow to enqueue elements") + + // verify custom error: code: QueueErrorCodeLockedQueue + customError, ok := err.(*QueueError) + suite.True(ok, "Expected error type: QueueError") + // verify custom error code + suite.Equalf(QueueErrorCodeLockedQueue, customError.Code(), "Expected code: '%v'", QueueErrorCodeLockedQueue) +} + +// single GR DequeueOrWaitForNextElement with a previous enqueued element +func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementWithEnqueuedElementSingleGR() { + value := 100 + len := suite.fifo.GetLen() + suite.fifo.Enqueue(value) + + result, err := suite.fifo.DequeueOrWaitForNextElement() + + suite.NoError(err) + suite.Equal(value, result) + // length must be exactly the same as it was before + suite.Equal(len, suite.fifo.GetLen()) +} + +// single GR DequeueOrWaitForNextElement 1 element +func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementWithEmptyQueue() { + var ( + value = 100 + result interface{} + err error + done = make(chan struct{}) + ) + + // waiting for next enqueued element + go func() { + result, err = suite.fifo.DequeueOrWaitForNextElement() + done <- struct{}{} + }() + + // enqueue an element + go func() { + suite.fifo.Enqueue(value) + }() + + select { + // wait for the dequeued element + case <-done: + suite.NoError(err) + suite.Equal(value, result) + + // the following comes first if more time than expected happened while waiting for the dequeued element + case <-time.After(1 * time.Second): + suite.Fail("too much time waiting for the enqueued element") + + } +} + +// single GR calling DequeueOrWaitForNextElement (WaitForNextElementChanCapacity + 1) times, last one should return error +func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementWithFullWaitingChannel() { + // enqueue WaitForNextElementChanCapacity listeners to future enqueued elements + for i := 0; i < WaitForNextElementChanCapacity; i++ { + suite.fifo.waitForNextElementChan <- make(chan interface{}) + } + + result, err := suite.fifo.DequeueOrWaitForNextElement() + suite.Nil(result) + suite.Error(err) + // verify custom error: code: QueueErrorCodeEmptyQueue + customError, ok := err.(*QueueError) + suite.True(ok, "Expected error type: QueueError") + // verify custom error code + suite.Equalf(QueueErrorCodeEmptyQueue, customError.Code(), "Expected code: '%v'", QueueErrorCodeEmptyQueue) +} + +// multiple GRs, calling DequeueOrWaitForNextElement from different GRs and enqueuing the expected values later +func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementMultiGR() { + var ( + wg sync.WaitGroup + mp sync.Map + ) + + for i := 0; i < WaitForNextElementChanCapacity; i++ { + go func() { + // wait for the next enqueued element + result, err := suite.fifo.DequeueOrWaitForNextElement() + // no error && no nil result + suite.NoError(err) + suite.NotNil(result) + + // assure that each returned element wasn't returned earlier + _, ok := mp.Load(result) + suite.Falsef(ok, "Duplicated value: %v", result) + // save the result to let other GRs know that it was already returned + mp.Store(result, result) + + // let the wg.Wait() know that this GR is done + wg.Done() + }() + } + + // enqueue all needed elements + for i := 0; i < WaitForNextElementChanCapacity; i++ { + wg.Add(1) + suite.fifo.Enqueue(i) + } + + // wait until all GRs dequeue the elements + wg.Wait() +} + +// single GR, DequeueOrWaitForNextElement() should dequeue from 0 to WaitForNextElementChanCapacity in asc order +func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementSingleGR() { + var wg sync.WaitGroup + + go func(max int) { + for i := 0; i < max; i++ { + result, err := suite.fifo.DequeueOrWaitForNextElement() + + suite.NoError(err) + // the order should be from 0 to max + suite.Equal(result, i) + + wg.Done() + } + }(WaitForNextElementChanCapacity) + + for i := 0; i < WaitForNextElementChanCapacity; i++ { + wg.Add(1) + suite.fifo.Enqueue(i) + } + + // wait for the GR dequeueing elements + wg.Wait() +} + // *************************************************************************************** // ** Lock / Unlock / IsLocked // *************************************************************************************** From 627b6e6f037eee0388cde2cf6e98da575ab79e7a Mon Sep 17 00:00:00 2001 From: Enrique Bris Date: Tue, 4 Jun 2019 16:56:58 -0400 Subject: [PATCH 2/6] fifo - get rid of race condition for DequeueOrWaitForNextElement --- fifo_queue.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/fifo_queue.go b/fifo_queue.go index bec27b8..0b3092f 100644 --- a/fifo_queue.go +++ b/fifo_queue.go @@ -32,7 +32,7 @@ func (st *FIFO) initialize() { st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity) } -// Enqueue enqueues an element +// Enqueue enqueues an element. Returns error if queue is locked. func (st *FIFO) Enqueue(value interface{}) error { if st.isLocked { return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked") @@ -55,7 +55,7 @@ func (st *FIFO) Enqueue(value interface{}) error { return nil } -// Dequeue dequeues an element +// Dequeue dequeues an element. Returns error if queue is locked or empty. func (st *FIFO) Dequeue() (interface{}, error) { if st.isLocked { return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked") @@ -75,14 +75,18 @@ func (st *FIFO) Dequeue() (interface{}, error) { return elementToReturn, nil } -// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element get enqueued and returns it. +// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it. // Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements. func (st *FIFO) DequeueOrWaitForNextElement() (interface{}, error) { if st.isLocked { return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked") } + // get the slice's len + st.rwmutex.Lock() len := len(st.slice) + st.rwmutex.Unlock() + if len == 0 { // channel to wait for next enqueued element waitChan := make(chan interface{}) From 917db0d2e204f8c0e534fb6162a7601a170c6242 Mon Sep 17 00:00:00 2001 From: Enrique Bris Date: Tue, 4 Jun 2019 17:44:05 -0400 Subject: [PATCH 3/6] fifo - get rid of sync.Map for DequeueOrWaitForNextElement testings --- fifo_queue_test.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/fifo_queue_test.go b/fifo_queue_test.go index 705d442..ca1e395 100644 --- a/fifo_queue_test.go +++ b/fifo_queue_test.go @@ -517,7 +517,10 @@ func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementWithFullWaitingChanne func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementMultiGR() { var ( wg sync.WaitGroup - mp sync.Map + // channel to enqueue dequeued values + dequeuedValues = make(chan int, WaitForNextElementChanCapacity) + // map[dequeued_value] = times dequeued + mp = make(map[int]int) ) for i := 0; i < WaitForNextElementChanCapacity; i++ { @@ -528,11 +531,9 @@ func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementMultiGR() { suite.NoError(err) suite.NotNil(result) - // assure that each returned element wasn't returned earlier - _, ok := mp.Load(result) - suite.Falsef(ok, "Duplicated value: %v", result) - // save the result to let other GRs know that it was already returned - mp.Store(result, result) + // send each dequeued element into the dequeuedValues channel + resultInt, _ := result.(int) + dequeuedValues <- resultInt // let the wg.Wait() know that this GR is done wg.Done() @@ -543,10 +544,26 @@ func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementMultiGR() { for i := 0; i < WaitForNextElementChanCapacity; i++ { wg.Add(1) suite.fifo.Enqueue(i) + // save the enqueued value as index + mp[i] = 0 } // wait until all GRs dequeue the elements wg.Wait() + // close dequeuedValues channel in order to only read the previous enqueued values (from the channel) + close(dequeuedValues) + + // verify that all enqueued values were dequeued + for v := range dequeuedValues { + val, ok := mp[v] + suite.Truef(ok, "element dequeued but never enqueued: %v", val) + // increment the m[p] value meaning the value p was dequeued + mp[v] = val + 1 + } + // verify that there are no duplicates + for k, v := range mp { + suite.Equalf(1, v, "%v was dequeued %v times", k, v) + } } // single GR, DequeueOrWaitForNextElement() should dequeue from 0 to WaitForNextElementChanCapacity in asc order From 02140e38bab2da5badbe5ee43c78d4fdb7a21b9b Mon Sep 17 00:00:00 2001 From: Enrique Bris Date: Tue, 4 Jun 2019 18:57:31 -0400 Subject: [PATCH 4/6] fixedFifo - DequeueOrWaitForNextElement --- fixed_fifo_queue.go | 54 +++++++++++- fixed_fifo_queue_test.go | 176 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 227 insertions(+), 3 deletions(-) diff --git a/fixed_fifo_queue.go b/fixed_fifo_queue.go index 88cd6f0..2a63e4c 100644 --- a/fixed_fifo_queue.go +++ b/fixed_fifo_queue.go @@ -4,6 +4,8 @@ package goconcurrentqueue type FixedFIFO struct { queue chan interface{} lockChan chan struct{} + // queue for watchers that will wait for next elements (if queue is empty at DequeueOrWaitForNextElement execution ) + waitForNextElementChan chan chan interface{} } func NewFixedFIFO(capacity int) *FixedFIFO { @@ -16,21 +18,34 @@ func NewFixedFIFO(capacity int) *FixedFIFO { func (st *FixedFIFO) initialize(capacity int) { st.queue = make(chan interface{}, capacity) st.lockChan = make(chan struct{}, 1) + st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity) } +// Enqueue enqueues an element. Returns error if queue is locked or it is at full capacity. func (st *FixedFIFO) Enqueue(value interface{}) error { if st.IsLocked() { return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked") } + // check if there is a listener waiting for the next element (this element) select { - case st.queue <- value: - return nil + case listener := <-st.waitForNextElementChan: + // send the element through the listener's channel instead of enqueue it + listener <- value + default: - return NewQueueError(QueueErrorCodeFullCapacity, "FixedFIFO queue is at full capacity") + // enqueue the element following the "normal way" + select { + case st.queue <- value: + default: + return NewQueueError(QueueErrorCodeFullCapacity, "FixedFIFO queue is at full capacity") + } } + + return nil } +// Dequeue dequeues an element. Returns error if: queue is locked, queue is empty or internal channel is closed. func (st *FixedFIFO) Dequeue() (interface{}, error) { if st.IsLocked() { return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked") @@ -47,6 +62,39 @@ func (st *FixedFIFO) Dequeue() (interface{}, error) { } } +// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it. +// Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements. +func (st *FixedFIFO) DequeueOrWaitForNextElement() (interface{}, error) { + if st.IsLocked() { + return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked") + } + + select { + case value, ok := <-st.queue: + if ok { + return value, nil + } + return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed") + + // queue is empty, add a listener to wait until next enqueued element is ready + default: + // channel to wait for next enqueued element + waitChan := make(chan interface{}) + + select { + // enqueue a watcher into the watchForNextElementChannel to wait for the next element + case st.waitForNextElementChan <- waitChan: + // return the next enqueued element, if any + return <-waitChan, nil + default: + // too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements + return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue and can't wait for next element") + } + + //return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue") + } +} + // GetLen returns queue's length (total enqueued elements) func (st *FixedFIFO) GetLen() int { st.Lock() diff --git a/fixed_fifo_queue_test.go b/fixed_fifo_queue_test.go index 403d229..7943ef9 100644 --- a/fixed_fifo_queue_test.go +++ b/fixed_fifo_queue_test.go @@ -3,6 +3,7 @@ package goconcurrentqueue import ( "sync" "testing" + "time" "github.com/stretchr/testify/suite" ) @@ -285,6 +286,181 @@ func (suite *FixedFIFOTestSuite) TestDequeueMultipleGRs() { suite.Equalf(totalElementsToDequeue, val, "The expected last element's value should be: %v", totalElementsToEnqueue-totalElementsToDequeue) } +// *************************************************************************************** +// ** DequeueOrWaitForNextElement +// *************************************************************************************** + +// single GR Locked queue +func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementLockSingleGR() { + suite.fifo.Lock() + result, err := suite.fifo.DequeueOrWaitForNextElement() + suite.Nil(result, "No value expected if queue is locked") + suite.Error(err, "Locked queue does not allow to enqueue elements") + + // verify custom error: code: QueueErrorCodeLockedQueue + customError, ok := err.(*QueueError) + suite.True(ok, "Expected error type: QueueError") + // verify custom error code + suite.Equalf(QueueErrorCodeLockedQueue, customError.Code(), "Expected code: '%v'", QueueErrorCodeLockedQueue) +} + +// single GR DequeueOrWaitForNextElement with a previous enqueued element +func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithEnqueuedElementSingleGR() { + value := 100 + len := suite.fifo.GetLen() + suite.fifo.Enqueue(value) + + result, err := suite.fifo.DequeueOrWaitForNextElement() + + suite.NoError(err) + suite.Equal(value, result) + // length must be exactly the same as it was before + suite.Equal(len, suite.fifo.GetLen()) +} + +// single GR DequeueOrWaitForNextElement 1 element +func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithEmptyQueue() { + var ( + value = 100 + result interface{} + err error + done = make(chan struct{}) + ) + + // waiting for next enqueued element + go func() { + result, err = suite.fifo.DequeueOrWaitForNextElement() + done <- struct{}{} + }() + + // enqueue an element + go func() { + suite.fifo.Enqueue(value) + }() + + select { + // wait for the dequeued element + case <-done: + suite.NoError(err) + suite.Equal(value, result) + + // the following comes first if more time than expected happened while waiting for the dequeued element + case <-time.After(1 * time.Second): + suite.Fail("too much time waiting for the enqueued element") + + } +} + +// single GR calling DequeueOrWaitForNextElement (WaitForNextElementChanCapacity + 1) times, last one should return error +func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithFullWaitingChannel() { + // enqueue WaitForNextElementChanCapacity listeners to future enqueued elements + for i := 0; i < WaitForNextElementChanCapacity; i++ { + suite.fifo.waitForNextElementChan <- make(chan interface{}) + } + + result, err := suite.fifo.DequeueOrWaitForNextElement() + suite.Nil(result) + suite.Error(err) + // verify custom error: code: QueueErrorCodeEmptyQueue + customError, ok := err.(*QueueError) + suite.True(ok, "Expected error type: QueueError") + // verify custom error code + suite.Equalf(QueueErrorCodeEmptyQueue, customError.Code(), "Expected code: '%v'", QueueErrorCodeEmptyQueue) +} + +// multiple GRs, calling DequeueOrWaitForNextElement from different GRs and enqueuing the expected values later +func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementMultiGR() { + var ( + wg sync.WaitGroup + // channel to enqueue dequeued values + dequeuedValues = make(chan int, WaitForNextElementChanCapacity) + // map[dequeued_value] = times dequeued + mp = make(map[int]int) + ) + + for i := 0; i < WaitForNextElementChanCapacity; i++ { + go func() { + // wait for the next enqueued element + result, err := suite.fifo.DequeueOrWaitForNextElement() + // no error && no nil result + suite.NoError(err) + suite.NotNil(result) + + // send each dequeued element into the dequeuedValues channel + resultInt, _ := result.(int) + dequeuedValues <- resultInt + + // let the wg.Wait() know that this GR is done + wg.Done() + }() + } + + // enqueue all needed elements + for i := 0; i < WaitForNextElementChanCapacity; i++ { + wg.Add(1) + suite.fifo.Enqueue(i) + // save the enqueued value as index + mp[i] = 0 + } + + // wait until all GRs dequeue the elements + wg.Wait() + // close dequeuedValues channel in order to only read the previous enqueued values (from the channel) + close(dequeuedValues) + + // verify that all enqueued values were dequeued + for v := range dequeuedValues { + val, ok := mp[v] + suite.Truef(ok, "element dequeued but never enqueued: %v", val) + // increment the m[p] value meaning the value p was dequeued + mp[v] = val + 1 + } + // verify that there are no duplicates + for k, v := range mp { + suite.Equalf(1, v, "%v was dequeued %v times", k, v) + } +} + +// single GR, DequeueOrWaitForNextElement() should dequeue from 0 to WaitForNextElementChanCapacity in asc order +func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementSingleGR() { + var wg sync.WaitGroup + + go func(max int) { + for i := 0; i < max; i++ { + result, err := suite.fifo.DequeueOrWaitForNextElement() + + suite.NoError(err) + // the order should be from 0 to max + suite.Equal(result, i) + + wg.Done() + } + }(WaitForNextElementChanCapacity) + + for i := 0; i < WaitForNextElementChanCapacity; i++ { + wg.Add(1) + suite.fifo.Enqueue(i) + } + + // wait for the GR dequeueing elements + wg.Wait() +} + +// DequeueOrWaitForNextElement once queue's channel is closed +func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementClosedChannel() { + close(suite.fifo.queue) + + result, err := suite.fifo.DequeueOrWaitForNextElement() + suite.Nil(result, "no result expected if internal queue's channel is closed") + suite.Error(err, "error expected if internal queue's channel is closed") + + // verify custom error: code: QueueErrorCodeEmptyQueue + customError, ok := err.(*QueueError) + suite.True(ok, "Expected error type: QueueError") + // verify custom error code + suite.Equalf(QueueErrorCodeInternalChannelClosed, customError.Code(), "Expected code: '%v'", QueueErrorCodeInternalChannelClosed) +} + // *************************************************************************************** // ** Lock / Unlock / IsLocked // *************************************************************************************** From 1c968a5c99258cd7ee55d588df88ccec631e9bb9 Mon Sep 17 00:00:00 2001 From: Enrique Bris Date: Tue, 4 Jun 2019 19:14:21 -0400 Subject: [PATCH 5/6] DequeueOrWaitForNextElement testings - 2s --- fifo_queue_test.go | 2 +- fixed_fifo_queue_test.go | 2 +- queue.go | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/fifo_queue_test.go b/fifo_queue_test.go index ca1e395..d733cc6 100644 --- a/fifo_queue_test.go +++ b/fifo_queue_test.go @@ -490,7 +490,7 @@ func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementWithEmptyQueue() { suite.Equal(value, result) // the following comes first if more time than expected happened while waiting for the dequeued element - case <-time.After(1 * time.Second): + case <-time.After(2 * time.Second): suite.Fail("too much time waiting for the enqueued element") } diff --git a/fixed_fifo_queue_test.go b/fixed_fifo_queue_test.go index 7943ef9..6064a4e 100644 --- a/fixed_fifo_queue_test.go +++ b/fixed_fifo_queue_test.go @@ -345,7 +345,7 @@ func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithEmptyQueue() suite.Equal(value, result) // the following comes first if more time than expected happened while waiting for the dequeued element - case <-time.After(1 * time.Second): + case <-time.After(2 * time.Second): suite.Fail("too much time waiting for the enqueued element") } diff --git a/queue.go b/queue.go index c4fbdba..879bde5 100644 --- a/queue.go +++ b/queue.go @@ -6,6 +6,9 @@ type Queue interface { Enqueue(interface{}) error // Dequeue element Dequeue() (interface{}, error) + // DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it. + // Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements. + DequeueOrWaitForNextElement() (interface{}, error) // Get number of enqueued elements GetLen() int // Get queue's capacity From ef2096ef95ae8fcfc331810e8733c25b0a6ebc87 Mon Sep 17 00:00:00 2001 From: Enrique Bris Date: Tue, 4 Jun 2019 21:54:51 -0400 Subject: [PATCH 6/6] readme.md - DequeueOrWaitForNextElement example --- fifo_queue_test.go | 25 ---------------------- fixed_fifo_queue_test.go | 25 ---------------------- readme.md | 45 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 43 insertions(+), 52 deletions(-) diff --git a/fifo_queue_test.go b/fifo_queue_test.go index d733cc6..01764e4 100644 --- a/fifo_queue_test.go +++ b/fifo_queue_test.go @@ -566,31 +566,6 @@ func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementMultiGR() { } } -// single GR, DequeueOrWaitForNextElement() should dequeue from 0 to WaitForNextElementChanCapacity in asc order -func (suite *FIFOTestSuite) TestDequeueOrWaitForNextElementSingleGR() { - var wg sync.WaitGroup - - go func(max int) { - for i := 0; i < max; i++ { - result, err := suite.fifo.DequeueOrWaitForNextElement() - - suite.NoError(err) - // the order should be from 0 to max - suite.Equal(result, i) - - wg.Done() - } - }(WaitForNextElementChanCapacity) - - for i := 0; i < WaitForNextElementChanCapacity; i++ { - wg.Add(1) - suite.fifo.Enqueue(i) - } - - // wait for the GR dequeueing elements - wg.Wait() -} - // *************************************************************************************** // ** Lock / Unlock / IsLocked // *************************************************************************************** diff --git a/fixed_fifo_queue_test.go b/fixed_fifo_queue_test.go index 6064a4e..9fbe8a1 100644 --- a/fixed_fifo_queue_test.go +++ b/fixed_fifo_queue_test.go @@ -421,31 +421,6 @@ func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementMultiGR() { } } -// single GR, DequeueOrWaitForNextElement() should dequeue from 0 to WaitForNextElementChanCapacity in asc order -func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementSingleGR() { - var wg sync.WaitGroup - - go func(max int) { - for i := 0; i < max; i++ { - result, err := suite.fifo.DequeueOrWaitForNextElement() - - suite.NoError(err) - // the order should be from 0 to max - suite.Equal(result, i) - - wg.Done() - } - }(WaitForNextElementChanCapacity) - - for i := 0; i < WaitForNextElementChanCapacity; i++ { - wg.Add(1) - suite.fifo.Enqueue(i) - } - - // wait for the GR dequeueing elements - wg.Wait() -} - // DequeueOrWaitForNextElement once queue's channel is closed func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementClosedChannel() { close(suite.fifo.queue) diff --git a/readme.md b/readme.md index ee73966..ca53195 100644 --- a/readme.md +++ b/readme.md @@ -1,7 +1,7 @@ -[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.4.0-yellowgreen.svg?style=flat "goconcurrentqueue v0.4.0") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) +[![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.5.0-yellowgreen.svg?style=flat "goconcurrentqueue v0.5.0") [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) # goconcurrentqueue - Concurrent safe queues -The package goconcurrentqueue offers a public interface Queue with the most common methods for a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)). +The package goconcurrentqueue offers a public interface Queue with methods for a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)). It comes with multiple Queue's concurrent-safe implementations, meaning they could be used concurrently by multiple goroutines without adding race conditions. ## Topics @@ -117,6 +117,43 @@ func main() { } ``` +### Wait until an element gets enqueued + +```go +package main + +import ( + "fmt" + "time" + + "github.com/enriquebris/goconcurrentqueue" +) + +func main() { + var ( + fifo = goconcurrentqueue.NewFIFO() + done = make(chan struct{}) + ) + + go func() { + fmt.Println("1 - Waiting for next enqueued element") + value, _ := fifo.DequeueOrWaitForNextElement() + fmt.Printf("2 - Dequeued element: %v\n", value) + + done <- struct{}{} + }() + + fmt.Println("3 - Go to sleep for 3 seconds") + time.Sleep(3 * time.Second) + + fmt.Println("4 - Enqueue element") + fifo.Enqueue(100) + + <-done +} + +``` + ### Dependency Inversion Principle using concurrent-safe queues *High level modules should not depend on low level modules. Both should depend on abstractions.* Robert C. Martin @@ -163,6 +200,10 @@ func workWithQueue(queue goconcurrentqueue.Queue) error { ## History +### v0.5.0 + +- Added DequeueOrWaitForNextElement() + ### v0.4.0 - Added QueueError (custom error)