Skip to content

Commit

Permalink
kafka(ticdc): sarama do not retry if produce message failed to preven…
Browse files Browse the repository at this point in the history
…t out of order (#11870) (#12026)

close #11935
  • Loading branch information
ti-chi-bot authored Jan 22, 2025
1 parent 79bd0c0 commit 75a5066
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 9 deletions.
4 changes: 2 additions & 2 deletions cdc/sink/mq/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,14 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
// For kafka cluster with a bad network condition, producer should not try to
// waster too much time on sending a message, get response no matter success
// or fail as soon as possible is preferred.
config.Producer.Retry.Max = 3
config.Producer.Retry.Backoff = 100 * time.Millisecond
config.Producer.Retry.Max = 0

// make sure sarama producer flush messages as soon as possible.
config.Producer.Flush.Bytes = 0
config.Producer.Flush.Messages = 0
config.Producer.Flush.Frequency = time.Duration(0)

config.Net.MaxOpenRequests = 1
config.Net.DialTimeout = c.DialTimeout
config.Net.WriteTimeout = c.WriteTimeout
config.Net.ReadTimeout = c.ReadTimeout
Expand Down
11 changes: 4 additions & 7 deletions pkg/logutil/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,11 @@ func ZapErrorFilter(err error, filterErrors ...error) zap.Field {

// initSaramaLogger hacks logger used in sarama lib
func initSaramaLogger(level zapcore.Level) error {
// only available less than info level
if !zapcore.InfoLevel.Enabled(level) {
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
if err != nil {
return errors.Trace(err)
}
sarama.Logger = logger
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
if err != nil {
return errors.Trace(err)
}
sarama.Logger = logger
return nil
}

Expand Down

0 comments on commit 75a5066

Please sign in to comment.