Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Goroutines left behind after writing message even after closing writer #1358

Open
fjagugar opened this issue Dec 18, 2024 · 1 comment
Open
Labels

Comments

@fjagugar
Copy link

Describe the bug

While debugging a high CPU consumption in my project, I noticed that each time an event is sent to Kafka, the number of goroutines running in my project was increasing.

The code sending the event is:

func (k *Client) Push(ctx context.Context, topicName string, message *kafka.Message) error {
	writer := &kafka.Writer{
		Addr:  kafka.TCP(k.brokers...),
		Topic: topicName,
		Balancer: &kafka.Murmur2Balancer{},
		Transport: &kafka.Transport{
			TLS: k.dialer.TLS,
		},
	}

	defer func() {
		err := writer.Close()
		if err != nil {
			logger.Warnf("Error closing writer for topic %s: %v", topicName, err)
		}
	}()
	return writer.WriteMessages(ctx, *message)
}

After running that, I see in Intellij debug console that the number of goroutines increases:

image

The more events are sent, the more goroutines keep running.

Checking the code in the transport file I see that a pool of connections is initialized with a refc: 2 here. When calling to the ref() function, it is increasing that value here. When calling the unref() function, it decreases the value and if it's 0 it properly closes the context of that goroutine. The unref() function is properly called when closing the transport connections.

Adding breakpoints I see that, when writing an event, a pool is created, the code calls to unref(), ref(), unref(), ref(), unref(), leaving the value of refc as 1 but never properly canceling the context and leaving the discover goroutine running forever. On each write operation, there's a new pool created and a new discover goroutine left behind running.

I suspect that refc attribute is being used for tracking if the pool is being used or not. I don't fully understand why the code is using a uintptr for this instead of a int or any other type nor why it is being initialized with 2.

Kafka Version

@fjagugar fjagugar added the bug label Dec 18, 2024
@petedannemann
Copy link
Contributor

While there may be a goroutine leak I suspect the reason why this is actually problematic for you is because you are creating a new writer each time you are writing a message. If you create a single writer and reuse it I imagine this will be a non-issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants