Skip to content

Commit

Permalink
Remove extra module within re_arrow_util
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Feb 11, 2025
1 parent 435ea46 commit 4e30a81
Show file tree
Hide file tree
Showing 17 changed files with 454 additions and 472 deletions.
51 changes: 19 additions & 32 deletions crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
use crossbeam::channel::{Receiver, Sender};
use nohash_hasher::IntMap;

use re_arrow_util::arrow_util;
use re_arrow_util::arrays_to_list_array_opt;
use re_byte_size::SizeBytes as _;
use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline};
use re_types_core::ComponentDescriptor;
Expand Down Expand Up @@ -749,7 +749,7 @@ impl PendingRow {

let mut per_name = ChunkComponents::default();
for (component_desc, array) in components {
let list_array = arrow_util::arrays_to_list_array_opt(&[Some(&*array as _)]);
let list_array = arrays_to_list_array_opt(&[Some(&*array as _)]);
if let Some(list_array) = list_array {
per_name.insert_descriptor(component_desc, list_array);
}
Expand Down Expand Up @@ -898,8 +898,7 @@ impl PendingRow {
let mut per_name = ChunkComponents::default();
for (component_desc, arrays) in std::mem::take(&mut components)
{
let list_array =
arrow_util::arrays_to_list_array_opt(&arrays);
let list_array = arrays_to_list_array_opt(&arrays);
if let Some(list_array) = list_array {
per_name.insert_descriptor(component_desc, list_array);
}
Expand Down Expand Up @@ -944,7 +943,7 @@ impl PendingRow {
{
let mut per_name = ChunkComponents::default();
for (component_desc, arrays) in components {
let list_array = arrow_util::arrays_to_list_array_opt(&arrays);
let list_array = arrays_to_list_array_opt(&arrays);
if let Some(list_array) = list_array {
per_name.insert_descriptor(component_desc, list_array);
}
Expand Down Expand Up @@ -1106,24 +1105,16 @@ mod tests {
let expected_components = [
(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(
&[&*points1, &*points2, &*points3].map(Some),
)
.unwrap(),
arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
), //
(
MyLabel::descriptor(),
arrow_util::arrays_to_list_array_opt(
&[&*labels1, &*labels2, &*labels3].map(Some),
)
.unwrap(),
arrays_to_list_array_opt(&[&*labels1, &*labels2, &*labels3].map(Some)).unwrap(),
), //
(
MyIndex::descriptor(),
arrow_util::arrays_to_list_array_opt(
&[&*indices1, &*indices2, &*indices3].map(Some),
)
.unwrap(),
arrays_to_list_array_opt(&[&*indices1, &*indices2, &*indices3].map(Some))
.unwrap(),
), //
];
let expected_chunk = Chunk::from_native_row_ids(
Expand Down Expand Up @@ -1307,8 +1298,7 @@ mod tests {
let expected_timelines = [];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some))
.unwrap(),
arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
Expand Down Expand Up @@ -1387,7 +1377,7 @@ mod tests {
)];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
Expand All @@ -1411,7 +1401,7 @@ mod tests {
)];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[1].id,
Expand Down Expand Up @@ -1494,7 +1484,7 @@ mod tests {
)];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(),
arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
Expand Down Expand Up @@ -1524,7 +1514,7 @@ mod tests {
];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(),
arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[1].id,
Expand Down Expand Up @@ -1603,7 +1593,7 @@ mod tests {
)];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
Expand All @@ -1627,7 +1617,7 @@ mod tests {
)];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[1].id,
Expand Down Expand Up @@ -1730,10 +1720,8 @@ mod tests {
];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(
&[&*points1, &*points2, &*points3, &*points4].map(Some),
)
.unwrap(),
arrays_to_list_array_opt(&[&*points1, &*points2, &*points3, &*points4].map(Some))
.unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
Expand Down Expand Up @@ -1836,8 +1824,7 @@ mod tests {
];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some))
.unwrap(),
arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[0].id,
Expand Down Expand Up @@ -1867,7 +1854,7 @@ mod tests {
];
let expected_components = [(
MyPoint::descriptor(),
arrow_util::arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(),
arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(),
)];
let expected_chunk = Chunk::from_native_row_ids(
chunks[1].id,
Expand Down
7 changes: 3 additions & 4 deletions crates/store/re_chunk/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use arrow::{array::ArrayRef, datatypes::DataType as ArrowDatatype};
use itertools::Itertools;
use nohash_hasher::IntMap;

use re_arrow_util::arrow_util;
use re_log_types::{EntityPath, TimeInt, TimePoint, Timeline};
use re_types_core::{AsComponents, ComponentBatch, ComponentDescriptor, SerializedComponentBatch};

Expand Down Expand Up @@ -280,7 +279,7 @@ impl ChunkBuilder {
.into_iter()
.filter_map(|(component_desc, arrays)| {
let arrays = arrays.iter().map(|array| array.as_deref()).collect_vec();
arrow_util::arrays_to_list_array_opt(&arrays)
re_arrow_util::arrays_to_list_array_opt(&arrays)
.map(|list_array| (component_desc, list_array))
})
{
Expand Down Expand Up @@ -339,10 +338,10 @@ impl ChunkBuilder {
// If we know the datatype in advance, we're able to keep even fully sparse
// columns around.
if let Some(datatype) = datatypes.get(&component_desc) {
arrow_util::arrays_to_list_array(datatype.clone(), &arrays)
re_arrow_util::arrays_to_list_array(datatype.clone(), &arrays)
.map(|list_array| (component_desc, list_array))
} else {
arrow_util::arrays_to_list_array_opt(&arrays)
re_arrow_util::arrays_to_list_array_opt(&arrays)
.map(|list_array| (component_desc, list_array))
}
})
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use arrow::{
};
use itertools::{izip, Either, Itertools};

use re_arrow_util::{arrow_util::offsets_lengths, ArrowArrayDowncastRef as _};
use re_arrow_util::{offsets_lengths, ArrowArrayDowncastRef as _};
use re_log_types::{TimeInt, TimePoint, Timeline};
use re_types_core::{ArrowString, Component, ComponentName};

Expand Down
12 changes: 6 additions & 6 deletions crates/store/re_chunk/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
use itertools::{izip, Itertools};
use nohash_hasher::IntMap;

use re_arrow_util::{arrow_util, ArrowArrayDowncastRef as _};
use re_arrow_util::ArrowArrayDowncastRef as _;

use crate::{chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, TimeColumn};

Expand Down Expand Up @@ -47,7 +47,7 @@ impl Chunk {
let row_ids = {
re_tracing::profile_scope!("row_ids");

let row_ids = arrow_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?;
let row_ids = re_arrow_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?;
#[allow(clippy::unwrap_used)]
// concatenating 2 RowId arrays must yield another RowId array
row_ids
Expand Down Expand Up @@ -105,15 +105,15 @@ impl Chunk {
));

let list_array =
arrow_util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?;
re_arrow_util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?;
let list_array = list_array.downcast_array_ref::<ArrowListArray>()?.clone();

Some((component_desc.clone(), list_array))
} else {
re_tracing::profile_scope!("pad");
Some((
component_desc.clone(),
arrow_util::pad_list_array_back(
re_arrow_util::pad_list_array_back(
lhs_list_array,
self.num_rows() + rhs.num_rows(),
),
Expand Down Expand Up @@ -144,15 +144,15 @@ impl Chunk {
));

let list_array =
arrow_util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?;
re_arrow_util::concat_arrays(&[lhs_list_array, rhs_list_array]).ok()?;
let list_array = list_array.downcast_array_ref::<ArrowListArray>()?.clone();

Some((component_desc.clone(), list_array))
} else {
re_tracing::profile_scope!("pad");
Some((
component_desc.clone(),
arrow_util::pad_list_array_front(
re_arrow_util::pad_list_array_front(
rhs_list_array,
self.num_rows() + rhs.num_rows(),
),
Expand Down
5 changes: 3 additions & 2 deletions crates/store/re_chunk/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::array::{
use itertools::Itertools;
use nohash_hasher::IntMap;

use re_arrow_util::{arrow_util, ArrowArrayDowncastRef as _};
use re_arrow_util::ArrowArrayDowncastRef as _;
use re_types_core::arrow_helpers::as_array_ref;

use crate::Chunk;
Expand Down Expand Up @@ -83,7 +83,8 @@ impl Chunk {
.map(|a| a.as_deref() as Option<&dyn ArrowArray>)
.collect_vec();

if let Some(list_array_patched) = arrow_util::arrays_to_list_array_opt(&arrays)
if let Some(list_array_patched) =
re_arrow_util::arrays_to_list_array_opt(&arrays)
{
*list_array = list_array_patched;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl Chunk {
let offsets =
ArrowOffsets::from_lengths(sorted_arrays.iter().map(|array| array.len()));
#[allow(clippy::unwrap_used)] // these are slices of the same outer array
let values = re_arrow_util::arrow_util::concat_arrays(&sorted_arrays).unwrap();
let values = re_arrow_util::concat_arrays(&sorted_arrays).unwrap();
let validity = original
.nulls()
.map(|validity| swaps.iter().map(|&from| validity.is_valid(from)).collect());
Expand Down
28 changes: 15 additions & 13 deletions crates/store/re_chunk/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use arrow::array::{
use itertools::Itertools as _;
use nohash_hasher::IntSet;

use re_arrow_util::arrow_util;
use re_log_types::Timeline;
use re_types_core::{ComponentDescriptor, ComponentName};

Expand Down Expand Up @@ -366,15 +365,15 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: arrow_util::filter_array(row_ids, &validity_filter),
row_ids: re_arrow_util::filter_array(row_ids, &validity_filter),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.filtered(&validity_filter)))
.collect(),
components: components
.iter_flattened()
.map(|(component_desc, list_array)| {
let filtered = arrow_util::filter_array(list_array, &validity_filter);
let filtered = re_arrow_util::filter_array(list_array, &validity_filter);
let filtered = if component_desc.component_name == component_name_pov {
// Make sure we fully remove the validity bitmap for the densified
// component.
Expand Down Expand Up @@ -537,7 +536,7 @@ impl Chunk {
entity_path: self.entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted: self.is_sorted,
row_ids: arrow_util::take_array(
row_ids: re_arrow_util::take_array(
&self.row_ids,
&arrow::array::Int32Array::from(indices.clone()),
),
Expand All @@ -550,7 +549,7 @@ impl Chunk {
.components
.iter_flattened()
.map(|(component_desc, list_array)| {
let filtered = arrow_util::take_array(list_array, &indices);
let filtered = re_arrow_util::take_array(list_array, &indices);
(component_desc.clone(), filtered)
})
.collect(),
Expand Down Expand Up @@ -613,15 +612,15 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: arrow_util::filter_array(row_ids, filter),
row_ids: re_arrow_util::filter_array(row_ids, filter),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.filtered(filter)))
.collect(),
components: components
.iter_flattened()
.map(|(component_desc, list_array)| {
let filtered = arrow_util::filter_array(list_array, filter);
let filtered = re_arrow_util::filter_array(list_array, filter);
(component_desc.clone(), filtered)
})
.collect(),
Expand Down Expand Up @@ -693,7 +692,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: arrow_util::take_array(
row_ids: re_arrow_util::take_array(
row_ids,
&arrow::array::Int32Array::from(indices.clone()),
),
Expand All @@ -704,7 +703,7 @@ impl Chunk {
components: components
.iter_flattened()
.map(|(component_desc, list_array)| {
let taken = arrow_util::take_array(list_array, indices);
let taken = re_arrow_util::take_array(list_array, indices);
(component_desc.clone(), taken)
})
.collect(),
Expand Down Expand Up @@ -841,9 +840,12 @@ impl TimeColumn {
Self::new(
is_sorted_opt,
*timeline,
arrow_util::filter_array(&arrow::array::Int64Array::new(times.clone(), None), filter)
.into_parts()
.1,
re_arrow_util::filter_array(
&arrow::array::Int64Array::new(times.clone(), None),
filter,
)
.into_parts()
.1,
)
}

Expand All @@ -859,7 +861,7 @@ impl TimeColumn {
time_range: _,
} = self;

let new_times = arrow_util::take_array(
let new_times = re_arrow_util::take_array(
&arrow::array::Int64Array::new(times.clone(), None),
&arrow::array::Int32Array::from(indices.clone()),
)
Expand Down
Loading

0 comments on commit 4e30a81

Please sign in to comment.