Skip to content

Commit

Permalink
Merge pull request #2 from paaaaay5/stats_test
Browse files Browse the repository at this point in the history
[add] stats test
  • Loading branch information
paaaaay5 authored Feb 28, 2024
2 parents bfeb7c9 + fcfe8c2 commit f1d0704
Showing 1 changed file with 93 additions and 2 deletions.
95 changes: 93 additions & 2 deletions firequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ type testFirehose struct {
counter uint32
}

// fhErrorRate is a rate of error occurrence.
// 0 means no error, 10 means always error.
var fhErrorRate = 3

func (tf *testFirehose) PutRecordBatch(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) {
if rand.Intn(10) > 3 {
if rand.Intn(10) >= fhErrorRate {
return &firehose.PutRecordBatchOutput{
FailedPutCount: aws.Int64(0),
}, nil
Expand All @@ -58,7 +62,7 @@ func (tf *testFirehose) PutRecordBatch(input *firehose.PutRecordBatchInput) (*fi
inputLength := len(input.Records)
firehoseResp := make([]*firehose.PutRecordBatchResponseEntry, inputLength)
for i := 0; i < inputLength; i++ {
if rand.Intn(10) < 2 {
if rand.Intn(10) < fhErrorRate {
firehoseResp[i] = &firehose.PutRecordBatchResponseEntry{
ErrorCode: aws.String("test_error"),
ErrorMessage: aws.String("test_error_message"),
Expand Down Expand Up @@ -87,6 +91,7 @@ func (tf *testFirehose) PutRecordBatchWithContext(_ context.Context, r *firehose

// success
func TestQueue(t *testing.T) {
fhErrorRate = 3
testCases := []struct {
name string
times int
Expand Down Expand Up @@ -317,3 +322,89 @@ func TestQueue_DrainProcessPara(t *testing.T) {
})
}
}

// Test for stats work correctly.
var qLength bool

func TestStats(t *testing.T) {
tf := &testFirehose{}
q := firequeue.New(tf, "env", firequeue.BatchInterval(1*time.Second))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go q.Loop(ctx)
time.Sleep(1000 * time.Millisecond)

for i := 0; i < 100; i++ {
err := q.Enqueue(&firehose.Record{
Data: []byte("test"),
})
if err != nil {
t.Errorf("error should not be occurred but: %s", err)
}
if q.Stats().QueueLength > 0 {
qLength = true
}
}
time.Sleep(10 * time.Second)
stats := q.Stats()
if stats.Success != 100 {
t.Log(stats)
t.Errorf("Not all records were success. expected: %d, actual: %d", 100, stats.Success)
}
if stats.BatchLength == 0 {
t.Errorf("BatchLength should be more than 0 but: %d", stats.BatchLength)
}
if !qLength {
t.Errorf("QueueLength should be true: %t", qLength)
}
t.Log(q.Stats())
}

func TestStats_QueueFullError(t *testing.T) {
tf := &testFirehose{}
q := firequeue.New(tf, "env", firequeue.BatchInterval(10*time.Second), firequeue.MaxQueueLength(1))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go q.Loop(ctx)
time.Sleep(1000 * time.Millisecond)

err := q.Enqueue(&firehose.Record{})
if err != nil {
t.Errorf("error should not be occurred but: %s", err)
}
err = q.Enqueue(&firehose.Record{})
if err == nil {
t.Errorf("error should be occurred but not")
}
stats := q.Stats()
if stats.QueueFullError != 1 {
t.Errorf("QueueFullError should be 1 but: %d", stats.QueueFullError)
}
t.Log(stats)
}

func TestStat_RetryCount(t *testing.T) {
// It means always return retryable error.
fhErrorRate = 10
tf := &testFirehose{}
q := firequeue.New(tf, "env")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go q.Loop(ctx)
time.Sleep(1000 * time.Millisecond)
for i := 0; i < 10; i++ {
err := q.Enqueue(&firehose.Record{})
if err != nil {
t.Errorf("error should not be occurred but: %s", err)
}
}
time.Sleep(5 * time.Second)
stats := q.Stats()
if stats.RetryCount == 0 {
t.Errorf("retryCount should be more than 0 but: %d", stats.RetryCount)
}
t.Log(stats)
}

0 comments on commit f1d0704

Please sign in to comment.