Skip to content

Commit

Permalink
perf: improve and fix rolling windows by linear scanning (#11326)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Sep 26, 2023
1 parent 1f0450a commit f885ace
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 123 deletions.
20 changes: 20 additions & 0 deletions crates/polars-time/src/windows/bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@ impl Bounds {
}
}

#[inline]
pub(crate) fn is_member_entry(&self, t: i64, closed: ClosedWindow) -> bool {
match closed {
ClosedWindow::Right => t > self.start,
ClosedWindow::Left => t >= self.start,
ClosedWindow::None => t > self.start,
ClosedWindow::Both => t >= self.start,
}
}

#[inline]
pub(crate) fn is_member_exit(&self, t: i64, closed: ClosedWindow) -> bool {
match closed {
ClosedWindow::Right => t <= self.stop,
ClosedWindow::Left => t < self.stop,
ClosedWindow::None => t < self.stop,
ClosedWindow::Both => t <= self.stop,
}
}

#[inline]
pub(crate) fn is_future(&self, t: i64, closed: ClosedWindow) -> bool {
match closed {
Expand Down
223 changes: 104 additions & 119 deletions crates/polars-time/src/windows/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use polars_core::prelude::*;
use polars_core::utils::_split_offsets;
use polars_core::utils::flatten::flatten_par;
use polars_core::POOL;
use polars_utils::slice::GetSaferUnchecked;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -224,9 +223,11 @@ pub fn group_by_windows(
(groups, lower_bound, upper_bound)
}

// this assumes that the given time point is the right endpoint of the window
// there could duplicates rhs still
// t is right at the end of the window
// ------t---
// [------]
#[inline]
#[allow(clippy::too_many_arguments)]
pub(crate) fn group_by_values_iter_lookbehind(
period: Duration,
offset: Duration,
Expand All @@ -235,6 +236,7 @@ pub(crate) fn group_by_values_iter_lookbehind(
tu: TimeUnit,
tz: Option<Tz>,
start_offset: usize,
upper_bound: Option<usize>,
) -> impl Iterator<Item = PolarsResult<(IdxSize, IdxSize)>> + TrustedLen + '_ {
debug_assert!(offset.duration_ns() == period.duration_ns());
debug_assert!(offset.negative);
Expand All @@ -244,46 +246,54 @@ pub(crate) fn group_by_values_iter_lookbehind(
TimeUnit::Milliseconds => Duration::add_ms,
};

let mut last_lookbehind_i = 0;
time[start_offset..]
let upper_bound = upper_bound.unwrap_or(time.len());
// Use binary search to find the initial start as that is behind.
let mut start = if let Some(&t) = time.get(start_offset) {
let lower = add(&offset, t, tz.as_ref()).unwrap();
let upper = add(&period, lower, tz.as_ref()).unwrap();
let b = Bounds::new(lower, upper);
let slice = &time[..start_offset];
slice.partition_point(|v| !b.is_member(*v, closed_window))
} else {
0
};
let mut end = start;
time[start_offset..upper_bound]
.iter()
.enumerate()
.map(move |(mut i, mut lower)| {
// Consume duplicates, this is very uncommon.
while let Some(next_val) = time.get(i + 1) {
if next_val == lower {
lower = next_val;
i += 1;
} else {
break;
}
}
.map(move |(mut i, lower)| {
i += start_offset;
let lower = add(&offset, *lower, tz.as_ref())?;
let upper = add(&period, lower, tz.as_ref())?;

let b = Bounds::new(lower, upper);

// we have a complete lookbehind so we know that `i` is the upper bound.
// Safety
// we are in bounds
let slice = unsafe { time.get_unchecked_release(last_lookbehind_i..i) };
let offset = slice.partition_point(|v| !b.is_member(*v, closed_window));

let lookbehind_i = offset + last_lookbehind_i;
// -1 for window boundary effects
last_lookbehind_i = lookbehind_i.saturating_sub(1);
for &t in &time[start..] {
if b.is_member_entry(t, closed_window) || start == i {
break;
}
start += 1;
}

let mut len = i - lookbehind_i;
if matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both) {
len += 1;
end = std::cmp::max(start, end);
for &t in &time[end..] {
if !b.is_member_exit(t, closed_window) {
break;
}
end += 1;
}

Ok((lookbehind_i as IdxSize, len as IdxSize))
let len = end - start;
let offset = start as IdxSize;

Ok((offset, len as IdxSize))
})
}

// this one is correct for all lookbehind/lookaheads, but is slower
// window is completely behind t and t itself is not a member
// ---------------t---
// [---]
pub(crate) fn group_by_values_iter_window_behind_t(
period: Duration,
offset: Duration,
Expand All @@ -298,42 +308,42 @@ pub(crate) fn group_by_values_iter_window_behind_t(
TimeUnit::Milliseconds => Duration::add_ms,
};

let mut lagging_offset = 0;
time.iter().enumerate().map(move |(i, lower)| {
let mut start = 0;
let mut end = start;
time.iter().map(move |lower| {
let lower = add(&offset, *lower, tz.as_ref())?;
let upper = add(&period, lower, tz.as_ref())?;

let b = Bounds::new(lower, upper);
if b.is_future(time[0], closed_window) {
Ok((0, 0))
} else {
// find starting point of window
// we can start searching from lagging offset as that is the minimum boundary because data is sorted
// and every iteration this boundary shifts right
// we cannot use binary search as a window is not binary,
// it is false left from the window, true inside, and false right of the window
let mut count = 0;
for &t in &time[lagging_offset..] {
if b.is_member(t, closed_window) || lagging_offset + count == i {
for &t in &time[start..] {
if b.is_member_entry(t, closed_window) {
break;
}
count += 1
start += 1;
}
if lagging_offset + count != i {
lagging_offset += count;

end = std::cmp::max(start, end);
for &t in &time[end..] {
if !b.is_member_exit(t, closed_window) {
break;
}
end += 1;
}

// Safety
// we just iterated over value i.
let slice = unsafe { time.get_unchecked(lagging_offset..) };
let len = slice.partition_point(|v| b.is_member(*v, closed_window));
let len = end - start;
let offset = start as IdxSize;

Ok((lagging_offset as IdxSize, len as IdxSize))
Ok((offset, len as IdxSize))
}
})
}

// this one is correct for all lookbehind/lookaheads, but is slower
// window is with -1 periods of t
// ----t---
// [---]
pub(crate) fn group_by_values_iter_partial_lookbehind(
period: Duration,
offset: Duration,
Expand All @@ -348,68 +358,41 @@ pub(crate) fn group_by_values_iter_partial_lookbehind(
TimeUnit::Milliseconds => Duration::add_ms,
};

let mut lagging_offset = 0;
let mut start = 0;
let mut end = start;
time.iter().enumerate().map(move |(i, lower)| {
let lower = add(&offset, *lower, tz.as_ref())?;
let upper = add(&period, lower, tz.as_ref())?;

let b = Bounds::new(lower, upper);

for &t in &time[lagging_offset..] {
if b.is_member(t, closed_window) || lagging_offset == i {
for &t in &time[start..] {
if b.is_member_entry(t, closed_window) || start == i {
break;
}
start += 1;
}

end = std::cmp::max(start, end);
for &t in &time[end..] {
if !b.is_member_exit(t, closed_window) {
break;
}
lagging_offset += 1;
end += 1;
}

// Safety
// we just iterated over value i.
let slice = unsafe { time.get_unchecked(lagging_offset..) };
let len = slice.partition_point(|v| b.is_member(*v, closed_window));
let len = end - start;
let offset = start as IdxSize;

Ok((lagging_offset as IdxSize, len as IdxSize))
Ok((offset, len as IdxSize))
})
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn group_by_values_iter_partial_lookahead(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<Tz>,
start_offset: usize,
upper_bound: Option<usize>,
) -> impl Iterator<Item = PolarsResult<(IdxSize, IdxSize)>> + TrustedLen + '_ {
let upper_bound = upper_bound.unwrap_or(time.len());
debug_assert!(!offset.negative);

let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
};

time[start_offset..upper_bound]
.iter()
.enumerate()
.map(move |(mut i, lower)| {
i += start_offset;
let lower = add(&offset, *lower, tz.as_ref())?;
let upper = add(&period, lower, tz.as_ref())?;

let b = Bounds::new(lower, upper);

debug_assert!(i < time.len());
let slice = unsafe { time.get_unchecked(i..) };
let len = slice.partition_point(|v| b.is_member(*v, closed_window));

Ok((i as IdxSize, len as IdxSize))
})
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn group_by_values_iter_full_lookahead(
// window is completely ahead of t and t itself is not a member
// --t-----------
// [---]
pub(crate) fn group_by_values_iter_lookahead(
period: Duration,
offset: Duration,
time: &[i64],
Expand All @@ -420,40 +403,41 @@ pub(crate) fn group_by_values_iter_full_lookahead(
upper_bound: Option<usize>,
) -> impl Iterator<Item = PolarsResult<(IdxSize, IdxSize)>> + TrustedLen + '_ {
let upper_bound = upper_bound.unwrap_or(time.len());
debug_assert!(!offset.negative);

let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
};
let mut start = start_offset;
let mut end = start;

time[start_offset..upper_bound]
.iter()
.enumerate()
.map(move |(mut i, lower)| {
i += start_offset;
let lower = add(&offset, *lower, tz.as_ref())?;
let upper = add(&period, lower, tz.as_ref())?;
time[start_offset..upper_bound].iter().map(move |lower| {
let lower = add(&offset, *lower, tz.as_ref())?;
let upper = add(&period, lower, tz.as_ref())?;

let b = Bounds::new(lower, upper);
let b = Bounds::new(lower, upper);

// find starting point of window
for &t in &time[i..] {
if b.is_member(t, closed_window) {
break;
}
i += 1;
for &t in &time[start..] {
if b.is_member_entry(t, closed_window) {
break;
}
if i >= time.len() {
return Ok((i as IdxSize, 0));
start += 1;
}

end = std::cmp::max(start, end);
for &t in &time[end..] {
if !b.is_member_exit(t, closed_window) {
break;
}
end += 1;
}

let slice = unsafe { time.get_unchecked(i..) };
let len = slice.partition_point(|v| b.is_member(*v, closed_window));
let len = end - start;
let offset = start as IdxSize;

Ok((i as IdxSize, len as IdxSize))
})
Ok((offset, len as IdxSize))
})
}

#[cfg(feature = "rolling_window")]
Expand All @@ -468,7 +452,7 @@ pub(crate) fn group_by_values_iter(
let mut offset = period;
offset.negative = true;
// t is at the right endpoint of the window
group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0)
group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
}

/// Checks if the boundary elements don't split on duplicates
Expand Down Expand Up @@ -521,11 +505,12 @@ pub fn group_by_values(
let iter = group_by_values_iter_lookbehind(
period,
offset,
&time[..upper_bound],
time,
closed_window,
tu,
tz,
base_offset,
Some(upper_bound),
);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<Vec<_>>>()
Expand Down Expand Up @@ -578,7 +563,7 @@ pub fn group_by_values(
.map(|(base_offset, len)| {
let lower_bound = base_offset;
let upper_bound = base_offset + len;
let iter = group_by_values_iter_full_lookahead(
let iter = group_by_values_iter_lookahead(
period,
offset,
time,
Expand Down Expand Up @@ -606,7 +591,7 @@ pub fn group_by_values(
.map(|(base_offset, len)| {
let lower_bound = base_offset;
let upper_bound = base_offset + len;
let iter = group_by_values_iter_partial_lookahead(
let iter = group_by_values_iter_lookahead(
period,
offset,
time,
Expand Down
Loading

0 comments on commit f885ace

Please sign in to comment.