From d5ecc8ac85fc8ed601eaa19df7c7623678780052 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Mon, 9 Oct 2023 10:54:53 +0200 Subject: [PATCH 01/13] Attempt to fix contacts sync by saving them every time we receive them --- presage/src/manager.rs | 66 +++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/presage/src/manager.rs b/presage/src/manager.rs index f06bdc529..d58c96ace 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -630,28 +630,6 @@ impl Manager { Ok(()) } - async fn wait_for_contacts_sync( - &mut self, - mut messages: impl Stream + Unpin, - ) -> Result<(), Error> { - let mut message_receiver = MessageReceiver::new(self.push_service()?); - while let Some(Content { body, .. }) = messages.next().await { - if let ContentBody::SynchronizeMessage(SyncMessage { - contacts: Some(contacts), - .. - }) = body - { - let contacts = message_receiver.retrieve_contacts(&contacts).await?; - let _ = self.config_store.clear_contacts(); - self.config_store - .save_contacts(contacts.filter_map(Result::ok))?; - info!("saved contacts"); - return Ok(()); - } - } - Ok(()) - } - async fn sync_contacts(&mut self) -> Result<(), Error> { let messages = self.receive_messages_stream(true).await?; pin_mut!(messages); @@ -660,12 +638,11 @@ impl Manager { info!("waiting for contacts sync for up to 60 seconds"); - tokio::time::timeout( - Duration::from_secs(60), - self.wait_for_contacts_sync(messages), - ) + tokio::time::timeout(Duration::from_secs(60), async move { + while messages.next().await.is_some() {} + }) .await - .map_err(Error::from)??; + .map_err(Error::from)?; Ok(()) } @@ -876,22 +853,24 @@ impl Manager { async fn receive_messages_stream( &mut self, - include_internal_events: bool, + stop_on_contacts_sync: bool, ) -> Result, Error> { struct StreamState { encrypted_messages: S, + message_receiver: MessageReceiver, service_cipher: ServiceCipher, config_store: C, groups_manager: GroupsManager, - include_internal_events: bool, + stop_on_contacts_sync: bool, } let init = StreamState { encrypted_messages: Box::pin(self.receive_messages_encrypted().await?), + message_receiver: MessageReceiver::new(self.push_service()?), service_cipher: self.new_service_cipher()?, config_store: self.config_store.clone(), groups_manager: self.groups_manager()?, - include_internal_events, + stop_on_contacts_sync, }; Ok(futures::stream::unfold(init, |mut state| async move { @@ -902,12 +881,33 @@ impl Manager { Ok(Some(content)) => { // contacts synchronization sent from the primary device (happens after linking, or on demand) if let ContentBody::SynchronizeMessage(SyncMessage { - contacts: Some(_), + contacts: Some(contacts), .. }) = &content.body { - if state.include_internal_events { - return Some((content, state)); + match state.message_receiver.retrieve_contacts(&contacts).await + { + Ok(contacts) => { + let _ = state.config_store.clear_contacts(); + match state + .config_store + .save_contacts(contacts.filter_map(Result::ok)) + { + Ok(()) => { + info!("saved contacts"); + } + Err(e) => { + warn!("failed to save contacts: {e}"); + } + } + } + Err(e) => { + warn!("failed to retrieve contacts: {e}"); + } + } + + if state.stop_on_contacts_sync { + return None; } else { continue; } From 3e39eb81d1f66ec14b42428832003c675b7dfe13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Mon, 9 Oct 2023 10:57:04 +0200 Subject: [PATCH 02/13] Fix quirks feature of CLI --- presage-cli/src/main.rs | 42 +++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index 7398b6d12..9cae6f43e 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -205,21 +205,11 @@ async fn main() -> anyhow::Result<()> { } async fn send( - msg: &str, - uuid: &Uuid, manager: &mut Manager, + uuid: &Uuid, + content_body: impl Into, + timestamp: u64, ) -> anyhow::Result<()> { - let timestamp = std::time::SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis() as u64; - - let message = ContentBody::DataMessage(DataMessage { - body: Some(msg.to_string()), - timestamp: Some(timestamp), - ..Default::default() - }); - let local = task::LocalSet::new(); local @@ -234,7 +224,7 @@ async fn send( sleep(Duration::from_secs(5)).await; manager - .send_message(*uuid, message, timestamp) + .send_message(*uuid, content_body, timestamp) .await .unwrap(); @@ -527,7 +517,16 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re } Cmd::Send { uuid, message } => { let mut manager = Manager::load_registered(config_store).await?; - send(&message, &uuid, &mut manager).await?; + let timestamp = std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + let message = DataMessage { + body: Some(message.to_string()), + timestamp: Some(timestamp), + ..Default::default() + }; + send(&mut manager, &uuid, message, timestamp).await?; } Cmd::SendToGroup { message, @@ -655,7 +654,18 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re #[cfg(feature = "quirks")] Cmd::RequestSyncContacts => { let mut manager = Manager::load_registered(config_store).await?; - manager.request_contacts_sync().await?; + let uuid = manager.state().service_ids.aci; + let timestamp = std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + let sync_message = SyncMessage { + request: Some(sync_message::Request { + r#type: Some(sync_message::request::Type::Contacts as i32), + }), + ..Default::default() + }; + send(&mut manager, &uuid, sync_message, timestamp).await?; } Cmd::ListMessages { group_master_key, From a867199680efeaf2b2605e666296e0622c60d980 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Mon, 9 Oct 2023 11:03:47 +0200 Subject: [PATCH 03/13] Fix compilation --- presage-cli/src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index 9cae6f43e..a3b29456c 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -653,6 +653,8 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re } #[cfg(feature = "quirks")] Cmd::RequestSyncContacts => { + use presage::prelude::proto::sync_message; + let mut manager = Manager::load_registered(config_store).await?; let uuid = manager.state().service_ids.aci; let timestamp = std::time::SystemTime::now() From 8b6e547e42615d54cc4c5afe82c3f2cbe712130a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Mon, 9 Oct 2023 11:05:44 +0200 Subject: [PATCH 04/13] Skip timeout --- presage/src/manager.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/presage/src/manager.rs b/presage/src/manager.rs index d58c96ace..5d3cc7ce9 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -631,19 +631,8 @@ impl Manager { } async fn sync_contacts(&mut self) -> Result<(), Error> { - let messages = self.receive_messages_stream(true).await?; - pin_mut!(messages); - + let _messages = self.receive_messages_stream(true).await?; self.request_contacts_sync().await?; - - info!("waiting for contacts sync for up to 60 seconds"); - - tokio::time::timeout(Duration::from_secs(60), async move { - while messages.next().await.is_some() {} - }) - .await - .map_err(Error::from)?; - Ok(()) } From 30a8755f650fde6b2291116e153f9d4386697926 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Sun, 15 Oct 2023 13:56:42 +0200 Subject: [PATCH 05/13] Use /v1/queue/empty to do contacts sync stuff --- presage/Cargo.toml | 4 ++-- presage/src/manager.rs | 39 ++++++++++++++++++++++++++------------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/presage/Cargo.toml b/presage/Cargo.toml index f7dbc1f9b..ae22f2cb2 100644 --- a/presage/Cargo.toml +++ b/presage/Cargo.toml @@ -6,8 +6,8 @@ authors = ["Gabriel Féron "] edition = "2021" [dependencies] -libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" } -libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" } +libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "16695d0" } +libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "16695d0" } base64 = "0.12" futures = "0.3" diff --git a/presage/src/manager.rs b/presage/src/manager.rs index 5d3cc7ce9..2abe4509f 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -16,7 +16,7 @@ use rand::{ use serde::{Deserialize, Serialize}; use url::Url; -use libsignal_service::proto::EditMessage; +use libsignal_service::{proto::EditMessage, messagepipe::Incoming}; use libsignal_service::push_service::{RegistrationMethod, VerificationTransport}; use libsignal_service::{ attachment_cipher::decrypt_in_place, @@ -28,7 +28,7 @@ use libsignal_service::{ models::Contact, prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid}, proto::{ - data_message::Delete, sync_message, AttachmentPointer, Envelope, GroupContextV2, + data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage, }, protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate}, @@ -630,9 +630,22 @@ impl Manager { Ok(()) } + async fn wait_for_initial_sync(&mut self) -> Result<(), Error> { + let messages = self.receive_messages_stream(true).await?; + pin_mut!(messages); + while let Some(msg) = messages.next().await { + debug!("{msg:?}"); + } + Ok(()) + } + async fn sync_contacts(&mut self) -> Result<(), Error> { - let _messages = self.receive_messages_stream(true).await?; + self.wait_for_initial_sync().await?; self.request_contacts_sync().await?; + + // XXX: this will not work, too fast for the primary phone to send contacts + self.wait_for_initial_sync().await?; + Ok(()) } @@ -792,7 +805,7 @@ impl Manager { async fn receive_messages_encrypted( &mut self, - ) -> Result>, Error> { + ) -> Result>, Error> { let credentials = self.credentials()?.ok_or(Error::NotYetRegisteredError)?; let allow_stories = false; let pipe = MessageReceiver::new(self.push_service()?) @@ -842,7 +855,7 @@ impl Manager { async fn receive_messages_stream( &mut self, - stop_on_contacts_sync: bool, + initial_sync: bool, ) -> Result, Error> { struct StreamState { encrypted_messages: S, @@ -850,7 +863,7 @@ impl Manager { service_cipher: ServiceCipher, config_store: C, groups_manager: GroupsManager, - stop_on_contacts_sync: bool, + initial_sync: bool, } let init = StreamState { @@ -859,13 +872,13 @@ impl Manager { service_cipher: self.new_service_cipher()?, config_store: self.config_store.clone(), groups_manager: self.groups_manager()?, - stop_on_contacts_sync, + initial_sync, }; Ok(futures::stream::unfold(init, |mut state| async move { loop { match state.encrypted_messages.next().await { - Some(Ok(envelope)) => { + Some(Ok(Incoming::Envelope(envelope))) => { match state.service_cipher.open_envelope(envelope).await { Ok(Some(content)) => { // contacts synchronization sent from the primary device (happens after linking, or on demand) @@ -895,11 +908,7 @@ impl Manager { } } - if state.stop_on_contacts_sync { - return None; - } else { - continue; - } + continue; } if let ContentBody::DataMessage(DataMessage { @@ -960,6 +969,10 @@ impl Manager { } } } + Some(Ok(Incoming::QueueEmpty)) => { + debug!("empty queue"); + if state.initial_sync { return None; } + } Some(Err(e)) => error!("Error: {}", e), None => return None, } From 5706fa117943aabd9f9f8c31231b38d224c4b4c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Sun, 15 Oct 2023 14:04:30 +0200 Subject: [PATCH 06/13] Add ReceivingMode enum --- presage/src/manager.rs | 43 ++++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/presage/src/manager.rs b/presage/src/manager.rs index 2abe4509f..db260b886 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -136,6 +136,13 @@ impl fmt::Debug for Registered { } } +#[derive(PartialEq, Eq)] +enum ReceivingMode { + InitialSync, + WaitForContacts, + Forever +} + impl Registered { pub fn device_id(&self) -> u32 { self.device_id.unwrap_or(DEFAULT_DEVICE_ID) @@ -630,21 +637,21 @@ impl Manager { Ok(()) } - async fn wait_for_initial_sync(&mut self) -> Result<(), Error> { - let messages = self.receive_messages_stream(true).await?; + + async fn sync_contacts(&mut self) -> Result<(), Error> { + let messages = self.receive_messages_stream(ReceivingMode::InitialSync).await?; pin_mut!(messages); - while let Some(msg) = messages.next().await { - debug!("{msg:?}"); + while let Some(_msg) = messages.next().await { + // debug!("{msg:?}"); } - Ok(()) - } - async fn sync_contacts(&mut self) -> Result<(), Error> { - self.wait_for_initial_sync().await?; self.request_contacts_sync().await?; - // XXX: this will not work, too fast for the primary phone to send contacts - self.wait_for_initial_sync().await?; + let messages = self.receive_messages_stream(ReceivingMode::WaitForContacts).await?; + pin_mut!(messages); + while let Ok(Some(_msg)) = tokio::time::timeout(Duration::from_secs(60), messages.next()).await { + // debug!("{msg:?}"); + } Ok(()) } @@ -833,7 +840,7 @@ impl Manager { pub async fn receive_messages( &mut self, ) -> Result, Error> { - self.receive_messages_stream(false).await + self.receive_messages_stream(ReceivingMode::Forever).await } fn groups_manager( @@ -855,7 +862,7 @@ impl Manager { async fn receive_messages_stream( &mut self, - initial_sync: bool, + mode: ReceivingMode, ) -> Result, Error> { struct StreamState { encrypted_messages: S, @@ -863,7 +870,7 @@ impl Manager { service_cipher: ServiceCipher, config_store: C, groups_manager: GroupsManager, - initial_sync: bool, + mode: ReceivingMode, } let init = StreamState { @@ -872,7 +879,7 @@ impl Manager { service_cipher: self.new_service_cipher()?, config_store: self.config_store.clone(), groups_manager: self.groups_manager()?, - initial_sync, + mode, }; Ok(futures::stream::unfold(init, |mut state| async move { @@ -908,7 +915,11 @@ impl Manager { } } - continue; + if state.mode == ReceivingMode::WaitForContacts { + return None; + } else { + continue; + } } if let ContentBody::DataMessage(DataMessage { @@ -971,7 +982,7 @@ impl Manager { } Some(Ok(Incoming::QueueEmpty)) => { debug!("empty queue"); - if state.initial_sync { return None; } + if state.mode == ReceivingMode::InitialSync { return None; } } Some(Err(e)) => error!("Error: {}", e), None => return None, From dcc57e1f725996ed363291eba8539782bc68b54a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Mon, 16 Oct 2023 10:53:17 +0200 Subject: [PATCH 07/13] Progress trying to improve things --- presage-cli/src/main.rs | 5 ++- presage/src/manager.rs | 73 ++++++++++++++++++++++++++++++----------- 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index a3b29456c..7f5da33a1 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -208,7 +208,6 @@ async fn send( manager: &mut Manager, uuid: &Uuid, content_body: impl Into, - timestamp: u64, ) -> anyhow::Result<()> { let local = task::LocalSet::new(); @@ -224,7 +223,7 @@ async fn send( sleep(Duration::from_secs(5)).await; manager - .send_message(*uuid, content_body, timestamp) + .send_message(*uuid, content_body) .await .unwrap(); @@ -526,7 +525,7 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re timestamp: Some(timestamp), ..Default::default() }; - send(&mut manager, &uuid, message, timestamp).await?; + send(&mut manager, &uuid, message).await?; } Cmd::SendToGroup { message, diff --git a/presage/src/manager.rs b/presage/src/manager.rs index db260b886..40dbb4cef 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -16,7 +16,6 @@ use rand::{ use serde::{Deserialize, Serialize}; use url::Url; -use libsignal_service::{proto::EditMessage, messagepipe::Incoming}; use libsignal_service::push_service::{RegistrationMethod, VerificationTransport}; use libsignal_service::{ attachment_cipher::decrypt_in_place, @@ -27,10 +26,7 @@ use libsignal_service::{ messagepipe::ServiceCredentials, models::Contact, prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid}, - proto::{ - data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, - NullMessage, - }, + proto::{data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage}, protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate}, provisioning::{generate_registration_id, LinkingManager, SecondaryDeviceProvisioning}, push_service::{ @@ -47,6 +43,7 @@ use libsignal_service::{ websocket::SignalWebSocket, AccountManager, Profile, ServiceAddress, }; +use libsignal_service::{messagepipe::Incoming, proto::EditMessage}; use libsignal_service_hyper::push_service::HyperPushService; use crate::cache::CacheCell; @@ -140,7 +137,7 @@ impl fmt::Debug for Registered { enum ReceivingMode { InitialSync, WaitForContacts, - Forever + Forever, } impl Registered { @@ -637,19 +634,26 @@ impl Manager { Ok(()) } - async fn sync_contacts(&mut self) -> Result<(), Error> { - let messages = self.receive_messages_stream(ReceivingMode::InitialSync).await?; + let messages = self + .receive_messages_stream(ReceivingMode::InitialSync) + .await?; pin_mut!(messages); while let Some(_msg) = messages.next().await { // debug!("{msg:?}"); } + self.request_configuration_sync().await?; + self.request_block_sync().await?; self.request_contacts_sync().await?; - let messages = self.receive_messages_stream(ReceivingMode::WaitForContacts).await?; + let messages = self + .receive_messages_stream(ReceivingMode::WaitForContacts) + .await?; pin_mut!(messages); - while let Ok(Some(_msg)) = tokio::time::timeout(Duration::from_secs(60), messages.next()).await { + while let Ok(Some(_msg)) = + tokio::time::timeout(Duration::from_secs(60), messages.next()).await + { // debug!("{msg:?}"); } @@ -670,13 +674,40 @@ impl Manager { ..Default::default() }; - let timestamp = std::time::SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis() as u64; + // first request the sync + self.send_message(self.state.service_ids.aci, sync_message) + .await?; + + Ok(()) + } + + async fn request_block_sync(&mut self) -> Result<(), Error> { + trace!("requesting blocked sync"); + let sync_message = SyncMessage { + request: Some(sync_message::Request { + r#type: Some(sync_message::request::Type::Blocked as i32), + }), + ..Default::default() + }; // first request the sync - self.send_message(self.state.service_ids.aci, sync_message, timestamp) + self.send_message(self.state.service_ids.aci, sync_message) + .await?; + + Ok(()) + } + + async fn request_configuration_sync(&mut self) -> Result<(), Error> { + trace!("requesting configuration sync"); + let sync_message = SyncMessage { + request: Some(sync_message::Request { + r#type: Some(sync_message::request::Type::Configuration as i32), + }), + ..Default::default() + }; + + // first request the sync + self.send_message(self.state.service_ids.aci, sync_message) .await?; Ok(()) @@ -982,7 +1013,9 @@ impl Manager { } Some(Ok(Incoming::QueueEmpty)) => { debug!("empty queue"); - if state.mode == ReceivingMode::InitialSync { return None; } + if state.mode == ReceivingMode::InitialSync { + return None; + } } Some(Err(e)) => error!("Error: {}", e), None => return None, @@ -1001,10 +1034,13 @@ impl Manager { &mut self, recipient_addr: impl Into, message: impl Into, - timestamp: u64, ) -> Result<(), Error> { let mut sender = self.new_message_sender().await?; + let timestamp = std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; let online_only = false; let recipient = recipient_addr.into(); let mut content_body: ContentBody = message.into(); @@ -1192,7 +1228,6 @@ impl Manager { pub async fn send_session_reset( &mut self, recipient: &ServiceAddress, - timestamp: u64, ) -> Result<(), Error> { log::trace!("Resetting session for address: {}", recipient.uuid); let message = DataMessage { @@ -1200,7 +1235,7 @@ impl Manager { ..Default::default() }; - self.send_message(*recipient, message, timestamp).await?; + self.send_message(*recipient, message).await?; Ok(()) } From 540271e3e7f8b96b382bb15125d633dea0faee4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Thu, 26 Oct 2023 22:51:58 +0200 Subject: [PATCH 08/13] Final touches --- presage-cli/src/main.rs | 14 ++++---------- presage/Cargo.toml | 2 +- presage/src/lib.rs | 4 +++- presage/src/manager.rs | 19 ++++++++----------- 4 files changed, 16 insertions(+), 23 deletions(-) diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index 7f5da33a1..f5386033c 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -21,6 +21,7 @@ use presage::libsignal_service::{groups_v2::Group, prelude::ProfileKey}; use presage::prelude::proto::EditMessage; use presage::prelude::SyncMessage; use presage::ContentTimestamp; +use presage::ReceivingMode; use presage::{ prelude::{ content::{Content, ContentBody, DataMessage, GroupContextV2}, @@ -222,10 +223,7 @@ async fn send( sleep(Duration::from_secs(5)).await; - manager - .send_message(*uuid, content_body) - .await - .unwrap(); + manager.send_message(*uuid, content_body).await.unwrap(); sleep(Duration::from_secs(5)).await; }) @@ -435,7 +433,7 @@ async fn receive( ); let messages = manager - .receive_messages() + .receive_messages(ReceivingMode::Forever) .await .context("failed to initialize messages stream")?; pin_mut!(messages); @@ -656,17 +654,13 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re let mut manager = Manager::load_registered(config_store).await?; let uuid = manager.state().service_ids.aci; - let timestamp = std::time::SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis() as u64; let sync_message = SyncMessage { request: Some(sync_message::Request { r#type: Some(sync_message::request::Type::Contacts as i32), }), ..Default::default() }; - send(&mut manager, &uuid, sync_message, timestamp).await?; + send(&mut manager, &uuid, sync_message).await?; } Cmd::ListMessages { group_master_key, diff --git a/presage/Cargo.toml b/presage/Cargo.toml index bea3bc85e..97032a03f 100644 --- a/presage/Cargo.toml +++ b/presage/Cargo.toml @@ -1,7 +1,7 @@ [package] # be a sign or warning of (an imminent event, typically an unwelcome one). name = "presage" -version = "0.6.0-dev" +version = "0.7.0-dev" authors = ["Gabriel Féron "] edition = "2021" diff --git a/presage/src/lib.rs b/presage/src/lib.rs index 60a9b29c4..753aa5516 100644 --- a/presage/src/lib.rs +++ b/presage/src/lib.rs @@ -5,7 +5,9 @@ mod serde; mod store; pub use errors::Error; -pub use manager::{Confirmation, Linking, Manager, Registered, Registration, RegistrationOptions}; +pub use manager::{ + Confirmation, Linking, Manager, ReceivingMode, Registered, Registration, RegistrationOptions, +}; pub use store::{ContentTimestamp, Store, StoreError, Thread}; #[deprecated(note = "Please help use improve the prelude module instead")] diff --git a/presage/src/manager.rs b/presage/src/manager.rs index b60ce1c8f..8d19cbbc7 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -134,7 +134,7 @@ impl fmt::Debug for Registered { } #[derive(PartialEq, Eq)] -enum ReceivingMode { +pub enum ReceivingMode { InitialSync, WaitForContacts, Forever, @@ -642,9 +642,7 @@ impl Manager { .receive_messages_stream(ReceivingMode::InitialSync) .await?; pin_mut!(messages); - while let Some(_msg) = messages.next().await { - // debug!("{msg:?}"); - } + while let Some(_msg) = messages.next().await {} self.request_configuration_sync().await?; self.request_block_sync().await?; @@ -656,9 +654,7 @@ impl Manager { pin_mut!(messages); while let Ok(Some(_msg)) = tokio::time::timeout(Duration::from_secs(60), messages.next()).await - { - // debug!("{msg:?}"); - } + {} Ok(()) } @@ -677,7 +673,6 @@ impl Manager { ..Default::default() }; - // first request the sync self.send_message(self.state.service_ids.aci, sync_message) .await?; @@ -693,7 +688,6 @@ impl Manager { ..Default::default() }; - // first request the sync self.send_message(self.state.service_ids.aci, sync_message) .await?; @@ -709,7 +703,6 @@ impl Manager { ..Default::default() }; - // first request the sync self.send_message(self.state.service_ids.aci, sync_message) .await?; @@ -870,11 +863,15 @@ impl Manager { /// Starts receiving and storing messages. /// + /// * `stop_on_initial_sync` [unstable API] - receive messages until the initial sync is over, or forever. + /// It is essential to synchronize the client once before you try to send, to make sure you have all the updated keys and sessions. + /// /// Returns a [futures::Stream] of messages to consume. Messages will also be stored by the implementation of the [Store]. pub async fn receive_messages( &mut self, + receiving_mode: ReceivingMode, ) -> Result, Error> { - self.receive_messages_stream(ReceivingMode::Forever).await + self.receive_messages_stream(receiving_mode).await } fn groups_manager( From d2049acb5569ad9d5ba244939bcce50d536002c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Thu, 26 Oct 2023 23:53:20 +0200 Subject: [PATCH 09/13] More refactoring to avoid small mistakes --- Cargo.toml | 6 +- presage/src/manager.rs | 182 ++++++++++++++++++++--------------------- 2 files changed, 90 insertions(+), 98 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 19c49fa58..8e49fdad3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,6 @@ resolver = "2" [patch.crates-io] curve25519-dalek = { git = 'https://github.com/signalapp/curve25519-dalek', tag = 'signal-curve25519-4.0.0' } -# [patch."https://github.com/whisperfish/libsignal-service-rs.git"] -# libsignal-service = { path = "../libsignal-service-rs/libsignal-service" } -# libsignal-service-hyper = { path = "../libsignal-service-rs/libsignal-service-hyper" } +[patch."https://github.com/whisperfish/libsignal-service-rs.git"] +libsignal-service = { path = "../libsignal-service-rs/libsignal-service" } +libsignal-service-hyper = { path = "../libsignal-service-rs/libsignal-service-hyper" } diff --git a/presage/src/manager.rs b/presage/src/manager.rs index 8d19cbbc7..81e6c52a9 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -16,7 +16,6 @@ use rand::{ use serde::{Deserialize, Serialize}; use url::Url; -use libsignal_service::push_service::{RegistrationMethod, VerificationTransport}; use libsignal_service::{ attachment_cipher::decrypt_in_place, cipher, @@ -44,6 +43,10 @@ use libsignal_service::{ AccountManager, Profile, ServiceAddress, }; use libsignal_service::{messagepipe::Incoming, proto::EditMessage}; +use libsignal_service::{ + messagepipe::MessagePipe, + push_service::{RegistrationMethod, VerificationTransport}, +}; use libsignal_service_hyper::push_service::HyperPushService; use crate::cache::CacheCell; @@ -391,9 +394,9 @@ impl Manager { manager.config_store.save_state(&manager.state)?; match ( - manager.register_pre_keys().await, manager.set_account_attributes().await, - manager.sync_contacts().await, + manager.register_pre_keys().await, + manager.request_initial_sync().await, ) { (Err(e), _, _) | (_, Err(e), _) => { // clear the entire store on any error, there's no possible recovery here @@ -558,13 +561,23 @@ impl Manager { manager.set_account_attributes().await?; } + let credentials = manager.credentials()?; + manager.state.identified_websocket.lock().replace( + manager + .identified_push_service()? + .ws("/v1/websocket/", &[], Some(credentials), true) + .await?, + ); + Ok(manager) } async fn register_pre_keys(&mut self) -> Result<(), Error> { trace!("registering pre keys"); - let mut account_manager = - AccountManager::new(self.push_service()?, Some(self.state.profile_key)); + let mut account_manager = AccountManager::new( + self.identified_push_service()?, + Some(self.state.profile_key), + ); let (pre_keys_offset_id, next_signed_pre_key_id, next_pq_pre_key_id) = account_manager .update_pre_key_bundle( @@ -590,8 +603,10 @@ impl Manager { async fn set_account_attributes(&mut self) -> Result<(), Error> { trace!("setting account attributes"); - let mut account_manager = - AccountManager::new(self.push_service()?, Some(self.state.profile_key)); + let mut account_manager = AccountManager::new( + self.identified_push_service()?, + Some(self.state.profile_key), + ); let pni_registration_id = if let Some(pni_registration_id) = self.state.pni_registration_id { @@ -637,14 +652,8 @@ impl Manager { Ok(()) } - async fn sync_contacts(&mut self) -> Result<(), Error> { - let messages = self - .receive_messages_stream(ReceivingMode::InitialSync) - .await?; - pin_mut!(messages); - while let Some(_msg) = messages.next().await {} - - self.request_configuration_sync().await?; + async fn request_initial_sync(&mut self) -> Result<(), Error> { + self.request_keys_sync().await?; self.request_block_sync().await?; self.request_contacts_sync().await?; @@ -666,44 +675,23 @@ impl Manager { /// processed when they're received using the `MessageReceiver`. pub async fn request_contacts_sync(&mut self) -> Result<(), Error> { trace!("requesting contacts sync"); - let sync_message = SyncMessage { - request: Some(sync_message::Request { - r#type: Some(sync_message::request::Type::Contacts as i32), - }), - ..Default::default() - }; - - self.send_message(self.state.service_ids.aci, sync_message) + self.send_message(self.state.service_ids.aci, SyncMessage::request_contacts()) .await?; Ok(()) } - async fn request_block_sync(&mut self) -> Result<(), Error> { - trace!("requesting blocked sync"); - let sync_message = SyncMessage { - request: Some(sync_message::Request { - r#type: Some(sync_message::request::Type::Blocked as i32), - }), - ..Default::default() - }; - - self.send_message(self.state.service_ids.aci, sync_message) + async fn request_keys_sync(&mut self) -> Result<(), Error> { + trace!("requesting keys sync"); + self.send_message(self.state.service_ids.aci, SyncMessage::request_keys()) .await?; Ok(()) } - async fn request_configuration_sync(&mut self) -> Result<(), Error> { - trace!("requesting configuration sync"); - let sync_message = SyncMessage { - request: Some(sync_message::Request { - r#type: Some(sync_message::request::Type::Configuration as i32), - }), - ..Default::default() - }; - - self.send_message(self.state.service_ids.aci, sync_message) + async fn request_block_sync(&mut self) -> Result<(), Error> { + trace!("requesting blocked sync"); + self.send_message(self.state.service_ids.aci, SyncMessage::request_blocked()) .await?; Ok(()) @@ -729,7 +717,7 @@ impl Manager { if needs_renewal(self.state.unidentified_sender_certificate.as_ref()) { let sender_certificate = self - .push_service()? + .identified_push_service()? .get_uuid_only_sender_certificate() .await?; @@ -750,7 +738,7 @@ impl Manager { token: &str, captcha: &str, ) -> Result<(), Error> { - let mut account_manager = AccountManager::new(self.push_service()?, None); + let mut account_manager = AccountManager::new(self.identified_push_service()?, None); account_manager .submit_recaptcha_challenge(token, captcha) .await?; @@ -764,7 +752,7 @@ impl Manager { /// Fetches basic information on the registered device. pub async fn whoami(&self) -> Result> { - Ok(self.push_service()?.whoami().await?) + Ok(self.identified_push_service()?.whoami().await?) } /// Fetches the profile (name, about, status emoji) of the registered user. @@ -784,7 +772,8 @@ impl Manager { return Ok(profile); } - let mut account_manager = AccountManager::new(self.push_service()?, Some(profile_key)); + let mut account_manager = + AccountManager::new(self.identified_push_service()?, Some(profile_key)); let profile = account_manager.retrieve_profile(uuid.into()).await?; @@ -840,30 +829,14 @@ impl Manager { async fn receive_messages_encrypted( &mut self, ) -> Result>, Error> { - let credentials = self.credentials()?.ok_or(Error::NotYetRegisteredError)?; - let allow_stories = false; - let pipe = MessageReceiver::new(self.push_service()?) - .create_message_pipe(credentials, allow_stories) - .await?; - - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - let mut unidentified_push_service = - HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); - let unidentified_ws = unidentified_push_service - .ws("/v1/websocket/", &[], None, false) - .await?; - self.state.identified_websocket.lock().replace(pipe.ws()); - self.state - .unidentified_websocket - .lock() - .replace(unidentified_ws); - - Ok(pipe.stream()) + let credentials: ServiceCredentials = self.credentials()?; + let identified_ws = self.identified_websocket().await?; + Ok(MessagePipe::from_socket(identified_ws, credentials).stream()) } /// Starts receiving and storing messages. /// - /// * `stop_on_initial_sync` [unstable API] - receive messages until the initial sync is over, or forever. + /// * `stop_on_initial_sync` [unstable API] - receive messages until the initial sync is over, or forever /// It is essential to synchronize the client once before you try to send, to make sure you have all the updated keys and sessions. /// /// Returns a [futures::Stream] of messages to consume. Messages will also be stored by the implementation of the [Store]. @@ -883,7 +856,7 @@ impl Manager { let groups_credentials_cache = InMemoryCredentialsCache::default(); let groups_manager = GroupsManager::new( self.state.service_ids.clone(), - self.push_service()?, + self.identified_push_service()?, groups_credentials_cache, server_public_params, ); @@ -906,7 +879,7 @@ impl Manager { let init = StreamState { encrypted_messages: Box::pin(self.receive_messages_encrypted().await?), - message_receiver: MessageReceiver::new(self.push_service()?), + message_receiver: MessageReceiver::new(self.identified_push_service()?), service_cipher: self.new_service_cipher()?, config_store: self.config_store.clone(), groups_manager: self.groups_manager()?, @@ -1100,7 +1073,7 @@ impl Manager { /// Uploads attachments prior to linking them in a message. pub async fn upload_attachments( - &self, + &mut self, attachments: Vec<(AttachmentSpec, Vec)>, ) -> Result>, Error> { if attachments.is_empty() { @@ -1210,7 +1183,7 @@ impl Manager { &self, attachment_pointer: &AttachmentPointer, ) -> Result, Error> { - let mut service = self.push_service()?; + let mut service = self.identified_push_service()?; let mut attachment_stream = service.get_attachment(attachment_pointer).await?; // We need the whole file for the crypto to check out @@ -1240,56 +1213,75 @@ impl Manager { Ok(()) } - fn credentials(&self) -> Result, Error> { - Ok(Some(ServiceCredentials { + fn credentials(&self) -> Result> { + Ok(ServiceCredentials { uuid: Some(self.state.service_ids.aci), phonenumber: self.state.phone_number.clone(), password: Some(self.state.password.clone()), signaling_key: Some(self.state.signaling_key), device_id: self.state.device_id, - })) + }) } /// Returns a clone of a cached push service. /// /// If no service is yet cached, it will create and cache one. - fn push_service(&self) -> Result> { + fn identified_push_service(&self) -> Result> { self.state.push_service_cache.get(|| { let credentials = self.credentials()?; let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - Ok(HyperPushService::new( service_configuration, - credentials, + Some(credentials), crate::USER_AGENT.to_string(), )) }) } + fn unidentified_push_service(&self) -> HyperPushService { + let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); + HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()) + } + + async fn identified_websocket(&mut self) -> Result> { + let mut lock = self.state.identified_websocket.lock(); + if let Some(identified_ws) = lock.as_ref() { + Ok(identified_ws.clone()) + } else { + let credentials = self.credentials()?; + let ws = self + .identified_push_service()? + .ws("/v1/websocket/", &[], Some(credentials), true) + .await?; + lock.replace(ws.clone()); + Ok(ws) + } + } + + async fn unidentified_websocket(&mut self) -> Result> { + let mut lock = self.state.unidentified_websocket.lock(); + if let Some(unidentified_ws) = lock.as_ref() { + Ok(unidentified_ws.clone()) + } else { + let ws = self + .unidentified_push_service() + .ws("/v1/websocket/", &[], None, true) + .await?; + lock.replace(ws.clone()); + Ok(ws) + } + } + /// Creates a new message sender. - async fn new_message_sender(&self) -> Result, Error> { + async fn new_message_sender(&mut self) -> Result, Error> { let local_addr = ServiceAddress { uuid: self.state.service_ids.aci, }; - let identified_websocket = self - .state - .identified_websocket - .lock() - .clone() - .ok_or(Error::MessagePipeNotStarted)?; - - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - let mut unidentified_push_service = - HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); - let unidentified_websocket = unidentified_push_service - .ws("/v1/websocket/", &[], None, false) - .await?; - Ok(MessageSender::new( - identified_websocket, - unidentified_websocket, - self.push_service()?, + self.identified_websocket().await?, + self.unidentified_websocket().await?, + self.identified_push_service()?, self.new_service_cipher()?, self.rng.clone(), self.config_store.clone(), From 2083802323f7bbf2dfec31ead79820ed0421dea8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Thu, 26 Oct 2023 23:58:02 +0200 Subject: [PATCH 10/13] Also cache the unidentified_push_service :o --- presage/src/manager.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/presage/src/manager.rs b/presage/src/manager.rs index 81e6c52a9..57bbff664 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -96,7 +96,9 @@ pub struct Confirmation { #[derive(Clone, Serialize, Deserialize)] pub struct Registered { #[serde(skip)] - push_service_cache: CacheCell, + identified_push_service_cache: CacheCell, + #[serde(skip)] + unidentified_push_service_cache: CacheCell, #[serde(skip)] identified_websocket: Arc>>, #[serde(skip)] @@ -355,7 +357,8 @@ impl Manager { { log::info!("successfully registered device {}", &service_ids); Ok(Registered { - push_service_cache: CacheCell::default(), + identified_push_service_cache: CacheCell::default(), + unidentified_push_service_cache: CacheCell::default(), identified_websocket: Default::default(), unidentified_websocket: Default::default(), unidentified_sender_certificate: Default::default(), @@ -506,7 +509,8 @@ impl Manager { rng, config_store: self.config_store, state: Registered { - push_service_cache: CacheCell::default(), + identified_push_service_cache: CacheCell::default(), + unidentified_push_service_cache: CacheCell::default(), identified_websocket: Default::default(), unidentified_websocket: Default::default(), unidentified_sender_certificate: Default::default(), @@ -1223,11 +1227,9 @@ impl Manager { }) } - /// Returns a clone of a cached push service. - /// - /// If no service is yet cached, it will create and cache one. + /// Create or return a clone of a cached identified (with credentials) push service. fn identified_push_service(&self) -> Result> { - self.state.push_service_cache.get(|| { + self.state.identified_push_service_cache.get(|| { let credentials = self.credentials()?; let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); Ok(HyperPushService::new( @@ -1238,9 +1240,16 @@ impl Manager { }) } - fn unidentified_push_service(&self) -> HyperPushService { - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()) + /// Create or return a clone of a cached unidentified (no credentials) push service. + fn unidentified_push_service(&self) -> Result> { + self.state.unidentified_push_service_cache.get(|| { + let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); + Ok(HyperPushService::new( + service_configuration, + None, + crate::USER_AGENT.to_string(), + )) + }) } async fn identified_websocket(&mut self) -> Result> { @@ -1263,8 +1272,8 @@ impl Manager { if let Some(unidentified_ws) = lock.as_ref() { Ok(unidentified_ws.clone()) } else { - let ws = self - .unidentified_push_service() + let ws: SignalWebSocket = self + .unidentified_push_service()? .ws("/v1/websocket/", &[], None, true) .await?; lock.replace(ws.clone()); From 3f66901b0c704d2e385886e24b888f62a3837c32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Fri, 27 Oct 2023 00:32:05 +0200 Subject: [PATCH 11/13] WIP --- presage-store-sled/src/lib.rs | 4 +- presage/src/manager.rs | 131 ++++++++++++++++++---------------- presage/src/store.rs | 9 ++- 3 files changed, 78 insertions(+), 66 deletions(-) diff --git a/presage-store-sled/src/lib.rs b/presage-store-sled/src/lib.rs index 757f5f51e..a5415c170 100644 --- a/presage-store-sled/src/lib.rs +++ b/presage-store-sled/src/lib.rs @@ -373,11 +373,11 @@ impl Store for SledStore { /// State - fn load_state(&self) -> Result, SledStoreError> { + fn load_state(&self) -> Result>, SledStoreError> { self.get(SLED_TREE_STATE, SLED_KEY_REGISTRATION) } - fn save_state(&mut self, state: &Registered) -> Result<(), SledStoreError> { + fn save_state(&mut self, state: &Registered) -> Result<(), SledStoreError> { self.insert(SLED_TREE_STATE, SLED_KEY_REGISTRATION, state)?; Ok(()) } diff --git a/presage/src/manager.rs b/presage/src/manager.rs index 57bbff664..548d58113 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -24,7 +24,10 @@ use libsignal_service::{ groups_v2::{decrypt_group, Group, GroupsManager, InMemoryCredentialsCache}, messagepipe::ServiceCredentials, models::Contact, - prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid}, + prelude::{ + phonenumber::{country::Id::DE, PhoneNumber}, + Content, ProfileKey, PushService, Uuid, + }, proto::{data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage}, protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate}, provisioning::{generate_registration_id, LinkingManager, SecondaryDeviceProvisioning}, @@ -93,19 +96,19 @@ pub struct Confirmation { session_id: String, } -#[derive(Clone, Serialize, Deserialize)] -pub struct Registered { - #[serde(skip)] +#[derive(Clone)] +pub struct Registered { identified_push_service_cache: CacheCell, - #[serde(skip)] unidentified_push_service_cache: CacheCell, - #[serde(skip)] identified_websocket: Arc>>, - #[serde(skip)] - unidentified_websocket: Arc>>, - #[serde(skip)] + message_sender: Arc>>>, unidentified_sender_certificate: Option, + inner: RegisteredData, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct RegisteredData { pub signal_servers: SignalServers, pub device_name: Option, pub phone_number: PhoneNumber, @@ -130,10 +133,11 @@ pub struct Registered { profile_key: ProfileKey, } -impl fmt::Debug for Registered { +impl fmt::Debug for Registered { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Registered") .field("websocket", &self.identified_websocket.lock().is_some()) + .field("message_sender", &self.message_sender.lock().is_some()) .finish_non_exhaustive() } } @@ -145,7 +149,7 @@ pub enum ReceivingMode { Forever, } -impl Registered { +impl Registered { pub fn device_id(&self) -> u32 { self.device_id.unwrap_or(DEFAULT_DEVICE_ID) } @@ -308,7 +312,7 @@ impl Manager { signal_servers: SignalServers, device_name: String, provisioning_link_channel: oneshot::Sender, - ) -> Result, Error> { + ) -> Result>, Error> { // clear the database: the moment we start the process, old API credentials are invalidated // and you won't be able to use this client anyways config_store.clear_registration()?; @@ -357,11 +361,6 @@ impl Manager { { log::info!("successfully registered device {}", &service_ids); Ok(Registered { - identified_push_service_cache: CacheCell::default(), - unidentified_push_service_cache: CacheCell::default(), - identified_websocket: Default::default(), - unidentified_websocket: Default::default(), - unidentified_sender_certificate: Default::default(), signal_servers, device_name: Some(device_name), phone_number, @@ -424,7 +423,7 @@ impl Manager { pub async fn confirm_verification_code( self, confirmation_code: impl AsRef, - ) -> Result, Error> { + ) -> Result>, Error> { trace!("confirming verification code"); let registration_id = generate_registration_id(&mut StdRng::from_entropy()); @@ -512,25 +511,27 @@ impl Manager { identified_push_service_cache: CacheCell::default(), unidentified_push_service_cache: CacheCell::default(), identified_websocket: Default::default(), - unidentified_websocket: Default::default(), + message_sender: Default::default(), unidentified_sender_certificate: Default::default(), - signal_servers: self.state.signal_servers, - device_name: None, - phone_number, - service_ids: ServiceIds { - aci: registered.uuid, - pni: registered.pni, + inner: RegisteredData { + signal_servers: self.state.signal_servers, + device_name: None, + phone_number, + service_ids: ServiceIds { + aci: registered.uuid, + pni: registered.pni, + }, + password, + signaling_key, + device_id: None, + registration_id, + pni_registration_id: Some(pni_registration_id), + aci_private_key: aci_identity_key_pair.private_key, + aci_public_key: aci_identity_key_pair.public_key, + pni_private_key: Some(pni_identity_key_pair.private_key), + pni_public_key: Some(pni_identity_key_pair.public_key), + profile_key, }, - password, - signaling_key, - device_id: None, - registration_id, - pni_registration_id: Some(pni_registration_id), - aci_private_key: aci_identity_key_pair.private_key, - aci_public_key: aci_identity_key_pair.public_key, - pni_private_key: Some(pni_identity_key_pair.private_key), - pni_public_key: Some(pni_identity_key_pair.public_key), - profile_key, }, }; @@ -546,22 +547,28 @@ impl Manager { } } -impl Manager { +impl Manager> { /// Loads a previously registered account from the implemented [Store]. /// /// Returns a instance of [Manager] you can use to send & receive messages. pub async fn load_registered(config_store: C) -> Result> { - let state = config_store + let inner = config_store .load_state()? .ok_or(Error::NotYetRegisteredError)?; - let mut manager = Self { rng: StdRng::from_entropy(), config_store, - state, + state: Registered { + inner, + identified_push_service_cache: Default::default(), + unidentified_push_service_cache: Default::default(), + identified_websocket: Default::default(), + message_sender: Default::default(), + unidentified_sender_certificate: Default::default(), + }, }; - if manager.state.pni_registration_id.is_none() { + if manager.state.inner.pni_registration_id.is_none() { manager.set_account_attributes().await?; } @@ -750,7 +757,7 @@ impl Manager { } /// Returns a handle on the registered state - pub fn state(&self) -> &Registered { + pub fn state(&self) -> &RegisteredData { &self.state } @@ -1012,7 +1019,7 @@ impl Manager { recipient_addr: impl Into, message: impl Into, ) -> Result<(), Error> { - let mut sender = self.new_message_sender().await?; + let mut sender = self.message_sender().await?; let timestamp = std::time::SystemTime::now() .duration_since(UNIX_EPOCH) @@ -1083,7 +1090,7 @@ impl Manager { if attachments.is_empty() { return Ok(Vec::new()); } - let sender = self.new_message_sender().await?; + let sender = self.message_sender().await?; let upload = future::join_all(attachments.into_iter().map(move |(spec, contents)| { let mut sender = sender.clone(); async move { sender.upload_attachment(spec, contents).await } @@ -1121,7 +1128,7 @@ impl Manager { } _ => {} } - let mut sender = self.new_message_sender().await?; + let mut sender = self.message_sender().await?; let mut groups_manager = self.groups_manager()?; let Some(group) = upsert_group( @@ -1267,36 +1274,38 @@ impl Manager { } } - async fn unidentified_websocket(&mut self) -> Result> { - let mut lock = self.state.unidentified_websocket.lock(); - if let Some(unidentified_ws) = lock.as_ref() { - Ok(unidentified_ws.clone()) - } else { - let ws: SignalWebSocket = self - .unidentified_push_service()? - .ws("/v1/websocket/", &[], None, true) - .await?; - lock.replace(ws.clone()); - Ok(ws) + /// Creates a new message sender. + async fn message_sender(&mut self) -> Result, Error> { + if let Some(message_sender) = self.state.message_sender.lock().as_ref() { + return Ok(message_sender.clone()); } - } - /// Creates a new message sender. - async fn new_message_sender(&mut self) -> Result, Error> { let local_addr = ServiceAddress { uuid: self.state.service_ids.aci, }; - Ok(MessageSender::new( + let unidentified_ws: SignalWebSocket = self + .unidentified_push_service()? + .ws("/v1/websocket/", &[], None, true) + .await?; + + let message_sender = MessageSender::new( self.identified_websocket().await?, - self.unidentified_websocket().await?, + unidentified_ws, self.identified_push_service()?, self.new_service_cipher()?, self.rng.clone(), self.config_store.clone(), local_addr, self.state.device_id.unwrap_or(DEFAULT_DEVICE_ID).into(), - )) + ); + + self.state + .message_sender + .lock() + .replace(message_sender.clone()); + + Ok(message_sender) } /// Creates a new service cipher. diff --git a/presage/src/store.rs b/presage/src/store.rs index 211094bea..210b9795a 100644 --- a/presage/src/store.rs +++ b/presage/src/store.rs @@ -1,6 +1,9 @@ use std::{fmt, ops::RangeBounds}; -use crate::{manager::Registered, GroupMasterKeyBytes}; +use crate::{ + manager::{Registered, RegisteredData}, + GroupMasterKeyBytes, +}; use libsignal_service::{ content::ContentBody, groups_v2::Group, @@ -28,10 +31,10 @@ pub trait Store: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone /// State /// Load registered (or linked) state - fn load_state(&self) -> Result, Self::Error>; + fn load_state(&self) -> Result, Self::Error>; /// Save registered (or linked) state - fn save_state(&mut self, state: &Registered) -> Result<(), Self::Error>; + fn save_state(&mut self, state: &RegisteredData) -> Result<(), Self::Error>; /// Returns whether this store contains registration data or not fn is_registered(&self) -> bool; From e95bc309aab57e9aeb375909e316616a053d5974 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Fri, 27 Oct 2023 00:41:01 +0200 Subject: [PATCH 12/13] Cache --- presage-cli/src/main.rs | 8 +- presage-store-sled/src/lib.rs | 8 +- presage/src/lib.rs | 3 +- presage/src/manager.rs | 177 +++++++++++++++++++--------------- presage/src/store.rs | 5 +- 5 files changed, 109 insertions(+), 92 deletions(-) diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index f5386033c..a09e04adb 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -206,7 +206,7 @@ async fn main() -> anyhow::Result<()> { } async fn send( - manager: &mut Manager, + manager: &mut Manager>, uuid: &Uuid, content_body: impl Into, ) -> anyhow::Result<()> { @@ -235,7 +235,7 @@ async fn send( // Note to developers, this is a good example of a function you can use as a source of inspiration // to process incoming messages. async fn process_incoming_message( - manager: &mut Manager, + manager: &mut Manager>, attachments_tmp_dir: &Path, notifications: bool, content: &Content, @@ -274,7 +274,7 @@ async fn process_incoming_message( } fn print_message( - manager: &Manager, + manager: &Manager>, notifications: bool, content: &Content, ) { @@ -423,7 +423,7 @@ fn print_message( } async fn receive( - manager: &mut Manager, + manager: &mut Manager>, notifications: bool, ) -> anyhow::Result<()> { let attachments_tmp_dir = Builder::new().prefix("presage-attachments").tempdir()?; diff --git a/presage-store-sled/src/lib.rs b/presage-store-sled/src/lib.rs index a5415c170..7747f686b 100644 --- a/presage-store-sled/src/lib.rs +++ b/presage-store-sled/src/lib.rs @@ -24,7 +24,7 @@ use presage::{ session_store::SessionStoreExt, Profile, ServiceAddress, }, - ContentTimestamp, + ContentTimestamp, GroupMasterKeyBytes, RegisteredData, Store, Thread, }; use prost::Message; use protobuf::ContentProto; @@ -32,8 +32,6 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sha2::{Digest, Sha256}; use sled::{Batch, IVec}; -use presage::{GroupMasterKeyBytes, Registered, Store, Thread}; - mod error; mod protobuf; @@ -373,11 +371,11 @@ impl Store for SledStore { /// State - fn load_state(&self) -> Result>, SledStoreError> { + fn load_state(&self) -> Result, SledStoreError> { self.get(SLED_TREE_STATE, SLED_KEY_REGISTRATION) } - fn save_state(&mut self, state: &Registered) -> Result<(), SledStoreError> { + fn save_state(&mut self, state: &RegisteredData) -> Result<(), SledStoreError> { self.insert(SLED_TREE_STATE, SLED_KEY_REGISTRATION, state)?; Ok(()) } diff --git a/presage/src/lib.rs b/presage/src/lib.rs index 753aa5516..e9db4ff30 100644 --- a/presage/src/lib.rs +++ b/presage/src/lib.rs @@ -6,7 +6,8 @@ mod store; pub use errors::Error; pub use manager::{ - Confirmation, Linking, Manager, ReceivingMode, Registered, Registration, RegistrationOptions, + Confirmation, Linking, Manager, ReceivingMode, Registered, RegisteredData, Registration, + RegistrationOptions, }; pub use store::{ContentTimestamp, Store, StoreError, Thread}; diff --git a/presage/src/manager.rs b/presage/src/manager.rs index 548d58113..b63aeb053 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -24,10 +24,7 @@ use libsignal_service::{ groups_v2::{decrypt_group, Group, GroupsManager, InMemoryCredentialsCache}, messagepipe::ServiceCredentials, models::Contact, - prelude::{ - phonenumber::{country::Id::DE, PhoneNumber}, - Content, ProfileKey, PushService, Uuid, - }, + prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid}, proto::{data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage}, protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate}, provisioning::{generate_registration_id, LinkingManager, SecondaryDeviceProvisioning}, @@ -150,8 +147,19 @@ pub enum ReceivingMode { } impl Registered { + fn with_data(inner: RegisteredData) -> Self { + Self { + identified_push_service_cache: CacheCell::default(), + unidentified_push_service_cache: CacheCell::default(), + identified_websocket: Default::default(), + message_sender: Default::default(), + unidentified_sender_certificate: Default::default(), + inner, + } + } + pub fn device_id(&self) -> u32 { - self.device_id.unwrap_or(DEFAULT_DEVICE_ID) + self.inner.device_id.unwrap_or(DEFAULT_DEVICE_ID) } } @@ -360,7 +368,7 @@ impl Manager { }) = rx.next().await { log::info!("successfully registered device {}", &service_ids); - Ok(Registered { + Ok(Registered::with_data(RegisteredData { signal_servers, device_name: Some(device_name), phone_number, @@ -377,7 +385,7 @@ impl Manager { profile_key: ProfileKey::create( profile_key.try_into().expect("32 bytes for profile key"), ), - }) + })) } else { Err(Error::NoProvisioningMessageReceived) } @@ -393,7 +401,7 @@ impl Manager { state: registration?, }; - manager.config_store.save_state(&manager.state)?; + manager.config_store.save_state(&manager.state.inner)?; match ( manager.set_account_attributes().await, @@ -507,35 +515,28 @@ impl Manager { let mut manager = Manager { rng, config_store: self.config_store, - state: Registered { - identified_push_service_cache: CacheCell::default(), - unidentified_push_service_cache: CacheCell::default(), - identified_websocket: Default::default(), - message_sender: Default::default(), - unidentified_sender_certificate: Default::default(), - inner: RegisteredData { - signal_servers: self.state.signal_servers, - device_name: None, - phone_number, - service_ids: ServiceIds { - aci: registered.uuid, - pni: registered.pni, - }, - password, - signaling_key, - device_id: None, - registration_id, - pni_registration_id: Some(pni_registration_id), - aci_private_key: aci_identity_key_pair.private_key, - aci_public_key: aci_identity_key_pair.public_key, - pni_private_key: Some(pni_identity_key_pair.private_key), - pni_public_key: Some(pni_identity_key_pair.public_key), - profile_key, + state: Registered::with_data(RegisteredData { + signal_servers: self.state.signal_servers, + device_name: None, + phone_number, + service_ids: ServiceIds { + aci: registered.uuid, + pni: registered.pni, }, - }, + password, + signaling_key, + device_id: None, + registration_id, + pni_registration_id: Some(pni_registration_id), + aci_private_key: aci_identity_key_pair.private_key, + aci_public_key: aci_identity_key_pair.public_key, + pni_private_key: Some(pni_identity_key_pair.private_key), + pni_public_key: Some(pni_identity_key_pair.public_key), + profile_key, + }), }; - manager.config_store.save_state(&manager.state)?; + manager.config_store.save_state(&manager.state.inner)?; if let Err(e) = manager.register_pre_keys().await { // clear the entire store on any error, there's no possible recovery here @@ -587,7 +588,7 @@ impl Manager> { trace!("registering pre keys"); let mut account_manager = AccountManager::new( self.identified_push_service()?, - Some(self.state.profile_key), + Some(self.state.inner.profile_key), ); let (pre_keys_offset_id, next_signed_pre_key_id, next_pq_pre_key_id) = account_manager @@ -616,24 +617,24 @@ impl Manager> { trace!("setting account attributes"); let mut account_manager = AccountManager::new( self.identified_push_service()?, - Some(self.state.profile_key), + Some(self.state.inner.profile_key), ); - let pni_registration_id = if let Some(pni_registration_id) = self.state.pni_registration_id - { - pni_registration_id - } else { - info!("migrating to PNI"); - let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); - self.state.pni_registration_id = Some(pni_registration_id); - self.config_store.save_state(&self.state)?; - pni_registration_id - }; + let pni_registration_id = + if let Some(pni_registration_id) = self.state.inner.pni_registration_id { + pni_registration_id + } else { + info!("migrating to PNI"); + let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); + self.state.inner.pni_registration_id = Some(pni_registration_id); + self.config_store.save_state(&self.state.inner)?; + pni_registration_id + }; account_manager .set_account_attributes(AccountAttributes { - name: self.state.device_name.clone(), - registration_id: self.state.registration_id, + name: self.state.inner.device_name.clone(), + registration_id: self.state.inner.registration_id, pni_registration_id, signaling_key: None, voice: false, @@ -641,7 +642,9 @@ impl Manager> { fetches_messages: true, pin: None, registration_lock: None, - unidentified_access_key: Some(self.state.profile_key.derive_access_key().to_vec()), + unidentified_access_key: Some( + self.state.inner.profile_key.derive_access_key().to_vec(), + ), unrestricted_unidentified_access: false, discoverable_by_phone_number: true, capabilities: DeviceCapabilities { @@ -652,11 +655,11 @@ impl Manager> { }) .await?; - if self.state.pni_registration_id.is_none() { + if self.state.inner.pni_registration_id.is_none() { debug!("fetching PNI UUID and updating state"); let whoami = self.whoami().await?; - self.state.service_ids.pni = whoami.pni; - self.config_store.save_state(&self.state)?; + self.state.inner.service_ids.pni = whoami.pni; + self.config_store.save_state(&self.state.inner)?; } trace!("done setting account attributes"); @@ -686,24 +689,33 @@ impl Manager> { /// processed when they're received using the `MessageReceiver`. pub async fn request_contacts_sync(&mut self) -> Result<(), Error> { trace!("requesting contacts sync"); - self.send_message(self.state.service_ids.aci, SyncMessage::request_contacts()) - .await?; + self.send_message( + self.state.inner.service_ids.aci, + SyncMessage::request_contacts(), + ) + .await?; Ok(()) } async fn request_keys_sync(&mut self) -> Result<(), Error> { trace!("requesting keys sync"); - self.send_message(self.state.service_ids.aci, SyncMessage::request_keys()) - .await?; + self.send_message( + self.state.inner.service_ids.aci, + SyncMessage::request_keys(), + ) + .await?; Ok(()) } async fn request_block_sync(&mut self) -> Result<(), Error> { trace!("requesting blocked sync"); - self.send_message(self.state.service_ids.aci, SyncMessage::request_blocked()) - .await?; + self.send_message( + self.state.inner.service_ids.aci, + SyncMessage::request_blocked(), + ) + .await?; Ok(()) } @@ -758,7 +770,7 @@ impl Manager> { /// Returns a handle on the registered state pub fn state(&self) -> &RegisteredData { - &self.state + &self.state.inner } /// Fetches basic information on the registered device. @@ -768,8 +780,11 @@ impl Manager> { /// Fetches the profile (name, about, status emoji) of the registered user. pub async fn retrieve_profile(&mut self) -> Result> { - self.retrieve_profile_by_uuid(self.state.service_ids.aci, self.state.profile_key) - .await + self.retrieve_profile_by_uuid( + self.state.inner.service_ids.aci, + self.state.inner.profile_key, + ) + .await } /// Fetches the profile of the provided user by UUID and profile key. @@ -861,12 +876,12 @@ impl Manager> { fn groups_manager( &self, ) -> Result, Error> { - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); + let service_configuration: ServiceConfiguration = self.state.inner.signal_servers.into(); let server_public_params = service_configuration.zkgroup_server_public_params; let groups_credentials_cache = InMemoryCredentialsCache::default(); let groups_manager = GroupsManager::new( - self.state.service_ids.clone(), + self.state.inner.service_ids.clone(), self.identified_push_service()?, groups_credentials_cache, server_public_params, @@ -1068,7 +1083,7 @@ impl Manager> { // save the message let content = Content { metadata: Metadata { - sender: self.state.service_ids.aci.into(), + sender: self.state.inner.service_ids.aci.into(), sender_device: self.state.device_id(), timestamp, needs_receipt: false, @@ -1147,7 +1162,7 @@ impl Manager> { for member in group .members .into_iter() - .filter(|m| m.uuid != self.state.service_ids.aci) + .filter(|m| m.uuid != self.state.inner.service_ids.aci) { let unidentified_access = self.config_store @@ -1169,7 +1184,7 @@ impl Manager> { let content = Content { metadata: Metadata { - sender: self.state.service_ids.aci.into(), + sender: self.state.inner.service_ids.aci.into(), sender_device: self.state.device_id(), timestamp, needs_receipt: false, // TODO: this is just wrong @@ -1226,11 +1241,11 @@ impl Manager> { fn credentials(&self) -> Result> { Ok(ServiceCredentials { - uuid: Some(self.state.service_ids.aci), - phonenumber: self.state.phone_number.clone(), - password: Some(self.state.password.clone()), - signaling_key: Some(self.state.signaling_key), - device_id: self.state.device_id, + uuid: Some(self.state.inner.service_ids.aci), + phonenumber: self.state.inner.phone_number.clone(), + password: Some(self.state.inner.password.clone()), + signaling_key: Some(self.state.inner.signaling_key), + device_id: self.state.inner.device_id, }) } @@ -1238,7 +1253,8 @@ impl Manager> { fn identified_push_service(&self) -> Result> { self.state.identified_push_service_cache.get(|| { let credentials = self.credentials()?; - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); + let service_configuration: ServiceConfiguration = + self.state.inner.signal_servers.into(); Ok(HyperPushService::new( service_configuration, Some(credentials), @@ -1250,7 +1266,8 @@ impl Manager> { /// Create or return a clone of a cached unidentified (no credentials) push service. fn unidentified_push_service(&self) -> Result> { self.state.unidentified_push_service_cache.get(|| { - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); + let service_configuration: ServiceConfiguration = + self.state.inner.signal_servers.into(); Ok(HyperPushService::new( service_configuration, None, @@ -1281,7 +1298,7 @@ impl Manager> { } let local_addr = ServiceAddress { - uuid: self.state.service_ids.aci, + uuid: self.state.inner.service_ids.aci, }; let unidentified_ws: SignalWebSocket = self @@ -1297,7 +1314,11 @@ impl Manager> { self.rng.clone(), self.config_store.clone(), local_addr, - self.state.device_id.unwrap_or(DEFAULT_DEVICE_ID).into(), + self.state + .inner + .device_id + .unwrap_or(DEFAULT_DEVICE_ID) + .into(), ); self.state @@ -1310,13 +1331,13 @@ impl Manager> { /// Creates a new service cipher. fn new_service_cipher(&self) -> Result, Error> { - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); + let service_configuration: ServiceConfiguration = self.state.inner.signal_servers.into(); let service_cipher = ServiceCipher::new( self.config_store.clone(), self.rng.clone(), service_configuration.unidentified_sender_trust_root, - self.state.service_ids.aci, - self.state.device_id.unwrap_or(DEFAULT_DEVICE_ID), + self.state.inner.service_ids.aci, + self.state.inner.device_id.unwrap_or(DEFAULT_DEVICE_ID), ); Ok(service_cipher) diff --git a/presage/src/store.rs b/presage/src/store.rs index 210b9795a..7e957cc49 100644 --- a/presage/src/store.rs +++ b/presage/src/store.rs @@ -1,9 +1,6 @@ use std::{fmt, ops::RangeBounds}; -use crate::{ - manager::{Registered, RegisteredData}, - GroupMasterKeyBytes, -}; +use crate::{manager::RegisteredData, GroupMasterKeyBytes}; use libsignal_service::{ content::ContentBody, groups_v2::Group, From dee04255909dbf5c7aef6484a0fee33eb312d701 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Tue, 14 Nov 2023 10:40:32 +0100 Subject: [PATCH 13/13] WIP --- presage-cli/Cargo.toml | 1 + presage-cli/src/main.rs | 79 ++++++++++++++++----- presage/src/manager.rs | 152 ++++++++++++++++++++-------------------- 3 files changed, 137 insertions(+), 95 deletions(-) diff --git a/presage-cli/Cargo.toml b/presage-cli/Cargo.toml index 10fbcb189..278ea120b 100644 --- a/presage-cli/Cargo.toml +++ b/presage-cli/Cargo.toml @@ -9,6 +9,7 @@ presage = { path = "../presage" } presage-store-sled = { path = "../presage-store-sled" } anyhow = "1.0" +axum = "0.6" base64 = "0.12" chrono = { version = "0.4", default-features = false, features = ["serde", "clock"] } clap = { version = ">=4.2.4", features = ["derive"] } diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index a09e04adb..8f7d2ea1c 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -1,11 +1,13 @@ use core::fmt; use std::convert::TryInto; -use std::path::Path; use std::path::PathBuf; use std::time::Duration; use std::time::UNIX_EPOCH; use anyhow::{anyhow, bail, Context as _}; +use axum::extract::Path; +use axum::routing::post; +use axum::Router; use chrono::Local; use clap::{ArgGroup, Parser, Subcommand}; use directories::ProjectDirs; @@ -119,6 +121,9 @@ enum Cmd { Receive { #[clap(long = "notifications", short = 'n')] notifications: bool, + /// Start a webserver to be able to send messages, useful for testing + #[clap(long)] + webserver: bool, }, #[clap(about = "List groups")] ListGroups, @@ -181,7 +186,7 @@ fn parse_group_master_key(value: &str) -> anyhow::Result { .map_err(|_| anyhow::format_err!("master key should be 32 bytes long")) } -#[tokio::main(flavor = "multi_thread")] +#[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { env_logger::from_env( Env::default().default_filter_or(format!("{}=warn", env!("CARGO_PKG_NAME"))), @@ -232,11 +237,30 @@ async fn send( Ok(()) } +async fn process_incoming_messages( + mut manager: Manager>, + attachments_tmp_dir: &std::path::Path, + notifications: bool, +) -> anyhow::Result<()> { + let messages = manager + .receive_messages(ReceivingMode::Forever) + .await + .context("failed to initialize messages stream")?; + + pin_mut!(messages); + + while let Some(content) = messages.next().await { + process_incoming_message(&mut manager, attachments_tmp_dir, notifications, &content).await; + } + + Ok(()) +} + // Note to developers, this is a good example of a function you can use as a source of inspiration // to process incoming messages. async fn process_incoming_message( manager: &mut Manager>, - attachments_tmp_dir: &Path, + attachments_tmp_dir: &std::path::Path, notifications: bool, content: &Content, ) { @@ -423,8 +447,9 @@ fn print_message( } async fn receive( - manager: &mut Manager>, + manager: Manager>, notifications: bool, + webserver: bool, ) -> anyhow::Result<()> { let attachments_tmp_dir = Builder::new().prefix("presage-attachments").tempdir()?; info!( @@ -432,20 +457,35 @@ async fn receive( attachments_tmp_dir.path().display() ); - let messages = manager - .receive_messages(ReceivingMode::Forever) - .await - .context("failed to initialize messages stream")?; - pin_mut!(messages); + if webserver { + let app = Router::new() + .with_state(manager) + .route("/message", post(web_send_message)); - while let Some(content) = messages.next().await { - process_incoming_message(manager, attachments_tmp_dir.path(), notifications, &content) - .await; + // run our app with hyper, listening globally on port 3000 + let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); + let webserver = + axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()).serve(app.into_make_service()); + + future::join( + webserver, + process_incoming_messages(manager, attachments_tmp_dir.path(), notifications), + ) + .await; + } else { + process_incoming_messages(manager, attachments_tmp_dir.path(), notifications).await; } Ok(()) } +async fn web_send_message( + manager: Manager>, + Path(recipient): Path, + Path(message): Path, +) { +} + async fn run(subcommand: Cmd, config_store: C) -> anyhow::Result<()> { match subcommand { Cmd::Register { @@ -499,7 +539,7 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re .await; match manager { - (Ok(manager), _) => { + (Ok(mut manager), _) => { let uuid = manager.whoami().await.unwrap().uuid; println!("{uuid:?}"); } @@ -508,9 +548,12 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re } } } - Cmd::Receive { notifications } => { - let mut manager = Manager::load_registered(config_store).await?; - receive(&mut manager, notifications).await?; + Cmd::Receive { + notifications, + webserver, + } => { + let manager = Manager::load_registered(config_store).await?; + receive(manager, notifications, webserver).await?; } Cmd::Send { uuid, message } => { let mut manager = Manager::load_registered(config_store).await?; @@ -622,8 +665,8 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re } } Cmd::Whoami => { - let manager = Manager::load_registered(config_store).await?; - println!("{:?}", &manager.whoami().await?); + let mut manager = Manager::load_registered(config_store).await?; + println!("{:?}", manager.whoami().await?); } Cmd::GetContact { ref uuid } => { let manager = Manager::load_registered(config_store).await?; diff --git a/presage/src/manager.rs b/presage/src/manager.rs index b63aeb053..1b32d6f91 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -10,7 +10,7 @@ use log::{debug, error, info, trace, warn}; use parking_lot::Mutex; use rand::{ distributions::{Alphanumeric, DistString}, - rngs::StdRng, + rngs::{StdRng, ThreadRng}, RngCore, SeedableRng, }; use serde::{Deserialize, Serialize}; @@ -49,7 +49,6 @@ use libsignal_service::{ }; use libsignal_service_hyper::push_service::HyperPushService; -use crate::cache::CacheCell; use crate::{serde::serde_profile_key, Thread}; use crate::{store::Store, Error}; @@ -62,8 +61,6 @@ pub struct Manager { config_store: Store, /// Part of the manager which is persisted in the store. state: State, - /// Random number generator - rng: StdRng, } impl fmt::Debug for Manager { @@ -95,8 +92,8 @@ pub struct Confirmation { #[derive(Clone)] pub struct Registered { - identified_push_service_cache: CacheCell, - unidentified_push_service_cache: CacheCell, + identified_push_service: HyperPushService, + unidentified_push_service: HyperPushService, identified_websocket: Arc>>, message_sender: Arc>>>, unidentified_sender_certificate: Option, @@ -139,6 +136,33 @@ impl fmt::Debug for Registered { } } +impl RegisteredData { + fn credentials(&self) -> ServiceCredentials { + ServiceCredentials { + uuid: Some(self.service_ids.aci), + phonenumber: self.phone_number.clone(), + password: Some(self.password.clone()), + signaling_key: Some(self.signaling_key), + device_id: self.device_id, + } + } + + fn identified_push_service(&self) -> HyperPushService { + let credentials = self.credentials(); + let service_configuration: ServiceConfiguration = self.signal_servers.into(); + HyperPushService::new( + service_configuration, + Some(credentials), + crate::USER_AGENT.to_string(), + ) + } + + fn unidentified_push_service(&self) -> HyperPushService { + let service_configuration: ServiceConfiguration = self.signal_servers.into(); + HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()) + } +} + #[derive(PartialEq, Eq)] pub enum ReceivingMode { InitialSync, @@ -149,8 +173,8 @@ pub enum ReceivingMode { impl Registered { fn with_data(inner: RegisteredData) -> Self { Self { - identified_push_service_cache: CacheCell::default(), - unidentified_push_service_cache: CacheCell::default(), + identified_push_service: inner.identified_push_service(), + unidentified_push_service: inner.unidentified_push_service(), identified_websocket: Default::default(), message_sender: Default::default(), unidentified_sender_certificate: Default::default(), @@ -219,7 +243,7 @@ impl Manager { config_store.clear_registration()?; // generate a random alphanumeric 24 chars password - let mut rng = StdRng::from_entropy(); + let mut rng = rand::thread_rng(); let password = Alphanumeric.sample_string(&mut rng, 24); let service_configuration: ServiceConfiguration = signal_servers.into(); @@ -326,7 +350,7 @@ impl Manager { config_store.clear_registration()?; // generate a random alphanumeric 24 chars password - let mut rng = StdRng::from_entropy(); + let mut rng = rand::thread_rng(); let password = Alphanumeric.sample_string(&mut rng, 24); // generate a 52 bytes signaling key @@ -434,8 +458,8 @@ impl Manager { ) -> Result>, Error> { trace!("confirming verification code"); - let registration_id = generate_registration_id(&mut StdRng::from_entropy()); - let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); + let registration_id = generate_registration_id(&mut rand::thread_rng()); + let pni_registration_id = generate_registration_id(&mut rand::thread_rng()); let Confirmation { signal_servers, @@ -469,7 +493,7 @@ impl Manager { return Err(Error::UnverifiedRegistrationSession); } - let mut rng = StdRng::from_entropy(); + let mut rng = rand::thread_rng(); // generate a 52 bytes signaling key let mut signaling_key = [0u8; 52]; @@ -556,13 +580,14 @@ impl Manager> { let inner = config_store .load_state()? .ok_or(Error::NotYetRegisteredError)?; + let identified_push_service = inner.identified_push_service(); + let unidentified_push_service = inner.unidentified_push_service(); let mut manager = Self { - rng: StdRng::from_entropy(), config_store, state: Registered { inner, - identified_push_service_cache: Default::default(), - unidentified_push_service_cache: Default::default(), + identified_push_service, + unidentified_push_service, identified_websocket: Default::default(), message_sender: Default::default(), unidentified_sender_certificate: Default::default(), @@ -573,10 +598,11 @@ impl Manager> { manager.set_account_attributes().await?; } - let credentials = manager.credentials()?; + let credentials = manager.state.inner.credentials(); manager.state.identified_websocket.lock().replace( manager - .identified_push_service()? + .state + .identified_push_service .ws("/v1/websocket/", &[], Some(credentials), true) .await?, ); @@ -587,7 +613,7 @@ impl Manager> { async fn register_pre_keys(&mut self) -> Result<(), Error> { trace!("registering pre keys"); let mut account_manager = AccountManager::new( - self.identified_push_service()?, + self.state.identified_push_service.clone(), Some(self.state.inner.profile_key), ); @@ -616,7 +642,7 @@ impl Manager> { async fn set_account_attributes(&mut self) -> Result<(), Error> { trace!("setting account attributes"); let mut account_manager = AccountManager::new( - self.identified_push_service()?, + self.state.identified_push_service.clone(), Some(self.state.inner.profile_key), ); @@ -625,7 +651,7 @@ impl Manager> { pni_registration_id } else { info!("migrating to PNI"); - let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); + let pni_registration_id = generate_registration_id(&mut rand::thread_rng()); self.state.inner.pni_registration_id = Some(pni_registration_id); self.config_store.save_state(&self.state.inner)?; pni_registration_id @@ -667,13 +693,16 @@ impl Manager> { } async fn request_initial_sync(&mut self) -> Result<(), Error> { - self.request_keys_sync().await?; - self.request_block_sync().await?; - self.request_contacts_sync().await?; + // self.request_keys_sync().await?; + // // here, wait for answer first? + // self.request_block_sync().await?; + + // self.request_contacts_sync().await?; let messages = self .receive_messages_stream(ReceivingMode::WaitForContacts) .await?; + pin_mut!(messages); while let Ok(Some(_msg)) = tokio::time::timeout(Duration::from_secs(60), messages.next()).await @@ -740,7 +769,8 @@ impl Manager> { if needs_renewal(self.state.unidentified_sender_certificate.as_ref()) { let sender_certificate = self - .identified_push_service()? + .state + .identified_push_service .get_uuid_only_sender_certificate() .await?; @@ -761,7 +791,8 @@ impl Manager> { token: &str, captcha: &str, ) -> Result<(), Error> { - let mut account_manager = AccountManager::new(self.identified_push_service()?, None); + let mut account_manager = + AccountManager::new(self.state.identified_push_service.clone(), None); account_manager .submit_recaptcha_challenge(token, captcha) .await?; @@ -774,8 +805,8 @@ impl Manager> { } /// Fetches basic information on the registered device. - pub async fn whoami(&self) -> Result> { - Ok(self.identified_push_service()?.whoami().await?) + pub async fn whoami(&mut self) -> Result> { + Ok(self.state.identified_push_service.whoami().await?) } /// Fetches the profile (name, about, status emoji) of the registered user. @@ -798,8 +829,10 @@ impl Manager> { return Ok(profile); } - let mut account_manager = - AccountManager::new(self.identified_push_service()?, Some(profile_key)); + let mut account_manager = AccountManager::new( + self.state.identified_push_service.clone(), + Some(profile_key), + ); let profile = account_manager.retrieve_profile(uuid.into()).await?; @@ -855,7 +888,7 @@ impl Manager> { async fn receive_messages_encrypted( &mut self, ) -> Result>, Error> { - let credentials: ServiceCredentials = self.credentials()?; + let credentials: ServiceCredentials = self.state.inner.credentials(); let identified_ws = self.identified_websocket().await?; Ok(MessagePipe::from_socket(identified_ws, credentials).stream()) } @@ -882,7 +915,7 @@ impl Manager> { let groups_credentials_cache = InMemoryCredentialsCache::default(); let groups_manager = GroupsManager::new( self.state.inner.service_ids.clone(), - self.identified_push_service()?, + self.state.identified_push_service.clone(), groups_credentials_cache, server_public_params, ); @@ -905,7 +938,7 @@ impl Manager> { let init = StreamState { encrypted_messages: Box::pin(self.receive_messages_encrypted().await?), - message_receiver: MessageReceiver::new(self.identified_push_service()?), + message_receiver: MessageReceiver::new(self.state.identified_push_service.clone()), service_cipher: self.new_service_cipher()?, config_store: self.config_store.clone(), groups_manager: self.groups_manager()?, @@ -1209,7 +1242,7 @@ impl Manager> { &self, attachment_pointer: &AttachmentPointer, ) -> Result, Error> { - let mut service = self.identified_push_service()?; + let mut service = self.state.identified_push_service.clone(); let mut attachment_stream = service.get_attachment(attachment_pointer).await?; // We need the whole file for the crypto to check out @@ -1239,51 +1272,15 @@ impl Manager> { Ok(()) } - fn credentials(&self) -> Result> { - Ok(ServiceCredentials { - uuid: Some(self.state.inner.service_ids.aci), - phonenumber: self.state.inner.phone_number.clone(), - password: Some(self.state.inner.password.clone()), - signaling_key: Some(self.state.inner.signaling_key), - device_id: self.state.inner.device_id, - }) - } - - /// Create or return a clone of a cached identified (with credentials) push service. - fn identified_push_service(&self) -> Result> { - self.state.identified_push_service_cache.get(|| { - let credentials = self.credentials()?; - let service_configuration: ServiceConfiguration = - self.state.inner.signal_servers.into(); - Ok(HyperPushService::new( - service_configuration, - Some(credentials), - crate::USER_AGENT.to_string(), - )) - }) - } - - /// Create or return a clone of a cached unidentified (no credentials) push service. - fn unidentified_push_service(&self) -> Result> { - self.state.unidentified_push_service_cache.get(|| { - let service_configuration: ServiceConfiguration = - self.state.inner.signal_servers.into(); - Ok(HyperPushService::new( - service_configuration, - None, - crate::USER_AGENT.to_string(), - )) - }) - } - async fn identified_websocket(&mut self) -> Result> { let mut lock = self.state.identified_websocket.lock(); if let Some(identified_ws) = lock.as_ref() { Ok(identified_ws.clone()) } else { - let credentials = self.credentials()?; + let credentials = self.state.inner.credentials(); let ws = self - .identified_push_service()? + .state + .identified_push_service .ws("/v1/websocket/", &[], Some(credentials), true) .await?; lock.replace(ws.clone()); @@ -1302,16 +1299,17 @@ impl Manager> { }; let unidentified_ws: SignalWebSocket = self - .unidentified_push_service()? + .state + .unidentified_push_service .ws("/v1/websocket/", &[], None, true) .await?; let message_sender = MessageSender::new( self.identified_websocket().await?, unidentified_ws, - self.identified_push_service()?, + self.state.identified_push_service.clone(), self.new_service_cipher()?, - self.rng.clone(), + StdRng::from_entropy(), self.config_store.clone(), local_addr, self.state @@ -1334,7 +1332,7 @@ impl Manager> { let service_configuration: ServiceConfiguration = self.state.inner.signal_servers.into(); let service_cipher = ServiceCipher::new( self.config_store.clone(), - self.rng.clone(), + StdRng::from_entropy(), service_configuration.unidentified_sender_trust_root, self.state.inner.service_ids.aci, self.state.inner.device_id.unwrap_or(DEFAULT_DEVICE_ID),