From 36c232681258c271b286c3632907d1fb1e099f7c Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 12 Dec 2024 17:56:45 +0800 Subject: [PATCH 1/3] set the sarama producer retry to 0 --- pkg/sink/kafka/sarama.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/sink/kafka/sarama.go b/pkg/sink/kafka/sarama.go index df8877412b3..98030d158d2 100644 --- a/pkg/sink/kafka/sarama.go +++ b/pkg/sink/kafka/sarama.go @@ -58,8 +58,10 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) { // For kafka cluster with a bad network condition, producer should not try to // waster too much time on sending a message, get response no matter success // or fail as soon as possible is preferred. - config.Producer.Retry.Max = 3 - config.Producer.Retry.Backoff = 100 * time.Millisecond + // According to the https://github.com/IBM/sarama/issues/2619, + // sarama may send message out of order even set the `config.Net.MaxOpenRequest` to 1, + // when the kafka cluster is unhealthy and trigger the internal retry mechanism. + config.Producer.Retry.Max = 0 // make sure sarama producer flush messages as soon as possible. config.Producer.Flush.Bytes = 0 @@ -67,6 +69,7 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) { config.Producer.Flush.Frequency = time.Duration(0) config.Producer.Flush.MaxMessages = o.MaxMessages + config.Net.MaxOpenRequests = 1 config.Net.DialTimeout = o.DialTimeout config.Net.WriteTimeout = o.WriteTimeout config.Net.ReadTimeout = o.ReadTimeout From 1478354091a3401ba921274d9a0bd17b3af78d54 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 26 Dec 2024 16:26:08 +0800 Subject: [PATCH 2/3] enable the sarama log --- pkg/logutil/log.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/logutil/log.go b/pkg/logutil/log.go index 1f7afe61d79..2ba39320cc1 100644 --- a/pkg/logutil/log.go +++ b/pkg/logutil/log.go @@ -16,12 +16,12 @@ package logutil import ( "bytes" "context" + "github.com/IBM/sarama" "io" "os" "strconv" "strings" - "github.com/IBM/sarama" "github.com/gin-gonic/gin" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" @@ -223,14 +223,11 @@ func initMySQLLogger() error { // initSaramaLogger hacks logger used in sarama lib func initSaramaLogger(level zapcore.Level) error { - // only available less than info level - if !zapcore.InfoLevel.Enabled(level) { - logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level) - if err != nil { - return errors.Trace(err) - } - sarama.Logger = logger + logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level) + if err != nil { + return errors.Trace(err) } + sarama.Logger = logger return nil } From f1d8b2e1c5de6a0123a838a58049717702ec6262 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 31 Dec 2024 14:21:29 +0800 Subject: [PATCH 3/3] fix make fmt --- pkg/logutil/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logutil/log.go b/pkg/logutil/log.go index 2ba39320cc1..138db763b9c 100644 --- a/pkg/logutil/log.go +++ b/pkg/logutil/log.go @@ -16,12 +16,12 @@ package logutil import ( "bytes" "context" - "github.com/IBM/sarama" "io" "os" "strconv" "strings" + "github.com/IBM/sarama" "github.com/gin-gonic/gin" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors"