Skip to content

Commit

Permalink
Wrap buffer in func
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive committed Feb 7, 2025
1 parent 818421b commit 36c2c05
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions pkg/dataobj/consumer/partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,28 +194,29 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {
return
}

flushBuffer := p.bufPool.Get().(*bytes.Buffer)
flushBuffer.Reset()
func() {
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
defer p.bufPool.Put(flushBuffer)

flushedDataobjStats, err := p.builder.Flush(flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
p.bufPool.Put(flushBuffer)
return
}
flushBuffer.Reset()

objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
p.bufPool.Put(flushBuffer)
return
}
p.bufPool.Put(flushBuffer)
flushedDataobjStats, err := p.builder.Flush(flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return
}

if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
return
}
objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
return
}

if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
return
}
}()

if err := p.commitRecords(record); err != nil {
level.Error(p.logger).Log("msg", "failed to commit records", "err", err)
Expand Down

0 comments on commit 36c2c05

Please sign in to comment.