Skip to content

Commit

Permalink
introduce maxBatchBytes option (soft limit)
Browse files Browse the repository at this point in the history
  • Loading branch information
jxsl13 committed Jan 3, 2024
1 parent 87a5e93 commit 0fc52d5
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 36 deletions.
90 changes: 73 additions & 17 deletions pool/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ func (s *Subscriber) RegisterBatchHandler(handler *BatchHandler) {

s.log.WithFields(withConsumerIfSet(handler.ConsumeOptions().ConsumerTag,
map[string]any{
"subscriber": s.pool.Name(),
"queue": opts.Queue,
"maxBatchSize": opts.MaxBatchSize, // TODO: optimize so that we don't call getters multiple times (mutex contention)
"flushTimeout": handler.FlushTimeout,
"subscriber": s.pool.Name(),
"queue": opts.Queue,
"maxBatchBytes": opts.MaxBatchBytes,
"maxBatchSize": opts.MaxBatchSize,
"flushTimeout": opts.FlushTimeout,
})).Info("registered batch message handler")
}

Expand Down Expand Up @@ -445,7 +446,14 @@ func (s *Subscriber) batchConsume(h *BatchHandler) (err error) {
// There is no way to recover form this state in case an error is returned from the Nack call.
nackErr := batch[len(batch)-1].Nack(true, true)
if nackErr != nil {
s.warnBatchHandler(opts.ConsumerTag, opts.Queue, opts.MaxBatchSize, err, "failed to nack and requeue batch upon shutdown")
s.warnBatchHandler(
opts.ConsumerTag,
opts.Queue,
opts.MaxBatchSize,
opts.MaxBatchBytes,
err,
"failed to nack and requeue batch upon shutdown",
)

Check warning on line 456 in pool/subscriber.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber.go#L449-L456

Added lines #L449 - L456 were not covered by tests
}
}
}()
Expand All @@ -456,10 +464,12 @@ func (s *Subscriber) batchConsume(h *BatchHandler) (err error) {
)
defer closeTimer(timer, &drained)

var batchBytes = 0
for {
// reset batch slice
// reuse memory
batch = batch[:0]
batchBytes = 0

collectBatch:
for {
Expand All @@ -474,8 +484,14 @@ func (s *Subscriber) batchConsume(h *BatchHandler) (err error) {
if !ok {
return ErrDeliveryClosed
}

batchBytes += len(msg.Body)
batch = append(batch, msg)
if len(batch) == opts.MaxBatchSize {
if opts.MaxBatchSize > 0 && len(batch) == opts.MaxBatchSize {
break collectBatch
}

if opts.MaxBatchBytes > 0 && batchBytes >= opts.MaxBatchBytes {
break collectBatch
}

Expand All @@ -485,6 +501,13 @@ func (s *Subscriber) batchConsume(h *BatchHandler) (err error) {
if len(batch) > 0 {
// timeout reached, process batch that might not contain
// a full batch, yet.
s.infoBatchHandler(
opts.ConsumerTag,
opts.Queue,
len(batch),
batchBytes,
"flush timeout reached",
)
break collectBatch
}

Expand All @@ -498,26 +521,37 @@ func (s *Subscriber) batchConsume(h *BatchHandler) (err error) {
lastDeliveryTag = batch[len(batch)-1].DeliveryTag
)

s.infoBatchHandler(opts.ConsumerTag, opts.Queue, batchSize, "received batch")
s.infoBatchHandler(opts.ConsumerTag, opts.Queue, batchSize, batchBytes, "received batch")
err = opts.HandlerFunc(batch)
// no acks required
if opts.AutoAck {
if err != nil {
// we cannot really do anything to recover from a processing error in this case
s.errorBatchHandler(opts.ConsumerTag, opts.Queue, batchSize, fmt.Errorf("processing failed: dropping batch: %w", err))
s.errorBatchHandler(opts.ConsumerTag,
opts.Queue,
batchSize,
batchBytes,
fmt.Errorf("processing failed: dropping batch: %w", err),
)

Check warning on line 535 in pool/subscriber.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber.go#L530-L535

Added lines #L530 - L535 were not covered by tests
} else {
s.infoBatchHandler(opts.ConsumerTag, opts.Queue, batchSize, "processed batch")
s.infoBatchHandler(
opts.ConsumerTag,
opts.Queue,
batchSize,
batchBytes,
"processed batch",
)

Check warning on line 543 in pool/subscriber.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber.go#L537-L543

Added lines #L537 - L543 were not covered by tests
}
} else {
poolErr := s.ackBatchPostHandle(opts, lastDeliveryTag, batchSize, session, err)
poolErr := s.ackBatchPostHandle(opts, lastDeliveryTag, batchSize, batchBytes, session, err)
if poolErr != nil {
return poolErr
}
}
}
}

func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerConfig, lastDeliveryTag uint64, currentBatchSize int, session *Session, handlerErr error) (err error) {
func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerConfig, lastDeliveryTag uint64, currentBatchSize, currentBatchBytes int, session *Session, handlerErr error) (err error) {
var ackErr error
// processing failed
if handlerErr != nil {
Expand All @@ -531,7 +565,14 @@ func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerConfig, lastDeliveryTag
// if (n)ack fails, we know that the connection died
// potentially before processing already.
if ackErr != nil {
s.warnBatchHandler(opts.ConsumerTag, opts.Queue, currentBatchSize, ackErr, "batch (n)ack failed")
s.warnBatchHandler(
opts.ConsumerTag,
opts.Queue,
currentBatchSize,
currentBatchBytes,
ackErr,
"batch (n)ack failed",
)

Check warning on line 575 in pool/subscriber.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber.go#L568-L575

Added lines #L568 - L575 were not covered by tests
poolErr := session.Recover()
if poolErr != nil {
// only returns an error upon shutdown
Expand All @@ -545,9 +586,21 @@ func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerConfig, lastDeliveryTag

// (n)acked successfully
if handlerErr != nil {
s.infoBatchHandler(opts.ConsumerTag, opts.Queue, currentBatchSize, "nacked batch")
s.infoBatchHandler(
opts.ConsumerTag,
opts.Queue,
currentBatchSize,
currentBatchBytes,
"nacked batch",
)

Check warning on line 595 in pool/subscriber.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber.go#L589-L595

Added lines #L589 - L595 were not covered by tests
} else {
s.infoBatchHandler(opts.ConsumerTag, opts.Queue, currentBatchSize, "acked batch")
s.infoBatchHandler(
opts.ConsumerTag,
opts.Queue,
currentBatchSize,
currentBatchBytes,
"acked batch",
)
}
// successfully handled message
return nil
Expand Down Expand Up @@ -586,28 +639,31 @@ func (s *Subscriber) catchShutdown() <-chan struct{} {
return s.ctx.Done()
}

func (s *Subscriber) infoBatchHandler(consumer, queue string, batchSize int, a ...any) {
func (s *Subscriber) infoBatchHandler(consumer, queue string, batchSize, batchBytes int, a ...any) {
s.log.WithFields(withConsumerIfSet(consumer,
map[string]any{
"batchSize": batchSize,
"batchBytes": batchBytes,
"subscriber": s.pool.Name(),
"queue": queue,
})).Info(a...)
}

func (s *Subscriber) warnBatchHandler(consumer, queue string, batchSize int, err error, a ...any) {
func (s *Subscriber) warnBatchHandler(consumer, queue string, batchSize, batchBytes int, err error, a ...any) {

Check warning on line 652 in pool/subscriber.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber.go#L652

Added line #L652 was not covered by tests
s.log.WithFields(withConsumerIfSet(consumer, map[string]any{
"batchSize": batchSize,
"batchBytes": batchBytes,

Check warning on line 655 in pool/subscriber.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber.go#L655

Added line #L655 was not covered by tests
"subscriber": s.pool.Name(),
"queue": queue,
"error": err,
})).Warn(a...)
}

func (s *Subscriber) errorBatchHandler(consumer, queue string, batchSize int, err error, a ...any) {
func (s *Subscriber) errorBatchHandler(consumer, queue string, batchSize, batchBytes int, err error, a ...any) {

Check warning on line 662 in pool/subscriber.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber.go#L662

Added line #L662 was not covered by tests
s.log.WithFields(withConsumerIfSet(consumer,
map[string]any{
"batchSize": batchSize,
"batchBytes": batchBytes,

Check warning on line 666 in pool/subscriber.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber.go#L666

Added line #L666 was not covered by tests
"subscriber": s.pool.Name(),
"queue": queue,
"error": err,
Expand Down
54 changes: 37 additions & 17 deletions pool/subscriber_batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ func NewBatchHandler(queue string, hf BatchHandlerFunc, options ...BatchHandlerO

// sane defaults
h := &BatchHandler{
sc: newStateContext(context.Background()),
queue: queue,
handlerFunc: hf,
maxBatchSize: defaultMaxBatchSize,
flushTimeout: defaultFlushTimeout,
sc: newStateContext(context.Background()),
queue: queue,
handlerFunc: hf,
maxBatchSize: defaultMaxBatchSize,
maxBatchBytes: 0, // unlimited by default
flushTimeout: defaultFlushTimeout,
consumeOpts: ConsumeOptions{
ConsumerTag: "",
AutoAck: false,
Expand Down Expand Up @@ -55,6 +56,11 @@ type BatchHandler struct {
// before processing is triggered
maxBatchSize int

// In case that the batch size exceeds this limit, the batch is passed to the handler function.
// This indicates that a batch will contains at least one message for processing.
// If the value is set to 0, the batch size is unlimited.
maxBatchBytes int

// FlushTimeout is the duration that is waited for the next message from a queue before
// the batch is closed and passed for processing.
// This value should be less than 30m (which is the (n)ack timeout of RabbitMQ)
Expand All @@ -69,8 +75,15 @@ type BatchHandlerConfig struct {
Queue string
ConsumeOptions

HandlerFunc BatchHandlerFunc
HandlerFunc BatchHandlerFunc

// Maximum number of messages
MaxBatchSize int

// Maximum size of a batch in bytes (soft limit which triggers a batch to be processed)
// does not guarantee that the batch size is not exceeded.
MaxBatchBytes int

FlushTimeout time.Duration
}

Expand Down Expand Up @@ -110,6 +123,7 @@ func (h *BatchHandler) configUnguarded() BatchHandlerConfig {
Queue: h.queue,
HandlerFunc: h.handlerFunc,
MaxBatchSize: h.maxBatchSize,
MaxBatchBytes: h.maxBatchBytes,
FlushTimeout: h.flushTimeout,
ConsumeOptions: h.consumeOpts,
}
Expand Down Expand Up @@ -186,22 +200,31 @@ func (h *BatchHandler) ConsumeOptions() ConsumeOptions {
func (h *BatchHandler) SetConsumeOptions(consumeOpts ConsumeOptions) {
h.mu.Lock()
defer h.mu.Unlock()
h.consumeOpts = consumeOpts
WithBatchConsumeOptions(consumeOpts)(h)

Check warning on line 203 in pool/subscriber_batch_handler.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber_batch_handler.go#L203

Added line #L203 was not covered by tests
}

func (h *BatchHandler) MaxBatchSize() int {
h.mu.Lock()
defer h.mu.Unlock()
h.mu.RLock()
defer h.mu.RUnlock()
return h.maxBatchSize
}

func (h *BatchHandler) SetMaxBatchSize(maxBatchSize int) {
h.mu.Lock()
defer h.mu.Unlock()
if maxBatchSize <= 0 {
maxBatchSize = defaultMaxBatchSize
}
h.maxBatchSize = maxBatchSize
WithMaxBatchSize(maxBatchSize)(h)

Check warning on line 215 in pool/subscriber_batch_handler.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber_batch_handler.go#L215

Added line #L215 was not covered by tests
}

func (h *BatchHandler) MaxBatchBytes() int {
h.mu.RLock()
defer h.mu.RUnlock()
return h.maxBatchBytes
}

func (h *BatchHandler) SetMaxBatchBytes(maxBatchBytes int) {
h.mu.Lock()
defer h.mu.Unlock()
WithMaxBatchBytes(maxBatchBytes)(h)

Check warning on line 227 in pool/subscriber_batch_handler.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber_batch_handler.go#L224-L227

Added lines #L224 - L227 were not covered by tests
}

func (h *BatchHandler) FlushTimeout() time.Duration {
Expand All @@ -213,8 +236,5 @@ func (h *BatchHandler) FlushTimeout() time.Duration {
func (h *BatchHandler) SetFlushTimeout(flushTimeout time.Duration) {
h.mu.Lock()
defer h.mu.Unlock()
if flushTimeout <= 0 {
flushTimeout = defaultFlushTimeout
}
h.flushTimeout = flushTimeout
WithBatchFlushTimeout(flushTimeout)(h)

Check warning on line 239 in pool/subscriber_batch_handler.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber_batch_handler.go#L239

Added line #L239 was not covered by tests
}
44 changes: 42 additions & 2 deletions pool/subscriber_handler_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,47 @@ import "time"

type BatchHandlerOption func(*BatchHandler)

// WithMaxBatchSize sets the maximum size of a batch.
// If set to 0 the batch size is not limited.
// This means that the batch size is only limited by the maximum batch size in bytes.
func WithMaxBatchSize(size int) BatchHandlerOption {
return func(bh *BatchHandler) {
if size <= 0 {

switch {
case and(size <= 0, bh.maxBatchBytes == 0):
// we need to set a sane default
bh.maxBatchSize = defaultMaxBatchSize
} else {
case and(size <= 0, bh.maxBatchBytes > 0):
// we need to set the batch size to unlimited
// because the batch is limited by bytes
bh.maxBatchSize = 0
case size > 0:
// we need to set the batch size to the provided value
bh.maxBatchSize = size
}
}
}

// WithMaxBatchBytes sets the maximum size of a batch in bytes.
// If the batch size exceeds this limit, the batch is passed to the handler function.
// If the value is set to 0, the batch size is not limited by bytes.
func WithMaxBatchBytes(size int) BatchHandlerOption {
return func(bh *BatchHandler) {
switch {
case and(size <= 0, bh.maxBatchSize == 0):
// do not change the current value
return

Check warning on line 36 in pool/subscriber_handler_options.go

View check run for this annotation

Codecov / codecov/patch

pool/subscriber_handler_options.go#L34-L36

Added lines #L34 - L36 were not covered by tests
case and(size <= 0, bh.maxBatchSize > 0):
// we need to set the batch size to unlimited
// because the batch is limited by number of messages
bh.maxBatchBytes = 0
case size > 0:
// we need to set the batch size to the provided value
bh.maxBatchBytes = size
}
}
}

func WithBatchFlushTimeout(d time.Duration) BatchHandlerOption {
return func(bh *BatchHandler) {
if d <= 0 {
Expand All @@ -29,3 +60,12 @@ func WithBatchConsumeOptions(opts ConsumeOptions) BatchHandlerOption {
bh.consumeOpts = opts
}
}

func and(b ...bool) bool {
for _, v := range b {
if !v {
return false
}
}
return true
}
29 changes: 29 additions & 0 deletions pool/subscriber_handler_options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pool

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestWithMaxBatchSize(t *testing.T) {
dummyHandler := func([]Delivery) error { return nil }
bh := NewBatchHandler("test", dummyHandler, WithMaxBatchSize(0), WithMaxBatchBytes(0))

assert.Equal(t, defaultMaxBatchSize, bh.MaxBatchSize())
assert.Equal(t, 0, bh.MaxBatchBytes())

bh = NewBatchHandler("test", dummyHandler, WithMaxBatchBytes(0), WithMaxBatchSize(0))
assert.Equal(t, defaultMaxBatchSize, bh.MaxBatchSize())
assert.Equal(t, 0, bh.MaxBatchBytes())

bh = NewBatchHandler("test", dummyHandler, WithMaxBatchBytes(1), WithMaxBatchSize(1))
assert.Equal(t, 1, bh.MaxBatchSize())
assert.Equal(t, 1, bh.MaxBatchBytes())

// if you want to set specific limits to infinite, you may first set all the != 0 options and then set the
// rest of the options to 0.
bh = NewBatchHandler("test", dummyHandler, WithMaxBatchBytes(50), WithMaxBatchSize(0))
assert.Equal(t, 0, bh.MaxBatchSize())
assert.Equal(t, 50, bh.MaxBatchBytes())
}
Loading

0 comments on commit 0fc52d5

Please sign in to comment.