diff --git a/ipa-core/src/query/runner/hybrid.rs b/ipa-core/src/query/runner/hybrid.rs index 7ed56c69b..6afaf3f45 100644 --- a/ipa-core/src/query/runner/hybrid.rs +++ b/ipa-core/src/query/runner/hybrid.rs @@ -46,6 +46,7 @@ use crate::{ replicated::semi_honest::AdditiveShare as Replicated, BitDecomposed, TransposeFrom, Vectorizable, }, + seq_join::seq_join, sharding::{ShardConfiguration, Sharded}, }; @@ -130,10 +131,12 @@ where })) }) .try_flatten() - .take(sz); + .take(sz) + .map(|v| async move { v }); + let (decrypted_reports, resharded_tags) = reshard_aad( ctx.narrow(&HybridStep::ReshardByTag), - stream, + seq_join(ctx.active_work(), stream), |ctx, _, tag| tag.shard_picker(ctx.shard_count()), ) .await?; @@ -344,7 +347,7 @@ mod tests { } // cannot test for Err directly because join3v calls unwrap. This should be sufficient. - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 3)] #[should_panic(expected = "DuplicateBytes")] async fn duplicate_encrypted_hybrid_reports() { const SHARDS: usize = 2;