Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some issues around SyncMessage #210

Merged
merged 1 commit into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 40 additions & 26 deletions presage-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ enum Cmd {
#[clap(long, short = 'k', help = "Master Key of the V2 group (hex string)", value_parser = parse_group_master_key)]
master_key: GroupMasterKeyBytes,
},
RequestSyncContacts,
RequestContactsSync,
}

enum Recipient {
Contact(Uuid),
Group(GroupMasterKeyBytes),
}

fn parse_group_master_key(value: &str) -> anyhow::Result<GroupMasterKeyBytes> {
Expand Down Expand Up @@ -209,22 +214,21 @@ async fn main() -> anyhow::Result<()> {
}

async fn send<S: Store + 'static>(
msg: &str,
uuid: &Uuid,
manager: &mut Manager<S, Registered>,
recipient: Recipient,
msg: impl Into<ContentBody>,
) -> anyhow::Result<()> {
let local = task::LocalSet::new();

let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

let message = ContentBody::DataMessage(DataMessage {
body: Some(msg.to_string()),
timestamp: Some(timestamp),
..Default::default()
});

let local = task::LocalSet::new();
let mut content_body = msg.into();
if let ContentBody::DataMessage(d) = &mut content_body {
d.timestamp = Some(timestamp);
}

local
.run_until(async move {
Expand All @@ -235,10 +239,22 @@ async fn send<S: Store + 'static>(
}
});

manager
.send_message(*uuid, message, timestamp)
.await
.unwrap();
match recipient {
Recipient::Contact(uuid) => {
info!("sending message to contact");
manager
.send_message(uuid, content_body, timestamp)
.await
.expect("failed to send message");
}
Recipient::Group(master_key) => {
info!("sending message to group");
manager
.send_message_to_group(&master_key, content_body, timestamp)
.await
.expect("failed to send message");
}
}
})
.await;

Expand Down Expand Up @@ -529,22 +545,22 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
}
Cmd::Send { uuid, message } => {
let mut manager = Manager::load_registered(config_store).await?;
send(&message, &uuid, &mut manager).await?;

let data_message = DataMessage {
body: Some(message),
..Default::default()
};

send(&mut manager, Recipient::Contact(uuid), data_message).await?;
}
Cmd::SendToGroup {
message,
master_key,
} => {
let mut manager = Manager::load_registered(config_store).await?;

let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

let data_message = DataMessage {
body: Some(message),
timestamp: Some(timestamp),
group_v2: Some(GroupContextV2 {
master_key: Some(master_key.to_vec()),
revision: Some(0),
Expand All @@ -553,9 +569,7 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
..Default::default()
};

manager
.send_message_to_group(&master_key, data_message, timestamp)
.await?;
send(&mut manager, Recipient::Group(master_key), data_message).await?;
}
Cmd::Unregister => unimplemented!(),
Cmd::RetrieveProfile {
Expand Down Expand Up @@ -656,9 +670,9 @@ async fn run<S: Store + 'static>(subcommand: Cmd, config_store: S) -> anyhow::Re
println!("{contact:#?}");
}
}
Cmd::RequestSyncContacts => {
Cmd::RequestContactsSync => {
let mut manager = Manager::load_registered(config_store).await?;
manager.request_contacts_sync().await?;
manager.sync_contacts().await?;
}
Cmd::ListMessages {
group_master_key,
Expand Down
2 changes: 1 addition & 1 deletion presage-store-sled/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ impl IdentityKeyStore for SledStore {
"failed to load registration ID",
"no registration data".into(),
))?;
Ok(data.registration_id())
Ok(data.registration_id)
}

async fn save_identity(
Expand Down
4 changes: 2 additions & 2 deletions presage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ authors = ["Gabriel Féron <[email protected]>"]
edition = "2021"

[dependencies]
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "afb5114" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "afb5114" }
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "6fc62c8" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "6fc62c8" }

base64 = "0.21"
futures = "0.3"
Expand Down
72 changes: 51 additions & 21 deletions presage/src/manager/registered.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::fmt;
use std::ops::RangeBounds;
use std::pin::pin;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use futures::{future, AsyncReadExt, Stream, StreamExt};
use libsignal_service::attachment_cipher::decrypt_in_place;
Expand Down Expand Up @@ -116,26 +117,32 @@ pub struct RegistrationData {
}

impl RegistrationData {
pub fn registration_id(&self) -> u32 {
self.registration_id
/// Account identity
pub fn aci(&self) -> Uuid {
self.service_ids.aci
}

pub fn service_ids(&self) -> &ServiceIds {
&self.service_ids
/// Phone number identity
pub fn pni(&self) -> Uuid {
self.service_ids.pni
}

/// Our own profile key
pub fn profile_key(&self) -> ProfileKey {
self.profile_key
}

pub fn device_name(&self) -> Option<&String> {
self.device_name.as_ref()
/// The name of the device (if linked as secondary)
pub fn device_name(&self) -> Option<&str> {
self.device_name.as_deref()
}

/// Account identity public key
pub fn aci_public_key(&self) -> PublicKey {
self.aci_public_key
}

/// Account identity private key
pub fn aci_private_key(&self) -> PrivateKey {
self.aci_private_key
}
Expand Down Expand Up @@ -283,7 +290,7 @@ impl<S: Store> Manager<S, Registered> {

account_manager
.set_account_attributes(AccountAttributes {
name: self.state.data.device_name().cloned(),
name: self.state.data.device_name().map(|d| d.to_string()),
registration_id: self.state.data.registration_id,
pni_registration_id,
signaling_key: None,
Expand Down Expand Up @@ -316,19 +323,40 @@ impl<S: Store> Manager<S, Registered> {
Ok(())
}

/// Request that the primary device to encrypt & send all of its contacts as a message to ourselves
/// which can be then received, decrypted and stored in the message receiving loop.
/// Requests contacts synchronization and waits until the primary device sends them
///
/// Note: DO NOT call this function if you're already running a receiving loop
pub async fn sync_contacts(&mut self) -> Result<(), Error<S::Error>> {
debug!("synchronizing contacts");

let mut messages = pin!(
self.receive_messages_with_mode(ReceivingMode::WaitForContacts)
.await?
);

self.request_contacts().await?;

tokio::time::timeout(Duration::from_secs(60), async move {
while let Some(msg) = messages.next().await {
log::trace!("got message while waiting for contacts sync: {msg:?}");
}
})
.await?;

Ok(())
}

/// Request the primary device to encrypt & send all of its contacts.
///
/// **Note**: If successful, the contacts are not yet received and stored, but will only be
/// processed when they're received using the `MessageReceiver`.
pub async fn request_contacts_sync(&mut self) -> Result<(), Error<S::Error>> {
/// processed when they're received after polling on the
pub async fn request_contacts(&mut self) -> Result<(), Error<S::Error>> {
trace!("requesting contacts sync");
let var_name = sync_message::request::Type::Contacts as i32;
let sync_message = SyncMessage {
request: Some(sync_message::Request {
r#type: Some(var_name),
r#type: Some(sync_message::request::Type::Contacts.into()),
}),
..Default::default()
..SyncMessage::with_padding()
};

let timestamp = SystemTime::now()
Expand Down Expand Up @@ -704,15 +732,17 @@ impl<S: Store> Manager<S, Registered> {
pub async fn send_message_to_group(
&mut self,
master_key_bytes: &[u8],
mut message: DataMessage,
message: impl Into<ContentBody>,
timestamp: u64,
) -> Result<(), Error<S::Error>> {
let mut content_body = message.into();

// Only update the expiration timer if it is not set.
match message {
DataMessage {
match content_body {
ContentBody::DataMessage(DataMessage {
expire_timer: ref mut timer,
..
} if timer.is_none() => {
}) if timer.is_none() => {
// Set the expire timer to None for errors.
let store_expire_timer = self
.store
Expand Down Expand Up @@ -755,7 +785,7 @@ impl<S: Store> Manager<S, Registered> {

let online_only = false;
let results = sender
.send_message_to_group(recipients, message.clone(), timestamp, online_only)
.send_message_to_group(recipients, content_body.clone(), timestamp, online_only)
.await;

// return first error if any
Expand All @@ -769,7 +799,7 @@ impl<S: Store> Manager<S, Registered> {
needs_receipt: false, // TODO: this is just wrong
unidentified_sender: false,
},
body: message.into(),
body: content_body,
};

save_message(&mut self.store, content)?;
Expand Down
Loading