From 36c2c0519fe8d9a609af66dbfe093c27b78b079d Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Fri, 7 Feb 2025 16:38:36 +0000 Subject: [PATCH] Wrap buffer in func --- pkg/dataobj/consumer/partition_processor.go | 39 +++++++++++---------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index 7b7aee9547966..cf8345364b7ba 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -194,28 +194,29 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { return } - flushBuffer := p.bufPool.Get().(*bytes.Buffer) - flushBuffer.Reset() + func() { + flushBuffer := p.bufPool.Get().(*bytes.Buffer) + defer p.bufPool.Put(flushBuffer) - flushedDataobjStats, err := p.builder.Flush(flushBuffer) - if err != nil { - level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) - p.bufPool.Put(flushBuffer) - return - } + flushBuffer.Reset() - objectPath, err := p.uploader.Upload(p.ctx, flushBuffer) - if err != nil { - level.Error(p.logger).Log("msg", "failed to upload object", "err", err) - p.bufPool.Put(flushBuffer) - return - } - p.bufPool.Put(flushBuffer) + flushedDataobjStats, err := p.builder.Flush(flushBuffer) + if err != nil { + level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) + return + } - if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil { - level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) - return - } + objectPath, err := p.uploader.Upload(p.ctx, flushBuffer) + if err != nil { + level.Error(p.logger).Log("msg", "failed to upload object", "err", err) + return + } + + if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil { + level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) + return + } + }() if err := p.commitRecords(record); err != nil { level.Error(p.logger).Log("msg", "failed to commit records", "err", err)