Skip to content

Commit

Permalink
improve: add eventChan to execute any in the event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Jul 10, 2024
1 parent 1152cfc commit 65308ab
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 22 deletions.
42 changes: 21 additions & 21 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ type partitionProducer struct {

// Channel where app is posting messages to be published
dataChan chan *sendRequest
cmdChan chan interface{}
connectClosedCh chan *connectionClosed
eventChan chan func()
publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
lastSequenceID int64
Expand Down Expand Up @@ -167,8 +166,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
options: options,
producerID: client.rpcClient.NewProducerID(),
dataChan: make(chan *sendRequest, maxPendingMessages),
cmdChan: make(chan interface{}, 10),
connectClosedCh: make(chan *connectionClosed, 10),
eventChan: make(chan func()),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType),
compression.Level(options.CompressionLevel)),
Expand Down Expand Up @@ -294,6 +292,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {

res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
cnx.UnregisterListener(p.producerID)
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
if errors.Is(err, internal.ErrRequestTimeOut) {
id := p.client.rpcClient.NewRequestID()
Expand Down Expand Up @@ -408,9 +407,10 @@ func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProdu
assignedBrokerURL = p.client.selectServiceURL(
closeProducer.GetAssignedBrokerServiceUrl(), closeProducer.GetAssignedBrokerServiceUrlTls())
}
p.connectClosedCh <- &connectionClosed{
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker(&connectionClosed{
assignedBrokerURL: assignedBrokerURL,
}
})
}

func (p *partitionProducer) SetRedirectedClusterURI(redirectedClusterURI string) {
Expand Down Expand Up @@ -545,27 +545,22 @@ func (p *partitionProducer) runEventsLoop() {
return
}
p.internalSend(data)
case cmd, ok := <-p.cmdChan:
// when doClose() is call, p.dataChan will be closed, cmd will be nil
case event, ok := <-p.eventChan:
// when doClose() is call, p.eventChan will be closed, cmd will be nil
if !ok {
return
}
switch v := cmd.(type) {
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker(connectionClosed)
event()
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
}
}

func (p *partitionProducer) execute(fn func()) {
p.eventChan <- fn
}

func (p *partitionProducer) Topic() string {
return p.topic
}
Expand Down Expand Up @@ -1399,7 +1394,7 @@ func (p *partitionProducer) doClose(reason error) {

p.log.Info("Closing producer")
defer close(p.dataChan)
defer close(p.cmdChan)
defer close(p.eventChan)

id := p.client.rpcClient.NewRequestID()
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
Expand Down Expand Up @@ -1481,7 +1476,10 @@ func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case p.cmdChan <- flushReq:
default:
p.execute(func() {
p.internalFlush(flushReq)
})
}

// wait for the flush request to complete
Expand Down Expand Up @@ -1514,7 +1512,9 @@ func (p *partitionProducer) Close() {
}

cp := &closeProducer{doneCh: make(chan struct{})}
p.cmdChan <- cp
p.execute(func() {
p.internalClose(cp)
})

// wait for close producer request to complete
<-cp.doneCh
Expand Down
2 changes: 1 addition & 1 deletion pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
testProducer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
DisableBlockIfQueueFull: false,
BatchingMaxPublishDelay: 100000,
BatchingMaxPublishDelay: 10 * time.Second,
BatchingMaxMessages: 1000,
})

Expand Down

0 comments on commit 65308ab

Please sign in to comment.