Skip to content

Commit

Permalink
init consumer and producer on 1 client
Browse files Browse the repository at this point in the history
Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa committed Feb 26, 2024
1 parent fb99907 commit f2a38c7
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions protocol/kafka_confluent/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,22 @@ func New(opts ...Option) (*Protocol, error) {
return nil, err
}

if p.consumerTopics != nil && p.consumer == nil && p.kafkaConfigMap != nil {
consumer, err := kafka.NewConsumer(p.kafkaConfigMap)
if err != nil {
return nil, err
if p.kafkaConfigMap != nil {
if p.consumerTopics != nil && p.consumer == nil {
consumer, err := kafka.NewConsumer(p.kafkaConfigMap)
if err != nil {
return nil, err
}
p.consumer = consumer
}
p.consumer = consumer
} else if p.producer == nil && p.kafkaConfigMap != nil {
producer, err := kafka.NewProducer(p.kafkaConfigMap)
if err != nil {
return nil, err
if p.producerDefaultTopic != "" && p.producer == nil {
producer, err := kafka.NewProducer(p.kafkaConfigMap)
if err != nil {
return nil, err
}
p.producer = producer
p.producerDeliveryChan = make(chan kafka.Event)
}
p.producer = producer
p.producerDeliveryChan = make(chan kafka.Event)
}

if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil {
Expand Down

0 comments on commit f2a38c7

Please sign in to comment.