From 75a50666da5fb9dee05e28dbeb0101ace0918fef Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 22 Jan 2025 16:36:20 +0800 Subject: [PATCH] kafka(ticdc): sarama do not retry if produce message failed to prevent out of order (#11870) (#12026) close pingcap/tiflow#11935 --- cdc/sink/mq/producer/kafka/config.go | 4 ++-- pkg/logutil/log.go | 11 ++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/cdc/sink/mq/producer/kafka/config.go b/cdc/sink/mq/producer/kafka/config.go index 6967c3c4695..fcecc8109b7 100644 --- a/cdc/sink/mq/producer/kafka/config.go +++ b/cdc/sink/mq/producer/kafka/config.go @@ -455,14 +455,14 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { // For kafka cluster with a bad network condition, producer should not try to // waster too much time on sending a message, get response no matter success // or fail as soon as possible is preferred. - config.Producer.Retry.Max = 3 - config.Producer.Retry.Backoff = 100 * time.Millisecond + config.Producer.Retry.Max = 0 // make sure sarama producer flush messages as soon as possible. config.Producer.Flush.Bytes = 0 config.Producer.Flush.Messages = 0 config.Producer.Flush.Frequency = time.Duration(0) + config.Net.MaxOpenRequests = 1 config.Net.DialTimeout = c.DialTimeout config.Net.WriteTimeout = c.WriteTimeout config.Net.ReadTimeout = c.ReadTimeout diff --git a/pkg/logutil/log.go b/pkg/logutil/log.go index e4e59cc0946..8526530b7ca 100644 --- a/pkg/logutil/log.go +++ b/pkg/logutil/log.go @@ -195,14 +195,11 @@ func ZapErrorFilter(err error, filterErrors ...error) zap.Field { // initSaramaLogger hacks logger used in sarama lib func initSaramaLogger(level zapcore.Level) error { - // only available less than info level - if !zapcore.InfoLevel.Enabled(level) { - logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level) - if err != nil { - return errors.Trace(err) - } - sarama.Logger = logger + logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level) + if err != nil { + return errors.Trace(err) } + sarama.Logger = logger return nil }