From cc178e8654cfbfc52f02ff1a312e1fd507260a90 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Fri, 6 Sep 2024 10:20:28 +0200 Subject: [PATCH] refactor: kafka client configuration --- config.example.yaml | 45 +++++ config/config.go | 6 + config/config_test.go | 29 +++ config/kafka.go | 50 +++++ config/testdata/complete.yaml | 23 +++ pkg/kafka/config.go | 339 ++++++++++++++++++++++++++++++++++ 6 files changed, 492 insertions(+) create mode 100644 config/kafka.go diff --git a/config.example.yaml b/config.example.yaml index 1fe95dca7..15d5661d4 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -96,3 +96,48 @@ notification: svix: apiKey: secret serverURL: http://localhost:8071 + +kafka: + brokers: 127.0.0.1:9092,127.0.0.2:9092 + securityProtocol: SASL_SSL + saslMechanisms: PLAIN + saslUsername: user + saslPassword: pass + # To enable stats reporting set this value to >=5s. + # Setting this value to 0 makes reporting explicitly disabled. + statsInterval: 5s + # Set IP address family used for communicating with Kafka cluster + brokerAddressFamily: v4 + # Use this configuration parameter to define how frequently the local metadata cache needs to be updated. + # It cannot be lower than 10 seconds. + topicMetadataRefreshInterval: 1m + # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection. + socketKeepAliveEnabled: true + # Set list of debug contexts to enable for librdkafka + # See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts + debugContexts: + - broker + - topic + # Consumer/Producer identifier + clientID: kafka-client-1 + # Consumer group identifier + consumerGroupID: consumer-group + # Static membership identifier in consumer group + consumerGroupInstanceID: consumer-group-1 + # Consumer group session and failure detection timeout. + # The consumer sends periodic heartbeats (heartbeatInterval) to indicate its liveness to the broker. + # If no hearts are received by the broker for a group member within the session timeout, + # the broker will remove the consumer from the group and trigger a rebalance. + sessionTimeout: 5m + # Consumer group session keepalive heartbeat interval + heartbeatInterval: 3s + # Automatically and periodically commit offsets in the background + enableAutoCommit: true + # Automatically store offset of last message provided to application. + # The offset store is an in-memory store of the next offset to (auto-)commit for each partition. + enableAutoOffsetStore: false + # AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range: + # * "smallest","earliest","beginning": automatically reset the offset to the smallest offset + # * "largest","latest","end": automatically reset the offset to the largest offset + # * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'. + autoOffsetReset: "error" diff --git a/config/config.go b/config/config.go index ac321bf9f..abf7569d5 100644 --- a/config/config.go +++ b/config/config.go @@ -33,6 +33,7 @@ type Configuration struct { BalanceWorker BalanceWorkerConfiguration Notification NotificationConfiguration Svix SvixConfig + Kafka KafkaConfig } // Validate validates the configuration. @@ -105,6 +106,10 @@ func (c Configuration) Validate() error { } } + if err := c.Kafka.Validate(); err != nil { + return fmt.Errorf("kafka: %w", err) + } + return nil } @@ -140,4 +145,5 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) { ConfigureEvents(v) ConfigureBalanceWorker(v) ConfigureNotification(v) + ConfigureKafkaConfiguration(v) } diff --git a/config/config_test.go b/config/config_test.go index 41af984b6..669edb864 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -250,6 +250,35 @@ func TestComplete(t *testing.T) { ServerURL: "http://127.0.0.1:8071", Debug: true, }, + Kafka: KafkaConfig{ + CommonConfigParams: pkgkafka.CommonConfigParams{ + Brokers: "127.0.0.1:9092", + SecurityProtocol: "SASL_SSL", + SaslMechanisms: "PLAIN", + SaslUsername: "user", + SaslPassword: "pass", + ClientID: "kafka-client-1", + StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second), + BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny, + TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute), + SocketKeepAliveEnabled: true, + DebugContexts: pkgkafka.DebugContexts{ + "broker", + "topic", + "consumer", + }, + }, + ConsumerConfigParams: pkgkafka.ConsumerConfigParams{ + ConsumerGroupID: "consumer-group", + ConsumerGroupInstanceID: "consumer-group-1", + SessionTimeout: pkgkafka.TimeDurationMilliSeconds(5 * time.Minute), + HeartbeatInterval: pkgkafka.TimeDurationMilliSeconds(3 * time.Second), + EnableAutoCommit: true, + EnableAutoOffsetStore: false, + AutoOffsetReset: "error", + }, + ProducerConfigParams: pkgkafka.ProducerConfigParams{}, + }, } assert.Equal(t, expected, actual) diff --git a/config/kafka.go b/config/kafka.go new file mode 100644 index 000000000..b73ad6f4d --- /dev/null +++ b/config/kafka.go @@ -0,0 +1,50 @@ +package config + +import ( + "github.com/spf13/viper" + + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +var _ pkgkafka.ConfigValidator = (*KafkaConfig)(nil) + +type KafkaConfig struct { + pkgkafka.CommonConfigParams `mapstructure:",squash"` + pkgkafka.ConsumerConfigParams `mapstructure:",squash"` + pkgkafka.ProducerConfigParams `mapstructure:",squash"` +} + +func (c KafkaConfig) AsProducerConfig() pkgkafka.ProducerConfig { + return pkgkafka.ProducerConfig{ + CommonConfigParams: c.CommonConfigParams, + ProducerConfigParams: c.ProducerConfigParams, + } +} + +func (c KafkaConfig) AsConsumerConfig() pkgkafka.ConsumerConfig { + return pkgkafka.ConsumerConfig{ + CommonConfigParams: c.CommonConfigParams, + ConsumerConfigParams: c.ConsumerConfigParams, + } +} + +func (c KafkaConfig) Validate() error { + validators := []pkgkafka.ConfigValidator{ + c.CommonConfigParams, + c.ConsumerConfigParams, + c.ProducerConfigParams, + } + + for _, validator := range validators { + if err := validator.Validate(); err != nil { + return err + } + } + + return nil +} + +// ConfigureKafkaConfiguration sets defaults in the Viper instance. +func ConfigureKafkaConfiguration(v *viper.Viper) { + v.SetDefault("kafka.brokers", "127.0.0.1:29092") +} diff --git a/config/testdata/complete.yaml b/config/testdata/complete.yaml index f29ddd81b..962529d8d 100644 --- a/config/testdata/complete.yaml +++ b/config/testdata/complete.yaml @@ -97,3 +97,26 @@ svix: apiKey: test-svix-token serverURL: http://127.0.0.1:8071 debug: true + +kafka: + brokers: 127.0.0.1:9092 + securityProtocol: SASL_SSL + saslMechanisms: PLAIN + saslUsername: user + saslPassword: pass + statsInterval: 5s + brokerAddressFamily: any + socketKeepAliveEnabled: true + topicMetadataRefreshInterval: 1m + debugContexts: + - broker + - topic + - consumer + clientID: kafka-client-1 + consumerGroupID: consumer-group + consumerGroupInstanceID: consumer-group-1 + sessionTimeout: 5m + heartbeatInterval: 3s + enableAutoCommit: true + enableAutoOffsetStore: false + autoOffsetReset: "error" diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 7f5036b2b..211ad6bdb 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -3,12 +3,351 @@ package kafka import ( "encoding" "encoding/json" + "errors" "fmt" + "slices" "strconv" "strings" "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +type ConfigValidator interface { + Validate() error +} + +type ConfigMapper interface { + AsConfigMap() (kafka.ConfigMap, error) +} + +var ( + _ ConfigMapper = (*CommonConfigParams)(nil) + _ ConfigValidator = (*CommonConfigParams)(nil) +) + +type CommonConfigParams struct { + Brokers string + SecurityProtocol string + SaslMechanisms string + SaslUsername string + SaslPassword string + + StatsInterval TimeDurationMilliSeconds + + // BrokerAddressFamily defines the IP address family to be used for network communication with Kafka cluster + BrokerAddressFamily BrokerAddressFamily + // SocketKeepAliveEnable defines if TCP socket keep-alive is enabled to prevent closing idle connections + // by Kafka brokers. + SocketKeepAliveEnabled bool + // TopicMetadataRefreshInterval defines how frequently the Kafka client needs to fetch metadata information + // (brokers, topic, partitions, etc) from the Kafka cluster. + // The 5 minutes default value is appropriate for mostly static Kafka clusters, but needs to be lowered + // in case of large clusters where changes are more frequent. + // This value must not be set to value lower than 10s. + TopicMetadataRefreshInterval TimeDurationMilliSeconds + + // Enable contexts for extensive debugging of librdkafka. + // See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts + DebugContexts DebugContexts + + // ClientID sets the Consumer/Producer identifier + ClientID string +} + +func (c CommonConfigParams) AsConfigMap() (kafka.ConfigMap, error) { + m := kafka.ConfigMap{ + // Required for logging + "go.logs.channel.enable": true, + } + + if err := m.SetKey("bootstrap.servers", c.Brokers); err != nil { + return nil, err + } + + // This is needed when using localhost brokers on OSX, + // since the OSX resolver will return the IPv6 addresses first. + // See: https://github.com/openmeterio/openmeter/issues/321 + if c.BrokerAddressFamily != "" { + if err := m.SetKey("broker.address.family", c.BrokerAddressFamily); err != nil { + return nil, err + } + } else if strings.Contains(c.Brokers, "localhost") || strings.Contains(c.Brokers, "127.0.0.1") { + if err := m.SetKey("broker.address.family", BrokerAddressFamilyIPv4); err != nil { + return nil, err + } + } + + if c.SecurityProtocol != "" { + if err := m.SetKey("security.protocol", c.SecurityProtocol); err != nil { + return nil, err + } + } + + if c.SaslMechanisms != "" { + if err := m.SetKey("sasl.mechanism", c.SaslMechanisms); err != nil { + return nil, err + } + } + + if c.SaslUsername != "" { + if err := m.SetKey("sasl.username", c.SaslUsername); err != nil { + return nil, err + } + } + + if c.SaslPassword != "" { + if err := m.SetKey("sasl.password", c.SaslPassword); err != nil { + return nil, err + } + } + + if c.StatsInterval > 0 { + if err := m.SetKey("statistics.interval.ms", c.StatsInterval); err != nil { + return nil, err + } + } + + if c.SocketKeepAliveEnabled { + if err := m.SetKey("socket.keepalive.enable", c.SocketKeepAliveEnabled); err != nil { + return nil, err + } + } + + // The `topic.metadata.refresh.interval.ms` defines the frequency the Kafka client needs to retrieve metadata + // from Kafka cluster. While `metadata.max.age.ms` defines the interval after the metadata cache maintained + // on client side becomes invalid. Setting the former will automatically adjust the value of the latter to avoid + // misconfiguration where the entries in metadata cache are evicted prior metadata refresh. + if c.TopicMetadataRefreshInterval > 0 { + if err := m.SetKey("topic.metadata.refresh.interval.ms", c.TopicMetadataRefreshInterval); err != nil { + return nil, err + } + + if err := m.SetKey("metadata.max.age.ms", 3*c.TopicMetadataRefreshInterval); err != nil { + return nil, err + } + } + + if len(c.DebugContexts) > 0 { + if err := m.SetKey("debug", c.DebugContexts.String()); err != nil { + return nil, err + } + } + + if c.ClientID != "" { + if err := m.SetKey("client.id", c.ClientID); err != nil { + return nil, err + } + } + + return m, nil +} + +func (c CommonConfigParams) Validate() error { + if c.Brokers == "" { + return errors.New("broker is required") + } + + if c.StatsInterval > 0 && c.StatsInterval.Duration() < 5*time.Second { + return errors.New("StatsInterval must be >=5s") + } + + if c.TopicMetadataRefreshInterval > 0 && c.TopicMetadataRefreshInterval.Duration() < 10*time.Second { + return errors.New("topic metadata refresh interval must be >=10s") + } + + return nil +} + +var ( + _ ConfigMapper = (*ConsumerConfigParams)(nil) + _ ConfigValidator = (*ConsumerConfigParams)(nil) ) +type ConsumerConfigParams struct { + // ConsumerGroupID defines the group id. All clients sharing the same ConsumerGroupID belong to the same group. + ConsumerGroupID string + // ConsumerGroupInstanceID defines the instance id in consumer group. Setting this parameter enables static group membership. + // Static group members are able to leave and rejoin a group within the configured SessionTimeout without prompting a group rebalance. + // This should be used in combination with a larger session.timeout.ms to avoid group rebalances caused by transient unavailability (e.g. process restarts). + ConsumerGroupInstanceID string + + // SessionTimeout defines the consumer group session and failure detection timeout. + // The consumer sends periodic heartbeats (HeartbeatInterval) to indicate its liveness to the broker. + // If no hearts are received by the broker for a group member within the session timeout, + // the broker will remove the consumer from the group and trigger a rebalance. + SessionTimeout TimeDurationMilliSeconds + // Defines the consumer group session keepalive heartbeat interval. + HeartbeatInterval TimeDurationMilliSeconds + + // EnableAutoCommit enables automatically and periodically commit offsets in the background. + EnableAutoCommit bool + // EnableAutoOffsetStore enables automatically store offset of last message provided to application. + // The offset store is an in-memory store of the next offset to (auto-)commit for each partition. + EnableAutoOffsetStore bool + // AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range: + // * "smallest","earliest","beginning": automatically reset the offset to the smallest offset + // * "largest","latest","end": automatically reset the offset to the largest offset + // * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'. + AutoOffsetReset string +} + +func (c ConsumerConfigParams) Validate() error { + if c.ConsumerGroupInstanceID != "" && c.ConsumerGroupID == "" { + return errors.New("consumer group instance id is required") + } + + if c.AutoOffsetReset != "" && !slices.Contains([]string{ + "smallest", "earliest", "beginning", + "largest", "latest", "end", + "error", + }, c.AutoOffsetReset) { + return errors.New("invalid auto offset reset") + } + + return nil +} + +func (c ConsumerConfigParams) AsConfigMap() (kafka.ConfigMap, error) { + m := kafka.ConfigMap{ + "go.application.rebalance.enable": true, + } + + if c.ConsumerGroupID != "" { + if err := m.SetKey("group.id", c.ConsumerGroupID); err != nil { + return nil, err + } + } + + if c.ConsumerGroupInstanceID != "" { + if err := m.SetKey("group.instance.id", c.ConsumerGroupID); err != nil { + return nil, err + } + } + + if c.SessionTimeout > 0 { + if err := m.SetKey("session.timeout.ms", c.SessionTimeout); err != nil { + return nil, err + } + } + + if c.HeartbeatInterval > 0 { + if err := m.SetKey("heartbeat.interval.ms", c.HeartbeatInterval); err != nil { + return nil, err + } + } + + if err := m.SetKey("enable.auto.commit", c.EnableAutoCommit); err != nil { + return nil, err + } + + if err := m.SetKey("enable.auto.offset.store", c.EnableAutoOffsetStore); err != nil { + return nil, err + } + + if c.AutoOffsetReset != "" { + if err := m.SetKey("auto.offset.reset", c.AutoOffsetReset); err != nil { + return nil, err + } + } + + return m, nil +} + +var ( + _ ConfigMapper = (*ProducerConfigParams)(nil) + _ ConfigValidator = (*ProducerConfigParams)(nil) +) + +type ProducerConfigParams struct{} + +func (p ProducerConfigParams) Validate() error { + return nil +} + +func (p ProducerConfigParams) AsConfigMap() (kafka.ConfigMap, error) { + return nil, nil +} + +var ( + _ ConfigMapper = (*ConsumerConfig)(nil) + _ ConfigValidator = (*ConsumerConfig)(nil) +) + +type ConsumerConfig struct { + CommonConfigParams + ConsumerConfigParams +} + +func (c ConsumerConfig) AsConfigMap() (kafka.ConfigMap, error) { + return mergeConfigsToMap(c.CommonConfigParams, c.ConsumerConfigParams) +} + +func (c ConsumerConfig) Validate() error { + validators := []ConfigValidator{ + c.CommonConfigParams, + c.ConsumerConfigParams, + } + + for _, validator := range validators { + if err := validator.Validate(); err != nil { + return err + } + } + + return nil +} + +var ( + _ ConfigMapper = (*ProducerConfig)(nil) + _ ConfigValidator = (*ProducerConfig)(nil) +) + +type ProducerConfig struct { + CommonConfigParams + ProducerConfigParams +} + +func (c ProducerConfig) AsConfigMap() (kafka.ConfigMap, error) { + return mergeConfigsToMap(c.CommonConfigParams, c.ProducerConfigParams) +} + +func (c ProducerConfig) Validate() error { + validators := []ConfigValidator{ + c.CommonConfigParams, + c.ProducerConfigParams, + } + + for _, validator := range validators { + if err := validator.Validate(); err != nil { + return err + } + } + + return nil +} + +func mergeConfigsToMap(mappers ...ConfigMapper) (kafka.ConfigMap, error) { + if len(mappers) == 0 { + return nil, nil + } + + configMap := kafka.ConfigMap{} + + for _, mapper := range mappers { + m, err := mapper.AsConfigMap() + if err != nil { + return nil, err + } + + for k, v := range m { + configMap[k] = v + } + } + + return configMap, nil +} + type configValue interface { fmt.Stringer encoding.TextUnmarshaler