diff --git a/internal/ingest/kafkaingest/collector.go b/internal/ingest/kafkaingest/collector.go index 50db7c0cb..788b8eece 100644 --- a/internal/ingest/kafkaingest/collector.go +++ b/internal/ingest/kafkaingest/collector.go @@ -152,7 +152,19 @@ func KafkaProducerGroup(ctx context.Context, producer *kafka.Producer, logger *s // as the underlying client will automatically try to // recover from any errors encountered, the application // does not need to take action on them. - logger.Error("kafka error", "error", ev) + attrs := []any{ + slog.Int("code", int(ev.Code())), + slog.String("error", ev.Error()), + } + + // Log Kafka client "local" errors on warning level as those are mostly informational and the client is + // able to handle/recover from them automatically. + // See: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L415 + if ev.Code() <= -100 { + logger.Warn("kafka local error", attrs...) + } else { + logger.Error("kafka broker error", attrs...) + } } case <-ctx.Done(): return ctx.Err()