From 28d91e26f8767825664b1c376dba830af919fbc6 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 12 Nov 2024 14:00:26 +1100 Subject: [PATCH] c --- .../src/cloud/polars_object_store.rs | 59 +++++++++++++++++-- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/crates/polars-io/src/cloud/polars_object_store.rs b/crates/polars-io/src/cloud/polars_object_store.rs index 8b28b0652a87..295a1e473f59 100644 --- a/crates/polars-io/src/cloud/polars_object_store.rs +++ b/crates/polars-io/src/cloud/polars_object_store.rs @@ -287,7 +287,8 @@ fn merge_ranges(ranges: &[Range]) -> impl Iterator, let mut current_merged_range = ranges.first().map_or(0..0, Clone::clone); // Number of fetched bytes excluding excess. let mut current_n_bytes = current_merged_range.len(); - let mut current_n_merged: usize = 1; + // Number of ranges merged due to being relatively small. + let mut n_small_merged: usize = 1; (0..ranges.len()) .filter_map(move |current_idx| { @@ -335,11 +336,15 @@ fn merge_ranges(ranges: &[Range]) -> impl Iterator, let one_side_is_relatively_small = { let fraction = range.len().max(current_merged_range.len()) - / current_n_merged.saturating_mul(16); + / n_small_merged.saturating_mul(16); range.len().min(current_merged_range.len()) <= fraction }; + if !new_merged_is_closer_to_chunk_size && one_side_is_relatively_small { + n_small_merged += 1; + } + distance_within_1MiB_or_12perc_max_8MiB && (new_merged_is_closer_to_chunk_size || one_side_is_relatively_small) }; @@ -352,13 +357,12 @@ fn merge_ranges(ranges: &[Range]) -> impl Iterator, } else { range.len() }; - current_n_merged += 1; None } else { let out = (current_merged_range.clone(), current_idx); current_merged_range = range; current_n_bytes = current_merged_range.len(); - current_n_merged = 1; + n_small_merged = 1; Some(out) } @@ -471,6 +475,53 @@ mod tests { ] ); + // Test small ranges merge rule. + assert_eq!( + merge_ranges(&[ + 0..64 * 1024 * 1024, + // 64MiB // 16MiB => 4MiB + 64 * 1024 * 1024..68 * 1024 * 1024, + // 68MiB // 32MiB => 2228224B + 68 * 1024 * 1024..68 * 1024 * 1024 + 2228224, + ]) + .collect::>(), + [(0..73531392, 3)] + ); + + assert_eq!( + merge_ranges(&[ + 0..64 * 1024 * 1024, + // 64MiB // 16MiB => 4MiB + 64 * 1024 * 1024..68 * 1024 * 1024, + // 68MiB // 32MiB => 2228224B + 68 * 1024 * 1024..68 * 1024 * 1024 + 2228224 + 1, + ]) + .collect::>(), + [(0..71303168, 2), (71303168..73531393, 3)] + ); + + // This specifically tests that the reduction in the small-range merge + // threshold is only affected by the number of small merged ranges + // that had occurred before. + assert_eq!( + merge_ranges(&[ + 0..16 * 1024 * 1024, + 16 * 1024 * 1024..32 * 1024 * 1024, + 32 * 1024 * 1024..48 * 1024 * 1024, + 48 * 1024 * 1024..64 * 1024 * 1024 - 3, + // These are a block of small ranges, but they don't get counted + // as the distance to the `chunk_size` is still decreasing at this + // point. + 64 * 1024 * 1024 - 3..64 * 1024 * 1024 - 2, + 64 * 1024 * 1024 - 2..64 * 1024 * 1024 - 1, + 64 * 1024 * 1024 - 1..64 * 1024 * 1024 - 0, + // Up to 4MiB (64MiB // 16MiB) should be accepted + 64 * 1024 * 1024..68 * 1024 * 1024, + ]) + .collect::>(), + [(0..71303168, 8)] + ); + // <= 1MiB gap, merge assert_eq!( merge_ranges(&[0..1, 1024 * 1024 + 1..1024 * 1024 + 2]).collect::>(),