Skip to content

Commit

Permalink
Merge pull request #31 from enriquebris/lock_issue
Browse files Browse the repository at this point in the history
Preventing FIFO.DequeueOrWaitForNextElement to gets blocked
  • Loading branch information
enriquebris authored Mar 1, 2022
2 parents e189583 + d5d4fef commit a3c75ce
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 6 deletions.
22 changes: 18 additions & 4 deletions fifo_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type FIFO struct {
isLocked bool
// queue for watchers that will wait for next elements (if queue is empty at DequeueOrWaitForNextElement execution )
waitForNextElementChan chan chan interface{}
// queue to unlock consumers that were locked when queue was empty (during DequeueOrWaitForNextElement execution)
unlockDequeueOrWaitForNextElementChan chan struct{}
}

// NewFIFO returns a new FIFO concurrent queue
Expand All @@ -33,6 +35,7 @@ func NewFIFO() *FIFO {
func (st *FIFO) initialize() {
st.slice = make([]interface{}, 0)
st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity)
st.unlockDequeueOrWaitForNextElementChan = make(chan struct{}, WaitForNextElementChanCapacity)
}

// Enqueue enqueues an element. Returns error if queue is locked.
Expand All @@ -41,6 +44,13 @@ func (st *FIFO) Enqueue(value interface{}) error {
return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
}

// let consumers (DequeueOrWaitForNextElement) know there is a new element
select {
case st.unlockDequeueOrWaitForNextElementChan <- struct{}{}:
default:
// message could not be sent
}

// check if there is a listener waiting for the next element (this element)
select {
case listener := <-st.waitForNextElementChan:
Expand Down Expand Up @@ -134,10 +144,14 @@ func (st *FIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interfa

// return the next enqueued element, if any
select {
case item := <-waitChan:
return item, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-st.unlockDequeueOrWaitForNextElementChan:
// new enqueued element, no need to keep waiting
break

case item := <-waitChan:
return item, nil
case <-ctx.Done():
return nil, ctx.Err()
}
default:
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements
Expand Down
11 changes: 11 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/enriquebris/goconcurrentqueue

go 1.17

require github.com/stretchr/testify v1.7.0

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
11 changes: 9 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.1-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.1") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.2-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.2") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)

# goconcurrentqueue - Concurrent safe queues
The package goconcurrentqueue offers a public interface Queue with methods for a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)).
Expand Down Expand Up @@ -31,6 +31,9 @@ This package is compatible with the following golang versions:
- 1.12.x
- 1.13.x
- 1.14.x
- 1.15.x
- 1.16.x
- 1.17.x

## Documentation
Visit [goconcurrentqueue at go.dev](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue)
Expand Down Expand Up @@ -242,9 +245,13 @@ func workWithQueue(queue goconcurrentqueue.Queue) error {

## History

### v0.6.2

- Prevents FIFO.DequeueOrWaitForNextElement to gets blocked when waiting for an enqueued element

### v0.6.1

- FixedFifo.Enqueue prevents to gets blocked trying to send the item over an invalid waitForNextElementChan channel
- FixedFifo.Enqueue prevents to get blocked trying to send the item over an invalid waitForNextElementChan channel

### v0.6.0

Expand Down

0 comments on commit a3c75ce

Please sign in to comment.