From ab6a9f91bfd8e23bbdd032af20bddd376a4ab47a Mon Sep 17 00:00:00 2001 From: wk989898 Date: Fri, 17 May 2024 16:14:44 +0800 Subject: [PATCH] ticdc: fix detecting kafka version (#11048) close pingcap/tiflow#11047 --- pkg/sink/kafka/options.go | 1 + pkg/sink/kafka/sarama.go | 32 +++++++++++++-------------- pkg/sink/kafka/sarama_factory_test.go | 4 ++++ 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/pkg/sink/kafka/options.go b/pkg/sink/kafka/options.go index 19ee3786a56..31f20d0234d 100644 --- a/pkg/sink/kafka/options.go +++ b/pkg/sink/kafka/options.go @@ -151,6 +151,7 @@ type Options struct { ReplicationFactor int16 Version string IsAssignedVersion bool + RequestVersion int16 MaxMessageBytes int Compression string ClientID string diff --git a/pkg/sink/kafka/sarama.go b/pkg/sink/kafka/sarama.go index 568270203f4..df8877412b3 100644 --- a/pkg/sink/kafka/sarama.go +++ b/pkg/sink/kafka/sarama.go @@ -142,7 +142,6 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) { zap.String("desiredVersion", kafkaVersion.String())) } } - return config, nil } @@ -202,7 +201,7 @@ func getKafkaVersion(config *sarama.Config, o *Options) (sarama.KafkaVersion, er }) } for i := range addrs { - version, err := getKafkaVersionFromBroker(config, addrs[i]) + version, err := getKafkaVersionFromBroker(config, o.RequestVersion, addrs[i]) if err == nil { return version, err } @@ -210,7 +209,7 @@ func getKafkaVersion(config *sarama.Config, o *Options) (sarama.KafkaVersion, er return version, err } -func getKafkaVersionFromBroker(config *sarama.Config, addr string) (sarama.KafkaVersion, error) { +func getKafkaVersionFromBroker(config *sarama.Config, requestVersion int16, addr string) (sarama.KafkaVersion, error) { KafkaVersion := defaultKafkaVersion broker := sarama.NewBroker(addr) err := broker.Open(config) @@ -218,38 +217,37 @@ func getKafkaVersionFromBroker(config *sarama.Config, addr string) (sarama.Kafka broker.Close() }() if err != nil { - log.Warn("Kafka fail to open broker", zap.String("addr", addr)) + log.Warn("Kafka fail to open broker", zap.String("addr", addr), zap.Error(err)) return KafkaVersion, err } - apiResponse, err := broker.ApiVersions(&sarama.ApiVersionsRequest{Version: 3}) + apiResponse, err := broker.ApiVersions(&sarama.ApiVersionsRequest{Version: requestVersion}) if err != nil { - log.Warn("Kafka fail to get ApiVersions", zap.String("addr", addr)) + log.Warn("Kafka fail to get ApiVersions", zap.String("addr", addr), zap.Error(err)) return KafkaVersion, err } // ApiKey method // 0 Produce // 3 Metadata (default) version := apiResponse.ApiKeys[3].MaxVersion - switch version { - case 10: + if version >= 10 { KafkaVersion = sarama.V2_8_0_0 - case 9: + } else if version >= 9 { KafkaVersion = sarama.V2_4_0_0 - case 8: + } else if version >= 8 { KafkaVersion = sarama.V2_3_0_0 - case 7: + } else if version >= 7 { KafkaVersion = sarama.V2_1_0_0 - case 6: + } else if version >= 6 { KafkaVersion = sarama.V2_0_0_0 - case 5: + } else if version >= 5 { KafkaVersion = sarama.V1_0_0_0 - case 3, 4: + } else if version >= 3 { KafkaVersion = sarama.V0_11_0_0 - case 2: + } else if version >= 2 { KafkaVersion = sarama.V0_10_1_0 - case 1: + } else if version >= 1 { KafkaVersion = sarama.V0_10_0_0 - case 0: + } else if version >= 0 { KafkaVersion = sarama.V0_8_2_0 } return KafkaVersion, nil diff --git a/pkg/sink/kafka/sarama_factory_test.go b/pkg/sink/kafka/sarama_factory_test.go index d0c7201eed5..28d6b2e1aa6 100644 --- a/pkg/sink/kafka/sarama_factory_test.go +++ b/pkg/sink/kafka/sarama_factory_test.go @@ -57,6 +57,8 @@ func TestSyncProducer(t *testing.T) { o.IsAssignedVersion = true o.BrokerEndpoints = []string{leader.Addr()} o.ClientID = "sarama-test" + // specify request version for mock broker + o.RequestVersion = 3 f, err := NewSaramaFactory(o, model.DefaultChangeFeedID("sarama-test")) require.NoError(t, err) @@ -87,6 +89,8 @@ func TestAsyncProducer(t *testing.T) { o.IsAssignedVersion = true o.BrokerEndpoints = []string{leader.Addr()} o.ClientID = "sarama-test" + // specify request version for mock broker + o.RequestVersion = 3 f, err := NewSaramaFactory(o, model.DefaultChangeFeedID("sarama-test")) require.NoError(t, err)