Skip to content

Commit

Permalink
introduce ringbuffer and refactor repo structure
Browse files Browse the repository at this point in the history
  • Loading branch information
marctrem committed Sep 6, 2024
1 parent 3857dc3 commit 2d21b06
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 50 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ A kitchen sink of data structures and algorithms in Go

# Contents

## io
### LineReader
Read lines from a reader while truncating lines that exceed the destination buffer's size.
* collections
* io
* math
126 changes: 126 additions & 0 deletions collections/ringbuffer/ringbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package ringbuffer

import (
"fmt"

armath "github.com/asymmetric-research/go-commons/math"
)

type T[C any] struct {
buflen uint
buf []C

// head points to the next free slot
head uint
}

func New[C any](size int) (*T[C], error) {
ret := &T[C]{}
return ret, NewInto(ret, make([]C, size))

}

// NewInto assumes that
func NewInto[C any](dst *T[C], buf []C) error {
if len(buf) <= 0 {
return fmt.Errorf("backing buffer must have a greater than zero")
}
*dst = T[C]{
buflen: uint(len(buf)),
buf: buf,
head: 0,
}
return nil
}

func (r *T[C]) Push(item C) {
r.buf[r.head%r.buflen] = item
r.head += 1
}

func (r *T[C]) Last(dst []C) int {
// how many entries can we write?
maxWritable := armath.Min(r.head, r.buflen)

// if the dst is larger than the amount of entries we can write, let's clamp it.
if len(dst) > int(maxWritable) {
// only consider the first available slots of dst
dst = dst[:maxWritable]
}

headmod := int(r.head % r.buflen)

// we must do at most 2 copies
n := 0
// copy the head of our internal buffer to the tail of dst
{
// end of src is the head slot
srcend := headmod

srcstart := armath.Max(0, headmod-len(dst))
src := r.buf[srcstart:srcend]

dststart := armath.Max(0, len(dst)-headmod)
dst := dst[dststart:]

n += copy(dst, src)
}

// if we haven't filled the buffer, copy the tail of our internal buffer to the head of dst
if n != len(dst) {
// copy start of src to end of dst
dst := dst[:len(dst)-n]

srcstart := int(maxWritable) - len(dst)
src := r.buf[srcstart:]

n += copy(dst, src)
}

return n
}

func (r *T[C]) Len() uint {
used := armath.Min(r.buflen, r.head)
return used
}

type SeqMode int

const (
SEQ_MODE_FIFO SeqMode = iota
SEQ_MODE_FILO
)

func (r *T[C]) Seq(seqMode SeqMode) func(yield func(uint, C) bool) {
return func(yield func(uint, C) bool) {
if r.buflen == 0 {
return
}

// how many entries can we write?
maxWritable := armath.Min(r.head, r.buflen)

if seqMode == SEQ_MODE_FIFO {
start := (((r.head - 1) % r.buflen) - maxWritable) % r.buflen

for i := range maxWritable {
idx := (start + i) % r.buflen
if !yield(i, r.buf[idx]) {
return
}
}
return
}
if seqMode == SEQ_MODE_FILO {
start := r.head - 1
for i := range maxWritable {
idx := (start - i) % r.buflen
if !yield(i, r.buf[idx]) {
return
}
}
return
}
}
}
132 changes: 132 additions & 0 deletions collections/ringbuffer/ringbuffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package ringbuffer

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestRange(t *testing.T) {
rb, err := New[string](5)
require.NoError(t, err)

for i := range 7 {
rb.Push(fmt.Sprintf("%d", i))
}

expected := []string{"2", "3", "4", "5", "6"}
for i, v := range rb.Seq(SEQ_MODE_FIFO) {
require.Equal(t, expected[i], v)
}

expected = []string{"6", "5", "4", "3", "2"}
for i, v := range rb.Seq(SEQ_MODE_FILO) {
require.Equal(t, expected[i], v)
}
}

func TestCycledRingBuffer(t *testing.T) {
rb, err := New[string](5)
require.NoError(t, err)

for i := range 7 {
rb.Push(fmt.Sprintf("%d", i))
}

// ask for 3 last items
lastN := make([]string, 3)
n := rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{"4", "5", "6"}, lastN)

// ask for 5 last
lastN = make([]string, 5)
n = rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{"2", "3", "4", "5", "6"}, lastN)

// ask for 2 last
lastN = make([]string, 2)
n = rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{"5", "6"}, lastN)

// ask for 1 last
lastN = make([]string, 1)
n = rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{"6"}, lastN)
}

func TestNotFilledRingBuffer(t *testing.T) {
rb, err := New[string](5)
require.NoError(t, err)

for i := range 3 {
rb.Push(fmt.Sprintf("%d", i))
}

// ask for 3 last items
lastN := make([]string, 3)
n := rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{"0", "1", "2"}, lastN)

// ask for 5 last
lastN = make([]string, 5)
n = rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{"0", "1", "2"}, lastN)

// ask for 2 last
lastN = make([]string, 2)
n = rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{"1", "2"}, lastN)

// ask for 1 last
lastN = make([]string, 1)
n = rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{"2"}, lastN)

expected := []string{"0", "1", "2"}
for i, v := range rb.Seq(SEQ_MODE_FIFO) {
require.Equal(t, expected[i], v)
}
}

func TestEmptyRingBuffer(t *testing.T) {
rb, err := New[string](5)
require.NoError(t, err)

// ask for 3 last items
lastN := make([]string, 3)
n := rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{}, lastN)

// ask for 5 last
lastN = make([]string, 5)
n = rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{}, lastN)

// ask for 2 last
lastN = make([]string, 2)
n = rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{}, lastN)

// ask for 1 last
lastN = make([]string, 1)
n = rb.Last(lastN)
lastN = lastN[:n]
require.Equal(t, []string{}, lastN)
}

func TestRingbufferWithoutRoom(t *testing.T) {
_, err := New[string](0)
require.Error(t, err)
}
32 changes: 0 additions & 32 deletions io/README.md

This file was deleted.

30 changes: 30 additions & 0 deletions io/linereader/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# LineReader
## Usage
```go
import "github.com/asymmetric-research/go-commons/io/linereader"

lr := linereader.New(reader, 4096 /* blockSize */)
line := [12288]byte{}

var err error
for err == nil {
n, ntrunc, err := lr.Read(line[:])
lastline := line[:n]
fmt.Println("%d bytes didn't fit", ntrunc)
}
```

## Benchmarks
```
go test -benchmem -benchtime=5s -bench=. ./io/linereader/...
goos: linux
goarch: amd64
pkg: github.com/asymmetric-research/go-commons/io/linereader
cpu: AMD Ryzen 9 5950X 16-Core Processor
BenchmarkLineReaderUnbuffered-32 1241234 4680 ns/op 22560 B/op 5 allocs/op
BenchmarkHashicorpsUnbuffered-32 4722 1349399 ns/op 2295410 B/op 29602 allocs/op
BenchmarkGoCmdUnbuffered-32 234085 24217 ns/op 41636 B/op 289 allocs/op
BenchmarkLineReaderLargeReads-32 2210827 2713 ns/op 12328 B/op 4 allocs/op
BenchmarkHashicorpsLargeReads-32 4208 1406119 ns/op 2285073 B/op 29601 allocs/op
BenchmarkGoCmdLargeReads-32 274774 21724 ns/op 31563 B/op 292 allocs/op
```
18 changes: 9 additions & 9 deletions io/line_reader.go → io/linereader/linereader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io
package linereader

import (
"bytes"
Expand All @@ -7,21 +7,21 @@ import (
armath "github.com/asymmetric-research/go-commons/math"
)

type LineReader struct {
type T struct {
reader io.Reader
readbufbase []byte
readbuf []byte
blocksize uint
}

func NewLineReader(reader io.Reader, blockSize uint) *LineReader {
lr := &LineReader{}
NewlineReaderInto(lr, reader, blockSize)
func New(reader io.Reader, blockSize uint) *T {
lr := &T{}
NewInto(lr, reader, blockSize)
return lr
}

func NewlineReaderInto(dst *LineReader, reader io.Reader, blockSize uint) {
*dst = LineReader{
func NewInto(dst *T, reader io.Reader, blockSize uint) {
*dst = T{
reader: reader,
readbufbase: make([]byte, blockSize),
blocksize: blockSize,
Expand All @@ -30,7 +30,7 @@ func NewlineReaderInto(dst *LineReader, reader io.Reader, blockSize uint) {

// Read reads as much as possible into p, until the next newline or EOF is reached.
// Every new call to read starts on a new line. The remainder of the previous line will be discarted.
func (lr *LineReader) Read(dst []byte) (nread int, ndiscarted int, err error) {
func (lr *T) Read(dst []byte) (nread int, ndiscarted int, err error) {
// copy as much of read buffer as possible to dst
if len(lr.readbuf) > 0 {
// fast path: can we get a new line from the read buffer?
Expand Down Expand Up @@ -98,7 +98,7 @@ func (lr *LineReader) Read(dst []byte) (nread int, ndiscarted int, err error) {
}
}

func (lr *LineReader) discardRestOfLine() int {
func (lr *T) discardRestOfLine() int {
// discard the rest of the line in the read buffer

if len(lr.readbuf) > 0 {
Expand Down
Loading

0 comments on commit 2d21b06

Please sign in to comment.