Skip to content

Commit

Permalink
feat: Add receiving mode to the message stream
Browse files Browse the repository at this point in the history
Add a new method `Manager::receive_messages_with_mode` allowing to
configure how to handle specific sentinel messages. It is supported to
stop the stream after the initial sync (empty Signal message queue) or
after contact sync.
  • Loading branch information
boxdot committed Nov 6, 2023
1 parent 0a76c0f commit 15aa4c2
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 23 deletions.
4 changes: 2 additions & 2 deletions presage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ authors = ["Gabriel Féron <[email protected]>"]
edition = "2021"

[dependencies]
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" }
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "a36d43d62" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "a36d43d62" }

base64 = "0.21"
futures = "0.3"
Expand Down
4 changes: 3 additions & 1 deletion presage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ mod serde;
mod store;

pub use errors::Error;
pub use manager::{Confirmation, Linking, Manager, Registered, Registration, RegistrationOptions};
pub use manager::{
Confirmation, Linking, Manager, ReceivingMode, Registered, Registration, RegistrationOptions,
};
pub use store::{ContentTimestamp, Store, StoreError, Thread};

#[deprecated(note = "Please help use improve the prelude module instead")]
Expand Down
84 changes: 64 additions & 20 deletions presage/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use rand::{
use serde::{Deserialize, Serialize};
use url::Url;

use libsignal_service::proto::EditMessage;
use libsignal_service::push_service::{RegistrationMethod, VerificationTransport};
use libsignal_service::{
attachment_cipher::decrypt_in_place,
Expand All @@ -27,10 +26,7 @@ use libsignal_service::{
messagepipe::ServiceCredentials,
models::Contact,
prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid},
proto::{
data_message::Delete, sync_message, AttachmentPointer, Envelope, GroupContextV2,
NullMessage,
},
proto::{data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage},
protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate},
provisioning::{generate_registration_id, LinkingManager, SecondaryDeviceProvisioning},
push_service::{
Expand All @@ -47,6 +43,7 @@ use libsignal_service::{
websocket::SignalWebSocket,
AccountManager, Profile, ServiceAddress,
};
use libsignal_service::{messagepipe::Incoming, proto::EditMessage};
use libsignal_service_hyper::push_service::HyperPushService;

use crate::cache::CacheCell;
Expand Down Expand Up @@ -656,13 +653,14 @@ impl<C: Store> Manager<C, Registered> {
}

async fn sync_contacts(&mut self) -> Result<(), Error<C::Error>> {
let messages = self.receive_messages_stream(true).await?;
pin_mut!(messages);

let messages = self
.receive_messages_stream(ReceivingMode::WaitForContacts)
.await?;
self.request_contacts_sync().await?;

info!("waiting for contacts sync for up to 60 seconds");

pin_mut!(messages);
tokio::time::timeout(
Duration::from_secs(60),
self.wait_for_contacts_sync(messages),
Expand Down Expand Up @@ -692,7 +690,6 @@ impl<C: Store> Manager<C, Registered> {
.expect("Time went backwards")
.as_millis() as u64;

// first request the sync
self.send_message(self.state.service_ids.aci, sync_message, timestamp)
.await?;

Expand Down Expand Up @@ -829,7 +826,7 @@ impl<C: Store> Manager<C, Registered> {

async fn receive_messages_encrypted(
&mut self,
) -> Result<impl Stream<Item = Result<Envelope, ServiceError>>, Error<C::Error>> {
) -> Result<impl Stream<Item = Result<Incoming, ServiceError>>, Error<C::Error>> {
let credentials = self.credentials()?.ok_or(Error::NotYetRegisteredError)?;
let allow_stories = false;
let pipe = MessageReceiver::new(self.push_service()?)
Expand Down Expand Up @@ -857,7 +854,14 @@ impl<C: Store> Manager<C, Registered> {
pub async fn receive_messages(
&mut self,
) -> Result<impl Stream<Item = Content>, Error<C::Error>> {
self.receive_messages_stream(false).await
self.receive_messages_stream(ReceivingMode::Forever).await
}

pub async fn receive_messages_with_mode(
&mut self,
mode: ReceivingMode,
) -> Result<impl Stream<Item = Content>, Error<C::Error>> {
self.receive_messages_stream(mode).await
}

fn groups_manager(
Expand All @@ -879,40 +883,60 @@ impl<C: Store> Manager<C, Registered> {

async fn receive_messages_stream(
&mut self,
include_internal_events: bool,
mode: ReceivingMode,
) -> Result<impl Stream<Item = Content>, Error<C::Error>> {
struct StreamState<S, C> {
encrypted_messages: S,
message_receiver: MessageReceiver<HyperPushService>,
service_cipher: ServiceCipher<C>,
config_store: C,
groups_manager: GroupsManager<HyperPushService, InMemoryCredentialsCache>,
include_internal_events: bool,
mode: ReceivingMode,
}

let init = StreamState {
encrypted_messages: Box::pin(self.receive_messages_encrypted().await?),
message_receiver: MessageReceiver::new(self.push_service()?),
service_cipher: self.new_service_cipher()?,
config_store: self.config_store.clone(),
groups_manager: self.groups_manager()?,
include_internal_events,
mode,
};

Ok(futures::stream::unfold(init, |mut state| async move {
loop {
match state.encrypted_messages.next().await {
Some(Ok(envelope)) => {
Some(Ok(Incoming::Envelope(envelope))) => {
match state.service_cipher.open_envelope(envelope).await {
Ok(Some(content)) => {
// contacts synchronization sent from the primary device (happens after linking, or on demand)
if let ContentBody::SynchronizeMessage(SyncMessage {
contacts: Some(_),
contacts: Some(contacts),
..
}) = &content.body
{
if state.include_internal_events {
return Some((content, state));
} else {
continue;
match state.message_receiver.retrieve_contacts(contacts).await {
Ok(contacts) => {
let _ = state.config_store.clear_contacts();
match state
.config_store
.save_contacts(contacts.filter_map(Result::ok))
{
Ok(()) => {
info!("saved contacts");
}
Err(e) => {
warn!("failed to save contacts: {e}");
}
}
}
Err(e) => {
warn!("failed to retrieve contacts: {e}");
}
}

if let ReceivingMode::WaitForContacts = state.mode {
return None;
}
}

Expand Down Expand Up @@ -974,6 +998,12 @@ impl<C: Store> Manager<C, Registered> {
}
}
}
Some(Ok(Incoming::QueueEmpty)) => {
debug!("empty queue");
if let ReceivingMode::InitialSync = state.mode {
return None;
}
}
Some(Err(e)) => error!("Error: {}", e),
None => return None,
}
Expand Down Expand Up @@ -1313,6 +1343,20 @@ impl<C: Store> Manager<C, Registered> {
}
}

/// The mode receiving messages stream
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum ReceivingMode {
/// Don't stop the stream
#[default]
Forever,
/// Stop the stream after the initial sync
///
/// That is, when the Signal's message queue becomes empty.
InitialSync,
/// Stop the stream after contacts are synced
WaitForContacts,
}

async fn upsert_group<C: Store>(
config_store: &C,
groups_manager: &mut GroupsManager<HyperPushService, InMemoryCredentialsCache>,
Expand Down

0 comments on commit 15aa4c2

Please sign in to comment.