Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memory leaked and error silenced with go.batch.producer set to true and big messages sent to produceChannel #1332

Open
4 of 7 tasks
jizhilong opened this issue Nov 7, 2024 · 9 comments
Assignees

Comments

@jizhilong
Copy link

Description

We have a kafka producer built on confluent-kafka-go, to improve the throughput, we set go.batch.producer to true, and send messages to producer.ProduceChannel.

Everything works find, except the process memory(RSS) keeps growing slowly, and leads to a OOM kill in the end.

image

After some digging, we suspect and confirmed this issue being related to the batch mode.

How to reproduce

Start a producer process with the following snippet:

package main

import (
	"fmt"
        "github.com/confluentinc/confluent-kafka-go/v2/kafka"
	"strings"
	"time"
)

func main() {
	conf := &kafka.ConfigMap{"bootstrap.servers": "192.168.1.99:9092"}
        conf.SetKey("go.batch.producer", true)
	p, err := kafka.NewProducer(conf)
	if err != nil {
		panic(err)
	}
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				m := ev
				if m.TopicPartition.Error != nil {
					fmt.Printf("ERROR Delivery failed: %v\n", m.TopicPartition.Error)
				} else {
				}
			case kafka.Error:
				fmt.Printf("Error: %v\n", ev)
			default:
				fmt.Printf("WARN Ignored event: %s\n", ev)
			}
		}
	}()
	topic := "test"
	for {
	        p.ProduceChannel() <- &kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                        Value:          []byte(strings.Repeat("H", 1024*1024)),
		}
		time.Sleep(2 * time.Millisecond)
	}
}

go run this snnipet, watch its output, monitor the process's cpu/memory usage with top. Two abnormal scenes were observed:

  • memory usage spikes to several giga bytes quickly in tens of seconds.
  • no error log found in the process's stdout/stderr

image

If we comment out the line conf.SetKey("go.batch.producer", true), things go back to normal:

  • memory usage keeps steady
  • tons of error message reported

image

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): v2.6.0
  • Apache Kafka broker version: 2.12-2.1.1
  • Client configuration: {"bootstrap.servers": "192.168.1.99:9092", "go.batch.producer": true}
  • Operating system: linux amd64
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@jizhilong
Copy link
Author

this issue maybe related to #1326 #1060

@ucanme
Copy link

ucanme commented Nov 8, 2024

this problem trobles me a lot and i'm so sad there are some bugs with this repo but nobody come out to help us.

@ucanme
Copy link

ucanme commented Nov 9, 2024

after struggling a lot, i choose to set go.batch.producer to false and set batch_size to 16384, go.events.quene.size to 1000,go.produce.queue.size to 1000, memory turns normal for one day running.
memory usage
image

kafka write qps monitor
image

@ucanme
Copy link

ucanme commented Nov 12, 2024

about two days later, oom again

@ucanme
Copy link

ucanme commented Nov 25, 2024

nobody comes out to help us. so sad.

@jizhilong
Copy link
Author

Dear maintainer,
I believe the root cause lies in the memory management of the C function rd_kafka_produce_batch, while this function calls rd_kafka_msg_new0 to create mesages, it fails to properly handle the RD_KAFKA_MSG_F_FREE flag when rd_kafka_msg_new0 returns NULL due to errors.

                /* Create message */
                rkm = rd_kafka_msg_new0(
                    rkt,
                    (msgflags & RD_KAFKA_MSG_F_PARTITION)
                        ? rkmessages[i].partition
                        : partition,
                    msgflags, rkmessages[i].payload, rkmessages[i].len,
                    rkmessages[i].key, rkmessages[i].key_len,
                    rkmessages[i]._private, &rkmessages[i].err, NULL, NULL,
                    utc_now, now);
                if (unlikely(!rkm)) {
                        if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
                                all_err = rkmessages[i].err;
                        continue;
                }

When a message payload exceeds the configured maximum size (in my setup, it was 1MB), rd_kafka_msg_new0 returns NULL with RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE

       if (unlikely(len > INT32_MAX || keylen > INT32_MAX ||
                     rd_kafka_msg_max_wire_size(keylen, len, hdrs_size) >
                         (size_t)rkt->rkt_rk->rk_conf.max_msg_size)) {
                *errp = RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
                if (errnop)
                        *errnop = EMSGSIZE;
                return NULL;
        }

Even though the golang wrapper produceBatch correctly sets the flag RD_KAFKA_MSG_F_FREE when calling C.rd_kafka_produce_batch, the C functions fails to honor this flag in its error path, resulting in memory leaks when messages are rejected due to size limitations:

unc (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) error {
	crkt := p.handle.getRkt(topic)

	cmsgs := make([]C.rd_kafka_message_t, len(msgs))
	for i, m := range msgs {
		p.handle.messageToC(m, &cmsgs[i])
	}
	r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE,
		(*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs)))
	if r == -1 {
		return newError(C.rd_kafka_last_error())
	}

	return nil
}

The following patch to to rd_kafka_produce_batch addresses this issue by properly freeing the payload when message creation fails and the RD_KAFKA_MSG_F_FREE flag is set:

diff --git a/src/rdkafka_msg.c b/src/rdkafka_msg.c
index 3fc3967c..00f47879 100644
--- a/src/rdkafka_msg.c
+++ b/src/rdkafka_msg.c
@@ -757,6 +757,9 @@ int rd_kafka_produce_batch(rd_kafka_topic_t *app_rkt,
                 if (unlikely(!rkm)) {
                         if (rkmessages[i].err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
                                 all_err = rkmessages[i].err;
+                        if (msgflags & RD_KAFKA_MSG_F_FREE && rkmessages[i].payload) {
+                                rd_free(rkmessages[i].payload);
+                        }
                         continue;
                 }

@jizhilong
Copy link
Author

@ucanme I hope this issue provides some hints for the problem you encountered. However, it's important to note that this issue only reports one possible path of memory leakage, and not all memory-related problems are connected to what's reported here.

@ucanme
Copy link

ucanme commented Nov 29, 2024

@jizhilong i guess i met the same problem as you, sometimes big messages more 1M may be sent.

@ucanme
Copy link

ucanme commented Dec 5, 2024

@ucanme I hope this issue provides some hints for the problem you encountered. However, it's important to note that this issue only reports one possible path of memory leakage, and not all memory-related problems are connected to what's reported here.

@jizhilong
i droped more than 1m msg , the memory leak didn't disappear,so the main cause may not be msg too large's bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants