diff --git a/examples/columnar.rs b/examples/columnar.rs index 2f7b71909..81c39eec8 100644 --- a/examples/columnar.rs +++ b/examples/columnar.rs @@ -43,8 +43,8 @@ fn main() { let data = data_input.to_stream(scope); let keys = keys_input.to_stream(scope); - let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.len() as u64); - let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.len() as u64); + let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().sum::() as u64); + let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().sum::() as u64); let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); @@ -447,7 +447,9 @@ pub mod batcher { } } - self.ready.push_back(std::mem::take(&mut self.empty)); + if !self.empty.is_empty() { + self.ready.push_back(std::mem::take(&mut self.empty)); + } } } }