Skip to content

Commit

Permalink
Reuse notify channel when blocked on read
Browse files Browse the repository at this point in the history
  • Loading branch information
paulwe committed Apr 28, 2024
1 parent 465cf31 commit c72ccc1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 28 deletions.
45 changes: 23 additions & 22 deletions packetio/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type Buffer struct {
data []byte
head, tail int

notify chan struct{} // non-nil when we have blocked readers
closed bool
notify chan struct{}
waiting bool
closed bool

count int
limitCount, limitSize int
Expand All @@ -56,6 +57,7 @@ const (
// NewBuffer creates a new Buffer.
func NewBuffer() *Buffer {
return &Buffer{
notify: make(chan struct{}, 1),
readDeadline: deadline.New(),
}
}
Expand Down Expand Up @@ -149,14 +151,6 @@ func (b *Buffer) Write(packet []byte) (int, error) {
}
}

var notify chan struct{}
if b.notify != nil {
// Prepare to notify readers, but only
// actually do it after we release the lock.
notify = b.notify
b.notify = nil
}

// store the length of the packet
b.data[b.tail] = uint8(len(packet) >> 8)
b.tail++
Expand All @@ -178,10 +172,17 @@ func (b *Buffer) Write(packet []byte) (int, error) {
b.tail = m
}
b.count++

waiting := b.waiting
b.waiting = false

b.mutex.Unlock()

if notify != nil {
close(notify)
if waiting {
select {
case b.notify <- struct{}{}:
default:
}
}

return len(packet), nil
Expand Down Expand Up @@ -244,7 +245,7 @@ func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
}

b.count--

b.waiting = false
b.mutex.Unlock()

if copied < count {
Expand All @@ -258,16 +259,13 @@ func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
return 0, io.EOF
}

if b.notify == nil {
b.notify = make(chan struct{})
}
notify := b.notify
b.waiting = true
b.mutex.Unlock()

select {
case <-b.readDeadline.Done():
return 0, &netError{ErrTimeout, true, true}
case <-notify:
case <-b.notify:
}
}
}
Expand All @@ -282,14 +280,17 @@ func (b *Buffer) Close() (err error) {
return nil
}

notify := b.notify
b.notify = nil
waiting := b.waiting
b.waiting = false
b.closed = true

b.mutex.Unlock()

if notify != nil {
close(notify)
if waiting {
select {
case b.notify <- struct{}{}:
default:
}
}

return nil
Expand Down
12 changes: 6 additions & 6 deletions packetio/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,10 @@ func TestBufferAlloc(t *testing.T) {
}
}

t.Run("100 writes", test(w, 100, 10))
t.Run("100 writes", test(w, 100, 11))
t.Run("200 writes", test(w, 200, 14))
t.Run("400 writes", test(w, 400, 16))
t.Run("1000 writes", test(w, 1000, 20))
t.Run("400 writes", test(w, 400, 17))
t.Run("1000 writes", test(w, 1000, 21))

wr := func(count int) func() {
return func() {
Expand All @@ -451,9 +451,9 @@ func TestBufferAlloc(t *testing.T) {
}
}

t.Run("100 writes and reads", test(wr, 100, 4))
t.Run("1000 writes and reads", test(wr, 1000, 4))
t.Run("10000 writes and reads", test(wr, 10000, 4))
t.Run("100 writes and reads", test(wr, 100, 5))
t.Run("1000 writes and reads", test(wr, 1000, 5))
t.Run("10000 writes and reads", test(wr, 10000, 5))
}

func benchmarkBufferWR(b *testing.B, size int64, write bool, grow int) { // nolint:unparam
Expand Down

0 comments on commit c72ccc1

Please sign in to comment.