Skip to content

Commit

Permalink
kafka(ticdc): sarama do not retry if produce message failed to preven…
Browse files Browse the repository at this point in the history
…t out of order (#11870) (#11961)

close #11935
  • Loading branch information
ti-chi-bot authored Jan 9, 2025
1 parent 0eb416d commit 4f11a8e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
11 changes: 4 additions & 7 deletions pkg/logutil/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,11 @@ func initMySQLLogger() error {

// initSaramaLogger hacks logger used in sarama lib
func initSaramaLogger(level zapcore.Level) error {
// only available less than info level
if !zapcore.InfoLevel.Enabled(level) {
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
if err != nil {
return errors.Trace(err)
}
sarama.Logger = logger
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
if err != nil {
return errors.Trace(err)
}
sarama.Logger = logger
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/sink/kafka/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,18 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) {
// For kafka cluster with a bad network condition, producer should not try to
// waster too much time on sending a message, get response no matter success
// or fail as soon as possible is preferred.
config.Producer.Retry.Max = 3
config.Producer.Retry.Backoff = 100 * time.Millisecond
// According to the https://github.com/IBM/sarama/issues/2619,
// sarama may send message out of order even set the `config.Net.MaxOpenRequest` to 1,
// when the kafka cluster is unhealthy and trigger the internal retry mechanism.
config.Producer.Retry.Max = 0

// make sure sarama producer flush messages as soon as possible.
config.Producer.Flush.Bytes = 0
config.Producer.Flush.Messages = 0
config.Producer.Flush.Frequency = time.Duration(0)
config.Producer.Flush.MaxMessages = o.MaxMessages

config.Net.MaxOpenRequests = 1
config.Net.DialTimeout = o.DialTimeout
config.Net.WriteTimeout = o.WriteTimeout
config.Net.ReadTimeout = o.ReadTimeout
Expand Down

0 comments on commit 4f11a8e

Please sign in to comment.