diff --git a/protocol/kafka_confluent/v2/protocol.go b/protocol/kafka_confluent/v2/protocol.go index a092ff286..1753d851e 100644 --- a/protocol/kafka_confluent/v2/protocol.go +++ b/protocol/kafka_confluent/v2/protocol.go @@ -53,19 +53,22 @@ func New(opts ...Option) (*Protocol, error) { return nil, err } - if p.consumerTopics != nil && p.consumer == nil && p.kafkaConfigMap != nil { - consumer, err := kafka.NewConsumer(p.kafkaConfigMap) - if err != nil { - return nil, err + if p.kafkaConfigMap != nil { + if p.consumerTopics != nil && p.consumer == nil { + consumer, err := kafka.NewConsumer(p.kafkaConfigMap) + if err != nil { + return nil, err + } + p.consumer = consumer } - p.consumer = consumer - } else if p.producer == nil && p.kafkaConfigMap != nil { - producer, err := kafka.NewProducer(p.kafkaConfigMap) - if err != nil { - return nil, err + if p.producerDefaultTopic != "" && p.producer == nil { + producer, err := kafka.NewProducer(p.kafkaConfigMap) + if err != nil { + return nil, err + } + p.producer = producer + p.producerDeliveryChan = make(chan kafka.Event) } - p.producer = producer - p.producerDeliveryChan = make(chan kafka.Event) } if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil {