Skip to content

Commit

Permalink
ticdc: fix detecting kafka version (#11048)
Browse files Browse the repository at this point in the history
close #11047
  • Loading branch information
wk989898 authored May 17, 2024
1 parent 08aec53 commit ab6a9f9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 17 deletions.
1 change: 1 addition & 0 deletions pkg/sink/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type Options struct {
ReplicationFactor int16
Version string
IsAssignedVersion bool
RequestVersion int16
MaxMessageBytes int
Compression string
ClientID string
Expand Down
32 changes: 15 additions & 17 deletions pkg/sink/kafka/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) {
zap.String("desiredVersion", kafkaVersion.String()))
}
}

return config, nil
}

Expand Down Expand Up @@ -202,54 +201,53 @@ func getKafkaVersion(config *sarama.Config, o *Options) (sarama.KafkaVersion, er
})
}
for i := range addrs {
version, err := getKafkaVersionFromBroker(config, addrs[i])
version, err := getKafkaVersionFromBroker(config, o.RequestVersion, addrs[i])
if err == nil {
return version, err
}
}
return version, err
}

func getKafkaVersionFromBroker(config *sarama.Config, addr string) (sarama.KafkaVersion, error) {
func getKafkaVersionFromBroker(config *sarama.Config, requestVersion int16, addr string) (sarama.KafkaVersion, error) {
KafkaVersion := defaultKafkaVersion
broker := sarama.NewBroker(addr)
err := broker.Open(config)
defer func() {
broker.Close()
}()
if err != nil {
log.Warn("Kafka fail to open broker", zap.String("addr", addr))
log.Warn("Kafka fail to open broker", zap.String("addr", addr), zap.Error(err))
return KafkaVersion, err
}
apiResponse, err := broker.ApiVersions(&sarama.ApiVersionsRequest{Version: 3})
apiResponse, err := broker.ApiVersions(&sarama.ApiVersionsRequest{Version: requestVersion})
if err != nil {
log.Warn("Kafka fail to get ApiVersions", zap.String("addr", addr))
log.Warn("Kafka fail to get ApiVersions", zap.String("addr", addr), zap.Error(err))
return KafkaVersion, err
}
// ApiKey method
// 0 Produce
// 3 Metadata (default)
version := apiResponse.ApiKeys[3].MaxVersion
switch version {
case 10:
if version >= 10 {
KafkaVersion = sarama.V2_8_0_0
case 9:
} else if version >= 9 {
KafkaVersion = sarama.V2_4_0_0
case 8:
} else if version >= 8 {
KafkaVersion = sarama.V2_3_0_0
case 7:
} else if version >= 7 {
KafkaVersion = sarama.V2_1_0_0
case 6:
} else if version >= 6 {
KafkaVersion = sarama.V2_0_0_0
case 5:
} else if version >= 5 {
KafkaVersion = sarama.V1_0_0_0
case 3, 4:
} else if version >= 3 {
KafkaVersion = sarama.V0_11_0_0
case 2:
} else if version >= 2 {
KafkaVersion = sarama.V0_10_1_0
case 1:
} else if version >= 1 {
KafkaVersion = sarama.V0_10_0_0
case 0:
} else if version >= 0 {
KafkaVersion = sarama.V0_8_2_0
}
return KafkaVersion, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/kafka/sarama_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func TestSyncProducer(t *testing.T) {
o.IsAssignedVersion = true
o.BrokerEndpoints = []string{leader.Addr()}
o.ClientID = "sarama-test"
// specify request version for mock broker
o.RequestVersion = 3

f, err := NewSaramaFactory(o, model.DefaultChangeFeedID("sarama-test"))
require.NoError(t, err)
Expand Down Expand Up @@ -87,6 +89,8 @@ func TestAsyncProducer(t *testing.T) {
o.IsAssignedVersion = true
o.BrokerEndpoints = []string{leader.Addr()}
o.ClientID = "sarama-test"
// specify request version for mock broker
o.RequestVersion = 3

f, err := NewSaramaFactory(o, model.DefaultChangeFeedID("sarama-test"))
require.NoError(t, err)
Expand Down

0 comments on commit ab6a9f9

Please sign in to comment.