Skip to content

Commit

Permalink
Merge pull request #9 from paaaaay5/firequeue-async
Browse files Browse the repository at this point in the history
Send to firehose asynchronously
  • Loading branch information
mash authored Jan 4, 2024
2 parents 521324c + bd344af commit ef13c99
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 154 deletions.
261 changes: 162 additions & 99 deletions firequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/firehose"
Expand All @@ -29,22 +30,60 @@ func Parallel(i int) Option {
}

// ErrorHandler is an option to specify the error handler
func ErrorHandler(fn func(error, *firehose.PutRecordInput)) Option {
func ErrorHandler(fn func(error, []*firehose.Record)) Option {
return func(q *Queue) {
q.errorHandler = fn
}
}

// MaxQueueLength is an option to specify max length of in-memory queue
func MaxQueueLength(length int) Option {
if length <= 0 {
panic("max queue length must be positive")
}
return func(q *Queue) {
q.maxQueueLength = length
}
}

// BatchSize is an option to specify batch size
func BatchSize(length int) Option {
if length <= 0 {
panic("max queue length must be positive")
}
// "Each PutRecordBatch request supports up to 500 records."
// ref. https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/firehose#Client.PutRecordBatch
if length > 500 {
panic("max queue length must be less than 500")
}
return func(q *Queue) {
q.batchSize = length
}
}

// BatchInterval is an option to specify batch interval
func BatchInterval(period time.Duration) Option {
// API limit is 1000 requests per second per shard(default).
// This means an average of 1 request per millisecond.
// ref. https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/limits.html#:~:text=Direct%20PUT%20%E3%81%8C%E3%83%87%E3%83%BC%E3%82%BF%E3%82%BD%E3%83%BC%E3%82%B9%E3%81%A8%E3%81%97%E3%81%A6%E8%A8%AD%E5%AE%9A%E3%81%95%E3%82%8C%E3%81%A6%E3%81%84%E3%82%8B%E5%A0%B4%E5%90%88%E3%80%81%E5%90%84%20Kinesis%20Data%20Firehose%20%E3%83%87%E3%83%AA%E3%83%90%E3%83%AA%E3%83%BC%E3%82%B9%E3%83%88%E3%83%AA%E3%83%BC%E3%83%A0%E3%81%AF%E3%80%81PutRecord%E3%81%8A%E3%82%88%E3%81%B3%E3%83%AA%E3%82%AF%E3%82%A8%E3%82%B9%E3%83%88%E3%81%AB%E5%AF%BE%E3%81%97%E3%81%A6%E4%BB%A5%E4%B8%8B%E3%81%AE%E5%90%88%E8%A8%88%E3%82%AF%E3%82%A9%E3%83%BC%E3%82%BF%E3%82%92%E6%8F%90%E4%BE%9B%E3%81%97%E3%81%BE%E3%81%99%E3%80%82PutRecordBatch
if period <= 1*time.Microsecond {
panic("batch interval must be greater than 1 microsecond")
}

return func(q *Queue) {
q.batchInterval = period
}
}

// New return new Queue
func New(fh firehoseiface.FirehoseAPI, opts ...Option) *Queue {
q := &Queue{firehose: fh, initialized: make(chan struct{})}
func New(fh firehoseiface.FirehoseAPI, DeliveryStreamName string, opts ...Option) *Queue {
q := &Queue{
queue: make([]*firehose.Record, 0),
workerCh: make(chan []*firehose.Record),
firehose: fh,
initialized: make(chan struct{}),
DeliveryStreamName: aws.String(DeliveryStreamName),
}
for _, opt := range opts {
opt(q)
}
Expand All @@ -53,34 +92,42 @@ func New(fh firehoseiface.FirehoseAPI, opts ...Option) *Queue {

// Queue manages a sending list for firehose
type Queue struct {
queue []*firehose.PutRecordInput
mu sync.RWMutex
queue []*firehose.Record
mu sync.RWMutex
workerCh chan []*firehose.Record

initialized chan struct{}
initMu sync.Mutex

para int
inFlightCounter int32
firehose firehoseiface.FirehoseAPI
errorHandler func(error, *firehose.PutRecordInput)
maxQueueLength int

successCount expvar.Int
retrySuccessCount expvar.Int
para int
inFlightBatchCounter int32
maxQueueLength int
firehose firehoseiface.FirehoseAPI
errorHandler func(error, []*firehose.Record)
batchSize int
batchInterval time.Duration
DeliveryStreamName *string

// successCount counts successfully sent records when attemptToSendBatch() is called.
successCount expvar.Int
// retryCount counts retryable records when attemptToSendBatch() is called.
retryCount expvar.Int
// unretryableErrorCount counts non-retryable batch when attemptToSendBatch() is called.
unretryableErrorCount expvar.Int
queueFullErrorCount expvar.Int
giveupErrorCount expvar.Int
// queueFullErrorCount counts queue full errors when Enqueue() is called.
queueFullErrorCount expvar.Int
// batchLength is length of the latest batch.
batchLength expvar.Int
}

// Stats return queue stats
func (q *Queue) Stats() Stats {
return Stats{
QueueLength: q.len(),
Success: q.successCount.Value(),
RetrySuccess: q.retrySuccessCount.Value(),
RetryCount: q.retryCount.Value(),
UnretryableError: q.unretryableErrorCount.Value(),
QueueFullError: q.queueFullErrorCount.Value(),
GiveupError: q.giveupErrorCount.Value(),
}
}

Expand All @@ -98,6 +145,12 @@ func (q *Queue) init() error {
if q.para == 0 {
q.para = 1
}
if q.batchSize == 0 {
q.batchSize = 500
}
if q.batchInterval == 0 {
q.batchInterval = 2 * time.Millisecond
}
close(q.initialized)
return nil
}
Expand All @@ -107,6 +160,7 @@ func (q *Queue) Loop(ctx context.Context) error {
if err := q.init(); err != nil {
return err
}

var wg sync.WaitGroup
wg.Add(q.para)
for i := 0; i < q.para; i++ {
Expand All @@ -118,15 +172,15 @@ func (q *Queue) Loop(ctx context.Context) error {
wg.Wait()

// draining process:
// The draining will continue if enqueuing continues or the Firehose is in failure,
// but it should be taken care of it at higher levels.
var bf = backoff.NewExponentialBackOff()
bf.MaxElapsedTime = 0

for {
for q.remaining() {
q.put(&backoff.ZeroBackOff{})
if q.remaining() {
nextInterval := q.sendBatch(bf)
time.Sleep(nextInterval)
continue
}
// Wait 2 seconds and wait to see if the jobs will accumulate, because they might
// come in queue
time.Sleep(time.Second * 2)
if !q.remaining() {
break
}
Expand All @@ -137,9 +191,8 @@ func (q *Queue) Loop(ctx context.Context) error {
func (q *Queue) loop(ctx context.Context) {
var bf = backoff.NewExponentialBackOff()
bf.MaxElapsedTime = 0
// Use 1 instead of 0 because time.NewTicker and ticker.Reset disallow zero
var nextInterval time.Duration = 1
ticker := time.NewTicker(1)
var nextInterval time.Duration = q.batchInterval
ticker := time.NewTicker(q.batchInterval)
defer ticker.Stop()

for {
Expand All @@ -148,7 +201,7 @@ func (q *Queue) loop(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
nextInterval = q.put(bf)
nextInterval = q.sendBatch(bf)
}
}
}
Expand Down Expand Up @@ -179,13 +232,8 @@ func isErrConnectionResetByPeer(err error) bool {
return strings.Contains(err.Error(), "read: connection reset by peer")
}

// Send firehorseInput
func (q *Queue) Send(r *firehose.PutRecordInput) error {
return q.SendWithContext(context.Background(), r)
}

// SendWithContext firehorseInput with context
func (q *Queue) SendWithContext(ctx context.Context, r *firehose.PutRecordInput) error {
// Enqueue add data to queue
func (q *Queue) Enqueue(r *firehose.Record) error {
select {
case <-q.initialized:
// nop and continue to send
Expand All @@ -198,58 +246,93 @@ func (q *Queue) SendWithContext(ctx context.Context, r *firehose.PutRecordInput)
// nop and continue to send
}
}
_, err := q.firehose.PutRecordWithContext(ctx, r)
if err == nil {
q.successCount.Add(1)
return nil
}
if !isRetryable(err) {
q.unretryableErrorCount.Add(1)
return err
}
if l := q.len(); l >= q.maxQueueLength {
q.queueFullErrorCount.Add(1)
return fmt.Errorf("too many jobs accumlated: %d, %w", l, err)
return fmt.Errorf("too many jobs accumlated: %d", l)
}
// Actually, race conditions may occur here and make the queue a little longer than maxQueueLength
// temporarily, but it's not a big problem and we don't care.
q.push(r)
return nil
}

func (q *Queue) handleError(err error, r *firehose.PutRecordInput) {
if q.errorHandler != nil {
q.errorHandler(err, r)
return
func (q *Queue) sendBatch(bf backoff.BackOff) time.Duration {
if q.len() == 0 {
return q.batchInterval
}
log.Println(err)
batch := q.createBatch()
q.batchLength.Set(int64(len(batch)))
if err := q.attemptToSendBatch(batch); err != nil {
// Handle non-retryable errors directly.
q.handleError(err, batch)
return bf.NextBackOff()
}
bf.Reset()
return q.batchInterval
}

// put puts item and returns interval to put next
func (q *Queue) put(bf backoff.BackOff) time.Duration {
r, done := q.shift()
if r == nil {
bf.Reset()
return 3 * time.Second
func (q *Queue) createBatch() []*firehose.Record {
q.mu.Lock()
defer q.mu.Unlock()

batchSize := q.batchSize
if len(q.queue) < batchSize {
batchSize = len(q.queue)
}
defer done()

if _, err := q.firehose.PutRecord(r); err != nil {
if isRetryable(err) {
// If an error occurs, move it back to the top of the queue and wait a while.
// It might be better to stuff it at the back of the queue, to avoid continuous
// error due to invalid input, but we do it that for now.
q.unshift(r)
} else {
q.giveupErrorCount.Add(1)
q.handleError(err, r)

batch := make([]*firehose.Record, batchSize)
copy(batch, q.queue)
q.queue = q.queue[batchSize:]
return batch
}

func (q *Queue) attemptToSendBatch(batch []*firehose.Record) error {
atomic.AddInt32(&q.inFlightBatchCounter, 1)
defer atomic.AddInt32(&q.inFlightBatchCounter, -1)

input := &firehose.PutRecordBatchInput{
DeliveryStreamName: q.DeliveryStreamName,
Records: batch,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

resp, err := q.firehose.PutRecordBatchWithContext(ctx, input)
if err != nil {
// Handle non-retryable errors directly.
if !isRetryable(err) {
q.unretryableErrorCount.Add(1)
q.handleError(err, batch)
return nil
}
return bf.NextBackOff()
// Retry failed records.
if resp != nil && *resp.FailedPutCount > 0 {
q.successCount.Add(int64(len(batch)) - int64(*resp.FailedPutCount))
q.retryFailedRecords(batch, resp)
return nil
}
return err
}
q.retrySuccessCount.Add(1)
bf.Reset()
// go to the next immediately when jobs are still in the queue
return 1

q.successCount.Add(int64(len(batch)))
return nil
}

func (q *Queue) retryFailedRecords(batch []*firehose.Record, resp *firehose.PutRecordBatchOutput) {
retryRecords := make([]*firehose.Record, 0, *resp.FailedPutCount)

for i, record := range batch {
if resp.RequestResponses[i].ErrorCode != nil {
q.retryCount.Add(1)
retryRecords = append(retryRecords, record)
}
}
q.push(retryRecords...)
}
func (q *Queue) handleError(err error, r []*firehose.Record) {
if q.errorHandler != nil {
q.errorHandler(err, r)
return
}
log.Println(err)
}

func (q *Queue) len() int {
Expand All @@ -258,37 +341,17 @@ func (q *Queue) len() int {
return len(q.queue)
}

func (q *Queue) inFlight() bool {
return atomic.LoadInt32(&q.inFlightCounter) > 0
func (q *Queue) inFlightBatch() bool {
return atomic.LoadInt32(&q.inFlightBatchCounter) > 0
}

func (q *Queue) remaining() bool {
// XXX race
return q.len() > 0 || q.inFlight()
return q.len() > 0 || q.inFlightBatch()
}

func (q *Queue) push(r *firehose.PutRecordInput) {
func (q *Queue) push(r ...*firehose.Record) {
q.mu.Lock()
defer q.mu.Unlock()
q.queue = append(q.queue, r)
}

func (q *Queue) shift() (*firehose.PutRecordInput, func()) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.queue) == 0 {
return nil, nil
}

atomic.AddInt32(&q.inFlightCounter, 1)
r := q.queue[0]
q.queue = q.queue[1:]
return r, func() { atomic.AddInt32(&q.inFlightCounter, -1) }
}

func (q *Queue) unshift(r *firehose.PutRecordInput) {
q.mu.Lock()
defer q.mu.Unlock()

q.queue = append([]*firehose.PutRecordInput{r}, q.queue...)
q.queue = append(q.queue, r...)
}
Loading

0 comments on commit ef13c99

Please sign in to comment.