Skip to content

Commit

Permalink
more accurate memory accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Feb 28, 2025
1 parent 5669d5f commit 96957f8
Showing 1 changed file with 26 additions and 16 deletions.
42 changes: 26 additions & 16 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@ impl ShuffleRepartitioner {
break;
}
AppendRowStatus::StartIndex(new_start) => {
// Cannot allocate enough memory for the array builders in the partition,
// spill partitions and retry.
// Cannot allocate enough memory for the array builders in this partition,
// spill all partitions and retry.
self.spill().await?;

start_index = new_start;
Expand Down Expand Up @@ -736,7 +736,7 @@ impl PartitionBuffer {

/// Initializes active builders if necessary.
/// Returns error if memory reservation fails.
fn init_active_if_necessary(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<()> {
fn allocate_active_builders(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<()> {
if self.active.is_empty() {
let mut mempool_timer = metrics.mempool_time.timer();
self.reservation.try_grow(self.active_slots_mem_size)?;
Expand All @@ -763,11 +763,10 @@ impl PartitionBuffer {
while start < indices.len() {
let end = (start + self.batch_size).min(indices.len());

// lazy init because some partition may be empty
// could not allocate memory for builders, so abort
// and return the current index
let init = self.init_active_if_necessary(metrics);
if init.is_err() {
// allocate builders
if self.allocate_active_builders(metrics).is_err() {
// could not allocate memory for builders, so abort
// and return the current index
return Ok(AppendRowStatus::StartIndex(start));
}

Expand Down Expand Up @@ -801,18 +800,30 @@ impl PartitionBuffer {
let num_rows = self.num_active_rows;
self.num_active_rows = 0;

let mut mempool_timer = metrics.mempool_time.timer();
self.reservation.try_shrink(self.active_slots_mem_size)?;
mempool_timer.stop();

let mut repart_timer = metrics.repart_time.timer();
let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?;
repart_timer.stop();

let mut cursor = Cursor::new(&mut self.frozen);
cursor.seek(SeekFrom::End(0))?;
self.shuffle_block_writer
.write_batch(&frozen_batch, &mut cursor, &metrics.encode_time)?;
let bytes_written = self.shuffle_block_writer.write_batch(
&frozen_batch,
&mut cursor,
&metrics.encode_time,
)?;

// we typically expect the frozen bytes to take up less memory than
// the builders due to compression but there could be edge cases where
// this is not the case
let mut mempool_timer = metrics.mempool_time.timer();
if self.active_slots_mem_size >= bytes_written {
self.reservation
.try_shrink(self.active_slots_mem_size - bytes_written)?;
} else {
self.reservation
.try_grow(bytes_written - self.active_slots_mem_size)?;
}
mempool_timer.stop();

Ok(())
}
Expand Down Expand Up @@ -1033,8 +1044,7 @@ mod test {
);
assert_eq!(0, buffer.num_active_rows);
assert_eq!(9914, buffer.frozen.len());
// note that the reservation does not include the frozen bytes
assert_eq!(0, buffer.reservation.size());
assert_eq!(9914, buffer.reservation.size());
assert!(buffer.spill_file.is_none());

// spill
Expand Down

0 comments on commit 96957f8

Please sign in to comment.