diff --git a/plugins/inputs/kinesis_consumer/kinesis_consumer.go b/plugins/inputs/kinesis_consumer/kinesis_consumer.go index 86e18721f0d2c..87b272e58b1fa 100644 --- a/plugins/inputs/kinesis_consumer/kinesis_consumer.go +++ b/plugins/inputs/kinesis_consumer/kinesis_consumer.go @@ -141,12 +141,15 @@ func (k *KinesisConsumer) Start(acc telegraf.Accumulator) error { shardUpdateInterval: time.Duration(k.ShardUpdateInterval), log: k.Log, onMessage: func(ctx context.Context, shard string, r *types.Record) { + // Checking for number of messages in flight and wait for a free + // slot in case there are too many select { case <-ctx.Done(): return case k.sem <- struct{}{}: break } + if err := k.onMessage(k.acc, shard, r); err != nil { seqnr := *r.SequenceNumber k.Log.Errorf("Processing message with sequence number %q in shard %s failed: %v", seqnr, shard, err)