Skip to content

Commit

Permalink
Rewrited buffer to make enable repeating read & added tests (#11)
Browse files Browse the repository at this point in the history
* Added test for slow data source

* Added

* Fixed slow reader test

* Fixed slowReader.Read

* Removed outdated comments from readBuffer
  • Loading branch information
EinKrebs authored Oct 6, 2021
1 parent 031e035 commit b87ba1c
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 15 deletions.
62 changes: 62 additions & 0 deletions async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package json

import (
"encoding/json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io"
"testing"
"time"
)

var _ io.Reader = &slowReader{}

type slowReader struct {
src []byte
index int
len int
pause time.Duration
}

func newSlowReader(src []byte, pause time.Duration) *slowReader {
return &slowReader{
src: src,
len: 1,
pause: pause,
}
}

func (s *slowReader) Work() {
for i := s.len; i <= len(s.src); i++ {
time.Sleep(s.pause)
s.len = i
}
}

func (s *slowReader) Read(p []byte) (int, error) {
n := len(p)
readerLen := s.len
if s.index == len(s.src) {
return 0, io.EOF
}
if s.index+n <= readerLen {
copy(p, s.src[s.index:s.index+n])
s.index += n
return n, nil
}
length := readerLen - s.index
copy(p, s.src[s.index:readerLen])
s.index = readerLen
return length, nil
}

func TestUnmarshal_SlowReader(t *testing.T) {
initBig()
data := newSlowReader(jsonBig, time.Microsecond)
res := new(map[string]interface{})
expected := new(map[string]interface{})
require.NoError(t, json.Unmarshal(jsonBig, expected))
go data.Work()
require.NoError(t, Unmarshal(data, res))
assert.Equal(t, expected, res)
}
39 changes: 24 additions & 15 deletions internal/readbuffer/readbuffer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package readbuffer

import "io"
import (
"io"
"time"
)

const readBufSize = 1 << 10
const (
readBufSize = 1 << 10
readTimeout = 100 * time.Millisecond
)

type ReadBuffer struct {
buf []byte
Expand All @@ -24,21 +30,24 @@ func (r *ReadBuffer) Get(n int) ([]byte, error) {
r.index += n
return res, nil
}
got := make([]byte, r.len-r.index)
copy(got, r.buf[r.index:r.len])
res := make([]byte, r.len-r.index)
copy(res, r.buf[r.index:r.len])
r.index = r.len
n -= len(got)
if err := r.load(); err != nil {
return got, err
}
if r.len-r.index >= n {
res := append(got, r.buf[r.index:r.index+n]...)
r.index += n
return res, nil
n -= len(res)
var err error
for err = r.load(); err == nil; err = r.load() {
if r.len-r.index >= n {
res = append(res, r.buf[r.index:r.index+n]...)
r.index += n
return res, nil
}

res = append(res, r.buf[r.index:r.len]...)
n -= r.len - r.index
r.index = r.len
time.Sleep(readTimeout)
}
res := append(got, r.buf[r.index:r.len]...)
r.index = r.len
return res, nil
return res, err
}

func (r *ReadBuffer) load() error {
Expand Down

0 comments on commit b87ba1c

Please sign in to comment.