diff --git a/internal/app/app.go b/internal/app/app.go index c583762..0ba9643 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -130,8 +130,13 @@ func getPublisher(c *cli.Context, redisClient *redis.Client, topic string) (publ publisherType := c.String(publisherTypeFlag.Name) switch publisherType { case publisherpkg.PublisherTypeKafka: - addresses := c.StringSlice(kafkaAddrsFlag.Name) - publisher, err = kafka.NewPublisher(&kafka.Config{Addresses: addresses}) + config := &kafka.Config{ + Addresses: c.StringSlice(kafkaAddrsFlag.Name), + UseAuthentication: c.Bool(kafkaUseAuthenticationFlag.Name), + Username: c.String(kafkaUsernameFlag.Name), + Password: c.String(kafkaPasswordFlag.Name), + } + publisher, err = kafka.NewPublisher(config) if err != nil { return nil, err } diff --git a/internal/app/flags.go b/internal/app/flags.go index cdb76d1..4e3763e 100644 --- a/internal/app/flags.go +++ b/internal/app/flags.go @@ -105,6 +105,24 @@ var ( Value: cli.NewStringSlice("localhost:9092"), Usage: "A list of address for connecting to kafka. Default: localhost:9092", } + kafkaUseAuthenticationFlag = &cli.BoolFlag{ + Name: "kafka-use-authentication", + EnvVars: []string{"KAFKA_USE_AUTHENTICATION"}, + Value: false, + Usage: "Whether or not to use authentication when connecting to the broker", + } + kafkaUsernameFlag = &cli.StringFlag{ + Name: "kafka-username", + EnvVars: []string{"KAFKA_USERNAME"}, + Value: "", + Usage: "Username for authenticating with kafka brokers", + } + kafkaPasswordFlag = &cli.StringFlag{ + Name: "kafka-password", + EnvVars: []string{"KAFKA_PASSWORD"}, + Value: "", + Usage: "Password for authenticating with kafka brokers", + } encoderTypeFlag = &cli.StringFlag{ Name: "encoder-type", @@ -165,7 +183,12 @@ func NewRedisFlags() []cli.Flag { // NewKafkaFlags returns flags for kafka. func NewKafkaFlags() []cli.Flag { - return []cli.Flag{kafkaAddrsFlag} + return []cli.Flag{ + kafkaAddrsFlag, + kafkaUseAuthenticationFlag, + kafkaUsernameFlag, + kafkaPasswordFlag, + } } // NewEncoderFlags returns flags for encoder. diff --git a/pkg/publisher/kafka/config.go b/pkg/publisher/kafka/config.go index 00de566..7c2a8b0 100644 --- a/pkg/publisher/kafka/config.go +++ b/pkg/publisher/kafka/config.go @@ -2,4 +2,8 @@ package kafka type Config struct { Addresses []string + + UseAuthentication bool + Username string + Password string } diff --git a/pkg/publisher/kafka/kafka.go b/pkg/publisher/kafka/kafka.go index bcb6938..2045c12 100644 --- a/pkg/publisher/kafka/kafka.go +++ b/pkg/publisher/kafka/kafka.go @@ -30,6 +30,10 @@ func NewPublisher(config *Config) (*Publisher, error) { c.Producer.Return.Successes = true c.Producer.Return.Errors = true + c.Net.SASL.Enable = config.UseAuthentication + c.Net.SASL.User = config.Username + c.Net.SASL.Password = config.Password + // Use SyncProducer since we want to ensure the message is published. producer, err := sarama.NewSyncProducer(config.Addresses, c) if err != nil {