diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index a3b29456c..7f5da33a1 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -208,7 +208,6 @@ async fn send( manager: &mut Manager, uuid: &Uuid, content_body: impl Into, - timestamp: u64, ) -> anyhow::Result<()> { let local = task::LocalSet::new(); @@ -224,7 +223,7 @@ async fn send( sleep(Duration::from_secs(5)).await; manager - .send_message(*uuid, content_body, timestamp) + .send_message(*uuid, content_body) .await .unwrap(); @@ -526,7 +525,7 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re timestamp: Some(timestamp), ..Default::default() }; - send(&mut manager, &uuid, message, timestamp).await?; + send(&mut manager, &uuid, message).await?; } Cmd::SendToGroup { message, diff --git a/presage/src/manager.rs b/presage/src/manager.rs index db260b886..40dbb4cef 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -16,7 +16,6 @@ use rand::{ use serde::{Deserialize, Serialize}; use url::Url; -use libsignal_service::{proto::EditMessage, messagepipe::Incoming}; use libsignal_service::push_service::{RegistrationMethod, VerificationTransport}; use libsignal_service::{ attachment_cipher::decrypt_in_place, @@ -27,10 +26,7 @@ use libsignal_service::{ messagepipe::ServiceCredentials, models::Contact, prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid}, - proto::{ - data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, - NullMessage, - }, + proto::{data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage}, protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate}, provisioning::{generate_registration_id, LinkingManager, SecondaryDeviceProvisioning}, push_service::{ @@ -47,6 +43,7 @@ use libsignal_service::{ websocket::SignalWebSocket, AccountManager, Profile, ServiceAddress, }; +use libsignal_service::{messagepipe::Incoming, proto::EditMessage}; use libsignal_service_hyper::push_service::HyperPushService; use crate::cache::CacheCell; @@ -140,7 +137,7 @@ impl fmt::Debug for Registered { enum ReceivingMode { InitialSync, WaitForContacts, - Forever + Forever, } impl Registered { @@ -637,19 +634,26 @@ impl Manager { Ok(()) } - async fn sync_contacts(&mut self) -> Result<(), Error> { - let messages = self.receive_messages_stream(ReceivingMode::InitialSync).await?; + let messages = self + .receive_messages_stream(ReceivingMode::InitialSync) + .await?; pin_mut!(messages); while let Some(_msg) = messages.next().await { // debug!("{msg:?}"); } + self.request_configuration_sync().await?; + self.request_block_sync().await?; self.request_contacts_sync().await?; - let messages = self.receive_messages_stream(ReceivingMode::WaitForContacts).await?; + let messages = self + .receive_messages_stream(ReceivingMode::WaitForContacts) + .await?; pin_mut!(messages); - while let Ok(Some(_msg)) = tokio::time::timeout(Duration::from_secs(60), messages.next()).await { + while let Ok(Some(_msg)) = + tokio::time::timeout(Duration::from_secs(60), messages.next()).await + { // debug!("{msg:?}"); } @@ -670,13 +674,40 @@ impl Manager { ..Default::default() }; - let timestamp = std::time::SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis() as u64; + // first request the sync + self.send_message(self.state.service_ids.aci, sync_message) + .await?; + + Ok(()) + } + + async fn request_block_sync(&mut self) -> Result<(), Error> { + trace!("requesting blocked sync"); + let sync_message = SyncMessage { + request: Some(sync_message::Request { + r#type: Some(sync_message::request::Type::Blocked as i32), + }), + ..Default::default() + }; // first request the sync - self.send_message(self.state.service_ids.aci, sync_message, timestamp) + self.send_message(self.state.service_ids.aci, sync_message) + .await?; + + Ok(()) + } + + async fn request_configuration_sync(&mut self) -> Result<(), Error> { + trace!("requesting configuration sync"); + let sync_message = SyncMessage { + request: Some(sync_message::Request { + r#type: Some(sync_message::request::Type::Configuration as i32), + }), + ..Default::default() + }; + + // first request the sync + self.send_message(self.state.service_ids.aci, sync_message) .await?; Ok(()) @@ -982,7 +1013,9 @@ impl Manager { } Some(Ok(Incoming::QueueEmpty)) => { debug!("empty queue"); - if state.mode == ReceivingMode::InitialSync { return None; } + if state.mode == ReceivingMode::InitialSync { + return None; + } } Some(Err(e)) => error!("Error: {}", e), None => return None, @@ -1001,10 +1034,13 @@ impl Manager { &mut self, recipient_addr: impl Into, message: impl Into, - timestamp: u64, ) -> Result<(), Error> { let mut sender = self.new_message_sender().await?; + let timestamp = std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; let online_only = false; let recipient = recipient_addr.into(); let mut content_body: ContentBody = message.into(); @@ -1192,7 +1228,6 @@ impl Manager { pub async fn send_session_reset( &mut self, recipient: &ServiceAddress, - timestamp: u64, ) -> Result<(), Error> { log::trace!("Resetting session for address: {}", recipient.uuid); let message = DataMessage { @@ -1200,7 +1235,7 @@ impl Manager { ..Default::default() }; - self.send_message(*recipient, message, timestamp).await?; + self.send_message(*recipient, message).await?; Ok(()) }