Skip to content

Commit

Permalink
Merge pull request #35 from enriquebris/issue-34
Browse files Browse the repository at this point in the history
Issue 34
  • Loading branch information
enriquebris authored Nov 17, 2022
2 parents ea2e52e + c7e9f8b commit c09fe97
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 21 deletions.
20 changes: 14 additions & 6 deletions fixed_fifo_queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package goconcurrentqueue

import "context"
import (
"context"
)

// Fixed capacity FIFO (First In First Out) concurrent queue
type FixedFIFO struct {
Expand Down Expand Up @@ -35,11 +37,11 @@ func (st *FixedFIFO) Enqueue(value interface{}) error {
// verify whether it is possible to notify the listener (it could be the listener is no longer
// available because the context expired: DequeueOrWaitForNextElementContext)
select {
// sends the element through the listener's channel instead of enqueueing it
case listener <- value:
default:
// push the element into the queue instead of sending it through the listener's channel (which is not available at this moment)
return st.enqueueIntoQueue(value)
// sends the element through the listener's channel instead of enqueueing it
case listener <- value:
default:
// push the element into the queue instead of sending it through the listener's channel (which is not available at this moment)
return st.enqueueIntoQueue(value)
}

default:
Expand Down Expand Up @@ -114,6 +116,12 @@ func (st *FixedFIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (in
return item, nil
case <-ctx.Done():
return nil, ctx.Err()
// try again to get the element from the regular queue (in case waitChan doesn't provide any item)
case value, ok := <-st.queue:
if ok {
return value, nil
}
return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed")
}
default:
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements
Expand Down
39 changes: 37 additions & 2 deletions fixed_fifo_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (suite *FixedFIFOTestSuite) TestEnqueueFullCapacitySingleGR() {
func (suite *FixedFIFOTestSuite) TestEnqueueListenerToExpireSingleGR() {
var (
uselessChan = make(chan interface{})
value = "my-test-value"
value = "my-test-value"
)

// let Enqueue knows there is a channel to send the next item instead of enqueueing it into the queue
Expand All @@ -98,6 +98,7 @@ func (suite *FixedFIFOTestSuite) TestEnqueueListenerToExpireSingleGR() {
// TestEnqueueLenMultipleGR enqueues elements concurrently
//
// Detailed steps:
//
// 1 - Enqueue totalGRs concurrently (from totalGRs different GRs)
// 2 - Verifies the len, it should be equal to totalGRs
// 3 - Verifies that all elements from 0 to totalGRs were enqueued
Expand Down Expand Up @@ -269,6 +270,7 @@ func (suite *FixedFIFOTestSuite) TestDequeueClosedChannelSingleGR() {
// TestDequeueMultipleGRs dequeues elements concurrently
//
// Detailed steps:
//
// 1 - Enqueues totalElementsToEnqueue consecutive integers
// 2 - Dequeues totalElementsToDequeue concurrently from totalElementsToDequeue GRs
// 3 - Verifies the final len, should be equal to totalElementsToEnqueue - totalElementsToDequeue
Expand Down Expand Up @@ -376,6 +378,39 @@ func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithEmptyQueue()
}
}

// calling DequeueOrWaitForNextElement with empty queue, then adding an item directly into queue's internal channel
func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithStuckWaitChan() {
var (
dummyValue = "dummyValue"
doneChan = make(chan struct{})
)

// consumer
go func(queue *FixedFIFO, expectedValue interface{}, done chan struct{}) {
item, err := queue.DequeueOrWaitForNextElement()
suite.NoError(err)
suite.Equal(expectedValue, item)

done <- struct{}{}
}(suite.fifo, dummyValue, doneChan)

// a second should be enough for the consumer to start consuming ...
time.Sleep(time.Second)

// add an item (enqueue) directly into queue's internal channel
suite.fifo.queue <- dummyValue

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

select {
case <-doneChan:

case <-ctx.Done():
suite.Fail("too much time waiting ...")
}
}

// single GR calling DequeueOrWaitForNextElement (WaitForNextElementChanCapacity + 1) times, last one should return error
func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithFullWaitingChannel() {
// enqueue WaitForNextElementChanCapacity listeners to future enqueued elements
Expand Down Expand Up @@ -554,4 +589,4 @@ func (suite *FixedFIFOTestSuite) TestContextAlreadyCanceled() {
case <-time.After(2 * time.Second):
suite.Fail("DequeueOrWaitForNextElementContext did not return immediately after context was canceled")
}
}
}
19 changes: 6 additions & 13 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.3-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.3") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.7.0-yellowgreen.svg?style=flat "goconcurrentqueue v0.7.0") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)

# goconcurrentqueue - Concurrent safe queues
The package goconcurrentqueue offers a public interface Queue with methods for a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)).
Expand All @@ -22,18 +22,7 @@ Execute
go get github.com/enriquebris/goconcurrentqueue
```

This package is compatible with the following golang versions:
- 1.7.x
- 1.8.x
- 1.9.x
- 1.10.x
- 1.11.x
- 1.12.x
- 1.13.x
- 1.14.x
- 1.15.x
- 1.16.x
- 1.17.x
This package is compatible with all golang versions >= 1.7.x

## Documentation
Visit [goconcurrentqueue at go.dev](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue)
Expand Down Expand Up @@ -245,6 +234,10 @@ func workWithQueue(queue goconcurrentqueue.Queue) error {

## History

### v0.7.0

- Prevents FIFO.DequeueOrWaitForNextElement to keep waiting for a waitChan while internal queues contain items

### v0.6.3

- Prevents FIFO.DequeueOrWaitForNextElement to add useless wait channels
Expand Down

0 comments on commit c09fe97

Please sign in to comment.