From 7e7f6127fdf32f6eb19178753ca2531db367ba41 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Thu, 31 Oct 2024 13:08:08 -0400 Subject: [PATCH] dekaf: Refactor to avoid leaking modified `HeapNodes` when using `DeletionMode::CDC` --- crates/dekaf/src/read.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 7bc259c0b9..1363b10f64 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -37,7 +37,6 @@ pub struct Read { offset_start: i64, deletes: DeletionMode, - alloc: bumpalo::Bump, pub(crate) rewrite_offsets_from: Option, } @@ -105,7 +104,6 @@ impl Read { journal_name: partition.spec.name.clone(), rewrite_offsets_from, deletes, - alloc: Default::default(), offset_start: offset, } } @@ -120,6 +118,8 @@ impl Read { Compression, Record, RecordBatchEncoder, RecordEncodeOptions, }; + let mut alloc = bumpalo::Bump::new(); + let mut records: Vec = Vec::new(); let mut records_bytes: usize = 0; @@ -277,14 +277,16 @@ impl Read { tmp.extend(self.value_schema_id.to_be_bytes()); if matches!(self.deletes, DeletionMode::CDC) { - let mut heap_node = HeapNode::from_node(root.get(), &self.alloc); + let mut heap_node = HeapNode::from_node(root.get(), &alloc); let foo = DELETION_INDICATOR_PTR - .create_heap_node(&mut heap_node, &self.alloc) + .create_heap_node(&mut heap_node, &alloc) .context("Unable to add deletion meta indicator")?; *foo = HeapNode::PosInt(if is_deletion { 1 } else { 0 }); () = avro::encode(&mut tmp, &self.value_schema, &heap_node)?; + + alloc.reset(); } else { () = avro::encode(&mut tmp, &self.value_schema, root.get())?; }