Skip to content

Commit

Permalink
fix: ring buffer deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
IronCore864 committed Oct 15, 2024
1 parent c3cb594 commit 9cb6317
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions internals/servicelog/ringbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func NewRingBuffer(size int) *RingBuffer {
func (rb *RingBuffer) Close() error {
rb.rwlock.Lock()
defer rb.rwlock.Unlock()
rb.iteratorMutex.Lock()
defer rb.iteratorMutex.Unlock()
if rb.writeClosed {
return nil
}
Expand All @@ -73,6 +75,10 @@ func (rb *RingBuffer) Close() error {
func (rb *RingBuffer) Closed() bool {
rb.rwlock.RLock()
defer rb.rwlock.RUnlock()
return rb.closed()
}

func (rb *RingBuffer) closed() bool {
return rb.writeClosed
}

Expand Down Expand Up @@ -158,6 +164,10 @@ func (rb *RingBuffer) Size() int {
func (rb *RingBuffer) Positions() (start RingPos, end RingPos) {
rb.rwlock.RLock()
defer rb.rwlock.RUnlock()
return rb.positions()
}

func (rb *RingBuffer) positions() (start RingPos, end RingPos) {
return rb.readIndex, rb.writeIndex
}

Expand Down Expand Up @@ -237,16 +247,18 @@ func (rb *RingBuffer) WriteTo(writer io.Writer, start RingPos) (next RingPos, n

// TailIterator returns an iterator from the tail of the buffer.
func (rb *RingBuffer) TailIterator() Iterator {
rb.rwlock.RLock()
defer rb.rwlock.RUnlock()
rb.iteratorMutex.Lock()
defer rb.iteratorMutex.Unlock()
start, _ := rb.Positions()
start, _ := rb.positions()
iter := &iterator{
rb: rb,
index: start,
nextChan: make(chan bool, 1),
closeChan: make(chan struct{}),
}
if rb.Closed() {
if rb.closed() {
close(iter.closeChan)
}
rb.iteratorList = append(rb.iteratorList, iter)
Expand All @@ -258,6 +270,8 @@ func (rb *RingBuffer) TailIterator() Iterator {
// backwards from the head.
func (rb *RingBuffer) HeadIterator(lines int) Iterator {
firstLine := rb.reverseLinePosition(lines)
rb.rwlock.RLock()
defer rb.rwlock.RUnlock()
rb.iteratorMutex.Lock()
defer rb.iteratorMutex.Unlock()
iter := &iterator{
Expand All @@ -266,7 +280,7 @@ func (rb *RingBuffer) HeadIterator(lines int) Iterator {
nextChan: make(chan bool, 1),
closeChan: make(chan struct{}),
}
if rb.Closed() {
if rb.closed() {
close(iter.closeChan)
}
rb.iteratorList = append(rb.iteratorList, iter)
Expand Down Expand Up @@ -339,8 +353,6 @@ func (rb *RingBuffer) signalIterators() {
}

func (rb *RingBuffer) releaseIterators() {
rb.iteratorMutex.Lock()
defer rb.iteratorMutex.Unlock()
for _, iter := range rb.iteratorList {
// Close closeChan if not already closed
select {
Expand Down

0 comments on commit 9cb6317

Please sign in to comment.