Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add receiving mode to the message stream #202

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions presage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ authors = ["Gabriel Féron <[email protected]>"]
edition = "2021"

[dependencies]
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" }
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "a36d43d62" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "a36d43d62" }

base64 = "0.21"
futures = "0.3"
Expand Down
4 changes: 3 additions & 1 deletion presage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ mod serde;
mod store;

pub use errors::Error;
pub use manager::{Confirmation, Linking, Manager, Registered, Registration, RegistrationOptions};
pub use manager::{
Confirmation, Linking, Manager, ReceivingMode, Registered, Registration, RegistrationOptions,
};
pub use store::{ContentTimestamp, Store, StoreError, Thread};

#[deprecated(note = "Please help use improve the prelude module instead")]
Expand Down
84 changes: 64 additions & 20 deletions presage/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use rand::{
use serde::{Deserialize, Serialize};
use url::Url;

use libsignal_service::proto::EditMessage;
use libsignal_service::push_service::{RegistrationMethod, VerificationTransport};
use libsignal_service::{
attachment_cipher::decrypt_in_place,
Expand All @@ -27,10 +26,7 @@ use libsignal_service::{
messagepipe::ServiceCredentials,
models::Contact,
prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid},
proto::{
data_message::Delete, sync_message, AttachmentPointer, Envelope, GroupContextV2,
NullMessage,
},
proto::{data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage},
protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate},
provisioning::{generate_registration_id, LinkingManager, SecondaryDeviceProvisioning},
push_service::{
Expand All @@ -47,6 +43,7 @@ use libsignal_service::{
websocket::SignalWebSocket,
AccountManager, Profile, ServiceAddress,
};
use libsignal_service::{messagepipe::Incoming, proto::EditMessage};
use libsignal_service_hyper::push_service::HyperPushService;

use crate::cache::CacheCell;
Expand Down Expand Up @@ -656,13 +653,14 @@ impl<C: Store> Manager<C, Registered> {
}

async fn sync_contacts(&mut self) -> Result<(), Error<C::Error>> {
let messages = self.receive_messages_stream(true).await?;
pin_mut!(messages);

let messages = self
.receive_messages_stream(ReceivingMode::WaitForContacts)
.await?;
self.request_contacts_sync().await?;

info!("waiting for contacts sync for up to 60 seconds");

pin_mut!(messages);
tokio::time::timeout(
Duration::from_secs(60),
self.wait_for_contacts_sync(messages),
Expand Down Expand Up @@ -692,7 +690,6 @@ impl<C: Store> Manager<C, Registered> {
.expect("Time went backwards")
.as_millis() as u64;

// first request the sync
self.send_message(self.state.service_ids.aci, sync_message, timestamp)
.await?;

Expand Down Expand Up @@ -829,7 +826,7 @@ impl<C: Store> Manager<C, Registered> {

async fn receive_messages_encrypted(
&mut self,
) -> Result<impl Stream<Item = Result<Envelope, ServiceError>>, Error<C::Error>> {
) -> Result<impl Stream<Item = Result<Incoming, ServiceError>>, Error<C::Error>> {
let credentials = self.credentials()?.ok_or(Error::NotYetRegisteredError)?;
let allow_stories = false;
let pipe = MessageReceiver::new(self.push_service()?)
Expand Down Expand Up @@ -857,7 +854,14 @@ impl<C: Store> Manager<C, Registered> {
pub async fn receive_messages(
&mut self,
) -> Result<impl Stream<Item = Content>, Error<C::Error>> {
self.receive_messages_stream(false).await
self.receive_messages_stream(ReceivingMode::Forever).await
}

pub async fn receive_messages_with_mode(
&mut self,
mode: ReceivingMode,
) -> Result<impl Stream<Item = Content>, Error<C::Error>> {
self.receive_messages_stream(mode).await
}

fn groups_manager(
Expand All @@ -879,40 +883,60 @@ impl<C: Store> Manager<C, Registered> {

async fn receive_messages_stream(
&mut self,
include_internal_events: bool,
mode: ReceivingMode,
) -> Result<impl Stream<Item = Content>, Error<C::Error>> {
struct StreamState<S, C> {
encrypted_messages: S,
message_receiver: MessageReceiver<HyperPushService>,
service_cipher: ServiceCipher<C>,
config_store: C,
groups_manager: GroupsManager<HyperPushService, InMemoryCredentialsCache>,
include_internal_events: bool,
mode: ReceivingMode,
}

let init = StreamState {
encrypted_messages: Box::pin(self.receive_messages_encrypted().await?),
message_receiver: MessageReceiver::new(self.push_service()?),
service_cipher: self.new_service_cipher()?,
config_store: self.config_store.clone(),
groups_manager: self.groups_manager()?,
include_internal_events,
mode,
};

Ok(futures::stream::unfold(init, |mut state| async move {
loop {
match state.encrypted_messages.next().await {
Some(Ok(envelope)) => {
Some(Ok(Incoming::Envelope(envelope))) => {
match state.service_cipher.open_envelope(envelope).await {
Ok(Some(content)) => {
// contacts synchronization sent from the primary device (happens after linking, or on demand)
if let ContentBody::SynchronizeMessage(SyncMessage {
contacts: Some(_),
contacts: Some(contacts),
..
}) = &content.body
{
if state.include_internal_events {
return Some((content, state));
} else {
continue;
match state.message_receiver.retrieve_contacts(contacts).await {
Ok(contacts) => {
let _ = state.config_store.clear_contacts();
match state
.config_store
.save_contacts(contacts.filter_map(Result::ok))
{
Ok(()) => {
info!("saved contacts");
}
Err(e) => {
warn!("failed to save contacts: {e}");
}
}
}
Err(e) => {
warn!("failed to retrieve contacts: {e}");
}
}

if let ReceivingMode::WaitForContacts = state.mode {
return None;
}
}

Expand Down Expand Up @@ -974,6 +998,12 @@ impl<C: Store> Manager<C, Registered> {
}
}
}
Some(Ok(Incoming::QueueEmpty)) => {
debug!("empty queue");
if let ReceivingMode::InitialSync = state.mode {
return None;
}
}
Some(Err(e)) => error!("Error: {}", e),
None => return None,
}
Expand Down Expand Up @@ -1313,6 +1343,20 @@ impl<C: Store> Manager<C, Registered> {
}
}

/// The mode receiving messages stream
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum ReceivingMode {
/// Don't stop the stream
#[default]
Forever,
/// Stop the stream after the initial sync
///
/// That is, when the Signal's message queue becomes empty.
InitialSync,
/// Stop the stream after contacts are synced
WaitForContacts,
}

async fn upsert_group<C: Store>(
config_store: &C,
groups_manager: &mut GroupsManager<HyperPushService, InMemoryCredentialsCache>,
Expand Down