Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FOR bug, also fix bench to compile #341

Merged
merged 16 commits into from
Jun 11, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { workspace = true }
uuid = { workspace = true }
vortex-alp = { path = "../vortex-alp" }
vortex-array = { path = "../vortex-array" }
vortex-buffer = { path = "../vortex-buffer" }
vortex-datetime-parts = { path = "../vortex-datetime-parts" }
vortex-dict = { path = "../vortex-dict" }
vortex-dtype = { path = "../vortex-dtype" }
Expand Down
1 change: 1 addition & 0 deletions bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ fn random_access(c: &mut Criterion) {
});

let dataset = BenchmarkDatasets::PBI(Medicare1);
dataset.write_as_parquet();
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
dataset.write_as_lance();
// NB: our parquet benchmarks read from a single file, and we (currently) write each
// file to an individual lance dataset for comparison parity.
Expand Down
22 changes: 11 additions & 11 deletions bench-vortex/src/bin/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@ use std::path::PathBuf;
use bench_vortex::data_downloads::BenchmarkDataset;
use bench_vortex::public_bi_data::BenchmarkDatasets::PBI;
use bench_vortex::public_bi_data::PBIDataset;
use bench_vortex::reader::{open_vortex, rewrite_parquet_as_vortex};
use bench_vortex::reader::{open_vortex_async, rewrite_parquet_as_vortex};
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::{setup_logger, IdempotentPath};
use futures::executor::block_on;
use log::{info, LevelFilter};
use tokio::fs::File;
use vortex_error::VortexResult;

pub fn main() {
#[tokio::main]
pub async fn main() {
setup_logger(LevelFilter::Info);
// compress_pbi(PBIDataset::Medicare1);
compress_taxi();
compress_taxi().await.unwrap();
}

fn compress_taxi() {
async fn compress_taxi() -> VortexResult<()> {
let path: PathBuf = "taxi_data.vortex".to_data_path();
block_on(async {
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await
})
.unwrap();
let output_file = File::create(&path).await?;
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?;

let taxi_vortex = open_vortex(&path).unwrap();
let taxi_vortex = open_vortex_async(&path).await?;
info!("{}", taxi_vortex.tree_display());

let pq_size = taxi_data_parquet().metadata().unwrap().size();
let vx_size = taxi_vortex.nbytes();

info!("Parquet size: {}, Vortex size: {}", pq_size, vx_size);
info!("Compression ratio: {}", vx_size as f32 / pq_size as f32);

Ok(())
}

#[allow(dead_code)]
Expand Down
13 changes: 12 additions & 1 deletion bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ pub fn open_vortex(path: &Path) -> VortexResult<Array> {
.map(|a| a.into_array())
}

pub async fn open_vortex_async(path: &Path) -> VortexResult<Array> {
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
let file = tokio::fs::File::open(path).await.unwrap();
let mut msgs = MessageReader::try_new(TokioAdapter(file)).await.unwrap();
msgs.array_stream_from_messages(&CTX)
.await
.unwrap()
.collect_chunked()
.await
.map(|a| a.into_array())
}

pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
parquet_path: PathBuf,
write: W,
Expand Down Expand Up @@ -103,7 +114,7 @@ pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult<Array> {
let array = open_vortex(path)?;
let taken = take(&array, &indices.to_vec().into_array())?;
// For equivalence.... we flatten to make sure we're not cheating too much.
taken.flatten().map(|x| x.into_array())
Ok(taken.flatten()?.into_array())
}

pub fn take_parquet(path: &Path, indices: &[u64]) -> VortexResult<RecordBatch> {
Expand Down
31 changes: 29 additions & 2 deletions bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::future::{ready, Future};
use std::io::Write;
use std::path::PathBuf;

use futures::executor::block_on;
use tokio::fs::File;
use vortex_buffer::io_buf::IoBuf;
use vortex_error::VortexError;
use vortex_ipc::io::VortexWrite;

use crate::data_downloads::{data_vortex_uncompressed, download_data, parquet_to_lance};
use crate::reader::rewrite_parquet_as_vortex;
Expand Down Expand Up @@ -33,10 +36,34 @@ pub fn taxi_data_vortex_uncompressed() -> PathBuf {
pub fn taxi_data_vortex() -> PathBuf {
idempotent("taxi.vortex", |output_fname| {
block_on(async {
let output_file = File::create(output_fname).await?;
let output_file = std::fs::File::create(output_fname)?;
let output_file = StdFile(output_file);
rewrite_parquet_as_vortex(taxi_data_parquet(), output_file).await?;
Ok::<PathBuf, VortexError>(output_fname.to_path_buf())
})
})
.unwrap()
}

//
// Test code uses futures_executor with a local pool, and nothing in VortexWrite ties us to Tokio,
// so this is a simple bridge to allow us to use a `std::fs::File` as a `VortexWrite`.
//

struct StdFile(std::fs::File);

impl VortexWrite for StdFile {
async fn write_all<B: IoBuf>(&mut self, buffer: B) -> std::io::Result<B> {
self.0.write_all(buffer.as_slice())?;
Ok(buffer)
}

async fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()?;
Ok(())
}

fn shutdown(&mut self) -> impl Future<Output = std::io::Result<()>> {
ready(Ok(()))
}
}
39 changes: 38 additions & 1 deletion vortex-alp/src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use vortex::compute::as_contiguous::AsContiguousFn;
use vortex::compute::scalar_at::{scalar_at, ScalarAtFn};
use vortex::compute::slice::{slice, SliceFn};
use vortex::compute::take::{take, TakeFn};
use vortex::compute::ArrayCompute;
use vortex::{Array, ArrayDType, IntoArray};
use vortex::{impl_default_as_contiguous_fn, Array, ArrayDType, IntoArray};
use vortex_error::VortexResult;
use vortex_scalar::Scalar;

use crate::{match_each_alp_float_ptype, ALPArray};

impl_default_as_contiguous_fn!(ALPArray);

impl ArrayCompute for ALPArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
Expand All @@ -20,6 +23,10 @@ impl ArrayCompute for ALPArray {
fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
Some(self)
}
}

impl ScalarAtFn for ALPArray {
Expand Down Expand Up @@ -61,3 +68,33 @@ impl SliceFn for ALPArray {
.into_array())
}
}

#[cfg(test)]
mod test {
use vortex::array::primitive::PrimitiveArray;
use vortex::compute::as_contiguous::AsContiguousFn;
use vortex::compute::scalar_at::scalar_at;
use vortex::validity::Validity;
use vortex::IntoArray;

use crate::ALPArray;

#[test]
fn test_as_contiguous() {
let values = vec![1.0, 2.0, 3.0];
let primitives = PrimitiveArray::from_vec(values, Validity::NonNullable);
let encoded = ALPArray::encode(primitives.into_array()).unwrap();
let alp = ALPArray::try_from(&encoded).unwrap();

let flat = alp.as_contiguous(&[encoded]).unwrap();

let a: f64 = scalar_at(&flat, 0).unwrap().try_into().unwrap();
let b: f64 = scalar_at(&flat, 1).unwrap().try_into().unwrap();

let c: f64 = scalar_at(&flat, 2).unwrap().try_into().unwrap();

assert_eq!(a, 1.0);
assert_eq!(b, 2.0);
assert_eq!(c, 3.0);
}
}
2 changes: 1 addition & 1 deletion vortex-array/src/array/bool/compute/as_contiguous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl AsContiguousFn for BoolArray {
let mut bools = Vec::with_capacity(arrays.iter().map(|a| a.len()).sum());
for buffer in arrays
.iter()
.map(|a| Self::try_from(a.clone()).unwrap().boolean_buffer())
.map(|a| Self::try_from(a).unwrap().boolean_buffer())
{
bools.extend(buffer.iter())
}
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/constant/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl ArrayFlatten for ConstantArray {
if let Ok(ptype) = PType::try_from(self.scalar().dtype()) {
return match_each_native_ptype!(ptype, |$P| {
Ok(Flattened::Primitive(PrimitiveArray::from_vec::<$P>(
vec![$P::try_from(self.scalar())?; self.len()],
vec![$P::try_from(self.scalar()).unwrap_or_else(|_| $P::default()); self.len()],
validity,
)))
});
Expand Down
22 changes: 21 additions & 1 deletion vortex-array/src/array/datetime/localdatetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ impl LocalDateTimeArray {
}
}

impl TryFrom<LocalDateTimeArray> for ExtensionArray {
a10y marked this conversation as resolved.
Show resolved Hide resolved
type Error = VortexError;

fn try_from(value: LocalDateTimeArray) -> Result<Self, Self::Error> {
Self::try_from(&value)
}
}

impl TryFrom<&LocalDateTimeArray> for ExtensionArray {
type Error = VortexError;

fn try_from(value: &LocalDateTimeArray) -> Result<Self, Self::Error> {
let DType::Extension(ext_dtype, _) = value.dtype().clone() else {
vortex_bail!(ComputeError: "expected dtype to be Extension variant");
};

Ok(Self::new(ext_dtype, value.ext.storage()))
}
}

impl TryFrom<&ExtensionArray> for LocalDateTimeArray {
type Error = VortexError;

Expand Down Expand Up @@ -93,7 +113,7 @@ impl IntoArrayData for LocalDateTimeArray {
}
}

fn try_parse_time_unit(ext_dtype: &ExtDType) -> VortexResult<TimeUnit> {
pub fn try_parse_time_unit(ext_dtype: &ExtDType) -> VortexResult<TimeUnit> {
let byte: [u8; 1] = ext_dtype
.metadata()
.ok_or_else(|| vortex_err!("Missing metadata"))?
Expand Down
20 changes: 17 additions & 3 deletions vortex-array/src/array/struct/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::compute::slice::{slice, SliceFn};
use crate::compute::take::{take, TakeFn};
use crate::compute::ArrayCompute;
use crate::validity::Validity;
use crate::ArrayTrait;
use crate::{Array, ArrayDType, IntoArray};

impl ArrayCompute for StructArray {
Expand Down Expand Up @@ -82,6 +81,11 @@ impl AsContiguousFn for StructArray {
}
}

let fields_len = fields
.first()
.map(|field| field.iter().map(|a| a.len()).sum())
.unwrap_or_default();

let validity = if self.dtype().is_nullable() {
Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity())))
} else {
Expand All @@ -92,9 +96,19 @@ impl AsContiguousFn for StructArray {
self.names().clone(),
fields
.iter()
.map(|field_arrays| as_contiguous(field_arrays))
.map(|field_arrays| {
// Currently, as_contiguous cannot handle sub-arrays with differing encodings.
// So, first flatten each constituent array, then as_contiguous them back into
// a single array.
let flattened = field_arrays
.iter()
.cloned()
.map(|array| array.flatten().unwrap().into_array())
.collect::<Vec<_>>();
as_contiguous(flattened.as_slice())
a10y marked this conversation as resolved.
Show resolved Hide resolved
})
.try_collect()?,
self.len(),
fields_len,
validity,
)
.map(|a| a.into_array())
Expand Down
6 changes: 3 additions & 3 deletions vortex-array/src/array/struct/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ impl ArrayTrait for StructArray {
}

impl ArrayValidity for StructArray {
fn is_valid(&self, _index: usize) -> bool {
todo!()
fn is_valid(&self, index: usize) -> bool {
self.validity().is_valid(index)
}

fn logical_validity(&self) -> LogicalValidity {
todo!()
self.validity().to_logical(self.len())
}
}

Expand Down
53 changes: 37 additions & 16 deletions vortex-array/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,10 @@ impl<'a> Compressor<'a> {
.map(|c| c.compress(arr, Some(l), self.for_encoding(l.encoding().compression())))
{
let compressed = compressed?;
if compressed.dtype() != arr.dtype() {
panic!(
"Compression changed dtype: {:?} -> {:?} for {}",
arr.dtype(),
compressed.dtype(),
compressed.tree_display(),
);
}

check_validity_unchanged(arr, &compressed);
check_dtype_unchanged(arr, &compressed);

return Ok(compressed);
} else {
warn!(
Expand All @@ -165,14 +161,6 @@ impl<'a> Compressor<'a> {

// Otherwise, attempt to compress the array
let compressed = self.compress_array(arr)?;
if compressed.dtype() != arr.dtype() {
panic!(
"Compression changed dtype: {:?} -> {:?} for {}",
arr.dtype(),
compressed.dtype(),
compressed.tree_display(),
);
}
Ok(compressed)
}

Expand Down Expand Up @@ -226,6 +214,39 @@ impl<'a> Compressor<'a> {
}
}

/// Check that compression did not alter the length of the validity array.
fn check_validity_unchanged(arr: &Array, compressed: &Array) {
let _ = arr;
let _ = compressed;
a10y marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(debug_assertions)]
{
let old_validity = arr.with_dyn(|a| a.logical_validity().len());
let new_validity = compressed.with_dyn(|a| a.logical_validity().len());

debug_assert!(
old_validity == new_validity,
"validity length changed after compression: {old_validity} -> {new_validity} for {}",
compressed.tree_display()
);
}
}

/// Check that compression did not alter the dtype.
fn check_dtype_unchanged(arr: &Array, compressed: &Array) {
let _ = arr;
let _ = compressed;
#[cfg(debug_assertions)]
{
debug_assert!(
arr.dtype() == compressed.dtype(),
"Compression changed dtype: {:?} -> {:?} for {}",
arr.dtype(),
compressed.dtype(),
compressed.tree_display(),
);
}
}

pub fn sampled_compression(array: &Array, compressor: &Compressor) -> VortexResult<Option<Array>> {
// First, we try constant compression and shortcut any sampling.
if !array.is_empty() && array.statistics().compute_is_constant().unwrap_or(false) {
Expand Down
Loading