Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide report channel Events() for confluent kafka producer #1031

Merged
merged 1 commit into from
Apr 13, 2024

Conversation

yanmxa
Copy link
Contributor

@yanmxa yanmxa commented Apr 1, 2024

Signed-off-by: myan [email protected]
Resolved: #1030

Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just starting the review until I noticed we are already passing an error channel to Produce which according to Confluent docs is an option to not have to poll Events() channel and should not lead to memory leak. Do you see memory leaks in our implementation?

It is also possible to direct delivery reports to alternate channels by providing a non-nil chan Event channel to .Produce()

https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html

protocol/kafka_confluent/v2/option.go Outdated Show resolved Hide resolved
protocol/kafka_confluent/v2/protocol.go Outdated Show resolved Hide resolved
@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 1, 2024

I was just starting the review until I noticed we are already passing an error channel to Produce which according to Confluent docs is an option to not have to poll Events() channel and should not lead to memory leak. Do you see memory leaks in our implementation?

It is also possible to direct delivery reports to alternate channels by providing a non-nil chan Event channel to .Produce()

https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html

After running a service with our current implementation for a few days, The memory usage is from 75 MB to about 180 MB
image
Then I check the memory through pprof, found it mainly caused by the func _Cfunc_GoBytes and the related issues: confluentinc/confluent-kafka-go#578 (comment), confluentinc/confluent-kafka-go#1043 (comment)

image

Note: All the confluent producer samples initialize a goroutine to retrieve the event from the producer.Events() chan which I missed before.

@yanmxa yanmxa requested a review from embano1 April 1, 2024 09:08
@embano1
Copy link
Member

embano1 commented Apr 1, 2024

Sorry for being a bit pedantic here, but since we follow Confluent's producer logic and pass an event channel, either something is fundamentally wrong with the Confluent library or we are not using it correctly. I don't see adding another goroutine to handle this solving the problem IF the Confluent library behaves as expected, allowing a return channel on produce which we use accordingly:

err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)

Can you please debug further whether that channel is not drained correctly and whether you see events also in Event() which should not be the case then?

@embano1
Copy link
Member

embano1 commented Apr 1, 2024

Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. Delivery reports are by default emitted on the .Events() channel as *kafka.Message and you should check msg.TopicPartition.Error for nil to find out if the message was succesfully delivered or not. It is also possible to direct delivery reports to alternate channels by providing a non-nil chan Event channel to .Produce(). If no delivery reports are wanted they can be completely disabled by setting configuration property "go.delivery.reports": false.

Btw (unrelated to this, but still important): we aren't calling Flush(), are we?

When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function .Flush() that will block until all message deliveries are done or the provided timeout elapses.

@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 1, 2024

Sorry for being a bit pedantic here, but since we follow Confluent's producer logic and pass an event channel, either something is fundamentally wrong with the Confluent library or we are not using it correctly. I don't see adding another goroutine to handle this solving the problem IF the Confluent library behaves as expected, allowing a return channel on produce which we use accordingly:

err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)

Can you please debug further whether that channel is not drained correctly and whether you see events also in Event() which should not be the case then?

@embano1 You get the point. It does seem wired to use deliveryChan and producer.Events() at the same time. Let's hold this PR. I need to get more information to understand why then doing this in the sample

@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 2, 2024

Producing is an asynchronous operation so the client notifies the application of per-message produce success or failure through something called delivery reports. Delivery reports are by default emitted on the .Events() channel as *kafka.Message and you should check msg.TopicPartition.Error for nil to find out if the message was succesfully delivered or not. It is also possible to direct delivery reports to alternate channels by providing a non-nil chan Event channel to .Produce(). If no delivery reports are wanted they can be completely disabled by setting configuration property "go.delivery.reports": false.

Btw (unrelated to this, but still important): we aren't calling Flush(), are we?

When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function .Flush() that will block until all message deliveries are done or the provided timeout elapses.

Yes, We don't need to call the Flush(). Cause each time we send a message, then wait directly for the result from the deliveryChan.

Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, We don't need to call the Flush(). Cause each time we send a message, then wait directly for the result from the deliveryChan.

Makes sense.

However, I wonder what happens if a user constructs the producer with the following option:

If no delivery reports are wanted they can be completely disabled by setting configuration property "go.delivery.reports": false.

Would the producer block indefinitely or panic on nil event? Can you please test this and make sure your Send still works?

@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 2, 2024

Yes, We don't need to call the Flush(). Cause each time we send a message, then wait directly for the result from the deliveryChan.

Makes sense.

However, I wonder what happens if a user constructs the producer with the following option:

If no delivery reports are wanted they can be completely disabled by setting configuration property "go.delivery.reports": false.

Would the producer block indefinitely or panic on nil event? Can you please test this and make sure your Send still works?

Sure! I verified the following cases:

1. What happens when set both producer.Events() chan and the customized derliveryChan?

  • Start a goroutine fetch the event from producer.Events() chan
  • Set a customized derliveryChan.
  • Result: The producer only can get events from derliveryChan after producing the message.

2. What happens with the configuration "go.delivery.reports": false based on the current implementation?

  • Set the "go.delivery.reports": false when initialize the cloudevents client.
  • Set a customized derliveryChan when sending the message(current implmentation)
  • Result: The sender still works and we still can get the event from the derliveryChan. The configuration has no impact on the current implementation!

3. What happens with the configuration "go.delivery.reports": false and the producer.Events() chan?

  • Start a goroutine fetch the event from producer.Events() chan
  • Set the "go.delivery.reports": false when create the kafka producer
  • Result: It can't get anything from the producer.Events() chan, after sending a message without the deliveryChan, that is producer.Produce(msg, nil)

I also found that this issue, #1030 is not necessarily related to our current implementation, so I am planning to close the issue and the current PR.
But I will keep tracing why the confluent sample uses the producer.Events() and deliveryChan at the same time. And once the conclusion is reached, I will update it here.

@embano1 What do you think?

@embano1
Copy link
Member

embano1 commented Apr 3, 2024

What happens when set both producer.Events() chan and the customized derliveryChan?

That result is what we are expecting, thanks.

What happens with the configuration "go.delivery.reports": false based on the current implementation?

Interesting, seems custom event delivery channel overwrites this setting then. Weird configuration behavior, but in-line with the underlying Kafka callback implementation so perhaps reasonable.

What happens with the configuration "go.delivery.reports": false and the producer.Events() chan?

OK, so that setting works then.

Thx for your investigations, proposed next steps SGTM!

@yanmxa yanmxa closed this Apr 3, 2024
@yanmxa yanmxa reopened this Apr 7, 2024
@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 7, 2024

@embano1
I posted an issue in the community, looks like the producer.Events() is also provided to handle some events not associated with the sending message. confluentinc/confluent-kafka-go#1163

It does report the connection event from the producer.Events() chan, even if the deliveryChan exist:

{"level":"info","ts":1712455974.5608,"logger":"fallback","caller":"v2/protocol.go:291","msg":"get an error event from producer.Events() chan: 127.0.0.1:9093/bootstrap: Connect to ipv4#127.0.0.1:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)"}

@embano1
Copy link
Member

embano1 commented Apr 7, 2024

@yanmxa thx for verifying! So I suggest implementing a select {} pattern on both channels then. Assuming producer.Events() only fires if there's an error which is not related to the provided send/produce call channel, we should be good not leaking resources, i.e., there should never be an event on the other channel we pass to produce/send and we can close it on return.

@yanmxa yanmxa force-pushed the br_confluent_bug branch 2 times, most recently from 9720fb1 to ea2f222 Compare April 8, 2024 03:42
@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 8, 2024

@yanmxa thx for verifying! So I suggest implementing a select {} pattern on both channels then.

@embano1 I'm worried that switching deliverChan to select{} pattern would make the Sender an asynchronous method, which means it will always return success immediately?

Assuming producer.Events() only fires if there's an error which is not related to the provided send/produce call channel, we should be good not leaking resources, i.e., there should never be an event on the other channel we pass to produce/send and we can close it on return.

Yes, We release the deliveryChan passed to the producer/send once the producer is done.

Is the current implementation consistent with your ideas?

@embano1
Copy link
Member

embano1 commented Apr 8, 2024

Select blocks, ie, is not making the code async. Let me see if I can propose something to simplify the current PR.

@embano1
Copy link
Member

embano1 commented Apr 8, 2024

Quick question: what was the reason we decided to use a custom event delivery channel instead of reading everything from producer.Events()?

@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 8, 2024

Quick question: what was the reason we decided to use a custom event delivery channel instead of reading everything from producer.Events()?

@embano1 use the "delieryChan" to get the result after each send so that we can change the asynchronous method Produce into a synchronous method to adapt to the Sender.

I choose deliveryChan instead of producer.Events(). Because I don't find any official examples that use producer.Events() chan to block and wait for the result of produce/send. They all use producer.Events() in a separate gourontine. But it does provide many cases where produce is blocked and waiting for a result to be sent using deliveryChan.

Now we know that they do this because producer.Events() also contains events that aren't related to Produce itself.

@embano1
Copy link
Member

embano1 commented Apr 8, 2024

This is really non-intuitive API design in the Confluent go SDK if you ask me :/

I'm proposing a slightly different implementation to

  1. Simplify the developer experience for sdk-go with Confluent Kafka users
  2. Simplify our implementation/complexity/maintenance

Produce() is fundamentally async and we would need to wrap sync behavior around it because Send has synchronous semantics in sdk-go today. However, wrapping is not easy because Produce() uses buffers and batching under the covers so that we'd need success and error handlers so users can correlate whether a Send() was successful, i.e., inspect each event in Events() and correlate to the appropriate Send() call. Instead, we could just surface that Confluent SDK behavior to users, given they're familiar with that SDK/behavior, and giving them full control to follow SDK best practices/usage without complicating our implementation and the developer experience.

diff --git a/protocol/kafka_confluent/v2/protocol.go b/protocol/kafka_confluent/v2/protocol.go
index 8aa9068..e98ab51 100644
--- a/protocol/kafka_confluent/v2/protocol.go
+++ b/protocol/kafka_confluent/v2/protocol.go
@@ -12,9 +12,10 @@
 	"io"
 	"sync"
 
+	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
+
 	"github.com/cloudevents/sdk-go/v2/binding"
 	"github.com/cloudevents/sdk-go/v2/protocol"
-	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
 
 	cecontext "github.com/cloudevents/sdk-go/v2/context"
 )
@@ -33,15 +34,14 @@ type Protocol struct {
 	consumerTopics       []string
 	consumerRebalanceCb  kafka.RebalanceCb                          // optional
 	consumerPollTimeout  int                                        // optional
-	consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional
+	consumerErrorHandler func(ctx context.Context, err kafka.Error) // optional
 	consumerMux          sync.Mutex
 	consumerIncoming     chan *kafka.Message
 	consumerCtx          context.Context
 	consumerCancel       context.CancelFunc
 
 	producer             *kafka.Producer
-	producerDeliveryChan chan kafka.Event // optional
-	producerDefaultTopic string           // optional
+	producerDefaultTopic string // optional
 
 	closerMux sync.Mutex
 }
@@ -85,9 +85,6 @@ func New(opts ...Option) (*Protocol, error) {
 	if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil {
 		return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer")
 	}
-	if p.producer != nil {
-		p.producerDeliveryChan = make(chan kafka.Event)
-	}
 	return p, nil
 }
 
@@ -128,23 +125,25 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ..
 		kafkaMsg.Key = []byte(messageKey)
 	}
 
-	err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...)
-	if err != nil {
-		return err
+	if err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...); err != nil {
+		return fmt.Errorf("create producer message: %w", err)
 	}
 
-	err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)
-	if err != nil {
-		return err
-	}
-	e := <-p.producerDeliveryChan
-	m := e.(*kafka.Message)
-	if m.TopicPartition.Error != nil {
-		return m.TopicPartition.Error
+	if err = p.producer.Produce(kafkaMsg, nil); err != nil {
+		return fmt.Errorf("send message: %w", err)
 	}
+
 	return nil
 }
 
+func (p *Protocol) Events() (chan kafka.Event, error) {
+	if p.producer == nil {
+		return nil, errors.New("producer not set")
+	}
+
+	return p.producer.Events(), nil
+}
+
 func (p *Protocol) OpenInbound(ctx context.Context) error {
 	if p.consumer == nil {
 		return errors.New("the consumer client must be set")
@@ -238,7 +237,6 @@ func (p *Protocol) Close(ctx context.Context) error {
 
 	if p.producer != nil && !p.producer.IsClosed() {
 		p.producer.Close()
-		close(p.producerDeliveryChan)
 	}
 
 	return nil

@embano1
Copy link
Member

embano1 commented Apr 8, 2024

If we agree on the above, needs code comments for public methods and usage example

@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 8, 2024

Hello, @embano1!

I'd like to consult your opinion on something related to kubeCon China. Since I couldn't find your contact information, I'm reaching out to you here.

Over the past year, we've brought some updates about the message queue to this repos. If you're interested, I can draft a proposal for the enhancements of cloudevents message queue to the KubeCon China. The main people involved would be the two of us. The main topics covered include bringing MQTT binding to support IoT cases, and some improvements related to Kafka such as supporting message confirm, consumption from a specific point, batch subscribing to topics, and supporting ordered partition offsets, etc.

How do you feel about it?

@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 8, 2024

If we agree on the above, needs code comments for public methods and usage example

It's a good choice to give the producer.Events() to the user!

If we deprecate the deliveryChan, then we need to think of another question: when to invoke the producer.Flush() to deliver all the buffered messages to the Kafka broker? invoke it in Sender or Closer? I agree with the above once this question is settled.

  • When you are done producing messages you will need to make sure all messages are indeed delivered to the broker (or failed), remember that this is an asynchronous client so some of your messages may be lingering in internal channels or tranmission queues. To do this you can either keep track of the messages you've produced and wait for their corresponding delivery reports, or call the convenience function .Flush() that will block until all message deliveries are done or the provided timeout elapses.

@embano1
Copy link
Member

embano1 commented Apr 8, 2024

If we deprecate the deliveryChan, then we need to think of another question: when to invoke the producer.Flush() to deliver all the buffered messages to the Kafka broker? invoke it in Sender or Closer? I agree with the above once this question is settled.

Do we need formal deprecation of deliveryChan since we don't expose it? Or do you just mean removing that code? I thought about Sender calling Flush too, but that might be too much (perf). So Close() might be the better option since it would also release Events() eventually (would need to check the Confluent SDK on the behavior again).

@embano1
Copy link
Member

embano1 commented Apr 8, 2024

I'd like to consult your opinion on something related to kubeCon China. Since I couldn't find your contact information, I'm reaching out to you here.

Can you please reach out on the CloudEvents Slack channel and ping me there?

@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 8, 2024

If we deprecate the deliveryChan, then we need to think of another question: when to invoke the producer.Flush() to deliver all the buffered messages to the Kafka broker? invoke it in Sender or Closer? I agree with the above once this question is settled.

Do we need formal deprecation of deliveryChan since we don't expose it? Or do you just mean removing that code? I thought about Sender calling Flush too, but that might be too much (perf). So Close() might be the better option since it would also release Events() eventually (would need to check the Confluent SDK on the behavior again).

I think we can just remove the code cause the user cannot sense it.

Yes. Calling Flush() in Sender definitely affects the performance. We do it in Closer.

It looks like we have reached a consensus. Then I will proceed with these changes and add some examples.

protocol/kafka_confluent/v2/option.go Outdated Show resolved Hide resolved
@yanmxa yanmxa changed the title Add eventHandler option for confluent kafka producer Provide report channel Events() for confluent kafka producer Apr 8, 2024
protocol/kafka_confluent/v2/protocol.go Outdated Show resolved Hide resolved
}

err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)
if err != nil {
if err = p.producer.Produce(kafkaMsg, nil); err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no formatted error like above?

Copy link
Contributor Author

@yanmxa yanmxa Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to return a kafka.Error here for fear that users might also want to check its type using an assert statement like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I'm wrapping the error with %w to allow Go idiomatic use of https://pkg.go.dev/errors#As

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yanmxa WDYT?

Copy link
Contributor Author

@yanmxa yanmxa Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@embano1
While the type of kafka.Error is not the type of error we typically refer to.
It's an object that only implements the error interface. So once the kafka.Error is formatted with %w, then it can't be unwrapped back to kafka.Error.

                 err = p.producer.Produce(kafkaMsg, nil)
                 if err != nil {
                         err = fmt.Errorf("send message: %w", err)
                 }
                 ...
		if err != nil {
			if err.(kafka.Error).Code() == kafka.ErrQueueFull {
				// Producer queue is full, wait 1s for messages
				// to be delivered then try again.
				time.Sleep(time.Second)
				continue
			}
			fmt.Printf("Failed to produce message: %v\n", err)
		}
panic: interface conversion: error is *fmt.wrapError, not kafka.Error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we're talking past each other :)

This doesn't work?

                 var kafkaError kafka.Error
                 if errors.As(err, &kafkaError) {
			// use kafkaError to handle and access fields
		}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yanmxa ICYMI ^

}
return p.producer.Events(), nil
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a comment on Send to reference/explain to also use the Events channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1))
// The 'WithBlockingCallback()' is to make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1).
// These two options make sure the events from kafka partition are processed in order
c, err := cloudevents.NewClient(receiver, client.WithBlockingCallback(), client.WithPollGoroutines(1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wow, man...this is really not intuitive to use

Comment on lines 89 to 91
// Producer queue is full, wait 1s for messages
// to be delivered then try again.
time.Sleep(time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a best practice? Wondering if it complicates the example, unless you think it's always important to implement this logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen them use assert like the above when sending messages asynchronously.

But I tried to set the producer buffer channel size with 1 "go.produce.channel.size": 1, and sending 10,000 messages within 1 second several times. It never catches the kafka.ErrQueueFull error. So let's remove it.

continue
}
}
if cloudevents.IsUndelivered(result) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably sufficient for error handling instead of the above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm... remove it now.

samples/kafka_confluent/sender/main.go Outdated Show resolved Hide resolved
defer sender.Close(ctx)

// Listen to all the events on the default events channel
// It's important to read these events otherwise the events channel will eventually fill up
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you're not waiting for this goroutine to finish before returning from main- which could prevent your goroutine from printing anything. typically use https://pkg.go.dev/golang.org/x/sync/errgroup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@embano1
Copy link
Member

embano1 commented Apr 8, 2024

Thx @yanmxa ! Definitely easier code on our side - however, that Confluent SDK is really hard to use given all the asynchrony and error handling one has to perform to catch all possible scenarios - guess not much further we can do here :/

@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 9, 2024

Thx @yanmxa ! Definitely easier code on our side - however, that Confluent SDK is really hard to use given all the asynchrony and error handling one has to perform to catch all possible scenarios - guess not much further we can do here :/

Absolutely! It seems like they traded ease of use for improved performance with their asynchronous approach. That's made us put in extra work to handle it.

I'm thinking we might consider adding an option in the future to switch the producer to a synchronous method, avoiding users having to watch producer.Events() channel additionally. Like the following snippet:

err = producer.Produce(msg, nil)
if err != nil {
    return fmt.Errorf("failed to produce message: %w", err)
}
event := <-producer.Events()

@embano1
Copy link
Member

embano1 commented Apr 9, 2024

I'm thinking we might consider adding an option in the future to switch the producer to a synchronous method, avoiding users having to watch producer.Events() channel additionally. Like the following snippet:

err = producer.Produce(msg, nil)
if err != nil {
    return fmt.Errorf("failed to produce message: %w", err)
}
event := <-producer.Events()

Initially I tried that, but IMHO it's not that easy. Because Produce to Kafka is async, you don't know which event you're getting/returning here, making it problematic for callers to understand whether the returned event/error was for the same CloudEvent just sent with Send(). We'd need additional correlation logic and also handle errors not caused be a specific event, but also returned on this channel (as I understood the Confluent docs).

}

err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)
if err != nil {
if err = p.producer.Produce(kafkaMsg, nil); err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I'm wrapping the error with %w to allow Go idiomatic use of https://pkg.go.dev/errors#As


if p.consumerCancel != nil {
p.consumerCancel()
}

if p.producer != nil && !p.producer.IsClosed() {
// Flush and close the producer and the events channel
for p.producer.Flush(10000) > 0 {
logger.Info("Still waiting to flush outstanding messages")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Info("Still waiting to flush outstanding messages")
logger.Info("Flushing outstanding messages")

Question: 10000 is 10 seconds timeout or delay? How often is this message printed? Risk of floding logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10000 is 10 seconds timeout.

Actually, I haven't seen the message in a normal case. It only echoes when Invoke Close() after sending to a disconnected broker. And with the current 10 second timeout, it prints the message every 10 seconds.

@yanmxa
Copy link
Contributor Author

yanmxa commented Apr 9, 2024

I'm thinking we might consider adding an option in the future to switch the producer to a synchronous method, avoiding users having to watch producer.Events() channel additionally. Like the following snippet:

err = producer.Produce(msg, nil)
if err != nil {
    return fmt.Errorf("failed to produce message: %w", err)
}
event := <-producer.Events()

Initially I tried that, but IMHO it's not that easy. Because Produce to Kafka is async, you don't know which event you're getting/returning here, making it problematic for callers to understand whether the returned event/error was for the same CloudEvent just sent with Send(). We'd need additional correlation logic and also handle errors not caused be a specific event, but also returned on this channel (as I understood the Confluent docs).

Exactly! Making the Producer synchronous also has 3 options currently:

  1. Using producer.Events() just like the above snippet. As you said need additional logic to handle the errors not associated with the produced events.
  2. Using the customized deliveryChan to handle the produced events, and watch producer.Events() channel in a separate goroutine to handle other errors
  3. Using producer. Flush() after sending the message, which also is discussed.

However, should we convert it to synchronous? Which option should we use? If it is necessary, we can discuss it later.

@yanmxa yanmxa requested a review from embano1 April 9, 2024 15:12
@embano1
Copy link
Member

embano1 commented Apr 13, 2024

Which option should we use? If it is necessary, we can discuss it later.

Yeah, let's keep this out of this PR for now

protocol/kafka_confluent/v2/protocol.go Outdated Show resolved Hide resolved
}

err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)
if err != nil {
if err = p.producer.Produce(kafkaMsg, nil); err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yanmxa ICYMI ^

protocol/kafka_confluent/v2/protocol.go Outdated Show resolved Hide resolved
Signed-off-by: Meng Yan <[email protected]>

reply review

Signed-off-by: Meng Yan <[email protected]>

Update protocol/kafka_confluent/v2/protocol.go

Co-authored-by: Michael Gasch <[email protected]>
Signed-off-by: Meng Yan <[email protected]>
@embano1 embano1 merged commit f765e03 into cloudevents:main Apr 13, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory leak in the confluent kafka producer
2 participants