Skip to content

Commit

Permalink
[chore] - optimize chunker (#1535)
Browse files Browse the repository at this point in the history
* Use chunkbytes that includes the size of peek.

* linter.

* continue.

* add TotalChunkSize const.
  • Loading branch information
ahrav authored Jul 25, 2023
1 parent 85f363f commit b5b01d3
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
24 changes: 15 additions & 9 deletions pkg/sources/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,40 @@ const (
ChunkSize = 10 * 1024
// PeekSize is the size of the peek into the previous chunk.
PeekSize = 3 * 1024
// TotalChunkSize is the total size of a chunk with peek data.
TotalChunkSize = ChunkSize + PeekSize
)

// Chunker takes a chunk and splits it into chunks of ChunkSize.
func Chunker(originalChunk *Chunk) chan *Chunk {
chunkChan := make(chan *Chunk)
go func() {
defer close(chunkChan)
if len(originalChunk.Data) <= ChunkSize+PeekSize {
if len(originalChunk.Data) <= TotalChunkSize {
chunkChan <- originalChunk
return
}
r := bytes.NewReader(originalChunk.Data)
reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize)
for {
chunkBytes := make([]byte, ChunkSize)
chunkBytes := make([]byte, TotalChunkSize)
chunk := *originalChunk
chunkBytes = chunkBytes[:ChunkSize]
n, err := reader.Read(chunkBytes)
if err != nil && !errors.Is(err, io.EOF) {
break
}
peekData, _ := reader.Peek(PeekSize)
chunk.Data = append(chunkBytes[:n], peekData...)
if n > 0 {
chunkChan <- &chunk
}
if errors.Is(err, io.EOF) {
break
if n == 0 {
if errors.Is(err, io.EOF) {
break
}
continue
}
peekData, _ := reader.Peek(PeekSize)
copy(chunkBytes[n:], peekData)
chunk.Data = chunkBytes[:n+len(peekData)]

chunkChan <- &chunk
}
}()
return chunkChan
Expand Down
12 changes: 12 additions & 0 deletions pkg/sources/chunker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,15 @@ func TestChunker(t *testing.T) {
}

}

func BenchmarkChunker(b *testing.B) {
data := bytes.Repeat([]byte("a"), ChunkSize*100)
chunk := &Chunk{
Data: data,
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for range Chunker(chunk) {
}
}
}

0 comments on commit b5b01d3

Please sign in to comment.