diff --git a/firequeue_test.go b/firequeue_test.go index 99ede33..600eec5 100644 --- a/firequeue_test.go +++ b/firequeue_test.go @@ -47,8 +47,12 @@ type testFirehose struct { counter uint32 } +// fhErrorRate is a rate of error occurrence. +// 0 means no error, 10 means always error. +var fhErrorRate = 3 + func (tf *testFirehose) PutRecordBatch(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error) { - if rand.Intn(10) > 3 { + if rand.Intn(10) >= fhErrorRate { return &firehose.PutRecordBatchOutput{ FailedPutCount: aws.Int64(0), }, nil @@ -58,7 +62,7 @@ func (tf *testFirehose) PutRecordBatch(input *firehose.PutRecordBatchInput) (*fi inputLength := len(input.Records) firehoseResp := make([]*firehose.PutRecordBatchResponseEntry, inputLength) for i := 0; i < inputLength; i++ { - if rand.Intn(10) < 2 { + if rand.Intn(10) < fhErrorRate { firehoseResp[i] = &firehose.PutRecordBatchResponseEntry{ ErrorCode: aws.String("test_error"), ErrorMessage: aws.String("test_error_message"), @@ -87,6 +91,7 @@ func (tf *testFirehose) PutRecordBatchWithContext(_ context.Context, r *firehose // success func TestQueue(t *testing.T) { + fhErrorRate = 3 testCases := []struct { name string times int @@ -317,3 +322,89 @@ func TestQueue_DrainProcessPara(t *testing.T) { }) } } + +// Test for stats work correctly. +var qLength bool + +func TestStats(t *testing.T) { + tf := &testFirehose{} + q := firequeue.New(tf, "env", firequeue.BatchInterval(1*time.Second)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go q.Loop(ctx) + time.Sleep(1000 * time.Millisecond) + + for i := 0; i < 100; i++ { + err := q.Enqueue(&firehose.Record{ + Data: []byte("test"), + }) + if err != nil { + t.Errorf("error should not be occurred but: %s", err) + } + if q.Stats().QueueLength > 0 { + qLength = true + } + } + time.Sleep(10 * time.Second) + stats := q.Stats() + if stats.Success != 100 { + t.Log(stats) + t.Errorf("Not all records were success. expected: %d, actual: %d", 100, stats.Success) + } + if stats.BatchLength == 0 { + t.Errorf("BatchLength should be more than 0 but: %d", stats.BatchLength) + } + if !qLength { + t.Errorf("QueueLength should be true: %t", qLength) + } + t.Log(q.Stats()) +} + +func TestStats_QueueFullError(t *testing.T) { + tf := &testFirehose{} + q := firequeue.New(tf, "env", firequeue.BatchInterval(10*time.Second), firequeue.MaxQueueLength(1)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go q.Loop(ctx) + time.Sleep(1000 * time.Millisecond) + + err := q.Enqueue(&firehose.Record{}) + if err != nil { + t.Errorf("error should not be occurred but: %s", err) + } + err = q.Enqueue(&firehose.Record{}) + if err == nil { + t.Errorf("error should be occurred but not") + } + stats := q.Stats() + if stats.QueueFullError != 1 { + t.Errorf("QueueFullError should be 1 but: %d", stats.QueueFullError) + } + t.Log(stats) +} + +func TestStat_RetryCount(t *testing.T) { + // It means always return retryable error. + fhErrorRate = 10 + tf := &testFirehose{} + q := firequeue.New(tf, "env") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go q.Loop(ctx) + time.Sleep(1000 * time.Millisecond) + for i := 0; i < 10; i++ { + err := q.Enqueue(&firehose.Record{}) + if err != nil { + t.Errorf("error should not be occurred but: %s", err) + } + } + time.Sleep(5 * time.Second) + stats := q.Stats() + if stats.RetryCount == 0 { + t.Errorf("retryCount should be more than 0 but: %d", stats.RetryCount) + } + t.Log(stats) +}