diff --git a/fifo_queue.go b/fifo_queue.go index daaae88..2205c1d 100644 --- a/fifo_queue.go +++ b/fifo_queue.go @@ -20,6 +20,8 @@ type FIFO struct { isLocked bool // queue for watchers that will wait for next elements (if queue is empty at DequeueOrWaitForNextElement execution ) waitForNextElementChan chan chan interface{} + // queue to unlock consumers that were locked when queue was empty (during DequeueOrWaitForNextElement execution) + unlockDequeueOrWaitForNextElementChan chan struct{} } // NewFIFO returns a new FIFO concurrent queue @@ -33,6 +35,7 @@ func NewFIFO() *FIFO { func (st *FIFO) initialize() { st.slice = make([]interface{}, 0) st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity) + st.unlockDequeueOrWaitForNextElementChan = make(chan struct{}, WaitForNextElementChanCapacity) } // Enqueue enqueues an element. Returns error if queue is locked. @@ -41,6 +44,13 @@ func (st *FIFO) Enqueue(value interface{}) error { return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked") } + // let consumers (DequeueOrWaitForNextElement) know there is a new element + select { + case st.unlockDequeueOrWaitForNextElementChan <- struct{}{}: + default: + // message could not be sent + } + // check if there is a listener waiting for the next element (this element) select { case listener := <-st.waitForNextElementChan: @@ -134,10 +144,14 @@ func (st *FIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interfa // return the next enqueued element, if any select { - case item := <-waitChan: - return item, nil - case <-ctx.Done(): - return nil, ctx.Err() + case <-st.unlockDequeueOrWaitForNextElementChan: + // new enqueued element, no need to keep waiting + break + + case item := <-waitChan: + return item, nil + case <-ctx.Done(): + return nil, ctx.Err() } default: // too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3227168 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/enriquebris/goconcurrentqueue + +go 1.17 + +require github.com/stretchr/testify v1.7.0 + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..acb88a4 --- /dev/null +++ b/go.sum @@ -0,0 +1,11 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/readme.md b/readme.md index 1a70ab0..eddefbe 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.1-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.1") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go) +[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.2-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.2") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go) # goconcurrentqueue - Concurrent safe queues The package goconcurrentqueue offers a public interface Queue with methods for a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)). @@ -31,6 +31,9 @@ This package is compatible with the following golang versions: - 1.12.x - 1.13.x - 1.14.x + - 1.15.x + - 1.16.x + - 1.17.x ## Documentation Visit [goconcurrentqueue at go.dev](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) @@ -242,9 +245,13 @@ func workWithQueue(queue goconcurrentqueue.Queue) error { ## History +### v0.6.2 + +- Prevents FIFO.DequeueOrWaitForNextElement to gets blocked when waiting for an enqueued element + ### v0.6.1 -- FixedFifo.Enqueue prevents to gets blocked trying to send the item over an invalid waitForNextElementChan channel +- FixedFifo.Enqueue prevents to get blocked trying to send the item over an invalid waitForNextElementChan channel ### v0.6.0