Skip to content

Commit

Permalink
Clean up comments, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jan 30, 2025
1 parent de78d9f commit 3413ea3
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 38 deletions.
22 changes: 9 additions & 13 deletions pkg/stanza/fileconsumer/internal/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestUntermintedLongLogEntry(t *testing.T) {
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period
// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Second ReadToEnd should emit the full untruncated token
Expand All @@ -273,9 +273,8 @@ func TestUntermintedLogEntryGrows(t *testing.T) {
temp := filetest.OpenTemp(t, tempDir)

// Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize
content := filetest.TokenWithLength(20 * 1024) // 20KB
additionalContext := filetest.TokenWithLength(1024) // 1KB
_, err := temp.WriteString(string(content)) // no newline
content := filetest.TokenWithLength(20 * 1024) // 20KB
_, err := temp.WriteString(string(content)) // no newline
require.NoError(t, err)

// Use a controlled clock. It advances by 1ns each time Now() is called, which may happen
Expand All @@ -302,25 +301,22 @@ func TestUntermintedLogEntryGrows(t *testing.T) {
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period
// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Write additional unterminated content to the file. We want to ensure this
// is all picked up in the same token.
// Importantly, this resets the flush timer so the next call still will not
// return anything
// Write additional unterminated content to ensure all is picked up in the same token
// The flusher should notice new data and not return anything on the next call
additionalContext := filetest.TokenWithLength(1024)
_, err = temp.WriteString(string(additionalContext)) // no newline
require.NoError(t, err)

// Next ReadToEnd should STILL not emit anything as flush period has been extended
// because we saw more data than last time
r.ReadToEnd(context.Background())
sink.ExpectNoCalls(t)

// Advance time past the flush period
// Advance time past the flush period to test behavior after timer is expired
clock.Advance(2 * flushPeriod)

// Finally, since we haven't seen new data, we should emit the token
// Finally, since we haven't seen new data, flusher should emit the token
r.ReadToEnd(context.Background())
sink.ExpectToken(t, append(content, additionalContext...))

Expand Down
10 changes: 0 additions & 10 deletions pkg/stanza/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ type State struct {
LastDataLength int
}

func (s *State) Copy() *State {
if s == nil {
return nil
}
return &State{
LastDataChange: s.LastDataChange,
LastDataLength: s.LastDataLength,
}
}

// Func wraps a bufio.SplitFunc with a timer.
// When the timer expires, an incomplete token may be returned.
// The timer will reset any time the data parameter changes.
Expand Down
20 changes: 5 additions & 15 deletions pkg/stanza/tokenlen/tokenlen.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@ type State struct {
PotentialLength int
}

func (s *State) Copy() *State {
if s == nil {
return nil
}
return &State{
PotentialLength: s.PotentialLength,
}
}

// Func wraps a bufio.SplitFunc to track potential token lengths
// Records the length of the data before delegating to the wrapped function
func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc {
Expand All @@ -27,20 +18,19 @@ func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc {
}

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
// Note the potential token length but don't update state yet
// Note the potential token length but don't update state until we know
// whether or not a token is actually returned
potentialLen := len(data)

// Delegate to the wrapped split function
advance, token, err = splitFunc(data, atEOF)

// Only update state if we didn't find a token (delegate returned 0, nil, nil)
if advance == 0 && token == nil && err == nil {
// The splitFunc is asking for more data. Remember how much
// we saw previously so the buffer can be sized appropriately.
s.PotentialLength = potentialLen
} else {
// Clear the state if we got a token or error
// A token was returned. This state represented that token, so clear it.
s.PotentialLength = 0
}

return advance, token, err
}
}
91 changes: 91 additions & 0 deletions pkg/stanza/tokenlen/tokenlen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tokenlen

import (
"bufio"
"testing"

"github.com/stretchr/testify/require"
)

func TestTokenLenState_Func(t *testing.T) {
cases := []struct {
name string
input []byte
atEOF bool
expectedLen int
expectedToken []byte
expectedAdv int
expectedErr error
}{
{
name: "no token yet",
input: []byte("partial"),
atEOF: false,
expectedLen: len("partial"),
},
{
name: "complete token",
input: []byte("complete\ntoken"),
atEOF: false,
expectedLen: 0, // should clear state after finding token
expectedToken: []byte("complete"),
expectedAdv: len("complete\n"),
},
{
name: "growing token",
input: []byte("growing"),
atEOF: false,
expectedLen: len("growing"),
},
{
name: "flush at EOF",
input: []byte("flush"),
atEOF: true,
expectedLen: 0, // should clear state after flushing
expectedToken: []byte("flush"),
expectedAdv: len("flush"),
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
state := &State{}
splitFunc := state.Func(bufio.ScanLines)

adv, token, err := splitFunc(tc.input, tc.atEOF)
require.Equal(t, tc.expectedErr, err)
require.Equal(t, tc.expectedToken, token)
require.Equal(t, tc.expectedAdv, adv)
require.Equal(t, tc.expectedLen, state.PotentialLength)
})
}
}

func TestTokenLenState_GrowingToken(t *testing.T) {
state := &State{}
splitFunc := state.Func(bufio.ScanLines)

// First call with partial token
adv, token, err := splitFunc([]byte("part"), false)
require.NoError(t, err)
require.Nil(t, token)
require.Equal(t, 0, adv)
require.Equal(t, len("part"), state.PotentialLength)

// Second call with longer partial token
adv, token, err = splitFunc([]byte("partial"), false)
require.NoError(t, err)
require.Nil(t, token)
require.Equal(t, 0, adv)
require.Equal(t, len("partial"), state.PotentialLength)

// Final call with complete token
adv, token, err = splitFunc([]byte("partial\ntoken"), false)
require.NoError(t, err)
require.Equal(t, []byte("partial"), token)
require.Equal(t, len("partial\n"), adv)
require.Equal(t, 0, state.PotentialLength) // State should be cleared after emitting token
}

0 comments on commit 3413ea3

Please sign in to comment.