Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix-rows-by-key
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemanley committed Nov 13, 2024
2 parents c591da0 + 6cbc7c3 commit ab405cd
Show file tree
Hide file tree
Showing 135 changed files with 5,550 additions and 2,843 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/polars-core/src/chunked_array/ops/sort/arg_bottom_k.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ pub fn _arg_bottom_k(
_broadcast_bools(by_column.len(), &mut sort_options.descending);
_broadcast_bools(by_column.len(), &mut sort_options.nulls_last);

// Don't go into row encoding.
if by_column.len() == 1 && sort_options.limit.is_some() && !sort_options.maintain_order {
return Ok(NoNull::new(by_column[0].arg_sort((&*sort_options).into())));
}

let encoded = _get_rows_encoded(
by_column,
&sort_options.descending,
Expand Down
64 changes: 58 additions & 6 deletions crates/polars-core/src/chunked_array/ops/sort/arg_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(super) fn arg_sort<I, J, T>(
iters: I,
options: SortOptions,
null_count: usize,
len: usize,
mut len: usize,
) -> IdxCa
where
I: IntoIterator<Item = J>,
Expand Down Expand Up @@ -49,14 +49,46 @@ where
vals.extend(iter);
}

sort_impl(vals.as_mut_slice(), options);
let vals = if let Some((limit, desc)) = options.limit {
let limit = limit as usize;
// Overwrite output len.
len = limit;
let out = if limit >= vals.len() {
vals.as_mut_slice()
} else if desc {
let (lower, _el, _upper) = vals
.as_mut_slice()
.select_nth_unstable_by(limit, |a, b| b.1.tot_cmp(&a.1));
lower
} else {
let (lower, _el, _upper) = vals
.as_mut_slice()
.select_nth_unstable_by(limit, |a, b| a.1.tot_cmp(&b.1));
lower
};

sort_impl(out, options);
out
} else {
sort_impl(vals.as_mut_slice(), options);
vals.as_slice()
};

let iter = vals.into_iter().map(|(idx, _v)| idx);
let iter = vals.iter().map(|(idx, _v)| idx).copied();
let idx = if nulls_last {
let mut idx = Vec::with_capacity(len);
idx.extend(iter);
idx.extend(nulls_idx);

let nulls_idx = if options.limit.is_some() {
&nulls_idx[..len - idx.len()]
} else {
&nulls_idx
};
idx.extend_from_slice(nulls_idx);
idx
} else if options.limit.is_some() {
nulls_idx.extend(iter.take(len - nulls_idx.len()));
nulls_idx
} else {
let ptr = nulls_idx.as_ptr() as usize;
nulls_idx.extend(iter);
Expand Down Expand Up @@ -90,9 +122,29 @@ where
}));
}

sort_impl(vals.as_mut_slice(), options);
let vals = if let Some((limit, desc)) = options.limit {
let limit = limit as usize;
let out = if limit >= vals.len() {
vals.as_mut_slice()
} else if desc {
let (lower, _el, _upper) = vals
.as_mut_slice()
.select_nth_unstable_by(limit, |a, b| b.1.tot_cmp(&a.1));
lower
} else {
let (lower, _el, _upper) = vals
.as_mut_slice()
.select_nth_unstable_by(limit, |a, b| a.1.tot_cmp(&b.1));
lower
};
sort_impl(out, options);
out
} else {
sort_impl(vals.as_mut_slice(), options);
vals.as_slice()
};

let iter = vals.into_iter().map(|(idx, _v)| idx);
let iter = vals.iter().map(|(idx, _v)| idx).copied();
let idx: Vec<_> = iter.collect_trusted();

ChunkedArray::with_chunk(name, IdxArr::from_data_default(Buffer::from(idx), None))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl CategoricalChunked {
descending,
multithreaded: true,
maintain_order: false,
limit: None,
})
}

Expand Down
10 changes: 10 additions & 0 deletions crates/polars-core/src/chunked_array/ops/sort/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ impl ChunkSort<StringType> for StringChunked {
nulls_last: false,
multithreaded: true,
maintain_order: false,
limit: None,
})
}

Expand Down Expand Up @@ -406,6 +407,7 @@ impl ChunkSort<BinaryType> for BinaryChunked {
nulls_last: false,
multithreaded: true,
maintain_order: false,
limit: None,
})
}

Expand Down Expand Up @@ -536,6 +538,7 @@ impl ChunkSort<BinaryOffsetType> for BinaryOffsetChunked {
nulls_last: false,
multithreaded: true,
maintain_order: false,
limit: None,
})
}

Expand Down Expand Up @@ -672,6 +675,7 @@ impl ChunkSort<BooleanType> for BooleanChunked {
nulls_last: false,
multithreaded: true,
maintain_order: false,
limit: None,
})
}

Expand Down Expand Up @@ -797,6 +801,7 @@ mod test {
nulls_last: false,
multithreaded: true,
maintain_order: false,
limit: None,
});
assert_eq!(
Vec::from(&out),
Expand All @@ -816,6 +821,7 @@ mod test {
nulls_last: true,
multithreaded: true,
maintain_order: false,
limit: None,
});
assert_eq!(
Vec::from(&out),
Expand Down Expand Up @@ -925,6 +931,7 @@ mod test {
nulls_last: false,
multithreaded: true,
maintain_order: false,
limit: None,
});
let expected = &[None, None, Some("a"), Some("b"), Some("c")];
assert_eq!(Vec::from(&out), expected);
Expand All @@ -934,6 +941,7 @@ mod test {
nulls_last: false,
multithreaded: true,
maintain_order: false,
limit: None,
});

let expected = &[None, None, Some("c"), Some("b"), Some("a")];
Expand All @@ -944,6 +952,7 @@ mod test {
nulls_last: true,
multithreaded: true,
maintain_order: false,
limit: None,
});
let expected = &[Some("a"), Some("b"), Some("c"), None, None];
assert_eq!(Vec::from(&out), expected);
Expand All @@ -953,6 +962,7 @@ mod test {
nulls_last: true,
multithreaded: true,
maintain_order: false,
limit: None,
});
let expected = &[Some("c"), Some("b"), Some("a"), None, None];
assert_eq!(Vec::from(&out), expected);
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-core/src/chunked_array/ops/sort/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ pub struct SortOptions {
/// If true maintain the order of equal elements.
/// Default `false`.
pub maintain_order: bool,
/// Limit a sort output, this is for optimization purposes and might be ignored.
/// - Len
/// - Descending
pub limit: Option<(IdxSize, bool)>,
}

/// Sort options for multi-series sorting.
Expand Down Expand Up @@ -96,6 +100,10 @@ pub struct SortMultipleOptions {
pub multithreaded: bool,
/// Whether maintain the order of equal elements. Default `false`.
pub maintain_order: bool,
/// Limit a sort output, this is for optimization purposes and might be ignored.
/// - Len
/// - Descending
pub limit: Option<(IdxSize, bool)>,
}

impl Default for SortOptions {
Expand All @@ -105,6 +113,7 @@ impl Default for SortOptions {
nulls_last: false,
multithreaded: true,
maintain_order: false,
limit: None,
}
}
}
Expand All @@ -116,6 +125,7 @@ impl Default for SortMultipleOptions {
nulls_last: vec![false],
multithreaded: true,
maintain_order: false,
limit: None,
}
}
}
Expand Down Expand Up @@ -224,6 +234,7 @@ impl From<&SortOptions> for SortMultipleOptions {
nulls_last: vec![value.nulls_last],
multithreaded: value.multithreaded,
maintain_order: value.maintain_order,
limit: value.limit,
}
}
}
Expand All @@ -235,6 +246,7 @@ impl From<&SortMultipleOptions> for SortOptions {
nulls_last: value.nulls_last.first().copied().unwrap_or(false),
multithreaded: value.multithreaded,
maintain_order: value.maintain_order,
limit: value.limit,
}
}
}
46 changes: 46 additions & 0 deletions crates/polars-core/src/datatypes/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,52 @@ impl DataType {
}
}

/// Try to get the maximum value for this datatype.
pub fn max(&self) -> PolarsResult<Scalar> {
use DataType::*;
let v = match self {
#[cfg(feature = "dtype-i8")]
Int8 => Scalar::from(i8::MAX),
#[cfg(feature = "dtype-i16")]
Int16 => Scalar::from(i16::MAX),
Int32 => Scalar::from(i32::MAX),
Int64 => Scalar::from(i64::MAX),
#[cfg(feature = "dtype-u8")]
UInt8 => Scalar::from(u8::MAX),
#[cfg(feature = "dtype-u16")]
UInt16 => Scalar::from(u16::MAX),
UInt32 => Scalar::from(u32::MAX),
UInt64 => Scalar::from(u64::MAX),
Float32 => Scalar::from(f32::INFINITY),
Float64 => Scalar::from(f64::INFINITY),
dt => polars_bail!(ComputeError: "cannot determine upper bound for dtype `{}`", dt),
};
Ok(v)
}

/// Try to get the minimum value for this datatype.
pub fn min(&self) -> PolarsResult<Scalar> {
use DataType::*;
let v = match self {
#[cfg(feature = "dtype-i8")]
Int8 => Scalar::from(i8::MIN),
#[cfg(feature = "dtype-i16")]
Int16 => Scalar::from(i16::MIN),
Int32 => Scalar::from(i32::MIN),
Int64 => Scalar::from(i64::MIN),
#[cfg(feature = "dtype-u8")]
UInt8 => Scalar::from(u8::MIN),
#[cfg(feature = "dtype-u16")]
UInt16 => Scalar::from(u16::MIN),
UInt32 => Scalar::from(u32::MIN),
UInt64 => Scalar::from(u64::MIN),
Float32 => Scalar::from(f32::NEG_INFINITY),
Float64 => Scalar::from(f64::NEG_INFINITY),
dt => polars_bail!(ComputeError: "cannot determine lower bound for dtype `{}`", dt),
};
Ok(v)
}

/// Convert to an Arrow data type.
#[inline]
pub fn to_arrow(&self, compat_level: CompatLevel) -> ArrowDataType {
Expand Down
26 changes: 24 additions & 2 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1382,12 +1382,24 @@ impl DataFrame {
self
}

// Note: Schema can be both input or output_schema
fn add_column_by_schema(&mut self, c: Column, schema: &Schema) -> PolarsResult<()> {
let name = c.name();
if let Some((idx, _, _)) = schema.get_full(name.as_str()) {
// schema is incorrect fallback to search
if self.columns.get(idx).map(|s| s.name()) != Some(name) {
self.add_column_by_search(c)?;
// Given schema is output_schema and we can push.
if idx == self.columns.len() {
if self.width() == 0 {
self.height = c.len();
}

self.columns.push(c);
}
// Schema is incorrect fallback to search
else {
debug_assert!(false);
self.add_column_by_search(c)?;
}
} else {
self.replace_column(idx, c)?;
}
Expand All @@ -1401,6 +1413,7 @@ impl DataFrame {
Ok(())
}

// Note: Schema can be both input or output_schema
pub fn _add_series(&mut self, series: Vec<Series>, schema: &Schema) -> PolarsResult<()> {
for (i, s) in series.into_iter().enumerate() {
// we need to branch here
Expand Down Expand Up @@ -1430,6 +1443,8 @@ impl DataFrame {
/// Add a new column to this [`DataFrame`] or replace an existing one.
/// Uses an existing schema to amortize lookups.
/// If the schema is incorrect, we will fallback to linear search.
///
/// Note: Schema can be both input or output_schema
pub fn with_column_and_schema<C: IntoColumn>(
&mut self,
column: C,
Expand Down Expand Up @@ -1974,6 +1989,12 @@ impl DataFrame {
return Ok(out);
}
if let Some((0, k)) = slice {
let desc = if sort_options.descending.len() == 1 {
sort_options.descending[0]
} else {
false
};
sort_options.limit = Some((k as IdxSize, desc));
return self.bottom_k_impl(k, by_column, sort_options);
}

Expand All @@ -1997,6 +2018,7 @@ impl DataFrame {
nulls_last: sort_options.nulls_last[0],
multithreaded: sort_options.multithreaded,
maintain_order: sort_options.maintain_order,
limit: sort_options.limit,
};
// fast path for a frame with a single series
// no need to compute the sort indices and then take by these indices
Expand Down
7 changes: 7 additions & 0 deletions crates/polars-expr/src/expressions/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ impl PhysicalExpr for AliasExpr {
Ok(self.finish(series))
}

fn evaluate_inline_impl(&self, depth_limit: u8) -> Option<Column> {
let depth_limit = depth_limit.checked_sub(1)?;
self.physical_expr
.evaluate_inline_impl(depth_limit)
.map(|s| self.finish(s))
}

#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
&self,
Expand Down
Loading

0 comments on commit ab405cd

Please sign in to comment.