From b5b01d3eba8fca303e1775f3c7b8b9b0fc275332 Mon Sep 17 00:00:00 2001 From: ahrav Date: Mon, 24 Jul 2023 19:30:29 -0700 Subject: [PATCH] [chore] - optimize chunker (#1535) * Use chunkbytes that includes the size of peek. * linter. * continue. * add TotalChunkSize const. --- pkg/sources/chunker.go | 24 +++++++++++++++--------- pkg/sources/chunker_test.go | 12 ++++++++++++ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/pkg/sources/chunker.go b/pkg/sources/chunker.go index 8c5c58e1b24d..6cb39bb285e2 100644 --- a/pkg/sources/chunker.go +++ b/pkg/sources/chunker.go @@ -12,6 +12,8 @@ const ( ChunkSize = 10 * 1024 // PeekSize is the size of the peek into the previous chunk. PeekSize = 3 * 1024 + // TotalChunkSize is the total size of a chunk with peek data. + TotalChunkSize = ChunkSize + PeekSize ) // Chunker takes a chunk and splits it into chunks of ChunkSize. @@ -19,27 +21,31 @@ func Chunker(originalChunk *Chunk) chan *Chunk { chunkChan := make(chan *Chunk) go func() { defer close(chunkChan) - if len(originalChunk.Data) <= ChunkSize+PeekSize { + if len(originalChunk.Data) <= TotalChunkSize { chunkChan <- originalChunk return } r := bytes.NewReader(originalChunk.Data) reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize) for { - chunkBytes := make([]byte, ChunkSize) + chunkBytes := make([]byte, TotalChunkSize) chunk := *originalChunk + chunkBytes = chunkBytes[:ChunkSize] n, err := reader.Read(chunkBytes) if err != nil && !errors.Is(err, io.EOF) { break } - peekData, _ := reader.Peek(PeekSize) - chunk.Data = append(chunkBytes[:n], peekData...) - if n > 0 { - chunkChan <- &chunk - } - if errors.Is(err, io.EOF) { - break + if n == 0 { + if errors.Is(err, io.EOF) { + break + } + continue } + peekData, _ := reader.Peek(PeekSize) + copy(chunkBytes[n:], peekData) + chunk.Data = chunkBytes[:n+len(peekData)] + + chunkChan <- &chunk } }() return chunkChan diff --git a/pkg/sources/chunker_test.go b/pkg/sources/chunker_test.go index d25963ec79e7..7ec81119fbf3 100644 --- a/pkg/sources/chunker_test.go +++ b/pkg/sources/chunker_test.go @@ -72,3 +72,15 @@ func TestChunker(t *testing.T) { } } + +func BenchmarkChunker(b *testing.B) { + data := bytes.Repeat([]byte("a"), ChunkSize*100) + chunk := &Chunk{ + Data: data, + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + for range Chunker(chunk) { + } + } +}