Skip to content

Commit

Permalink
improve error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Feb 28, 2025
1 parent fba15c7 commit eee60ae
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,12 @@ impl ShuffleRepartitioner {
// If the range of indices is not big enough, just appending the rows into
// active array builders instead of directly adding them as a record batch.
let mut start_index: usize = 0;
let mut output_ret = output.append_rows(columns, indices, start_index, &self.metrics);
let mut output_ret = output.append_rows(columns, indices, start_index, &self.metrics)?;

loop {
match output_ret {
AppendRowStatus::MemDiff(l) => {
mem_diff += l?;
mem_diff += l;
break;
}
AppendRowStatus::StartIndex(new_start) => {
Expand All @@ -635,7 +635,8 @@ impl ShuffleRepartitioner {

start_index = new_start;
let output = &mut self.buffered_partitions[partition_id];
output_ret = output.append_rows(columns, indices, start_index, &self.metrics);
output_ret =
output.append_rows(columns, indices, start_index, &self.metrics)?;

if let AppendRowStatus::StartIndex(new_start) = output_ret {
if new_start == start_index {
Expand Down Expand Up @@ -670,7 +671,7 @@ impl Debug for ShuffleRepartitioner {
#[derive(Debug)]
enum AppendRowStatus {
/// The difference in memory usage after appending rows
MemDiff(Result<isize>),
MemDiff(isize),
/// The index of the next row to append
StartIndex(usize),
}
Expand Down Expand Up @@ -762,14 +763,14 @@ impl PartitionBuffer {
indices: &[usize],
start_index: usize,
metrics: &ShuffleRepartitionerMetrics,
) -> AppendRowStatus {
) -> Result<AppendRowStatus> {
let mut mem_diff = 0;
let mut start = start_index;

// lazy init because some partition may be empty
let init = self.init_active_if_necessary(metrics);
if init.is_err() {
return AppendRowStatus::StartIndex(start);
return Ok(AppendRowStatus::StartIndex(start));
}
mem_diff += init.unwrap();

Expand All @@ -789,19 +790,19 @@ impl PartitionBuffer {
if self.num_active_rows >= self.batch_size {
let flush = self.flush(metrics);
if let Err(e) = flush {
return AppendRowStatus::MemDiff(Err(e));
return Err(e);
}
mem_diff += flush.unwrap();

let init = self.init_active_if_necessary(metrics);
if init.is_err() {
return AppendRowStatus::StartIndex(end);
return Ok(AppendRowStatus::StartIndex(end));
}
mem_diff += init.unwrap();
}
start = end;
}
AppendRowStatus::MemDiff(Ok(mem_diff))
Ok(AppendRowStatus::MemDiff(mem_diff))
}

/// Flush active data into frozen bytes. This can reduce memory usage because the frozen
Expand Down Expand Up @@ -1029,21 +1030,25 @@ mod test {
assert!(buffer.spill_file.is_none());

// append first batch - should fit in memory
let status = buffer.append_rows(batch.columns(), &indices, 0, &metrics);
let status = buffer
.append_rows(batch.columns(), &indices, 0, &metrics)
.unwrap();
assert_eq!(
format!("{status:?}"),
format!("{:?}", AppendRowStatus::MemDiff(Ok(106496)))
format!("{:?}", AppendRowStatus::MemDiff(106496))
);
assert_eq!(900, buffer.num_active_rows);
assert_eq!(106496, buffer.reservation.size());
assert_eq!(0, buffer.frozen.len());
assert!(buffer.spill_file.is_none());

// append second batch - should trigger flush to frozen bytes
let status = buffer.append_rows(batch.columns(), &indices, 0, &metrics);
let status = buffer
.append_rows(batch.columns(), &indices, 0, &metrics)
.unwrap();
assert_eq!(
format!("{status:?}"),
format!("{:?}", AppendRowStatus::MemDiff(Ok(126316)))
format!("{:?}", AppendRowStatus::MemDiff(126316))
);
assert_eq!(0, buffer.num_active_rows);
assert_eq!(9914, buffer.frozen.len());
Expand All @@ -1060,10 +1065,12 @@ mod test {
assert_eq!(9914, buffer.spill_file.as_ref().unwrap().file.len());

// append after spill
let status = buffer.append_rows(batch.columns(), &indices, 0, &metrics);
let status = buffer
.append_rows(batch.columns(), &indices, 0, &metrics)
.unwrap();
assert_eq!(
format!("{status:?}"),
format!("{:?}", AppendRowStatus::MemDiff(Ok(0)))
format!("{:?}", AppendRowStatus::MemDiff(0))
);
assert_eq!(900, buffer.num_active_rows);
// TODO reservation should not be zero because there are active builders again
Expand Down

0 comments on commit eee60ae

Please sign in to comment.