Skip to content

Commit

Permalink
Merge pull request #10 from paaaaay5/support-batch-length
Browse files Browse the repository at this point in the history
Add Test for Stats
  • Loading branch information
mash authored Mar 13, 2024
2 parents 05d865c + 071f24b commit 9d7af8b
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 7 deletions.
2 changes: 2 additions & 0 deletions firequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type Queue struct {
func (q *Queue) Stats() Stats {
return Stats{
QueueLength: q.len(),
BatchLength: q.batchLength.Value(),
Success: q.successCount.Value(),
RetryCount: q.retryCount.Value(),
UnretryableError: q.unretryableErrorCount.Value(),
Expand Down Expand Up @@ -260,6 +261,7 @@ func (q *Queue) sendBatch(bf backoff.BackOff) time.Duration {
}
batch := q.createBatch()
q.batchLength.Set(int64(len(batch)))

if err := q.attemptToSendBatch(batch); err != nil {
// Handle non-retryable errors directly.
q.handleError(err, batch)
Expand Down
141 changes: 136 additions & 5 deletions firequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,27 @@ type testAWSError struct {
}

func (te *testAWSError) Error() string {
return "retryable error!"
return "retryable error"
}

type errCodeMode string

const (
retryable errCodeMode = "retryable"
unretryable errCodeMode = "unretryable"
)

var ecm = retryable

func (te *testAWSError) Code() string {
return request.ErrCodeResponseTimeout
switch ecm {
case retryable:
return request.ErrCodeResponseTimeout
case unretryable:
return request.CanceledErrorCode
default:
return ""
}
}

func (te *testAWSError) Message() string {
Expand All @@ -47,8 +63,12 @@ type testFirehose struct {
counter uint32
}

// fhErrorRate is a rate of error occurrence.
// 0 means no error, 10 means always error.
var fhErrorRate = 3

func (tf *testFirehose) PutRecordBatch(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
if rand.Intn(10) > 3 {
if rand.Intn(10) >= fhErrorRate {
return &firehose.PutRecordBatchOutput{
FailedPutCount: aws.Int64(0),
}, nil
Expand All @@ -58,7 +78,7 @@ func (tf *testFirehose) PutRecordBatch(input *firehose.PutRecordBatchInput) (*fi
inputLength := len(input.Records)
firehoseResp := make([]*firehose.PutRecordBatchResponseEntry, inputLength)
for i := 0; i < inputLength; i++ {
if rand.Intn(10) < 2 {
if rand.Intn(10) < fhErrorRate {
firehoseResp[i] = &firehose.PutRecordBatchResponseEntry{
ErrorCode: aws.String("test_error"),
ErrorMessage: aws.String("test_error_message"),
Expand Down Expand Up @@ -87,6 +107,7 @@ func (tf *testFirehose) PutRecordBatchWithContext(_ context.Context, r *firehose

// success
func TestQueue(t *testing.T) {
fhErrorRate = 3
testCases := []struct {
name string
times int
Expand Down Expand Up @@ -158,7 +179,7 @@ func TestQueue_Para(t *testing.T) {
{
name: "parallel 100",
times: 1000,
para: 10,
para: 100,
},
}

Expand Down Expand Up @@ -317,3 +338,113 @@ func TestQueue_DrainProcessPara(t *testing.T) {
})
}
}

// Test for stats work correctly.
var qLength bool

func TestStats(t *testing.T) {
tf := &testFirehose{}
q := firequeue.New(tf, "env", firequeue.BatchInterval(1*time.Second))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go q.Loop(ctx)
time.Sleep(1000 * time.Millisecond)

for i := 0; i < 100; i++ {
err := q.Enqueue(&firehose.Record{
Data: []byte("test"),
})
if err != nil {
t.Errorf("error should not be occurred but: %s", err)
}
if q.Stats().QueueLength > 0 {
qLength = true
}
}
time.Sleep(10 * time.Second)
stats := q.Stats()
if stats.Success != 100 {
t.Log(stats)
t.Errorf("Not all records were success. expected: %d, actual: %d", 100, stats.Success)
}
if stats.BatchLength == 0 {
t.Errorf("BatchLength should be more than 0 but: %d", stats.BatchLength)
}
if !qLength {
t.Errorf("QueueLength should be true: %t", qLength)
}
t.Log(q.Stats())
}

func TestStats_QueueFullError(t *testing.T) {
tf := &testFirehose{}
q := firequeue.New(tf, "env", firequeue.BatchInterval(10*time.Second), firequeue.MaxQueueLength(1))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go q.Loop(ctx)
time.Sleep(1000 * time.Millisecond)

err := q.Enqueue(&firehose.Record{})
if err != nil {
t.Errorf("error should not be occurred but: %s", err)
}
err = q.Enqueue(&firehose.Record{})
if err == nil {
t.Errorf("error should be occurred but not")
}
stats := q.Stats()
if stats.QueueFullError != 1 {
t.Errorf("QueueFullError should be 1 but: %d", stats.QueueFullError)
}
t.Log(stats)
}

func TestStat_RetryCount(t *testing.T) {
// It means always return retryable error.
fhErrorRate = 10
tf := &testFirehose{}
q := firequeue.New(tf, "env")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go q.Loop(ctx)
time.Sleep(1000 * time.Millisecond)
for i := 0; i < 10; i++ {
err := q.Enqueue(&firehose.Record{})
if err != nil {
t.Errorf("error should not be occurred but: %s", err)
}
}
time.Sleep(5 * time.Second)
stats := q.Stats()
if stats.RetryCount == 0 {
t.Errorf("retryCount should be more than 0 but: %d", stats.RetryCount)
}
t.Log(stats)
}

func TestStat_UnRetryableErrorCount(t *testing.T) {
// It means always return unretryable error.
fhErrorRate = 10
ecm = unretryable

tf := &testFirehose{}
q := firequeue.New(tf, "env", firequeue.BatchSize(1))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go q.Loop(ctx)
time.Sleep(1000 * time.Millisecond)
err := q.Enqueue(&firehose.Record{})
if err != nil {
t.Errorf("error should not be occurred but: %s", err)
}
time.Sleep(5 * time.Second)
stats := q.Stats()
if stats.UnretryableError == 0 {
t.Errorf("retryCount should be more than 0 but: %d", stats.RetryCount)
}
t.Log(stats)
}
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
2 changes: 1 addition & 1 deletion stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package firequeue
// Stats contains firequeue statistics.
type Stats struct {
QueueLength int
BatchLength int
BatchLength int64
Success int64
RetryCount int64
UnretryableError int64
Expand Down

0 comments on commit 9d7af8b

Please sign in to comment.