diff --git a/Cargo.lock b/Cargo.lock index cd3efe3ae2d0..434521018422 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -356,9 +356,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4caf25cdc4a985f91df42ed9e9308e1adbcd341a31a72605c697033fcef163e3" +checksum = "eaf3437355979f1e93ba84ba108c38be5767713051f3c8ffbf07c094e2e61f9f" dependencies = [ "arrow-arith", "arrow-array", @@ -376,9 +376,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91f2dfd1a7ec0aca967dfaa616096aec49779adc8eccec005e2f5e4111b1192a" +checksum = "31dce77d2985522288edae7206bffd5fc4996491841dda01a13a58415867e681" dependencies = [ "arrow-array", "arrow-buffer", @@ -391,9 +391,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39387ca628be747394890a6e47f138ceac1aa912eab64f02519fed24b637af8" +checksum = "2d45fe6d3faed0435b7313e59a02583b14c6c6339fa7729e94c32a20af319a79" dependencies = [ "ahash", "arrow-buffer", @@ -401,15 +401,15 @@ dependencies = [ "arrow-schema", "chrono", "half", - "hashbrown 0.14.5", + "hashbrown 0.15.2", "num", ] [[package]] name = "arrow-buffer" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e51e05228852ffe3eb391ce7178a0f97d2cf80cc6ef91d3c4a6b3cb688049ec" +checksum = "2b02656a35cc103f28084bc80a0159668e0a680d919cef127bd7e0aaccb06ec1" dependencies = [ "bytes", "half", @@ -418,9 +418,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d09aea56ec9fa267f3f3f6cdab67d8a9974cbba90b3aa38c8fe9d0bb071bd8c1" +checksum = "c73c6233c5b5d635a56f6010e6eb1ab9e30e94707db21cea03da317f67d84cf3" dependencies = [ "arrow-array", "arrow-buffer", @@ -438,9 +438,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b98ae0af50890b494cebd7d6b04b35e896205c1d1df7b29a6272c5d0d0249ef5" +checksum = "b7f2861ffa86f107b8ab577d86cff7c7a490243eabe961ba1e1af4f27542bb79" dependencies = [ "arrow-buffer", "arrow-schema", @@ -460,9 +460,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ed91bdeaff5a1c00d28d8f73466bcb64d32bbd7093b5a30156b4b9f4dba3eee" +checksum = "0270dc511f11bb5fa98a25020ad51a99ca5b08d8a8dfbd17503bb9dba0388f0b" dependencies = [ "arrow-array", "arrow-buffer", @@ -474,9 +474,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2883d7035e0b600fb4c30ce1e50e66e53d8656aa729f2bfa4b51d359cf3ded52" +checksum = "c6f202a879d287099139ff0d121e7f55ae5e0efe634b8cf2106ebc27a8715dee" dependencies = [ "arrow-array", "arrow-buffer", @@ -489,9 +489,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "552907e8e587a6fde4f8843fd7a27a576a260f65dab6c065741ea79f633fc5be" +checksum = "a8f936954991c360ba762dff23f5dda16300774fafd722353d9683abd97630ae" dependencies = [ "ahash", "arrow-array", @@ -503,18 +503,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "539ada65246b949bd99ffa0881a9a15a4a529448af1a07a9838dd78617dafab1" +checksum = "9579b9d8bce47aa41389fe344f2c6758279983b7c0ebb4013e283e3e91bb450e" dependencies = [ "bitflags 2.6.0", ] [[package]] name = "arrow-select" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6259e566b752da6dceab91766ed8b2e67bf6270eb9ad8a6e07a33c1bede2b125" +checksum = "7471ba126d0b0aaa24b50a36bc6c25e4e74869a1fd1a5553357027a0b1c8d1f1" dependencies = [ "ahash", "arrow-array", @@ -526,9 +526,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "53.2.0" +version = "53.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3179ccbd18ebf04277a095ba7321b93fd1f774f18816bd5f6b3ce2f594edb6c" +checksum = "72993b01cb62507b06f1fb49648d7286c8989ecfabdb7b77a750fcb54410731b" dependencies = [ "arrow-array", "arrow-buffer", @@ -6099,7 +6099,6 @@ dependencies = [ "lz4_flex", "mimalloc", "parking_lot", - "re_arrow2", "re_build_info", "re_chunk", "re_log", diff --git a/Cargo.toml b/Cargo.toml index a454ba22a3b0..a8f73da6202a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,7 +153,7 @@ ahash = "0.8" anyhow = { version = "1.0", default-features = false } argh = "0.1.12" array-init = "2.1" -arrow = { version = "53.1", default-features = false } +arrow = { version = "53.4", default-features = false } arrow2 = { package = "re_arrow2", version = "0.18", features = ["arrow"] } async-executor = "1.0" backtrace = "0.3" diff --git a/clippy.toml b/clippy.toml index 7a3595366c79..8f26830095c4 100644 --- a/clippy.toml +++ b/clippy.toml @@ -74,7 +74,6 @@ disallowed-names = [] # https://rust-lang.github.io/rust-clippy/master/index.html#disallowed_types disallowed-types = [ - { path = "arrow::ipc::writer::StreamWriter", reason = "Wait until https://github.com/apache/arrow-rs/pull/6805 has been released" }, { path = "egui::Checkbox", reason = "Use `re_checkbox` from `re_ui::UiEx" }, { path = "ring::digest::SHA1_FOR_LEGACY_USE_ONLY", reason = "SHA1 is cryptographically broken" }, { path = "std::sync::Condvar", reason = "Use parking_lot instead" }, diff --git a/crates/store/re_log_encoding/Cargo.toml b/crates/store/re_log_encoding/Cargo.toml index f8bab7461412..2ec1d7f27695 100644 --- a/crates/store/re_log_encoding/Cargo.toml +++ b/crates/store/re_log_encoding/Cargo.toml @@ -59,7 +59,6 @@ re_tracing.workspace = true # External: arrow = { workspace = true, features = ["ipc"] } -arrow2.workspace = true parking_lot.workspace = true thiserror.workspace = true diff --git a/crates/store/re_log_encoding/src/codec/arrow.rs b/crates/store/re_log_encoding/src/codec/arrow.rs index 84a50a086ab6..f2bed9b50396 100644 --- a/crates/store/re_log_encoding/src/codec/arrow.rs +++ b/crates/store/re_log_encoding/src/codec/arrow.rs @@ -2,42 +2,16 @@ use super::CodecError; use arrow::array::RecordBatch as ArrowRecordBatch; -/// TODO(#3741): switch to arrow1 once is released -const SERIALIZE_WITH_ARROW_1: bool = false; // I _think_ we can use arrow1 here, because we don't encounter the above bug in this context -const DESERIALIZE_WITH_ARROW_1: bool = true; // Both arrow1 and arrow2 should be working fine - /// Helper function that serializes given arrow schema and record batch into bytes /// using Arrow IPC format. pub(crate) fn write_arrow_to_bytes( writer: &mut W, batch: &ArrowRecordBatch, ) -> Result<(), CodecError> { - if SERIALIZE_WITH_ARROW_1 { - #[allow(clippy::disallowed_types)] // it's behind a disabled feature flag - let mut sw = arrow::ipc::writer::StreamWriter::try_new(writer, batch.schema_ref()) - .map_err(CodecError::ArrowSerialization)?; - sw.write(batch).map_err(CodecError::ArrowSerialization)?; - sw.finish().map_err(CodecError::ArrowSerialization)?; - } else { - let schema = arrow2::datatypes::Schema::from(batch.schema()); - let chunk = arrow2::chunk::Chunk::new( - batch - .columns() - .iter() - .map(|c| -> Box { c.clone().into() }) - .collect(), - ); - - let mut writer = arrow2::io::ipc::write::StreamWriter::new(writer, Default::default()); - writer - .start(&schema, None) - .map_err(CodecError::Arrow2Serialization)?; - writer - .write(&chunk, None) - .map_err(CodecError::Arrow2Serialization)?; - writer.finish().map_err(CodecError::Arrow2Serialization)?; - } - + let mut sw = arrow::ipc::writer::StreamWriter::try_new(writer, batch.schema_ref()) + .map_err(CodecError::ArrowSerialization)?; + sw.write(batch).map_err(CodecError::ArrowSerialization)?; + sw.finish().map_err(CodecError::ArrowSerialization)?; Ok(()) } @@ -48,40 +22,13 @@ pub(crate) fn write_arrow_to_bytes( pub(crate) fn read_arrow_from_bytes( reader: &mut R, ) -> Result { - if DESERIALIZE_WITH_ARROW_1 { - let mut stream = arrow::ipc::reader::StreamReader::try_new(reader, None) - .map_err(CodecError::ArrowDeserialization)?; - - stream - .next() - .ok_or(CodecError::MissingRecordBatch)? - .map_err(CodecError::ArrowDeserialization) - } else { - use arrow2::io::ipc; + let mut stream = arrow::ipc::reader::StreamReader::try_new(reader, None) + .map_err(CodecError::ArrowDeserialization)?; - let metadata = - ipc::read::read_stream_metadata(reader).map_err(CodecError::Arrow2Serialization)?; - let mut stream = ipc::read::StreamReader::new(reader, metadata, None); - - let schema = stream.schema().clone(); - // there should be at least one record batch in the stream - let stream_state = stream - .next() - .ok_or(CodecError::MissingRecordBatch)? - .map_err(CodecError::Arrow2Serialization)?; - - match stream_state { - ipc::read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState), - ipc::read::StreamState::Some(chunk) => { - let batch = ArrowRecordBatch::try_new( - schema.into(), - chunk.columns().iter().map(|c| c.clone().into()).collect(), - ) - .map_err(CodecError::ArrowDeserialization)?; - Ok(batch) - } - } - } + stream + .next() + .ok_or(CodecError::MissingRecordBatch)? + .map_err(CodecError::ArrowDeserialization) } #[cfg(feature = "encoder")] diff --git a/crates/store/re_log_encoding/src/codec/mod.rs b/crates/store/re_log_encoding/src/codec/mod.rs index 6f143be06875..ee140f408882 100644 --- a/crates/store/re_log_encoding/src/codec/mod.rs +++ b/crates/store/re_log_encoding/src/codec/mod.rs @@ -7,9 +7,6 @@ pub enum CodecError { #[error("Arrow IPC serialization error: {0}")] ArrowSerialization(::arrow::error::ArrowError), - #[error("Arrow2 IPC serialization error: {0}")] - Arrow2Serialization(::arrow2::error::Error), - #[error("Invalid Chunk: {0}")] InvalidChunk(::arrow::error::ArrowError), diff --git a/crates/store/re_log_types/src/arrow_msg.rs b/crates/store/re_log_types/src/arrow_msg.rs index 77259a7f9fe5..30a582645627 100644 --- a/crates/store/re_log_types/src/arrow_msg.rs +++ b/crates/store/re_log_types/src/arrow_msg.rs @@ -9,10 +9,6 @@ use arrow::array::RecordBatch as ArrowRecordBatch; use crate::TimePoint; -// TODO(#3741): Remove once is released -const SERIALIZE_WITH_ARROW_1: bool = false; -const DESERIALIZE_WITH_ARROW_1: bool = true; // Both arrow1 and arrow2 should be working fine - /// An arbitrary callback to be run when an [`ArrowMsg`], and more specifically the /// [`ArrowRecordBatch`] within it, goes out of scope. /// @@ -107,39 +103,15 @@ impl serde::Serialize for ArrowMsg { let mut ipc_bytes = Vec::::new(); - if SERIALIZE_WITH_ARROW_1 { - #[allow(clippy::disallowed_types)] // it's behind a disabled feature flag - let mut writer = - arrow::ipc::writer::StreamWriter::try_new(&mut ipc_bytes, self.batch.schema_ref()) - .map_err(|err| serde::ser::Error::custom(err.to_string()))?; - writer - .write(&self.batch) - .map_err(|err| serde::ser::Error::custom(err.to_string()))?; - writer - .finish() - .map_err(|err| serde::ser::Error::custom(err.to_string()))?; - } else { - let schema = arrow2::datatypes::Schema::from(self.batch.schema()); - let chunk = arrow2::chunk::Chunk::new( - self.batch - .columns() - .iter() - .map(|c| -> Box { c.clone().into() }) - .collect(), - ); - - let mut writer = - arrow2::io::ipc::write::StreamWriter::new(&mut ipc_bytes, Default::default()); - writer - .start(&schema, None) + let mut writer = + arrow::ipc::writer::StreamWriter::try_new(&mut ipc_bytes, self.batch.schema_ref()) .map_err(|err| serde::ser::Error::custom(err.to_string()))?; - writer - .write(&chunk, None) - .map_err(|err| serde::ser::Error::custom(err.to_string()))?; - writer - .finish() - .map_err(|err| serde::ser::Error::custom(err.to_string()))?; - } + writer + .write(&self.batch) + .map_err(|err| serde::ser::Error::custom(err.to_string()))?; + writer + .finish() + .map_err(|err| serde::ser::Error::custom(err.to_string()))?; let mut inner = serializer.serialize_tuple(3)?; inner.serialize_element(&self.chunk_id)?; @@ -177,92 +149,35 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { if let (Some(chunk_id), Some(timepoint_max), Some(buf)) = (table_id, timepoint_max, ipc_bytes) { - let mut cursor = std::io::Cursor::new(buf); - - if DESERIALIZE_WITH_ARROW_1 { - use arrow::ipc::reader::StreamReader; - - let stream = StreamReader::try_new(cursor, None).map_err(|err| { - serde::de::Error::custom(format!("Arrow error: {err}")) - })?; - let batches: Result, _> = stream.collect(); - - let batches = batches.map_err(|err| { - serde::de::Error::custom(format!("Arrow error: {err}")) - })?; - - if batches.is_empty() { - return Err(serde::de::Error::custom("No RecordBatch in stream")); - } - if batches.len() > 1 { - return Err(serde::de::Error::custom(format!( - "Found {} batches in stream - expected just one.", - batches.len() - ))); - } - #[allow(clippy::unwrap_used)] // is_empty check above - let batch = batches.into_iter().next().unwrap(); - - Ok(ArrowMsg { - chunk_id, - timepoint_max, - batch, - on_release: None, - }) - } else { - use arrow2::io::ipc::read::{ - read_stream_metadata, StreamReader, StreamState, - }; - - let metadata = match read_stream_metadata(&mut cursor) { - Ok(metadata) => metadata, - Err(err) => { - return Err(serde::de::Error::custom(format!( - "Failed to read stream metadata: {err}" - ))) - } - }; - let schema = metadata.schema.clone(); - let stream = StreamReader::new(cursor, metadata, None); - let chunks: Result, _> = stream - .map(|state| match state { - Ok(StreamState::Some(chunk)) => Ok(chunk), - Ok(StreamState::Waiting) => { - unreachable!("cannot be waiting on a fixed buffer") - } - Err(err) => Err(err), - }) - .collect(); - - let chunks = chunks.map_err(|err| { - serde::de::Error::custom(format!("Arrow error: {err}")) - })?; - - if chunks.is_empty() { - return Err(serde::de::Error::custom("No chunks found in stream")); - } - if chunks.len() > 1 { - return Err(serde::de::Error::custom(format!( - "Found {} chunks in stream - expected just one.", - chunks.len() - ))); - } - #[allow(clippy::unwrap_used)] // is_empty check above - let chunk = chunks.into_iter().next().unwrap(); - - let batch = ArrowRecordBatch::try_new( - schema.into(), - chunk.columns().iter().map(|c| c.clone().into()).collect(), - ) + let cursor = std::io::Cursor::new(buf); + + use arrow::ipc::reader::StreamReader; + + let stream = StreamReader::try_new(cursor, None) .map_err(|err| serde::de::Error::custom(format!("Arrow error: {err}")))?; + let batches: Result, _> = stream.collect(); - Ok(ArrowMsg { - chunk_id, - timepoint_max, - batch, - on_release: None, - }) + let batches = batches + .map_err(|err| serde::de::Error::custom(format!("Arrow error: {err}")))?; + + if batches.is_empty() { + return Err(serde::de::Error::custom("No RecordBatch in stream")); + } + if batches.len() > 1 { + return Err(serde::de::Error::custom(format!( + "Found {} batches in stream - expected just one.", + batches.len() + ))); } + #[allow(clippy::unwrap_used)] // is_empty check above + let batch = batches.into_iter().next().unwrap(); + + Ok(ArrowMsg { + chunk_id, + timepoint_max, + batch, + on_release: None, + }) } else { Err(serde::de::Error::custom( "Expected (table_id, timepoint, buf)",