Skip to content

Commit

Permalink
Make function_terminate read 0 to N inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
arnodb committed Dec 20, 2023
1 parent e2a97c6 commit e9d7bb5
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 74 deletions.
4 changes: 2 additions & 2 deletions datapet/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl<'a> Chain<'a> {
})
}

pub fn get_thread_id_and_module_by_source(
pub fn get_thread_by_source(
&mut self,
input: &NodeStream,
new_thread_name: &FullyQualifiedName,
Expand Down Expand Up @@ -508,7 +508,7 @@ impl<'a> Chain<'a> {
) {
let name = node.name();

let thread = self.get_thread_id_and_module_by_source(input, name, Some(output));
let thread = self.get_thread_by_source(input, name, Some(output));

let record = self.stream_definition_fragments(output).record();

Expand Down
16 changes: 8 additions & 8 deletions datapet/src/filter/fork/extract_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,19 @@ impl DynNode for ExtractFields {

let thread_body = quote! {
move || {
let rx = thread_control.input_0.take().expect("input 0");
let tx_0 = thread_control.output_0.take().expect("output 0");
let tx_1 = thread_control.output_1.take().expect("output 1");
while let Some(record) = rx.recv()? {
let input_0 = thread_control.input_0.take().expect("input 0");
let output_0 = thread_control.output_0.take().expect("output 0");
let output_1 = thread_control.output_1.take().expect("output 1");
while let Some(record) = input_0.recv()? {
#(#datum_clones)*
let record_1 = #output_record_1::new(
#output_unpacked_record_1 { #(#fields),* }
);
tx_0.send(Some(record))?;
tx_1.send(Some(record_1))?;
output_0.send(Some(record))?;
output_1.send(Some(record_1))?;
}
tx_0.send(None)?;
tx_1.send(None)?;
output_0.send(None)?;
output_1.send(None)?;
Ok(())
}
};
Expand Down
65 changes: 43 additions & 22 deletions datapet/src/filter/function/terminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ pub struct FunctionTerminateParams<'a> {
}

#[derive(Getters)]
pub struct FunctionTerminate {
pub struct FunctionTerminate<const N: usize> {
name: FullyQualifiedName,
#[getset(get = "pub")]
inputs: [NodeStream; 1],
inputs: [NodeStream; N],
#[getset(get = "pub")]
outputs: [NodeStream; 0],
body: TokenStream,
}

impl FunctionTerminate {
impl<const N: usize> FunctionTerminate<N> {
fn new<R: TypeResolver + Copy>(
_graph: &mut GraphBuilder<R>,
name: FullyQualifiedName,
inputs: [NodeStream; 1],
inputs: [NodeStream; N],
params: FunctionTerminateParams,
trace: Trace,
) -> ChainResult<Self> {
Expand All @@ -49,7 +49,7 @@ impl FunctionTerminate {
}
}

impl DynNode for FunctionTerminate {
impl<const N: usize> DynNode for FunctionTerminate<N> {
fn name(&self) -> &FullyQualifiedName {
&self.name
}
Expand All @@ -63,43 +63,64 @@ impl DynNode for FunctionTerminate {
}

fn gen_chain(&self, graph: &Graph, chain: &mut Chain) {
let thread = chain.get_thread_id_and_module_by_source(
self.inputs.single(),
&self.name,
self.outputs.none(),
);

let input = thread.format_input(
self.inputs.single().source(),
graph.chain_customizer(),
true,
);
let (thread_id, inputs) = if self.inputs.len() == 1 {
let thread =
chain.get_thread_by_source(&self.inputs[0], &self.name, self.outputs.none());

let input =
thread.format_input(self.inputs[0].source(), graph.chain_customizer(), true);

(thread.thread_id, vec![input])
} else {
let thread_id = chain.pipe_inputs(&self.name, &self.inputs, &self.outputs);

let inputs = (0..self.inputs.len())
.map(|input_index| {
let input = format_ident!("input_{}", input_index);
let expect = format!("input {}", input_index);
quote! { let #input = thread_control.#input.take().expect(#expect); }
})
.collect::<Vec<_>>();

(thread_id, inputs)
};

let drop_thread_control = if inputs.is_empty() {
Some(quote! { drop(thread_control); })
} else {
None
};

let body = &self.body;

let thread_body = quote! {
#input
#(
#inputs
)*

#drop_thread_control

move || {
#body
}
};

chain.implement_node_thread(self, thread.thread_id, &thread_body);
chain.implement_node_thread(self, thread_id, &thread_body);

chain.set_thread_main(thread.thread_id, self.name.clone());
chain.set_thread_main(thread_id, self.name.clone());
}

fn all_nodes(&self) -> Box<dyn Iterator<Item = &dyn DynNode> + '_> {
Box::new(Some(self as &dyn DynNode).into_iter())
}
}

pub fn function_terminate<R: TypeResolver + Copy>(
pub fn function_terminate<const N: usize, R: TypeResolver + Copy>(
graph: &mut GraphBuilder<R>,
name: FullyQualifiedName,
inputs: [NodeStream; 1],
inputs: [NodeStream; N],
params: FunctionTerminateParams,
trace: Trace,
) -> ChainResult<FunctionTerminate> {
) -> ChainResult<FunctionTerminate<N>> {
FunctionTerminate::new(graph, name, inputs, params, trace)
}
1 change: 1 addition & 0 deletions datapet_support/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2021"
rust-version = "1.65.0"

[dependencies]
derive_more = "0.99"
derive-new = "0.5"
fallible-iterator = "0.2"
lazy_static = "1"
Expand Down
4 changes: 3 additions & 1 deletion datapet_support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#[macro_use]
extern crate assert_matches;
#[macro_use]
extern crate derive_more;
#[macro_use]
extern crate derive_new;
#[macro_use]
extern crate lazy_static;
Expand Down Expand Up @@ -43,5 +45,5 @@ impl<T> From<SendError<T>> for DatapetError {
}
}

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, new)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Display, Deref, new)]
pub struct AnchorId<const TABLE_ID: usize>(usize);
116 changes: 75 additions & 41 deletions examples/datapet_example_wordlist/build.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use datapet::dtpt;
use datapet::prelude::*;
use datapet_support::AnchorId;
use std::path::Path;

use datapet::{dtpt, prelude::*};
use datapet_support::AnchorId;
use truc::record::type_resolver::{StaticTypeResolver, TypeResolver};

fn main() {
Expand Down Expand Up @@ -51,47 +51,81 @@ use datapet::{
ci_anchor_field: "ci_anchor",
ci_refs_field: "ci_refs",
) [s2, s3, s4]
- function_terminate#term_1(
body: r#"
while let Some(record) = input.next()? {
println!("term_1 {} (id = {:?})", record.token(), record.anchor());
}
Ok(())
"#,
)
)
( < s2
- function_terminate#term_2(
body: r#"
while let Some(record) = input.next()? {
println!("term_2 {} (id = {:?})", record.token(), record.anchor());
}
Ok(())
"#,
)
)
( < s3
- function_terminate#term_3(
- [s2, s3, s4] function_terminate#dot(
body: r#"
while let Some(record) = input.next()? {
println!("term_3 {} (ci id = {:?}) == {}", record.token(), record.ci_anchor(), record.ci_refs().len());
for r in record.ci_refs().iter() {
println!(" {:?}", r.anchor());
println!("digraph wordlist \{{");
let mut input_0 = Some(input_0);
let mut input_1 = Some(input_1);
let mut input_2 = Some(input_2);
let mut input_3 = Some(input_3);
while input_0.is_some() || input_1.is_some() || input_2.is_some() || input_3.is_some() {
let mut read = 0;
if let Some(input) = &input_0 {
let mut closed = false;
for record in input.try_iter() {
if let Some(record) = record {
println!("word_{} [label=\"{}\"]", record.anchor(), record.token());
read += 1;
} else {
closed = true;
}
}
if closed {
input_0 = None;
}
}
if let Some(input) = &input_1 {
let mut closed = false;
for record in input.try_iter() {
if let Some(record) = record {
println!("rev_word_{} [label=\"{}\"]", record.anchor(), record.token());
println!("word_{} -> rev_word_{}", record.anchor(), record.anchor());
read += 1;
} else {
closed = true;
}
}
if closed {
input_1 = None;
}
}
if let Some(input) = &input_2 {
let mut closed = false;
for record in input.try_iter() {
if let Some(record) = record {
println!("ci_word_{} [label=\"{}\"]", record.ci_anchor(), record.token());
for ref_record in record.ci_refs() {
println!("word_{} -> ci_word_{}", ref_record.anchor(), record.ci_anchor());
}
read += 1;
} else {
closed = true;
}
}
if closed {
input_2 = None;
}
}
if let Some(input) = &input_3 {
let mut closed = false;
for record in input.try_iter() {
if let Some(record) = record {
println!("rev_ci_word_{} [label=\"{}\"]", record.ci_anchor(), record.token());
println!("ci_word_{} -> rev_ci_word_{}", record.ci_anchor(), record.ci_anchor());
read += 1;
} else {
closed = true;
}
}
if closed {
input_3 = None;
}
}
if read == 0 {
std::thread::sleep(std::time::Duration::from_millis(42));
}
}
Ok(())
"#,
)
)
( < s4
- function_terminate#term_4(
body: r#"
while let Some(record) = input.next()? {
println!("term_4 {} (ci id = {:?})", record.token(), record.ci_anchor());
}
println!("}}");
Ok(())
"#,
)
Expand Down

0 comments on commit e9d7bb5

Please sign in to comment.