Skip to content

Commit

Permalink
Assert the output of write_csv
Browse files Browse the repository at this point in the history
  • Loading branch information
arnodb committed Feb 8, 2025
1 parent 6f7d56a commit ae9d34e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
43 changes: 40 additions & 3 deletions quirky_binder/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,19 +398,58 @@ impl<'a> Chain<'a> {
pub interrupt: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
}),
};
let outer_interrupt_impl = match thread.thread_type {
ChainThreadType::Regular => None,
ChainThreadType::Background => Some(quote! {
pub fn interrupt(&self) {
let mut is_interrupted = self.interrupt.0.lock().unwrap();
*is_interrupted = true;
self.interrupt.1.notify_all();
}
}),
};
let interrupt_impl = match thread.thread_type {
ChainThreadType::Regular => None,
ChainThreadType::Background => Some(quote! {
pub fn wait_until_interrupted(&self) {
let is_interrupted = self.interrupt.1.wait_while(
self.interrupt.0.lock().unwrap(),
|is_interrupted| !*is_interrupted,
).unwrap();
debug_assert!(*is_interrupted);
}

pub fn wait_timeout_until_interrupted(&self, dur: std::time::Duration) -> bool {
let is_interrupted = self.interrupt.1.wait_timeout_while(
self.interrupt.0.lock().unwrap(),
dur,
|is_interrupted| !*is_interrupted,
).unwrap().0;
*is_interrupted
}
}),
};
let struct_def = quote! {

pub struct ThreadOuterControl {
#interrupt
}

impl ThreadOuterControl {
#outer_interrupt_impl
}

pub struct ThreadControl {
pub chain_configuration: Arc<ChainConfiguration>,
#interrupt
#(pub #inputs: Option<Receiver<Option<#input_types>>>,)*
#(pub #outputs: Option<SyncSender<Option<#output_types>>>,)*
}

impl ThreadControl {
#interrupt_impl
}

};
module.fragment(struct_def.to_string());
}
Expand Down Expand Up @@ -530,9 +569,7 @@ impl<'a> Chain<'a> {
.map(|thread| {
let thread_outer_control = format_ident!("thread_outer_control_{}", thread.id);
quote! {{
let mut is_interrupted = #thread_outer_control.interrupt.0.lock().unwrap();
*is_interrupted = true;
#thread_outer_control.interrupt.1.notify_all();
#thread_outer_control.interrupt();
}}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use quirky_binder::{
filter::{
function::produce::function_produce,
function::{
execute::function_execute,
produce::function_produce,
},
},
};
use quirky_binder_csv::write_csv;
Expand All @@ -21,4 +24,21 @@ use quirky_binder_csv::write_csv;
has_headers: true,
)
)

(
function_execute#assert_output(
thread_type: Background,
body: r###"
// Wait until the end of the execution to read the output and assert its content
thread_control.wait_until_interrupted();

let actual = std::fs::read_to_string("output/hello_universe.csv").expect("actual");
assert_eq!(actual, r#"hello,universe
world,42
"#);

Ok(())
"###,
)
)
}

0 comments on commit ae9d34e

Please sign in to comment.