From e4fc21e9fceef60cc149c362857fed952407142c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Thu, 1 Jun 2023 00:54:18 +0200 Subject: [PATCH 1/5] Initialize websockets in Manager and enable keep-alive on unidentified websocket --- presage-cli/src/main.rs | 29 ++++----- presage/src/manager.rs | 132 +++++++++++++++++++++++----------------- 2 files changed, 86 insertions(+), 75 deletions(-) diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index 222f18eac..180555282 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -220,25 +220,18 @@ async fn send( let local = task::LocalSet::new(); - local - .run_until(async move { - let mut receiving_manager = manager.clone(); - task::spawn_local(async move { - if let Err(e) = receive(&mut receiving_manager, false).await { - error!("error while receiving stuff: {e}"); - } - }); - - sleep(Duration::from_secs(5)).await; - - manager - .send_message(*uuid, message, timestamp) - .await - .unwrap(); + local.run_until(async move { + let mut receiving_manager = manager.clone(); + task::spawn_local(async move { + if let Err(e) = receive(&mut receiving_manager, false).await { + error!("error while receiving stuff: {e}"); + } + }); - sleep(Duration::from_secs(5)).await; - }) - .await; + if let Err(error) = manager.send_message(*uuid, message, timestamp).await { + error!("failed to send message: {error}"); + } + }).await; Ok(()) } diff --git a/presage/src/manager.rs b/presage/src/manager.rs index d425e28bd..fa3fd1029 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -1,13 +1,12 @@ use std::{ + cell::{RefCell}, fmt, ops::RangeBounds, - sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; use futures::{channel::mpsc, channel::oneshot, future, pin_mut, AsyncReadExt, Stream, StreamExt}; use log::{debug, error, info, trace, warn}; -use parking_lot::Mutex; use rand::{distributions::Alphanumeric, rngs::StdRng, Rng, RngCore, SeedableRng}; use serde::{Deserialize, Serialize}; use url::Url; @@ -18,7 +17,7 @@ use libsignal_service::{ configuration::{ServiceConfiguration, SignalServers, SignalingKey}, content::{ContentBody, DataMessage, DataMessageFlags, Metadata, SyncMessage}, groups_v2::{Group, GroupsManager, InMemoryCredentialsCache}, - messagepipe::ServiceCredentials, + messagepipe::{MessagePipe, ServiceCredentials}, models::Contact, prelude::{ phonenumber::PhoneNumber, @@ -94,11 +93,13 @@ pub struct Confirmation { #[derive(Clone, Serialize, Deserialize)] pub struct Registered { #[serde(skip)] - push_service_cache: CacheCell, + identified_push_service: CacheCell, #[serde(skip)] - identified_websocket: Arc>>, + unidentified_push_service: CacheCell, #[serde(skip)] - unidentified_websocket: Arc>>, + identified_websocket: RefCell>, + #[serde(skip)] + unidentified_websocket: RefCell>, #[serde(skip)] unidentified_sender_certificate: Option, @@ -129,7 +130,8 @@ pub struct Registered { impl fmt::Debug for Registered { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Registered") - .field("websocket", &self.identified_websocket.lock().is_some()) + .field("signal_servers", &self.signal_servers) + .field("phone_number", &self.phone_number) .finish_non_exhaustive() } } @@ -318,7 +320,8 @@ impl Manager { { log::info!("successfully registered device {}", &service_ids); Ok(Registered { - push_service_cache: CacheCell::default(), + identified_push_service: CacheCell::default(), + unidentified_push_service: CacheCell::default(), identified_websocket: Default::default(), unidentified_websocket: Default::default(), unidentified_sender_certificate: Default::default(), @@ -460,7 +463,8 @@ impl Manager { rng, config_store: self.config_store, state: Registered { - push_service_cache: CacheCell::default(), + identified_push_service: CacheCell::default(), + unidentified_push_service: CacheCell::default(), identified_websocket: Default::default(), unidentified_websocket: Default::default(), unidentified_sender_certificate: Default::default(), @@ -520,8 +524,10 @@ impl Manager { async fn register_pre_keys(&mut self) -> Result<(), Error> { trace!("registering pre keys"); - let mut account_manager = - AccountManager::new(self.push_service()?, Some(self.state.profile_key)); + let mut account_manager = AccountManager::new( + self.identified_push_service()?, + Some(self.state.profile_key), + ); let (pre_keys_offset_id, next_signed_pre_key_id) = account_manager .update_pre_key_bundle( @@ -544,8 +550,10 @@ impl Manager { async fn set_account_attributes(&mut self) -> Result<(), Error> { trace!("setting account attributes"); - let mut account_manager = - AccountManager::new(self.push_service()?, Some(self.state.profile_key)); + let mut account_manager = AccountManager::new( + self.identified_push_service()?, + Some(self.state.profile_key), + ); let pni_registration_id = if let Some(pni_registration_id) = self.state.pni_registration_id { @@ -595,7 +603,7 @@ impl Manager { &mut self, mut messages: impl Stream + Unpin, ) -> Result<(), Error> { - let mut message_receiver = MessageReceiver::new(self.push_service()?); + let mut message_receiver = MessageReceiver::new(self.identified_push_service()?); while let Some(Content { body, .. }) = messages.next().await { if let ContentBody::SynchronizeMessage(SyncMessage { contacts: Some(contacts), @@ -677,7 +685,7 @@ impl Manager { if needs_renewal(self.state.unidentified_sender_certificate.as_ref()) { let sender_certificate = self - .push_service()? + .identified_push_service()? .get_uuid_only_sender_certificate() .await?; @@ -698,7 +706,7 @@ impl Manager { token: &str, captcha: &str, ) -> Result<(), Error> { - let mut account_manager = AccountManager::new(self.push_service()?, None); + let mut account_manager = AccountManager::new(self.identified_push_service()?, None); account_manager .submit_recaptcha_challenge(token, captcha) .await?; @@ -712,7 +720,7 @@ impl Manager { /// Fetches basic information on the registered device. pub async fn whoami(&self) -> Result> { - Ok(self.push_service()?.whoami().await?) + Ok(self.identified_push_service()?.whoami().await?) } /// Fetches the profile (name, about, status emoji) of the registered user. @@ -732,7 +740,8 @@ impl Manager { return Ok(profile); } - let mut account_manager = AccountManager::new(self.push_service()?, Some(profile_key)); + let mut account_manager = + AccountManager::new(self.identified_push_service()?, Some(profile_key)); let profile = account_manager.retrieve_profile(uuid.into()).await?; @@ -789,22 +798,8 @@ impl Manager { &mut self, ) -> Result>, Error> { let credentials = self.credentials()?.ok_or(Error::NotYetRegisteredError)?; - let pipe = MessageReceiver::new(self.push_service()?) - .create_message_pipe(credentials) - .await?; - - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - let mut unidentified_push_service = - HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); - let unidentified_ws = unidentified_push_service - .ws("/v1/websocket/", None, false) - .await?; - self.state.identified_websocket.lock().replace(pipe.ws()); - self.state - .unidentified_websocket - .lock() - .replace(unidentified_ws); - + let ws = self.identified_websocket().await?; + let pipe = MessagePipe::from_socket(ws, credentials); Ok(pipe.stream()) } @@ -826,7 +821,7 @@ impl Manager { let groups_credentials_cache = InMemoryCredentialsCache::default(); let groups_manager = GroupsManager::new( self.state.service_ids.clone(), - self.push_service()?, + self.identified_push_service()?, groups_credentials_cache, server_public_params, ); @@ -1069,7 +1064,7 @@ impl Manager { &self, attachment_pointer: &AttachmentPointer, ) -> Result, Error> { - let mut service = self.push_service()?; + let mut service = self.identified_push_service()?; let mut attachment_stream = service.get_attachment(attachment_pointer).await?; // We need the whole file for the crypto to check out @@ -1110,14 +1105,13 @@ impl Manager { })) } - /// Returns a clone of a cached push service. + /// Return a clone of a cached push service. /// /// If no service is yet cached, it will create and cache one. - fn push_service(&self) -> Result> { - self.state.push_service_cache.get(|| { + fn identified_push_service(&self) -> Result> { + self.state.identified_push_service.get(|| { let credentials = self.credentials()?; let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - Ok(HyperPushService::new( service_configuration, credentials, @@ -1126,30 +1120,54 @@ impl Manager { }) } + /// Return a clone of a cached _unidentified_ push service. + fn unidentified_push_service(&self) -> Result> { + self.state.unidentified_push_service.get(|| { + let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); + Ok(HyperPushService::new( + service_configuration, + None, + crate::USER_AGENT.to_string(), + )) + }) + } + + async fn identified_websocket(&self) -> Result> { + if let Some(ws) = self.state.identified_websocket.borrow().as_ref() { + return Ok(ws.clone()); + } + + let ws = self + .identified_push_service()? + .ws("/v1/websocket/", self.credentials()?, true) + .await?; + self.state.identified_websocket.replace(Some(ws.clone())); + Ok(ws) + } + + async fn unidentified_websocket(&self) -> Result> { + if let Some(ws) = self.state.unidentified_websocket.borrow().as_ref() { + Ok(ws.clone()) + } else { + let ws = self + .unidentified_push_service()? + .ws("/v1/websocket/", None, true) + .await?; + self.state.identified_websocket.replace(Some(ws.clone())); + Ok(ws) + } + } + /// Creates a new message sender. async fn new_message_sender(&self) -> Result, Error> { let local_addr = ServiceAddress { uuid: self.state.service_ids.aci, }; - let identified_websocket = self - .state - .identified_websocket - .lock() - .clone() - .ok_or(Error::MessagePipeNotStarted)?; - - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - let mut unidentified_push_service = - HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); - let unidentified_websocket = unidentified_push_service - .ws("/v1/websocket/", None, false) - .await?; - Ok(MessageSender::new( - identified_websocket, - unidentified_websocket, - self.push_service()?, + self.identified_websocket().await?, + self.unidentified_websocket().await?, + self.identified_push_service()?, self.new_service_cipher()?, self.rng.clone(), self.config_store.clone(), From 919da9923ebc3042ed05b0937ced7a07483be70c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Thu, 1 Jun 2023 00:58:55 +0200 Subject: [PATCH 2/5] Format --- presage-cli/src/main.rs | 24 +++++++++++++----------- presage/src/manager.rs | 2 +- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index 180555282..211efbb42 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -220,18 +220,20 @@ async fn send( let local = task::LocalSet::new(); - local.run_until(async move { - let mut receiving_manager = manager.clone(); - task::spawn_local(async move { - if let Err(e) = receive(&mut receiving_manager, false).await { - error!("error while receiving stuff: {e}"); - } - }); + local + .run_until(async move { + let mut receiving_manager = manager.clone(); + task::spawn_local(async move { + if let Err(e) = receive(&mut receiving_manager, false).await { + error!("error while receiving stuff: {e}"); + } + }); - if let Err(error) = manager.send_message(*uuid, message, timestamp).await { - error!("failed to send message: {error}"); - } - }).await; + if let Err(error) = manager.send_message(*uuid, message, timestamp).await { + error!("failed to send message: {error}"); + } + }) + .await; Ok(()) } diff --git a/presage/src/manager.rs b/presage/src/manager.rs index fa3fd1029..cb5b488b5 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -1,5 +1,5 @@ use std::{ - cell::{RefCell}, + cell::RefCell, fmt, ops::RangeBounds, time::{Duration, SystemTime, UNIX_EPOCH}, From 32c7c5b4f62b1dcad69631fee6504d2b8a81de87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Thu, 1 Jun 2023 01:04:44 +0200 Subject: [PATCH 3/5] Stop receiving when sending in CLI --- presage-cli/src/main.rs | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index 211efbb42..329b48ba0 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -2,7 +2,6 @@ use core::fmt; use std::convert::TryInto; use std::path::Path; use std::path::PathBuf; -use std::time::Duration; use std::time::UNIX_EPOCH; use anyhow::{anyhow, bail, Context as _}; @@ -12,6 +11,7 @@ use directories::ProjectDirs; use env_logger::Env; use futures::StreamExt; use futures::{channel::oneshot, future, pin_mut}; +use log::warn; use log::{debug, error, info}; use notify_rust::Notification; use presage::libsignal_service::content::Reaction; @@ -30,8 +30,6 @@ use presage::{ use presage_store_sled::MigrationConflictStrategy; use presage_store_sled::SledStore; use tempfile::Builder; -use tokio::task; -use tokio::time::sleep; use tokio::{ fs, io::{self, AsyncBufReadExt, BufReader}, @@ -218,22 +216,9 @@ async fn send( ..Default::default() }); - let local = task::LocalSet::new(); - - local - .run_until(async move { - let mut receiving_manager = manager.clone(); - task::spawn_local(async move { - if let Err(e) = receive(&mut receiving_manager, false).await { - error!("error while receiving stuff: {e}"); - } - }); - - if let Err(error) = manager.send_message(*uuid, message, timestamp).await { - error!("failed to send message: {error}"); - } - }) - .await; + if let Err(error) = manager.send_message(*uuid, message, timestamp).await { + warn!("possible failure when sending message: {error}"); + } Ok(()) } From c6b970a5171312be339a6331d1dc604917a862f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Tue, 6 Jun 2023 09:32:30 +0200 Subject: [PATCH 4/5] Fix typo --- presage/src/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/presage/src/manager.rs b/presage/src/manager.rs index cb5b488b5..31a67f927 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -1153,7 +1153,7 @@ impl Manager { .unidentified_push_service()? .ws("/v1/websocket/", None, true) .await?; - self.state.identified_websocket.replace(Some(ws.clone())); + self.state.unidentified_websocket.replace(Some(ws.clone())); Ok(ws) } } From 62584674829a912859eaceb7c4ab42a2965cd8e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Tue, 6 Jun 2023 09:34:21 +0200 Subject: [PATCH 5/5] Fix clippy warning --- presage/src/manager.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/presage/src/manager.rs b/presage/src/manager.rs index 31a67f927..2ae723c40 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -1133,8 +1133,9 @@ impl Manager { } async fn identified_websocket(&self) -> Result> { - if let Some(ws) = self.state.identified_websocket.borrow().as_ref() { - return Ok(ws.clone()); + let socket = self.state.identified_websocket.borrow().clone(); + if let Some(ws) = socket { + return Ok(ws); } let ws = self @@ -1146,8 +1147,9 @@ impl Manager { } async fn unidentified_websocket(&self) -> Result> { - if let Some(ws) = self.state.unidentified_websocket.borrow().as_ref() { - Ok(ws.clone()) + let socket = self.state.unidentified_websocket.borrow().clone(); + if let Some(ws) = socket { + Ok(ws) } else { let ws = self .unidentified_push_service()?