diff --git a/inskinesis/README.md b/inskinesis/README.md index 96669dd..733cf11 100644 --- a/inskinesis/README.md +++ b/inskinesis/README.md @@ -65,6 +65,7 @@ Here's a quick guide on how to get started with the `inskinesis` package: | MaxGroup | 1 | The maximum number of concurrent groups for sending records. If you want to send records concurrently, set this value to a number greater than 1. | | RetryCount | 3 | The number of times to retry sending a batch of records to the stream. | | RetryInterval | 100 ms | The interval between retries. | +| Verbose | false | Whether to enable verbose logging. | Please note that `N/A` in the Default Value column indicates that these fields are required and do not have default values. diff --git a/inskinesis/inskinesis.go b/inskinesis/inskinesis.go index 547e7d4..48d0402 100644 --- a/inskinesis/inskinesis.go +++ b/inskinesis/inskinesis.go @@ -14,6 +14,7 @@ import ( ) const outputSeparator = byte('\n') +const errorChannelSize = 100 type KinesisInterface interface { PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) @@ -61,6 +62,8 @@ type stream struct { failedCount int // Counter for the number of failed record submissions. totalCount int // Counter for the total number of records sent to the stream. + + verbose bool // Verbose mode } type Config struct { @@ -73,6 +76,7 @@ type Config struct { MaxGroup int RetryCount int RetryWaitTime time.Duration + Verbose bool } // NewKinesis creates a new Kinesis stream. @@ -105,14 +109,16 @@ func NewKinesis(config Config) (StreamInterface, error) { wgLogChan: &sync.WaitGroup{}, wgBatchChan: &sync.WaitGroup{}, - logChannel: make(chan interface{}, 1000), + logChannel: make(chan interface{}, 2000), batchChannel: make(chan []interface{}, 100), - errChannel: make(chan error), - stopChannel: make(chan bool), - stopBatchChannel: make(chan bool), + errChannel: make(chan error, errorChannelSize), + stopChannel: make(chan bool, 10), + stopBatchChannel: make(chan bool, 10), retryCount: config.RetryCount, retryWaitTime: 100 * time.Millisecond, + + verbose: config.Verbose, } if s.logBufferSize == 0 { @@ -161,7 +167,9 @@ func (s *stream) startStreaming() { batches, err := createBatches(batch, s.maxStreamBatchSize, s.maxStreamBatchByteSize) if err != nil { - s.errChannel <- err + if len(s.errChannel) < errorChannelSize { + s.errChannel <- err + } s.wgLogChan.Done() continue } @@ -237,13 +245,15 @@ func (s *stream) sendSingleBatch(batch []interface{}, concurrentLimiter chan str failedCount, err := s.PutRecords(batch) s.failedCount += failedCount if err != nil { - fmt.Printf("Error sending records to Kinesis stream %s: %v\n", s.name, err) - s.errChannel <- err + s.printf("Error sending records to Kinesis stream %s: %v\n", s.name, err) + if len(s.errChannel) < errorChannelSize { + s.errChannel <- err + } return } - fmt.Printf("Sent %d records to Kinesis stream %s\n", len(batch), s.name) + s.printf("Sent %d records to Kinesis stream %s\n", len(batch), s.name) }() } @@ -255,7 +265,7 @@ func (s *stream) start() { func (s *stream) FlushAndStopStreaming() { s.stopAndWaitLogStreaming() - fmt.Printf("%d/%d records sent to Kinesis stream %s\n", s.totalCount-s.failedCount, s.totalCount, s.name) + s.printf("%d/%d records sent to Kinesis stream %s\n", s.totalCount-s.failedCount, s.totalCount, s.name) } // PutRecords sends records to the Kinesis stream. @@ -277,9 +287,9 @@ func (s *stream) Put(record interface{}) { } func (s *stream) putRecords(batch []*kinesis.PutRecordsRequestEntry, retryCount int) (int, error) { - fmt.Printf("Sending %d records to Kinesis stream %s\n", len(batch), s.name) + s.printf("Sending %d records to Kinesis stream %s\n", len(batch), s.name) if retryCount < 0 { - fmt.Printf("Retry count exceeded for Kinesis stream %s\n", s.name) + s.printf("Retry count exceeded for Kinesis stream %s\n", s.name) return len(batch), errors.New("retry count exceeded") } @@ -289,17 +299,17 @@ func (s *stream) putRecords(batch []*kinesis.PutRecordsRequestEntry, retryCount }) if err != nil { - fmt.Printf("Error sending records to Kinesis stream %s: %v\n", s.name, err) + s.printf("Error sending records to Kinesis stream %s: %v\n", s.name, err) return len(batch), err } if res != nil && res.FailedRecordCount != nil && *res.FailedRecordCount > 0 { - fmt.Printf("Failed to send %d records to Kinesis stream %s\n", *res.FailedRecordCount, s.name) + s.printf("Failed to send %d records to Kinesis stream %s\n", *res.FailedRecordCount, s.name) failedRecords := s.wrapWithPutRecordsRequestEntry(getFailedRecords(res, batch)) batch = failedRecords retryCount-- - fmt.Printf("Retrying %d records to Kinesis stream %s\n", len(batch), s.name) + s.printf("Retrying %d records to Kinesis stream %s\n", len(batch), s.name) time.Sleep(s.retryWaitTime) failed, err := s.putRecords(batch, retryCount) if err != nil { @@ -328,7 +338,7 @@ func (s *stream) transformRecords(records []interface{}) ([]*kinesis.PutRecordsR } if failedRecords > 0 { - fmt.Printf("Failed to transform %d records to Kinesis stream %s\n", failedRecords, s.name) + s.printf("Failed to transform %d records to Kinesis stream %s\n", failedRecords, s.name) } return transformedRecords, err @@ -366,5 +376,13 @@ func addOutputSeparatorIfNeeded(record []byte) []byte { if record[len(record)-1] != outputSeparator { return append(record, outputSeparator) } + return record } + +// custom printf if verbose mode is enabled +func (s *stream) printf(format string, a ...interface{}) { + if s.verbose { + fmt.Printf(format, a...) + } +}