Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
douglas-raillard-arm committed Jan 5, 2024
1 parent d3c0180 commit 59256dd
Showing 1 changed file with 103 additions and 77 deletions.
180 changes: 103 additions & 77 deletions tools/analysis/trace-tools/src/lib/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub enum MainError {
#[error("Arrow data type not handled: {0:?}")]
ArrowDataTypeNotHandled(Box<DataType>),

#[error("Runtime data cannot be used according to the column storage schema")]
DataMismatchingSchema,

#[error("Expected field {field_name} to be available in event {event_name}")]
MissingField {
event_name: String,
Expand Down Expand Up @@ -148,6 +151,18 @@ where
// end up with a state per-event and per-buffer, which is not what we want.
type StateMap<'scope, 'scopeenv> = BTreeMap<EventId, SharedState<'scope, 'scopeenv>>;

macro_rules! chunk_append {
($($arms:expr),*) => {
loop {
$(
$arms;
)*

break Err(MainError::DataMismatchingSchema);
}
}
}

macro_rules! make_macros {
($scrutinee:expr) => {
let scrutinee = $scrutinee;
Expand All @@ -156,7 +171,7 @@ where
($pat:pat => $expr:expr) => {
if let $pat = scrutinee {
let xs = $expr;
break xs.len();
break Ok(xs.len());
}
}
}
Expand All @@ -167,29 +182,18 @@ where
match x {
$val_ctor(x) => {
#[allow(clippy::redundant_closure_call)]
xs.push(Some($f(x)))
xs.push(Some($f(x)));
break Ok(xs.len());
}
_ => xs.push(None)
_ => break Err(MainError::DataMismatchingSchema)
}
break xs.len();
}
}
}

}
}

macro_rules! chunk_append {
($($arms:expr),*) => {
loop {
$(
$arms;
)*
panic!("Unhandled combination of column type and value type");
}
}
}

scope(move |scope| -> Result<_, MainError> {
eprintln!("SPAWNED");

Expand Down Expand Up @@ -223,7 +227,7 @@ where
let cpu = buf_id.cpu;
let len: Cell<usize> = Cell::new(0);

let mut table_state = state.with_fields(
let (mut table_state, res) = state.with_fields(
&visitor,
|field_name, col, val| -> Result<(), MainError> {
let val = val?;
Expand Down Expand Up @@ -272,11 +276,6 @@ where
xs.push(Some(x != 0));
xs
}),
basic!((FieldArray::Bool(xs), _) => {
xs.push(None);
xs
}),

// Expected Binary
basic!((FieldArray::Binary(xs), Value::U8Array(x)) => {
xs.push(Some(x));
Expand Down Expand Up @@ -325,42 +324,6 @@ where
xs
}),

basic!((FieldArray::ListU8(xs), _) => {
xs.push_null();
xs
}),
basic!((FieldArray::ListI8(xs), _) => {
xs.push_null();
xs
}),

basic!((FieldArray::ListU16(xs), _) => {
xs.push_null();
xs
}),
basic!((FieldArray::ListI16(xs), _) => {
xs.push_null();
xs
}),

basic!((FieldArray::ListU32(xs), _) => {
xs.push_null();
xs
}),
basic!((FieldArray::ListI32(xs), _) => {
xs.push_null();
xs
}),

basic!((FieldArray::ListU64(xs), _) => {
xs.push_null();
xs
}),
basic!((FieldArray::ListI64(xs), _) => {
xs.push_null();
xs
}),

// Unexpected Binary
basic!((FieldArray::Binary(xs), Value::U16Array(x)) => {
xs.push(Some(bytemuck::cast_slice(&x)));
Expand All @@ -385,12 +348,8 @@ where
basic!((FieldArray::Binary(xs), Value::I64Array(x)) => {
xs.push(Some(bytemuck::cast_slice(&x)));
xs
}),
basic!((FieldArray::Binary(xs), _) => {
xs.push_null();
xs
})
});
}?);
Ok(())
}
)?;
Expand All @@ -402,7 +361,7 @@ where
let chunk = table_state.extract_chunk()?;
table_state.sender.send(chunk).unwrap();
}
Ok(())
res
}
}
}
Expand Down Expand Up @@ -524,6 +483,34 @@ impl FieldArray {
FieldArray::ListI64(xs) => xs.into_arc(),
}
}

fn push_null(&mut self) {
match self {
FieldArray::U8(xs) => xs.push_null(),
FieldArray::U16(xs) => xs.push_null(),
FieldArray::U32(xs) => xs.push_null(),
FieldArray::U64(xs) => xs.push_null(),

FieldArray::I8(xs) => xs.push_null(),
FieldArray::I16(xs) => xs.push_null(),
FieldArray::I32(xs) => xs.push_null(),
FieldArray::I64(xs) => xs.push_null(),

FieldArray::Bool(xs) => xs.push_null(),
FieldArray::Str(xs) => xs.push_null(),
FieldArray::Binary(xs) => xs.push_null(),

FieldArray::ListU8(xs) => xs.push_null(),
FieldArray::ListU16(xs) => xs.push_null(),
FieldArray::ListU32(xs) => xs.push_null(),
FieldArray::ListU64(xs) => xs.push_null(),

FieldArray::ListI8(xs) => xs.push_null(),
FieldArray::ListI16(xs) => xs.push_null(),
FieldArray::ListI32(xs) => xs.push_null(),
FieldArray::ListI64(xs) => xs.push_null(),
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -633,16 +620,22 @@ where
&'ret mut self,
visitor: &'ret EventVisitor<'i, 'h, 'edm, InitDescF, T>,
mut f: F,
) -> Result<impl DerefMut<Target=TableState<'scope>> + 'ret, MainError>
) -> Result<
(
impl DerefMut<Target=TableState<'scope>> + 'ret,
Result<(), MainError>
),
MainError
>
where
'i: 'ret,
'h: 'ret,
'scope: 'ret,
InitDescF: 'h + FnMut(&'h Header, &'h EventDesc) -> T,
F: for<'a> FnMut(
&'a str,
&'a mut FieldArray,
Result<Value<'a>, BufferError>,
F: FnMut(
&str,
&mut FieldArray,
Result<Value<'_>, BufferError>,
) -> Result<(), MainError>,
{
enum DerefMutWrapper<'a, T> {
Expand All @@ -669,18 +662,48 @@ where
}
}

let mut handle_error = |name, col: &mut _, val| {
let res = f(name, col, val);
match res {
Err(err) => {
col.push_null();
Err(err)
}
_ => Ok(())
}
};

// We want to go through all the columns so that we have a chance to append None values in
// places we had an error, and when we are done we return the last error. This way, all
// columns should have the same length and we will still be able to dump to parquet.
fn last_err<E, I: Iterator<Item=Result<(), E>>>(iter: I) -> Result<(), E> {
let mut res = Ok(());
for x in iter {
match x {
Err(err) => {
res = Err(err);
}
_ => {}
}
}
res
}

macro_rules! generic_iter {
($table_state:expr, $visitor:expr) => {{
let table_state = $table_state;
let visitor = $visitor;

visitor
let res = last_err(visitor
.fields()?
.into_iter()
.zip(table_state.field_cols.iter_mut())
.map(|((fmt, val), col)| f(fmt.declaration.identifier.deref(), col, val))
.collect::<Result<(), MainError>>()
.map(|_| DerefMutWrapper::RefMut(table_state))
.map(|((fmt, val), col)| handle_error(fmt.declaration.identifier.deref(), col, val))
);
Ok((
DerefMutWrapper::RefMut(table_state),
res
))
}};
}

Expand All @@ -697,14 +720,17 @@ where
let mut table_state: RefMut<'ret, _> = RefCell::borrow_mut(table_state);
let mut _table_state = table_state.deref_mut();

visitor
let res = last_err(visitor
.vbin_fields(print_fmt, &array)
.zip(_table_state.field_cols.iter_mut())
.map(|(res, col)| {
f(&_table_state.name, col, res.map(|print_arg| print_arg.value))
})
.collect::<Result<(), MainError>>()
.map(|_| DerefMutWrapper::RcRefMut(table_state))
handle_error(&_table_state.name, col, res.map(|print_arg| print_arg.value))
}));

Ok((
DerefMutWrapper::RcRefMut(table_state),
res
))
}
_ => Err(MainError::EvalError(EvalError::IllegalType)),
}
Expand Down

0 comments on commit 59256dd

Please sign in to comment.