Skip to content

Commit

Permalink
feat: add kafka authentication configs
Browse files Browse the repository at this point in the history
  • Loading branch information
lehainam-dev committed Jan 25, 2024
1 parent e0665eb commit 53b5b65
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 3 deletions.
9 changes: 7 additions & 2 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@ func getPublisher(c *cli.Context, redisClient *redis.Client, topic string) (publ
publisherType := c.String(publisherTypeFlag.Name)
switch publisherType {
case publisherpkg.PublisherTypeKafka:
addresses := c.StringSlice(kafkaAddrsFlag.Name)
publisher, err = kafka.NewPublisher(&kafka.Config{Addresses: addresses})
config := &kafka.Config{
Addresses: c.StringSlice(kafkaAddrsFlag.Name),
UseAuthentication: c.Bool(kafkaUseAuthenticationFlag.Name),
Username: c.String(kafkaUsernameFlag.Name),
Password: c.String(kafkaPasswordFlag.Name),
}
publisher, err = kafka.NewPublisher(config)
if err != nil {
return nil, err
}
Expand Down
25 changes: 24 additions & 1 deletion internal/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ var (
Value: cli.NewStringSlice("localhost:9092"),
Usage: "A list of address for connecting to kafka. Default: localhost:9092",
}
kafkaUseAuthenticationFlag = &cli.BoolFlag{
Name: "kafka-use-authentication",
EnvVars: []string{"KAFKA_USE_AUTHENTICATION"},
Value: false,
Usage: "Whether or not to use authentication when connecting to the broker",
}
kafkaUsernameFlag = &cli.StringFlag{
Name: "kafka-username",
EnvVars: []string{"KAFKA_USERNAME"},
Value: "",
Usage: "Username for authenticating with kafka brokers",
}
kafkaPasswordFlag = &cli.StringFlag{
Name: "kafka-password",
EnvVars: []string{"KAFKA_PASSWORD"},
Value: "",
Usage: "Password for authenticating with kafka brokers",
}

encoderTypeFlag = &cli.StringFlag{
Name: "encoder-type",
Expand Down Expand Up @@ -165,7 +183,12 @@ func NewRedisFlags() []cli.Flag {

// NewKafkaFlags returns flags for kafka.
func NewKafkaFlags() []cli.Flag {
return []cli.Flag{kafkaAddrsFlag}
return []cli.Flag{
kafkaAddrsFlag,
kafkaUseAuthenticationFlag,
kafkaUsernameFlag,
kafkaPasswordFlag,
}
}

// NewEncoderFlags returns flags for encoder.
Expand Down
4 changes: 4 additions & 0 deletions pkg/publisher/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ package kafka

type Config struct {
Addresses []string

UseAuthentication bool
Username string
Password string
}
4 changes: 4 additions & 0 deletions pkg/publisher/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func NewPublisher(config *Config) (*Publisher, error) {
c.Producer.Return.Successes = true
c.Producer.Return.Errors = true

c.Net.SASL.Enable = config.UseAuthentication
c.Net.SASL.User = config.Username
c.Net.SASL.Password = config.Password

// Use SyncProducer since we want to ensure the message is published.
producer, err := sarama.NewSyncProducer(config.Addresses, c)
if err != nil {
Expand Down

0 comments on commit 53b5b65

Please sign in to comment.