From 9760121c6366632f73232be5a873698c68afa88b Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Wed, 18 Dec 2024 16:35:52 -0800 Subject: [PATCH] add flag to encrypt_util for length delimited binary format --- ipa-core/src/cli/crypto/hybrid_decrypt.rs | 22 +++- ipa-core/src/cli/crypto/hybrid_encrypt.rs | 148 ++++++++++++++++++---- ipa-core/src/cli/crypto/mod.rs | 11 +- 3 files changed, 144 insertions(+), 37 deletions(-) diff --git a/ipa-core/src/cli/crypto/hybrid_decrypt.rs b/ipa-core/src/cli/crypto/hybrid_decrypt.rs index be705e062..285df9e7e 100644 --- a/ipa-core/src/cli/crypto/hybrid_decrypt.rs +++ b/ipa-core/src/cli/crypto/hybrid_decrypt.rs @@ -226,9 +226,14 @@ mod tests { let output_dir = tempdir().unwrap(); let network_file = hybrid_sample_data::test_keys().network_config(); - HybridEncryptArgs::new(input_file.path(), output_dir.path(), network_file.path()) - .encrypt() - .unwrap(); + HybridEncryptArgs::new( + input_file.path(), + output_dir.path(), + network_file.path(), + false, + ) + .encrypt() + .unwrap(); let decrypt_output = output_dir.path().join("output"); let enc1 = output_dir.path().join("DOES_NOT_EXIST.enc"); @@ -258,9 +263,14 @@ mod tests { let network_file = hybrid_sample_data::test_keys().network_config(); let output_dir = tempdir().unwrap(); - HybridEncryptArgs::new(input_file.path(), output_dir.path(), network_file.path()) - .encrypt() - .unwrap(); + HybridEncryptArgs::new( + input_file.path(), + output_dir.path(), + network_file.path(), + false, + ) + .encrypt() + .unwrap(); let decrypt_output = output_dir.path().join("output"); let enc1 = output_dir.path().join("helper1.enc"); diff --git a/ipa-core/src/cli/crypto/hybrid_encrypt.rs b/ipa-core/src/cli/crypto/hybrid_encrypt.rs index 1a60d6944..c90bc16e5 100644 --- a/ipa-core/src/cli/crypto/hybrid_encrypt.rs +++ b/ipa-core/src/cli/crypto/hybrid_encrypt.rs @@ -52,15 +52,38 @@ pub struct HybridEncryptArgs { /// Path to helper network configuration file #[arg(long)] network: PathBuf, + /// a flag to produce length delimited binary instead of newline delimited hex + #[arg(long)] + length_delimited: bool, +} + +#[derive(Copy, Clone)] +enum FileFormat { + LengthDelimitedBinary, + NewlineDelimitedHex, } impl HybridEncryptArgs { #[must_use] - pub fn new(input_file: &Path, output_dir: &Path, network: &Path) -> Self { + pub fn new( + input_file: &Path, + output_dir: &Path, + network: &Path, + length_delimited: bool, + ) -> Self { Self { input_file: input_file.to_path_buf(), output_dir: output_dir.to_path_buf(), network: network.to_path_buf(), + length_delimited, + } + } + + fn file_format(&self) -> FileFormat { + if self.length_delimited { + FileFormat::LengthDelimitedBinary + } else { + FileFormat::NewlineDelimitedHex } } @@ -89,7 +112,8 @@ impl HybridEncryptArgs { panic!("could not load network file") }; - let mut worker_pool = ReportWriter::new(key_registries, &self.output_dir); + let mut worker_pool = + ReportWriter::new(key_registries, &self.output_dir, self.file_format()); for (report_id, record) in input.iter::().enumerate() { worker_pool.submit(report_id, record.share())?; } @@ -118,6 +142,7 @@ impl EncryptorPool { thread_count: usize, file_writer: [SyncSender; 3], key_registries: [KeyRegistry; 3], + file_format: FileFormat, ) -> Self { Self { pool: (0..thread_count) @@ -132,11 +157,23 @@ impl EncryptorPool { .spawn(move || { for (i, helper_id, report) in rx { let key_registry = &key_registries[helper_id]; - let output = report.encrypt( - DEFAULT_KEY_ID, - key_registry, - &mut thread_rng(), - )?; + let mut output = + Vec::with_capacity(usize::from(report.encrypted_len() + 2)); + match file_format { + FileFormat::NewlineDelimitedHex => report.encrypt_to( + DEFAULT_KEY_ID, + key_registry, + &mut thread_rng(), + &mut output, + )?, + FileFormat::LengthDelimitedBinary => report + .delimited_encrypt_to( + DEFAULT_KEY_ID, + key_registry, + &mut thread_rng(), + &mut output, + )?, + } file_writer[helper_id].send((i, output))?; } @@ -178,7 +215,11 @@ struct ReportWriter { } impl ReportWriter { - pub fn new(key_registries: [KeyRegistry; 3], output_dir: &Path) -> Self { + pub fn new( + key_registries: [KeyRegistry; 3], + output_dir: &Path, + file_format: FileFormat, + ) -> Self { // create 3 worker threads to write data into 3 files let workers = array::from_fn(|i| { let output_filename = format!("helper{}.enc", i + 1); @@ -188,12 +229,13 @@ impl ReportWriter { .open(output_dir.join(&output_filename)) .unwrap_or_else(|e| panic!("unable write to {:?}. {}", &output_filename, e)); - FileWriteWorker::new(file) + FileWriteWorker::new(file, file_format) }); let encryptor_pool = EncryptorPool::with_worker_threads( num_cpus::get(), workers.each_ref().map(|x| x.sender.clone()), key_registries, + file_format, ); Self { @@ -239,17 +281,26 @@ struct FileWriteWorker { } impl FileWriteWorker { - pub fn new(file: File) -> Self { + pub fn new(file: File, file_format: FileFormat) -> Self { + fn write_report( + writer: &mut W, + report: &[u8], + file_format: FileFormat, + ) -> Result<(), BoxError> { + match file_format { + FileFormat::LengthDelimitedBinary => { + FileWriteWorker::write_report_length_delimited_binary(writer, report) + } + FileFormat::NewlineDelimitedHex => { + FileWriteWorker::write_report_newline_delimited_hex(writer, report) + } + } + } + let (tx, rx) = std::sync::mpsc::sync_channel(65535); Self { sender: tx, handle: thread::spawn(move || { - fn write_report(writer: &mut W, report: &[u8]) -> Result<(), BoxError> { - let hex_output = hex::encode(report); - writeln!(writer, "{hex_output}")?; - Ok(()) - } - // write low watermark. All reports below this line have been written let mut lw = 0; let mut pending_reports = BTreeMap::new(); @@ -271,7 +322,7 @@ impl FileWriteWorker { "Internal error: received a duplicate report {report_id}" ); while let Some(report) = pending_reports.remove(&lw) { - write_report(&mut writer, &report)?; + write_report(&mut writer, &report, file_format)?; lw += 1; if lw % 1_000_000 == 0 { tracing::info!("Encrypted {}M reports", lw / 1_000_000); @@ -282,6 +333,23 @@ impl FileWriteWorker { }), } } + + fn write_report_newline_delimited_hex( + writer: &mut W, + report: &[u8], + ) -> Result<(), BoxError> { + let hex_output = hex::encode(report); + writeln!(writer, "{hex_output}")?; + Ok(()) + } + + fn write_report_length_delimited_binary( + writer: &mut W, + report: &[u8], + ) -> Result<(), BoxError> { + writer.write_all(report)?; + Ok(()) + } } #[cfg(all(test, unit_test))] @@ -334,12 +402,26 @@ mod tests { } input_file.flush().unwrap(); - let output_dir = tempdir().unwrap(); + let output_dir_1 = tempdir().unwrap(); + let output_dir_2 = tempdir().unwrap(); let network_file = sample_data::test_keys().network_config(); - HybridEncryptArgs::new(input_file.path(), output_dir.path(), network_file.path()) - .encrypt() - .unwrap(); + HybridEncryptArgs::new( + input_file.path(), + output_dir_1.path(), + network_file.path(), + false, + ) + .encrypt() + .unwrap(); + HybridEncryptArgs::new( + input_file.path(), + output_dir_2.path(), + network_file.path(), + true, + ) + .encrypt() + .unwrap(); } #[test] @@ -350,7 +432,7 @@ mod tests { let output_dir = tempdir().unwrap(); let network_dir = tempdir().unwrap(); let network_file = network_dir.path().join("does_not_exist"); - HybridEncryptArgs::new(input_file.path(), output_dir.path(), &network_file) + HybridEncryptArgs::new(input_file.path(), output_dir.path(), &network_file, true) .encrypt() .unwrap(); } @@ -368,9 +450,14 @@ this is not toml! let mut network_file = NamedTempFile::new().unwrap(); writeln!(network_file.as_file_mut(), "{network_data}").unwrap(); - HybridEncryptArgs::new(input_file.path(), output_dir.path(), network_file.path()) - .encrypt() - .unwrap(); + HybridEncryptArgs::new( + input_file.path(), + output_dir.path(), + network_file.path(), + true, + ) + .encrypt() + .unwrap(); } #[test] @@ -392,8 +479,13 @@ public_key = "cfdbaaff16b30aa8a4ab07eaad2cdd80458208a1317aefbb807e46dce596617e" let mut network_file = NamedTempFile::new().unwrap(); writeln!(network_file.as_file_mut(), "{network_data}").unwrap(); - HybridEncryptArgs::new(input_file.path(), output_dir.path(), network_file.path()) - .encrypt() - .unwrap(); + HybridEncryptArgs::new( + input_file.path(), + output_dir.path(), + network_file.path(), + true, + ) + .encrypt() + .unwrap(); } } diff --git a/ipa-core/src/cli/crypto/mod.rs b/ipa-core/src/cli/crypto/mod.rs index ac0fd1a06..47180efef 100644 --- a/ipa-core/src/cli/crypto/mod.rs +++ b/ipa-core/src/cli/crypto/mod.rs @@ -345,9 +345,14 @@ mod tests { let input = hybrid_sample_data::test_hybrid_data().take(10); let input_file = hybrid_sample_data::write_csv(input).unwrap(); let network_file = hybrid_sample_data::test_keys().network_config(); - HybridEncryptArgs::new(input_file.path(), output_dir.path(), network_file.path()) - .encrypt() - .unwrap(); + HybridEncryptArgs::new( + input_file.path(), + output_dir.path(), + network_file.path(), + false, + ) + .encrypt() + .unwrap(); let decrypt_output = output_dir.path().join("output"); let enc1 = output_dir.path().join("helper1.enc");