From d15596268917ec0fc0b3451aa06c6cba5f77b9c1 Mon Sep 17 00:00:00 2001 From: "sinu.eth" <65924192+sinui0@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:28:31 -0800 Subject: [PATCH] fix: ideal functionality synchronization (#201) --- crates/mpz-common/Cargo.toml | 2 +- crates/mpz-common/src/ideal.rs | 199 +++--------------- crates/mpz-ole-core/src/ideal.rs | 2 +- crates/mpz-ole/src/ideal.rs | 85 +++++--- crates/mpz-ot-core/src/ideal/cot.rs | 2 +- crates/mpz-ot-core/src/ideal/ot.rs | 2 +- crates/mpz-ot-core/src/ideal/rcot.rs | 2 +- crates/mpz-ot-core/src/ideal/rot.rs | 2 +- crates/mpz-ot/src/ideal/cot.rs | 26 ++- crates/mpz-ot/src/ideal/ot.rs | 26 ++- crates/mpz-ot/src/ideal/rcot.rs | 26 ++- crates/mpz-ot/src/ideal/rot.rs | 26 ++- crates/mpz-ot/src/test.rs | 2 + crates/mpz-share-conversion-core/src/ideal.rs | 8 +- crates/mpz-share-conversion/Cargo.toml | 6 +- crates/mpz-share-conversion/src/ideal.rs | 64 ++++-- crates/mpz-share-conversion/src/lib.rs | 20 +- 17 files changed, 246 insertions(+), 254 deletions(-) diff --git a/crates/mpz-common/Cargo.toml b/crates/mpz-common/Cargo.toml index c50365b5..768d3002 100644 --- a/crates/mpz-common/Cargo.toml +++ b/crates/mpz-common/Cargo.toml @@ -10,7 +10,7 @@ executor = ["cpu", "dep:uid-mux"] sync = ["tokio/sync"] future = [] test-utils = ["dep:uid-mux", "uid-mux/test-utils"] -ideal = [] +ideal = ["tokio/sync"] rayon = ["dep:rayon"] force-st = [] diff --git a/crates/mpz-common/src/ideal.rs b/crates/mpz-common/src/ideal.rs index 804472ef..b42bdf83 100644 --- a/crates/mpz-common/src/ideal.rs +++ b/crates/mpz-common/src/ideal.rs @@ -1,191 +1,58 @@ //! Ideal functionality utilities. -use futures::channel::oneshot; -use std::{ - any::Any, - collections::HashMap, - sync::{Arc, Mutex, MutexGuard}, -}; +use std::sync::Arc; +use tokio::sync::Barrier; -use crate::{Context, ThreadId}; - -type BoxAny = Box; - -#[derive(Debug, Default)] -struct Buffer { - alice: HashMap)>, - bob: HashMap)>, -} - -/// The ideal functionality from the perspective of Alice. -#[derive(Debug)] -pub struct Alice { - f: Arc>, - buffer: Arc>, -} - -impl Clone for Alice { - fn clone(&self) -> Self { - Self { - f: self.f.clone(), - buffer: self.buffer.clone(), - } - } -} - -impl Alice { - /// Returns a lock to the ideal functionality. - pub fn get_mut(&mut self) -> MutexGuard<'_, F> { - self.f.lock().unwrap() - } - - /// Calls the ideal functionality. - pub async fn call(&mut self, ctx: &mut Ctx, input: IA, call: C) -> OA - where - Ctx: Context, - C: FnOnce(&mut F, IA, IB) -> (OA, OB), - IA: Send + 'static, - IB: Send + 'static, - OA: Send + 'static, - OB: Send + 'static, - { - let receiver = { - let mut buffer = self.buffer.lock().unwrap(); - if let Some((input_bob, ret_bob)) = buffer.bob.remove(ctx.id()) { - let input_bob = *input_bob - .downcast() - .expect("alice received correct input type for bob"); - - let (output_alice, output_bob) = - call(&mut self.f.lock().unwrap(), input, input_bob); - - _ = ret_bob.send(Box::new(output_bob)); - - return output_alice; - } - - let (sender, receiver) = oneshot::channel(); - buffer - .alice - .insert(ctx.id().clone(), (Box::new(input), sender)); - receiver - }; - - let output_alice = receiver.await.expect("bob did not drop the channel"); - *output_alice - .downcast() - .expect("bob sent correct output type for alice") - } +/// Creates a new call synchronizer between two parties. +pub fn call_sync() -> (CallSync, CallSync) { + let barrier = Arc::new(Barrier::new(2)); + ( + CallSync { + barrier: Arc::clone(&barrier), + }, + CallSync { barrier }, + ) } -/// The ideal functionality from the perspective of Bob. +/// Synchronizes function calls between two parties. #[derive(Debug)] -pub struct Bob { - f: Arc>, - buffer: Arc>, -} - -impl Clone for Bob { - fn clone(&self) -> Self { - Self { - f: self.f.clone(), - buffer: self.buffer.clone(), - } - } +pub struct CallSync { + barrier: Arc, } -impl Bob { - /// Returns a lock to the ideal functionality. - pub fn get_mut(&mut self) -> MutexGuard<'_, F> { - self.f.lock().unwrap() - } - - /// Calls the ideal functionality. - pub async fn call(&mut self, ctx: &mut Ctx, input: IB, call: C) -> OB - where - Ctx: Context, - C: FnOnce(&mut F, IA, IB) -> (OA, OB), - IA: Send + 'static, - IB: Send + 'static, - OA: Send + 'static, - OB: Send + 'static, - { - let receiver = { - let mut buffer = self.buffer.lock().unwrap(); - if let Some((input_alice, ret_alice)) = buffer.alice.remove(ctx.id()) { - let input_alice = *input_alice - .downcast() - .expect("bob received correct input type for alice"); +impl CallSync { + /// Synchronizes a call. + pub async fn call R, R>(&mut self, mut f: F) -> Option { + // Wait for both parties to call. + let is_leader = self.barrier.wait().await.is_leader(); - let (output_alice, output_bob) = - call(&mut self.f.lock().unwrap(), input_alice, input); + let ret = if is_leader { Some(f()) } else { None }; - _ = ret_alice.send(Box::new(output_alice)); + // Wait for the call to return. + self.barrier.wait().await; - return output_bob; - } - - let (sender, receiver) = oneshot::channel(); - buffer - .bob - .insert(ctx.id().clone(), (Box::new(input), sender)); - receiver - }; - - let output_bob = receiver.await.expect("alice did not drop the channel"); - *output_bob - .downcast() - .expect("alice sent correct output type for bob") + ret } } -/// Creates an ideal functionality, returning the perspectives of Alice and Bob. -pub fn ideal_f2p(f: F) -> (Alice, Bob) { - let f = Arc::new(Mutex::new(f)); - let buffer = Arc::new(Mutex::new(Buffer::default())); - - ( - Alice { - f: f.clone(), - buffer: buffer.clone(), - }, - Bob { f, buffer }, - ) -} - #[cfg(test)] mod test { - use crate::executor::test_st_executor; + use std::sync::Mutex; use super::*; - #[test] - fn test_ideal() { - let (mut alice, mut bob) = ideal_f2p(()); - let (mut ctx_a, mut ctx_b) = test_st_executor(8); + #[tokio::test] + async fn test_call_sync() { + let x = Arc::new(Mutex::new(0)); - let (output_a, output_b) = futures::executor::block_on(async { - futures::join!( - alice.call(&mut ctx_a, 1u8, |&mut (), a: u8, b: u8| (a + b, a + b)), - bob.call(&mut ctx_b, 2u8, |&mut (), a: u8, b: u8| (a + b, a + b)), - ) - }); + let (mut sync_0, mut sync_1) = call_sync(); - assert_eq!(output_a, 3); - assert_eq!(output_b, 3); - } + let add_one = || { + *x.lock().unwrap() += 1; + }; - #[test] - #[should_panic] - fn test_ideal_wrong_input_type() { - let (mut alice, mut bob) = ideal_f2p(()); - let (mut ctx_a, mut ctx_b) = test_st_executor(8); + futures::join!(sync_0.call(add_one.clone()), sync_1.call(add_one)); - futures::executor::block_on(async { - futures::join!( - alice.call(&mut ctx_a, 1u16, |&mut (), a: u16, b: u16| (a + b, a + b)), - bob.call(&mut ctx_b, 2u8, |&mut (), a: u8, b: u8| (a + b, a + b)), - ) - }); + assert_eq!(*x.lock().unwrap(), 1); } } diff --git a/crates/mpz-ole-core/src/ideal.rs b/crates/mpz-ole-core/src/ideal.rs index 21041eac..13cd661d 100644 --- a/crates/mpz-ole-core/src/ideal.rs +++ b/crates/mpz-ole-core/src/ideal.rs @@ -111,7 +111,7 @@ where let sender_count = this.sender_state.alloc; let receiver_count = this.receiver_state.alloc; - sender_count > 0 || receiver_count > 0 && sender_count == receiver_count + sender_count > 0 || receiver_count > 0 } /// Flushes the functionality. diff --git a/crates/mpz-ole/src/ideal.rs b/crates/mpz-ole/src/ideal.rs index f20e6633..d6f0c0dd 100644 --- a/crates/mpz-ole/src/ideal.rs +++ b/crates/mpz-ole/src/ideal.rs @@ -1,37 +1,40 @@ //! Ideal ROLE. use async_trait::async_trait; -use mpz_common::{Context, Flush}; -use mpz_core::Block; +use rand::{rngs::StdRng, Rng, SeedableRng}; + +use mpz_common::{ + ideal::{call_sync, CallSync}, + Context, Flush, +}; use mpz_fields::Field; use mpz_ole_core::{ ideal::{IdealROLE as Core, IdealROLEError}, ROLEReceiver, ROLESender, ROLESenderOutput, }; -/// Ideal ROLE. -#[derive(Debug, Clone)] -pub struct IdealROLE { - core: Core, +/// Returns a new ideal ROLE sender and receiver. +pub fn ideal_role() -> (IdealROLESender, IdealROLEReceiver) { + let mut rng = StdRng::seed_from_u64(0); + let core = Core::new(rng.gen()); + let (sync_0, sync_1) = call_sync(); + ( + IdealROLESender { + core: core.clone(), + sync: sync_0, + }, + IdealROLEReceiver { core, sync: sync_1 }, + ) } -impl IdealROLE -where - F: Field, -{ - /// Create a new ideal ROLE. - /// - /// # Arguments - /// - /// * `seed` - PRG seed. - pub fn new(seed: Block) -> Self { - Self { - core: Core::new(seed), - } - } +/// Ideal ROLE sender. +#[derive(Debug)] +pub struct IdealROLESender { + core: Core, + sync: CallSync, } -impl ROLESender for IdealROLE +impl ROLESender for IdealROLESender where F: Field, { @@ -55,7 +58,38 @@ where } } -impl ROLEReceiver for IdealROLE +#[async_trait] +impl Flush for IdealROLESender +where + Ctx: Context, + F: Field, +{ + type Error = IdealROLEError; + + fn wants_flush(&self) -> bool { + self.core.wants_flush() + } + + async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { + if self.core.wants_flush() { + self.sync + .call(|| self.core.flush().map_err(IdealROLEError::from)) + .await + .transpose()?; + } + + Ok(()) + } +} + +/// Ideal ROLE Receiver. +#[derive(Debug)] +pub struct IdealROLEReceiver { + core: Core, + sync: CallSync, +} + +impl ROLEReceiver for IdealROLEReceiver where F: Field, { @@ -83,7 +117,7 @@ where } #[async_trait] -impl Flush for IdealROLE +impl Flush for IdealROLEReceiver where Ctx: Context, F: Field, @@ -96,7 +130,10 @@ where async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { if self.core.wants_flush() { - self.core.flush()?; + self.sync + .call(|| self.core.flush().map_err(IdealROLEError::from)) + .await + .transpose()?; } Ok(()) diff --git a/crates/mpz-ot-core/src/ideal/cot.rs b/crates/mpz-ot-core/src/ideal/cot.rs index 6e1b71b6..101f757b 100644 --- a/crates/mpz-ot-core/src/ideal/cot.rs +++ b/crates/mpz-ot-core/src/ideal/cot.rs @@ -96,7 +96,7 @@ impl IdealCOT { let sender_queue = this.sender_state.queue.len(); let receiver_queue = this.receiver_state.queue.len(); - sender_queue > 0 && receiver_queue > 0 && sender_queue == receiver_queue + sender_queue > 0 || receiver_queue > 0 } /// Flushes the functionality. diff --git a/crates/mpz-ot-core/src/ideal/ot.rs b/crates/mpz-ot-core/src/ideal/ot.rs index 6ab3d411..2b10a4c3 100644 --- a/crates/mpz-ot-core/src/ideal/ot.rs +++ b/crates/mpz-ot-core/src/ideal/ot.rs @@ -52,7 +52,7 @@ impl IdealOT { let sender_queue = this.sender_state.queue.len(); let receiver_queue = this.receiver_state.queue.len(); - sender_queue > 0 && receiver_queue > 0 && sender_queue == receiver_queue + sender_queue > 0 || receiver_queue > 0 } /// Flushes the functionality. diff --git a/crates/mpz-ot-core/src/ideal/rcot.rs b/crates/mpz-ot-core/src/ideal/rcot.rs index e571c937..91e0da46 100644 --- a/crates/mpz-ot-core/src/ideal/rcot.rs +++ b/crates/mpz-ot-core/src/ideal/rcot.rs @@ -93,7 +93,7 @@ impl IdealRCOT { let sender_count = this.sender_state.alloc; let receiver_count = this.receiver_state.alloc; - sender_count > 0 && receiver_count > 0 && sender_count == receiver_count + sender_count > 0 || receiver_count > 0 } /// Flushes pending operations. diff --git a/crates/mpz-ot-core/src/ideal/rot.rs b/crates/mpz-ot-core/src/ideal/rot.rs index de189d50..6d23c09d 100644 --- a/crates/mpz-ot-core/src/ideal/rot.rs +++ b/crates/mpz-ot-core/src/ideal/rot.rs @@ -81,7 +81,7 @@ impl IdealROT { let sender_count = this.sender_state.alloc; let receiver_count = this.receiver_state.alloc; - sender_count > 0 && receiver_count > 0 && sender_count == receiver_count + sender_count > 0 || receiver_count > 0 } /// Flushes the functionality. diff --git a/crates/mpz-ot/src/ideal/cot.rs b/crates/mpz-ot/src/ideal/cot.rs index 58e76b67..69b64d93 100644 --- a/crates/mpz-ot/src/ideal/cot.rs +++ b/crates/mpz-ot/src/ideal/cot.rs @@ -1,7 +1,11 @@ //! Ideal functionality for correlated OT. use async_trait::async_trait; -use mpz_common::Flush; + +use mpz_common::{ + ideal::{call_sync, CallSync}, + Flush, +}; use mpz_core::Block; use mpz_ot_core::{ cot::{COTReceiver, COTSender}, @@ -11,15 +15,20 @@ use mpz_ot_core::{ /// Returns a new ideal COT sender and receiver. pub fn ideal_cot(delta: Block) -> (IdealCOTSender, IdealCOTReceiver) { let core = Core::new(delta); + let (sync_0, sync_1) = call_sync(); ( - IdealCOTSender { core: core.clone() }, - IdealCOTReceiver { core }, + IdealCOTSender { + core: core.clone(), + sync: sync_0, + }, + IdealCOTReceiver { core, sync: sync_1 }, ) } /// Ideal COT sender. pub struct IdealCOTSender { core: Core, + sync: CallSync, } impl COTSender for IdealCOTSender { @@ -53,7 +62,10 @@ impl Flush for IdealCOTSender { async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { if self.core.wants_flush() { - self.core.flush().map_err(IdealCOTError::from)?; + self.sync + .call(|| self.core.flush().map_err(IdealCOTError::from)) + .await + .transpose()?; } Ok(()) @@ -63,6 +75,7 @@ impl Flush for IdealCOTSender { /// Ideal COT receiver. pub struct IdealCOTReceiver { core: Core, + sync: CallSync, } impl COTReceiver for IdealCOTReceiver { @@ -92,7 +105,10 @@ impl Flush for IdealCOTReceiver { async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { if self.core.wants_flush() { - self.core.flush().map_err(IdealCOTError::from)?; + self.sync + .call(|| self.core.flush().map_err(IdealCOTError::from)) + .await + .transpose()?; } Ok(()) diff --git a/crates/mpz-ot/src/ideal/ot.rs b/crates/mpz-ot/src/ideal/ot.rs index 43d46f10..61f6ab05 100644 --- a/crates/mpz-ot/src/ideal/ot.rs +++ b/crates/mpz-ot/src/ideal/ot.rs @@ -1,7 +1,11 @@ //! Ideal functionality for chosen-message oblivious transfer. use async_trait::async_trait; -use mpz_common::Flush; + +use mpz_common::{ + ideal::{call_sync, CallSync}, + Flush, +}; use mpz_core::Block; use mpz_ot_core::{ ideal::ot::{IdealOT as Core, IdealOTError as CoreError}, @@ -11,15 +15,20 @@ use mpz_ot_core::{ /// Returns a new ideal OT sender and receiver. pub fn ideal_ot() -> (IdealOTSender, IdealOTReceiver) { let core = Core::new(); + let (sync_0, sync_1) = call_sync(); ( - IdealOTSender { core: core.clone() }, - IdealOTReceiver { core }, + IdealOTSender { + core: core.clone(), + sync: sync_0, + }, + IdealOTReceiver { core, sync: sync_1 }, ) } /// Ideal OT sender. pub struct IdealOTSender { core: Core, + sync: CallSync, } impl OTSender for IdealOTSender { @@ -45,7 +54,10 @@ impl Flush for IdealOTSender { async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { if self.core.wants_flush() { - self.core.flush().map_err(IdealOTError::from)?; + self.sync + .call(|| self.core.flush().map_err(IdealOTError::from)) + .await + .transpose()?; } Ok(()) @@ -55,6 +67,7 @@ impl Flush for IdealOTSender { /// Ideal OT receiver. pub struct IdealOTReceiver { core: Core, + sync: CallSync, } impl OTReceiver for IdealOTReceiver { @@ -80,7 +93,10 @@ impl Flush for IdealOTReceiver { async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { if self.core.wants_flush() { - self.core.flush().map_err(IdealOTError::from)?; + self.sync + .call(|| self.core.flush().map_err(IdealOTError::from)) + .await + .transpose()?; } Ok(()) diff --git a/crates/mpz-ot/src/ideal/rcot.rs b/crates/mpz-ot/src/ideal/rcot.rs index 7afb46ab..2bc776bf 100644 --- a/crates/mpz-ot/src/ideal/rcot.rs +++ b/crates/mpz-ot/src/ideal/rcot.rs @@ -1,7 +1,11 @@ //! Ideal functionality for random correlated OT. use async_trait::async_trait; -use mpz_common::Flush; + +use mpz_common::{ + ideal::{call_sync, CallSync}, + Flush, +}; use mpz_core::Block; use mpz_ot_core::{ ideal::rcot::{IdealRCOT as Core, IdealRCOTError as CoreError}, @@ -11,15 +15,20 @@ use mpz_ot_core::{ /// Returns a new ideal RCOT sender and receiver. pub fn ideal_rcot(seed: Block, delta: Block) -> (IdealRCOTSender, IdealRCOTReceiver) { let core = Core::new(seed, delta); + let (sync_0, sync_1) = call_sync(); ( - IdealRCOTSender { core: core.clone() }, - IdealRCOTReceiver { core }, + IdealRCOTSender { + core: core.clone(), + sync: sync_0, + }, + IdealRCOTReceiver { core, sync: sync_1 }, ) } /// Ideal RCOT sender. pub struct IdealRCOTSender { core: Core, + sync: CallSync, } impl RCOTSender for IdealRCOTSender { @@ -57,7 +66,10 @@ impl Flush for IdealRCOTSender { async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { if self.core.wants_flush() { - self.core.flush().map_err(IdealRCOTError::from)?; + self.sync + .call(|| self.core.flush().map_err(IdealRCOTError::from)) + .await + .transpose()?; } Ok(()) @@ -67,6 +79,7 @@ impl Flush for IdealRCOTSender { /// Ideal RCOT receiver. pub struct IdealRCOTReceiver { core: Core, + sync: CallSync, } impl RCOTReceiver for IdealRCOTReceiver { @@ -103,7 +116,10 @@ impl Flush for IdealRCOTReceiver { async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { if self.core.wants_flush() { - self.core.flush().map_err(IdealRCOTError::from)?; + self.sync + .call(|| self.core.flush().map_err(IdealRCOTError::from)) + .await + .transpose()?; } Ok(()) diff --git a/crates/mpz-ot/src/ideal/rot.rs b/crates/mpz-ot/src/ideal/rot.rs index 9f1d6805..7fcd566d 100644 --- a/crates/mpz-ot/src/ideal/rot.rs +++ b/crates/mpz-ot/src/ideal/rot.rs @@ -1,7 +1,11 @@ //! Ideal functionality for random correlated OT. use async_trait::async_trait; -use mpz_common::Flush; + +use mpz_common::{ + ideal::{call_sync, CallSync}, + Flush, +}; use mpz_core::Block; use mpz_ot_core::{ ideal::rot::{IdealROT as Core, IdealROTError as CoreError}, @@ -11,15 +15,20 @@ use mpz_ot_core::{ /// Returns a new ideal ROT sender and receiver. pub fn ideal_rot(seed: Block) -> (IdealROTSender, IdealROTReceiver) { let core = Core::new(seed); + let (sync_0, sync_1) = call_sync(); ( - IdealROTSender { core: core.clone() }, - IdealROTReceiver { core }, + IdealROTSender { + core: core.clone(), + sync: sync_0, + }, + IdealROTReceiver { core, sync: sync_1 }, ) } /// Ideal ROT sender. pub struct IdealROTSender { core: Core, + sync: CallSync, } impl ROTSender<[Block; 2]> for IdealROTSender { @@ -53,7 +62,10 @@ impl Flush for IdealROTSender { async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { if self.core.wants_flush() { - self.core.flush().map_err(IdealROTError::from)?; + self.sync + .call(|| self.core.flush().map_err(IdealROTError::from)) + .await + .transpose()?; } Ok(()) @@ -63,6 +75,7 @@ impl Flush for IdealROTSender { /// Ideal OT receiver. pub struct IdealROTReceiver { core: Core, + sync: CallSync, } impl ROTReceiver for IdealROTReceiver { @@ -99,7 +112,10 @@ impl Flush for IdealROTReceiver { async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { if self.core.wants_flush() { - self.core.flush().map_err(IdealROTError::from)?; + self.sync + .call(|| self.core.flush().map_err(IdealROTError::from)) + .await + .transpose()?; } Ok(()) diff --git a/crates/mpz-ot/src/test.rs b/crates/mpz-ot/src/test.rs index e67491c8..4c784711 100644 --- a/crates/mpz-ot/src/test.rs +++ b/crates/mpz-ot/src/test.rs @@ -1,3 +1,5 @@ +//! Test utilities. + use mpz_common::{ executor::{test_st_executor, TestSTExecutor}, Flush, diff --git a/crates/mpz-share-conversion-core/src/ideal.rs b/crates/mpz-share-conversion-core/src/ideal.rs index 174c76bd..885b956d 100644 --- a/crates/mpz-share-conversion-core/src/ideal.rs +++ b/crates/mpz-share-conversion-core/src/ideal.rs @@ -128,12 +128,8 @@ where let receiver_a2m_queue = self.receiver_state.a2m_queue.len(); let receiver_m2a_queue = self.receiver_state.m2a_queue.len(); - let wants_a2m = sender_a2m_queue > 0 - && receiver_a2m_queue > 0 - && sender_a2m_queue == receiver_a2m_queue; - let wants_m2a = sender_m2a_queue > 0 - && receiver_m2a_queue > 0 - && sender_m2a_queue == receiver_m2a_queue; + let wants_a2m = sender_a2m_queue > 0 || receiver_a2m_queue > 0; + let wants_m2a = sender_m2a_queue > 0 || receiver_m2a_queue > 0; wants_a2m || wants_m2a } diff --git a/crates/mpz-share-conversion/Cargo.toml b/crates/mpz-share-conversion/Cargo.toml index a456db6b..1cb6f1b8 100644 --- a/crates/mpz-share-conversion/Cargo.toml +++ b/crates/mpz-share-conversion/Cargo.toml @@ -28,7 +28,11 @@ rand.workspace = true [dev-dependencies] mpz-share-conversion-core = { workspace = true, features = ["test-utils"] } mpz-ole = { workspace = true, features = ["test-utils"] } -mpz-common = { workspace = true, features = ["executor", "test-utils"] } +mpz-common = { workspace = true, features = [ + "executor", + "test-utils", + "ideal", +] } mpz-core.workspace = true tokio = { workspace = true, features = [ "net", diff --git a/crates/mpz-share-conversion/src/ideal.rs b/crates/mpz-share-conversion/src/ideal.rs index 250138a7..5f87945a 100644 --- a/crates/mpz-share-conversion/src/ideal.rs +++ b/crates/mpz-share-conversion/src/ideal.rs @@ -1,8 +1,11 @@ //! Ideal functionalities. -use crate::{AdditiveToMultiplicative, MultiplicativeToAdditive}; use async_trait::async_trait; -use mpz_common::Flush; + +use mpz_common::{ + ideal::{call_sync, CallSync}, + Flush, +}; use mpz_core::Block; use mpz_fields::Field; use mpz_share_conversion_core::ideal::{ @@ -10,20 +13,32 @@ use mpz_share_conversion_core::ideal::{ IdealShareConvertReceiver as CoreReceiver, IdealShareConvertSender as CoreSender, }; +use crate::{AdditiveToMultiplicative, MultiplicativeToAdditive}; + /// Create a pair of ideal share converters. pub fn ideal_share_convert( seed: Block, ) -> (IdealShareConvertSender, IdealShareConvertReceiver) { let (core_sender, core_receiver) = core_ideal_share_convert(seed); + let (sync_0, sync_1) = call_sync(); ( - IdealShareConvertSender(core_sender), - IdealShareConvertReceiver(core_receiver), + IdealShareConvertSender { + core: core_sender, + sync: sync_0, + }, + IdealShareConvertReceiver { + core: core_receiver, + sync: sync_1, + }, ) } /// Ideal share conversion sender. #[derive(Debug)] -pub struct IdealShareConvertSender(CoreSender); +pub struct IdealShareConvertSender { + core: CoreSender, + sync: CallSync, +} impl AdditiveToMultiplicative for IdealShareConvertSender where @@ -33,11 +48,11 @@ where type Future = as AdditiveToMultiplicative>::Future; fn alloc(&mut self, count: usize) -> Result<(), Self::Error> { - AdditiveToMultiplicative::alloc(&mut self.0, count) + AdditiveToMultiplicative::alloc(&mut self.core, count) } fn queue_to_multiplicative(&mut self, inputs: &[F]) -> Result { - self.0.queue_to_multiplicative(inputs) + self.core.queue_to_multiplicative(inputs) } } @@ -49,11 +64,11 @@ where type Future = as MultiplicativeToAdditive>::Future; fn alloc(&mut self, count: usize) -> Result<(), Self::Error> { - MultiplicativeToAdditive::alloc(&mut self.0, count) + MultiplicativeToAdditive::alloc(&mut self.core, count) } fn queue_to_additive(&mut self, inputs: &[F]) -> Result { - self.0.queue_to_additive(inputs) + self.core.queue_to_additive(inputs) } } @@ -65,12 +80,15 @@ where type Error = IdealShareConvertError; fn wants_flush(&self) -> bool { - self.0.wants_flush() + self.core.wants_flush() } async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { - if self.0.wants_flush() { - self.0.flush()?; + if self.core.wants_flush() { + self.sync + .call(|| self.core.flush().map_err(IdealShareConvertError::from)) + .await + .transpose()?; } Ok(()) @@ -79,7 +97,10 @@ where /// Ideal share conversion receiver. #[derive(Debug)] -pub struct IdealShareConvertReceiver(CoreReceiver); +pub struct IdealShareConvertReceiver { + core: CoreReceiver, + sync: CallSync, +} impl AdditiveToMultiplicative for IdealShareConvertReceiver where @@ -89,11 +110,11 @@ where type Future = as AdditiveToMultiplicative>::Future; fn alloc(&mut self, count: usize) -> Result<(), Self::Error> { - AdditiveToMultiplicative::alloc(&mut self.0, count) + AdditiveToMultiplicative::alloc(&mut self.core, count) } fn queue_to_multiplicative(&mut self, inputs: &[F]) -> Result { - self.0.queue_to_multiplicative(inputs) + self.core.queue_to_multiplicative(inputs) } } @@ -105,11 +126,11 @@ where type Future = as MultiplicativeToAdditive>::Future; fn alloc(&mut self, count: usize) -> Result<(), Self::Error> { - MultiplicativeToAdditive::alloc(&mut self.0, count) + MultiplicativeToAdditive::alloc(&mut self.core, count) } fn queue_to_additive(&mut self, inputs: &[F]) -> Result { - self.0.queue_to_additive(inputs) + self.core.queue_to_additive(inputs) } } @@ -121,12 +142,15 @@ where type Error = IdealShareConvertError; fn wants_flush(&self) -> bool { - self.0.wants_flush() + self.core.wants_flush() } async fn flush(&mut self, _ctx: &mut Ctx) -> Result<(), Self::Error> { - if self.0.wants_flush() { - self.0.flush()?; + if self.core.wants_flush() { + self.sync + .call(|| self.core.flush().map_err(IdealShareConvertError::from)) + .await + .transpose()?; } Ok(()) diff --git a/crates/mpz-share-conversion/src/lib.rs b/crates/mpz-share-conversion/src/lib.rs index a511e1f4..7fd31e71 100644 --- a/crates/mpz-share-conversion/src/lib.rs +++ b/crates/mpz-share-conversion/src/lib.rs @@ -22,28 +22,26 @@ pub use sender::{SenderError, ShareConversionSender}; #[cfg(test)] mod tests { use super::*; - use mpz_core::Block; use mpz_fields::{gf2_128::Gf2_128, p256::P256}; - use mpz_ole::ideal::IdealROLE; - use rand::{rngs::StdRng, SeedableRng}; + use mpz_ole::ideal::ideal_role; use test::test_share_convert; #[tokio::test] async fn test_share_convert_p256() { - let mut rng = StdRng::seed_from_u64(0); - let ideal_role = IdealROLE::new(Block::random(&mut rng)); - let sender = ShareConversionSender::<_, P256>::new(ideal_role.clone()); - let receiver = ShareConversionReceiver::<_, P256>::new(ideal_role); + let (role_sender, role_receiver) = ideal_role(); + + let sender = ShareConversionSender::<_, P256>::new(role_sender); + let receiver = ShareConversionReceiver::<_, P256>::new(role_receiver); test_share_convert(sender, receiver, 8).await; } #[tokio::test] async fn test_share_convert_gf2_128() { - let mut rng = StdRng::seed_from_u64(0); - let ideal_role = IdealROLE::new(Block::random(&mut rng)); - let sender = ShareConversionSender::<_, Gf2_128>::new(ideal_role.clone()); - let receiver = ShareConversionReceiver::<_, Gf2_128>::new(ideal_role); + let (role_sender, role_receiver) = ideal_role(); + + let sender = ShareConversionSender::<_, Gf2_128>::new(role_sender); + let receiver = ShareConversionReceiver::<_, Gf2_128>::new(role_receiver); test_share_convert(sender, receiver, 8).await; }