Skip to content

Commit

Permalink
perf: Fix cloud download speed regression (#19734)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Nov 12, 2024
1 parent 4fb7cd1 commit 36e5913
Showing 1 changed file with 12 additions and 47 deletions.
59 changes: 12 additions & 47 deletions crates/polars-io/src/cloud/polars_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ fn merge_ranges(ranges: &[Range<usize>]) -> impl Iterator<Item = (Range<usize>,

let mut current_merged_range = ranges.first().map_or(0..0, Clone::clone);
// Number of fetched bytes excluding excess.
let mut current_n_bytes = 0;
let mut current_n_bytes = current_merged_range.len();

(0..ranges.len())
.filter_map(move |current_idx| {
Expand Down Expand Up @@ -318,28 +318,14 @@ fn merge_ranges(ranges: &[Range<usize>]) -> impl Iterator<Item = (Range<usize>,
(r.abs_diff(l), r < l)
};

#[rustfmt::skip]
let should_merge =
is_overlapping // Always merge if overlapping
|| (
(
// Either one range is extremely small compared to the other, with a limit of 8MiB..
range.len().min(current_merged_range.len())
< (range.len().max(current_merged_range.len()) / 128).min(8 * 1024 * 1024)
// ..or the new size is closer to the chunk_size
|| new_merged.len().abs_diff(chunk_size) < current_merged_range.len().abs_diff(chunk_size)
)
&& (
// Either the gap is less than 1MiB..
distance <= 1024 * 1024
|| (
// ..or, the gap is less than 12.5% of the largest between `current_n_bytes`
// and the new `range`, capped at 8MiB.
distance <= current_n_bytes.max(range.len()) / 8
&& distance <= 8 * 1024 * 1024
)
)
);
let should_merge = is_overlapping || {
let leq_current_len_dist_to_chunk_size = new_merged.len().abs_diff(chunk_size)
<= current_merged_range.len().abs_diff(chunk_size);
let gap_tolerance =
(current_n_bytes.max(range.len()) / 8).clamp(1024 * 1024, 8 * 1024 * 1024);

leq_current_len_dist_to_chunk_size && distance <= gap_tolerance
};

if should_merge {
// Merge to existing range
Expand All @@ -351,10 +337,10 @@ fn merge_ranges(ranges: &[Range<usize>]) -> impl Iterator<Item = (Range<usize>,
};
None
} else {
let v = current_merged_range.clone();
let out = (current_merged_range.clone(), current_idx);
current_merged_range = range;
current_n_bytes = 0;
Some((v, current_idx))
current_n_bytes = current_merged_range.len();
Some(out)
}
}
})
Expand Down Expand Up @@ -444,27 +430,6 @@ mod tests {
[(0..66584576, 0), (66584576..133169152, 2)]
);

assert_eq!(
merge_ranges(&[
0..1,
1..128 * 1024 * 1024,
1 + 128 * 1024 * 1024..2 + 128 * 1024 * 1024,
2 + 128 * 1024 * 1024..256 * 1024 * 1024
])
.collect::<Vec<_>>(),
[
(0..67108865, 0),
(67108865..134217730, 3),
(134217730..201326593, 0),
(201326593..268435456, 4)
]
);

assert_eq!(
merge_ranges(&[0..1, 1..128 * 1024 * 1024]).collect::<Vec<_>>(),
[(0..67108864, 0), (67108864..134217728, 2)]
);

// <= 1MiB gap, merge
assert_eq!(
merge_ranges(&[0..1, 1024 * 1024 + 1..1024 * 1024 + 2]).collect::<Vec<_>>(),
Expand Down

0 comments on commit 36e5913

Please sign in to comment.