Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WASM group by 32bit/64bit conversion bugfix #17793

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
6 changes: 4 additions & 2 deletions crates/polars-arrow/src/compute/cast/utf8_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ type OffsetType = i8;
// chunks so that we don't overflow the offset u32.
fn truncate_buffer(buf: &Buffer<u8>) -> Buffer<u8> {
// * 2, as it must be able to hold u32::MAX offset + u32::MAX len.
buf.clone()
.sliced(0, std::cmp::min(buf.len(), OffsetType::MAX as usize * 2))
buf.clone().sliced(
0,
std::cmp::min(buf.len(), ((OffsetType::MAX as u64) * 2) as usize),
)
}

pub fn binary_to_binview<O: Offset>(arr: &BinaryArray<O>) -> BinaryViewArray {
Expand Down
50 changes: 24 additions & 26 deletions crates/polars-row/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ fn allocate_rows_buf(
columns: &mut [Encoder],
fields: &[EncodingField],
values: &mut Vec<u8>,
offsets: &mut Vec<usize>,
offsets: &mut Vec<u64>,
) -> usize {
let has_variable = columns.iter().any(|enc| enc.is_variable());

Expand Down Expand Up @@ -372,13 +372,13 @@ fn allocate_rows_buf(
for opt_val in iter {
unsafe {
lengths.push_unchecked(
row_size_fixed + crate::variable::encoded_len(opt_val, &field),
(row_size_fixed + crate::variable::encoded_len(opt_val, &field)) as u64,
);
}
}
} else {
for (opt_val, row_length) in iter.zip(lengths.iter_mut()) {
*row_length += crate::variable::encoded_len(opt_val, &field)
*row_length += crate::variable::encoded_len(opt_val, &field) as u64;
}
}
processed_count += 1;
Expand All @@ -389,18 +389,18 @@ fn allocate_rows_buf(
let array = array.as_any().downcast_ref::<BinaryViewArray>().unwrap();
if processed_count == 0 {
for opt_val in array.into_iter() {
let next_length = row_size_fixed
+ crate::variable::encoded_len(opt_val, enc_field);
unsafe {
lengths.push_unchecked(
row_size_fixed
+ crate::variable::encoded_len(opt_val, enc_field),
);
lengths.push_unchecked(next_length as u64);
}
}
} else {
for (opt_val, row_length) in
array.into_iter().zip(lengths.iter_mut())
{
*row_length += crate::variable::encoded_len(opt_val, enc_field)
*row_length +=
crate::variable::encoded_len(opt_val, enc_field) as u64
}
}
processed_count += 1;
Expand All @@ -409,18 +409,18 @@ fn allocate_rows_buf(
let array = array.as_any().downcast_ref::<BinaryArray<i64>>().unwrap();
if processed_count == 0 {
for opt_val in array.into_iter() {
let next_length = row_size_fixed
+ crate::variable::encoded_len(opt_val, enc_field);
unsafe {
lengths.push_unchecked(
row_size_fixed
+ crate::variable::encoded_len(opt_val, enc_field),
);
lengths.push_unchecked(next_length as u64);
}
}
} else {
for (opt_val, row_length) in
array.into_iter().zip(lengths.iter_mut())
{
*row_length += crate::variable::encoded_len(opt_val, enc_field)
*row_length +=
crate::variable::encoded_len(opt_val, enc_field) as u64
}
}
processed_count += 1;
Expand All @@ -436,16 +436,14 @@ fn allocate_rows_buf(
.map(|opt_s| opt_s.map(|s| s.as_bytes()));
if processed_count == 0 {
for opt_val in iter {
unsafe {
lengths.push_unchecked(
row_size_fixed
+ crate::variable::encoded_len(opt_val, enc_field),
)
}
let next_length = row_size_fixed
+ crate::variable::encoded_len(opt_val, enc_field);
unsafe { lengths.push_unchecked(next_length as u64) }
}
} else {
for (opt_val, row_length) in iter.zip(lengths.iter_mut()) {
*row_length += crate::variable::encoded_len(opt_val, enc_field)
*row_length +=
crate::variable::encoded_len(opt_val, enc_field) as u64
}
}
processed_count += 1;
Expand All @@ -466,12 +464,12 @@ fn allocate_rows_buf(
for length in offsets.iter_mut() {
let to_write = lagged_offset;
lagged_offset = current_offset;
current_offset += *length;
current_offset += *length as usize;

*length = to_write;
*length = to_write as u64;
}
// ensure we have len + 1 offsets
offsets.push(lagged_offset);
offsets.push(lagged_offset as u64);

// Only reserve. The init will be done later
values.reserve(current_offset);
Expand All @@ -496,10 +494,10 @@ fn allocate_rows_buf(
// 0, 2, 4, 6
offsets.clear();
offsets.reserve(num_rows + 1);
let mut current_offset = 0;
offsets.push(current_offset);
let mut current_offset = 0_usize;
offsets.push(current_offset as u64);
for _ in 0..num_rows {
offsets.push(current_offset);
offsets.push(current_offset as u64);
current_offset += row_size;
}
n_bytes
Expand Down
19 changes: 11 additions & 8 deletions crates/polars-row/src/fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ impl FixedLengthEncoding for f64 {
#[inline]
fn encode_value<T: FixedLengthEncoding>(
value: &T,
offset: &mut usize,
offset: &mut u64,
descending: bool,
buf: &mut [MaybeUninit<u8>],
) {
let end_offset = *offset + T::ENCODED_LEN;
let dst = unsafe { buf.get_unchecked_mut(*offset..end_offset) };
let usize_offset = *offset as usize;
let end_offset = usize_offset + T::ENCODED_LEN;
let dst = unsafe { buf.get_unchecked_mut(usize_offset..end_offset) };
// set valid
dst[0] = MaybeUninit::new(1);
let mut encoded = value.encode();
Expand All @@ -162,7 +163,7 @@ fn encode_value<T: FixedLengthEncoding>(
}

dst[1..].copy_from_slice(encoded.as_ref().as_uninit());
*offset = end_offset;
*offset = end_offset as u64;
}

pub(crate) unsafe fn encode_slice<T: FixedLengthEncoding>(
Expand Down Expand Up @@ -197,16 +198,18 @@ pub(crate) unsafe fn encode_iter<I: Iterator<Item = Option<T>>, T: FixedLengthEn
if let Some(value) = opt_value {
encode_value(&value, offset, field.descending, values);
} else {
let usize_offset = *offset as usize;
unsafe {
*values.get_unchecked_mut(*offset) = MaybeUninit::new(get_null_sentinel(field))
*values.get_unchecked_mut(usize_offset) = MaybeUninit::new(get_null_sentinel(field))
};
let end_offset = *offset + T::ENCODED_LEN;

let end_offset = usize_offset + T::ENCODED_LEN;

// initialize remaining bytes
let remainder = values.get_unchecked_mut(*offset + 1..end_offset);
let remainder = values.get_unchecked_mut(usize_offset + 1..end_offset);
remainder.fill(MaybeUninit::new(0));

*offset = end_offset;
*offset = end_offset as u64;
}
}
}
Expand Down
33 changes: 15 additions & 18 deletions crates/polars-row/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,23 @@ impl EncodingField {
#[derive(Default, Clone)]
pub struct RowsEncoded {
pub(crate) values: Vec<u8>,
pub(crate) offsets: Vec<usize>,

// This vector is in practice a vec of usize's.
// However, since the vec is eventually passed to arrow as i64's,
// we need to make sure the right number of bytes are reserved.
// Usize's take 4 bytes of memory on 32bit systems, whereas i64 takes 8 bytes.
pub(crate) offsets: Vec<u64>,
}

fn checks(offsets: &[usize]) {
assert_eq!(
size_of::<usize>(),
size_of::<i64>(),
"only supported on 64bit arch"
);
assert!(
(*offsets.last().unwrap() as u64) < i64::MAX as u64,
"overflow"
);
fn checks(offsets: &[u64]) {
assert!(*offsets.last().unwrap() < i64::MAX as u64, "overflow");
}

unsafe fn rows_to_array(buf: Vec<u8>, offsets: Vec<usize>) -> BinaryArray<i64> {
unsafe fn rows_to_array(buf: Vec<u8>, offsets: Vec<u64>) -> BinaryArray<i64> {
checks(&offsets);

// SAFETY: we checked overflow
let offsets = bytemuck::cast_vec::<usize, i64>(offsets);
let offsets = bytemuck::cast_vec::<u64, i64>(offsets);

// SAFETY: monotonically increasing
let offsets = Offsets::new_unchecked(offsets);
Expand All @@ -63,13 +60,13 @@ unsafe fn rows_to_array(buf: Vec<u8>, offsets: Vec<usize>) -> BinaryArray<i64> {
}

impl RowsEncoded {
pub(crate) fn new(values: Vec<u8>, offsets: Vec<usize>) -> Self {
pub(crate) fn new(values: Vec<u8>, offsets: Vec<u64>) -> Self {
RowsEncoded { values, offsets }
}

pub fn iter(&self) -> RowsEncodedIter {
let iter = self.offsets[1..].iter();
let offset = self.offsets[0];
let offset = self.offsets[0] as usize;
RowsEncodedIter {
offset,
end: iter,
Expand All @@ -87,7 +84,7 @@ impl RowsEncoded {

unsafe {
let (_, values, _) = mmap::slice(&self.values).into_inner();
let offsets = bytemuck::cast_slice::<usize, i64>(self.offsets.as_slice());
let offsets = bytemuck::cast_slice::<u64, i64>(self.offsets.as_slice());
let (_, offsets, _) = mmap::slice(offsets).into_inner();
let offsets = OffsetsBuffer::new_unchecked(offsets);

Expand Down Expand Up @@ -115,15 +112,15 @@ impl RowsEncoded {

pub struct RowsEncodedIter<'a> {
offset: usize,
end: std::slice::Iter<'a, usize>,
end: std::slice::Iter<'a, u64>,
values: &'a [u8],
}

impl<'a> Iterator for RowsEncodedIter<'a> {
type Item = &'a [u8];

fn next(&mut self) -> Option<Self::Item> {
let new_offset = *self.end.next()?;
let new_offset = *self.end.next()? as usize;
let payload = unsafe { self.values.get_unchecked(self.offset..new_offset) };
self.offset = new_offset;
Some(payload)
Expand Down
14 changes: 8 additions & 6 deletions crates/polars-row/src/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,21 @@ pub(crate) unsafe fn encode_iter<'a, I: Iterator<Item = Option<&'a [u8]>>>(

if field.no_order {
for (offset, opt_value) in out.offsets.iter_mut().skip(1).zip(input) {
let dst = values.get_unchecked_mut(*offset..);
let dst: &mut [MaybeUninit<u8>] =
values.get_unchecked_mut((*offset as usize)..);
let written_len = encode_one_no_order(dst, opt_value.map(|v| v.as_uninit()), field);
*offset += written_len;
*offset += written_len as u64;
}
} else {
for (offset, opt_value) in out.offsets.iter_mut().skip(1).zip(input) {
let dst = values.get_unchecked_mut(*offset..);
let dst: &mut [MaybeUninit<u8>] =
values.get_unchecked_mut((*offset as usize)..);
let written_len = encode_one(dst, opt_value.map(|v| v.as_uninit()), field);
*offset += written_len;
*offset += written_len as u64;
}
}
let offset = out.offsets.last().unwrap();
let dst = values.get_unchecked_mut(*offset..);
let offset = *out.offsets.last().unwrap() as usize;
let dst: &mut [MaybeUninit<u8>] = values.get_unchecked_mut(offset..);
// write remainder as zeros
dst.fill(MaybeUninit::new(0));
out.values.set_len(out.values.capacity())
Expand Down
Loading