diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f1224df53..81c1a6e17 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -41,13 +41,13 @@ jobs: command: build args: --all-targets ${{ matrix.cargo_flags }} - - name: Run tests + - name: Test uses: actions-rs/cargo@v1 with: command: test args: --all-targets ${{ matrix.cargo_flags }} - - name: Run doc tests + - name: Test docs uses: actions-rs/cargo@v1 with: command: test diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index 7398b6d12..03889ad99 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -14,20 +14,23 @@ use futures::StreamExt; use futures::{channel::oneshot, future, pin_mut}; use log::{debug, error, info}; use notify_rust::Notification; +use presage::libsignal_service::configuration::SignalServers; use presage::libsignal_service::content::Reaction; +use presage::libsignal_service::models::Contact; +use presage::libsignal_service::prelude::phonenumber::PhoneNumber; +use presage::libsignal_service::prelude::Uuid; use presage::libsignal_service::proto::data_message::Quote; use presage::libsignal_service::proto::sync_message::Sent; +use presage::libsignal_service::zkgroup::GroupMasterKeyBytes; use presage::libsignal_service::{groups_v2::Group, prelude::ProfileKey}; -use presage::prelude::proto::EditMessage; -use presage::prelude::SyncMessage; -use presage::ContentTimestamp; +use presage::proto::EditMessage; +use presage::proto::SyncMessage; +use presage::store::ContentExt; use presage::{ - prelude::{ - content::{Content, ContentBody, DataMessage, GroupContextV2}, - Contact, SignalServers, - }, - prelude::{phonenumber::PhoneNumber, Uuid}, - GroupMasterKeyBytes, Manager, Registered, RegistrationOptions, Store, Thread, + libsignal_service::content::{Content, ContentBody, DataMessage, GroupContextV2}, + manager::{Registered, RegistrationOptions}, + store::{Store, Thread}, + Manager, }; use presage_store_sled::MigrationConflictStrategy; use presage_store_sled::SledStore; @@ -204,10 +207,10 @@ async fn main() -> anyhow::Result<()> { run(args.subcommand, config_store).await } -async fn send( +async fn send( msg: &str, uuid: &Uuid, - manager: &mut Manager, + manager: &mut Manager, ) -> anyhow::Result<()> { let timestamp = std::time::SystemTime::now() .duration_since(UNIX_EPOCH) @@ -247,8 +250,8 @@ async fn send( // Note to developers, this is a good example of a function you can use as a source of inspiration // to process incoming messages. -async fn process_incoming_message( - manager: &mut Manager, +async fn process_incoming_message( + manager: &mut Manager, attachments_tmp_dir: &Path, notifications: bool, content: &Content, @@ -286,8 +289,8 @@ async fn process_incoming_message( } } -fn print_message( - manager: &Manager, +fn print_message( + manager: &Manager, notifications: bool, content: &Content, ) { @@ -315,7 +318,7 @@ fn print_message( }), .. } => { - let Ok(Some(message)) = manager.message(thread, *timestamp) else { + let Ok(Some(message)) = manager.store().message(thread, *timestamp) else { log::warn!("no message in {thread} sent at {timestamp}"); return None; }; @@ -338,6 +341,7 @@ fn print_message( let format_contact = |uuid| { manager + .store() .contact_by_id(uuid) .ok() .flatten() @@ -348,6 +352,7 @@ fn print_message( let format_group = |key| { manager + .store() .group(key) .ok() .flatten() @@ -402,20 +407,20 @@ fn print_message( let ts = content.timestamp(); let (prefix, body) = match msg { Msg::Received(Thread::Contact(sender), body) => { - let contact = format_contact(sender); + let contact = format_contact(*sender); (format!("From {contact} @ {ts}: "), body) } Msg::Sent(Thread::Contact(recipient), body) => { - let contact = format_contact(recipient); + let contact = format_contact(*recipient); (format!("To {contact} @ {ts}"), body) } Msg::Received(Thread::Group(key), body) => { - let sender = format_contact(&content.metadata.sender.uuid); - let group = format_group(key); + let sender = format_contact(content.metadata.sender.uuid); + let group = format_group(*key); (format!("From {sender} to group {group} @ {ts}: "), body) } Msg::Sent(Thread::Group(key), body) => { - let group = format_group(key); + let group = format_group(*key); (format!("To group {group} @ {ts}"), body) } }; @@ -435,8 +440,8 @@ fn print_message( } } -async fn receive( - manager: &mut Manager, +async fn receive( + manager: &mut Manager, notifications: bool, ) -> anyhow::Result<()> { let attachments_tmp_dir = Builder::new().prefix("presage-attachments").tempdir()?; @@ -459,7 +464,7 @@ async fn receive( Ok(()) } -async fn run(subcommand: Cmd, config_store: C) -> anyhow::Result<()> { +async fn run(subcommand: Cmd, config_store: S) -> anyhow::Result<()> { match subcommand { Cmd::Register { servers, @@ -563,6 +568,7 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re let mut manager = Manager::load_registered(config_store).await?; if profile_key.is_none() { for contact in manager + .store() .contacts()? .filter_map(Result::ok) .filter(|c| c.uuid == uuid) @@ -589,7 +595,7 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re Cmd::UpdateContact => unimplemented!(), Cmd::ListGroups => { let manager = Manager::load_registered(config_store).await?; - for group in manager.groups()? { + for group in manager.store().groups()? { match group { Ok(( group_master_key, @@ -620,7 +626,7 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re uuid, phone_number, .. - } in manager.contacts()?.flatten() + } in manager.store().contacts()?.flatten() { println!("{uuid} / {phone_number:?} / {name}"); } @@ -631,7 +637,7 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re } Cmd::GetContact { ref uuid } => { let manager = Manager::load_registered(config_store).await?; - match manager.contact_by_id(uuid)? { + match manager.store().contact_by_id(*uuid)? { Some(contact) => println!("{contact:#?}"), None => eprintln!("Could not find contact for {uuid}"), } @@ -643,6 +649,7 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re } => { let manager = Manager::load_registered(config_store).await?; for contact in manager + .store() .contacts()? .filter_map(Result::ok) .filter(|c| uuid.map_or_else(|| true, |u| c.uuid == u)) diff --git a/presage-store-sled/src/error.rs b/presage-store-sled/src/error.rs index e342f02d1..498bcb780 100644 --- a/presage-store-sled/src/error.rs +++ b/presage-store-sled/src/error.rs @@ -1,4 +1,4 @@ -use presage::{libsignal_service::protocol::SignalProtocolError, StoreError}; +use presage::{libsignal_service::protocol::SignalProtocolError, store::StoreError}; #[derive(Debug, thiserror::Error)] pub enum SledStoreError { diff --git a/presage-store-sled/src/lib.rs b/presage-store-sled/src/lib.rs index 757f5f51e..c6e7547e6 100644 --- a/presage-store-sled/src/lib.rs +++ b/presage-store-sled/src/lib.rs @@ -7,32 +7,31 @@ use std::{ use async_trait::async_trait; use log::{debug, error, trace, warn}; -use presage::{ - libsignal_service::{ - self, - groups_v2::Group, - models::Contact, - prelude::{Content, ProfileKey, Uuid}, - protocol::{ - Direction, GenericSignedPreKey, IdentityKey, IdentityKeyPair, IdentityKeyStore, - KyberPreKeyId, KyberPreKeyRecord, KyberPreKeyStore, PreKeyId, PreKeyRecord, - PreKeyStore, ProtocolAddress, ProtocolStore, SenderKeyRecord, SenderKeyStore, - SessionRecord, SessionStore, SignalProtocolError, SignedPreKeyId, SignedPreKeyRecord, - SignedPreKeyStore, - }, - push_service::DEFAULT_DEVICE_ID, - session_store::SessionStoreExt, - Profile, ServiceAddress, +use presage::libsignal_service::zkgroup::GroupMasterKeyBytes; +use presage::libsignal_service::{ + self, + content::Content, + groups_v2::Group, + models::Contact, + prelude::{ProfileKey, Uuid}, + protocol::{ + Direction, GenericSignedPreKey, IdentityKey, IdentityKeyPair, IdentityKeyStore, + KyberPreKeyId, KyberPreKeyRecord, KyberPreKeyStore, PreKeyId, PreKeyRecord, PreKeyStore, + ProtocolAddress, ProtocolStore, SenderKeyRecord, SenderKeyStore, SessionRecord, + SessionStore, SignalProtocolError, SignedPreKeyId, SignedPreKeyRecord, SignedPreKeyStore, }, - ContentTimestamp, + push_service::DEFAULT_DEVICE_ID, + session_store::SessionStoreExt, + Profile, ServiceAddress, }; +use presage::manager::RegistrationData; +use presage::store::{ContentExt, ContentsStore, PreKeyStoreExt, StateStore, Store, Thread}; use prost::Message; -use protobuf::ContentProto; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use sha2::{Digest, Sha256}; use sled::{Batch, IVec}; -use presage::{GroupMasterKeyBytes, Registered, Store, Thread}; +use crate::protobuf::ContentProto; mod error; mod protobuf; @@ -314,7 +313,7 @@ fn migrate( let state = serde_json::from_slice(&data).map_err(SledStoreError::from)?; // save it the new school way - store.save_state(&state)?; + store.save_registration_data(&state)?; // remove old data let db = store.write(); @@ -364,26 +363,20 @@ fn migrate( impl ProtocolStore for SledStore {} -impl Store for SledStore { - type Error = SledStoreError; - - type ContactsIter = SledContactsIter; - type GroupsIter = SledGroupsIter; - type MessagesIter = SledMessagesIter; - - /// State +impl StateStore for SledStore { + type StateStoreError = SledStoreError; - fn load_state(&self) -> Result, SledStoreError> { + fn load_registration_data(&self) -> Result, SledStoreError> { self.get(SLED_TREE_STATE, SLED_KEY_REGISTRATION) } - fn save_state(&mut self, state: &Registered) -> Result<(), SledStoreError> { + fn save_registration_data(&mut self, state: &RegistrationData) -> Result<(), SledStoreError> { self.insert(SLED_TREE_STATE, SLED_KEY_REGISTRATION, state)?; Ok(()) } fn is_registered(&self) -> bool { - self.load_state().unwrap_or_default().is_some() + self.load_registration_data().unwrap_or_default().is_some() } fn clear_registration(&mut self) -> Result<(), SledStoreError> { @@ -403,63 +396,14 @@ impl Store for SledStore { Ok(()) } +} - fn clear(&mut self) -> Result<(), SledStoreError> { - self.clear_registration()?; - - let db = self.write(); - db.drop_tree(SLED_TREE_CONTACTS)?; - db.drop_tree(SLED_TREE_GROUPS)?; - - for tree in db - .tree_names() - .into_iter() - .filter(|n| n.starts_with(SLED_TREE_THREADS_PREFIX.as_bytes())) - { - db.drop_tree(tree)?; - } - - db.flush()?; - - Ok(()) - } - - /// Pre-keys - - fn pre_keys_offset_id(&self) -> Result { - Ok(self - .get(SLED_TREE_STATE, SLED_KEY_PRE_KEYS_OFFSET_ID)? - .unwrap_or(0)) - } - - fn set_pre_keys_offset_id(&mut self, id: u32) -> Result<(), SledStoreError> { - self.insert(SLED_TREE_STATE, SLED_KEY_PRE_KEYS_OFFSET_ID, id)?; - Ok(()) - } - - fn next_signed_pre_key_id(&self) -> Result { - Ok(self - .get(SLED_TREE_STATE, SLED_KEY_NEXT_SIGNED_PRE_KEY_ID)? - .unwrap_or(0)) - } - - fn set_next_signed_pre_key_id(&mut self, id: u32) -> Result<(), SledStoreError> { - self.insert(SLED_TREE_STATE, SLED_KEY_NEXT_SIGNED_PRE_KEY_ID, id)?; - Ok(()) - } - - fn next_pq_pre_key_id(&self) -> Result { - Ok(self - .get(SLED_TREE_STATE, SLED_KEY_NEXT_PQ_PRE_KEY_ID)? - .unwrap_or(0)) - } - - fn set_next_pq_pre_key_id(&mut self, id: u32) -> Result<(), SledStoreError> { - self.insert(SLED_TREE_STATE, SLED_KEY_NEXT_PQ_PRE_KEY_ID, id)?; - Ok(()) - } +impl ContentsStore for SledStore { + type ContentsStoreError = SledStoreError; - /// Contacts + type ContactsIter = SledContactsIter; + type GroupsIter = SledGroupsIter; + type MessagesIter = SledMessagesIter; fn clear_contacts(&mut self) -> Result<(), SledStoreError> { self.write().drop_tree(SLED_TREE_CONTACTS)?; @@ -646,6 +590,67 @@ impl Store for SledStore { } } +impl PreKeyStoreExt for SledStore { + type PreKeyStoreExtError = SledStoreError; + + fn pre_keys_offset_id(&self) -> Result { + Ok(self + .get(SLED_TREE_STATE, SLED_KEY_PRE_KEYS_OFFSET_ID)? + .unwrap_or(0)) + } + + fn set_pre_keys_offset_id(&mut self, id: u32) -> Result<(), SledStoreError> { + self.insert(SLED_TREE_STATE, SLED_KEY_PRE_KEYS_OFFSET_ID, id)?; + Ok(()) + } + + fn next_signed_pre_key_id(&self) -> Result { + Ok(self + .get(SLED_TREE_STATE, SLED_KEY_NEXT_SIGNED_PRE_KEY_ID)? + .unwrap_or(0)) + } + + fn set_next_signed_pre_key_id(&mut self, id: u32) -> Result<(), SledStoreError> { + self.insert(SLED_TREE_STATE, SLED_KEY_NEXT_SIGNED_PRE_KEY_ID, id)?; + Ok(()) + } + + fn next_pq_pre_key_id(&self) -> Result { + Ok(self + .get(SLED_TREE_STATE, SLED_KEY_NEXT_PQ_PRE_KEY_ID)? + .unwrap_or(0)) + } + + fn set_next_pq_pre_key_id(&mut self, id: u32) -> Result<(), SledStoreError> { + self.insert(SLED_TREE_STATE, SLED_KEY_NEXT_PQ_PRE_KEY_ID, id)?; + Ok(()) + } +} + +impl Store for SledStore { + type Error = SledStoreError; + + fn clear(&mut self) -> Result<(), SledStoreError> { + self.clear_registration()?; + + let db = self.write(); + db.drop_tree(SLED_TREE_CONTACTS)?; + db.drop_tree(SLED_TREE_GROUPS)?; + + for tree in db + .tree_names() + .into_iter() + .filter(|n| n.starts_with(SLED_TREE_THREADS_PREFIX.as_bytes())) + { + db.drop_tree(tree)?; + } + + db.flush()?; + + Ok(()) + } +} + pub struct SledContactsIter { #[cfg(feature = "encryption")] cipher: Option>, @@ -932,28 +937,29 @@ impl SessionStoreExt for SledStore { impl IdentityKeyStore for SledStore { async fn get_identity_key_pair(&self) -> Result { trace!("getting identity_key_pair"); - let state = self - .load_state() + let data = self + .load_registration_data() .map_err(SledStoreError::into_signal_error)? .ok_or(SignalProtocolError::InvalidState( "failed to load identity key pair", "no registration data".into(), ))?; + Ok(IdentityKeyPair::new( - IdentityKey::new(state.aci_public_key), - state.aci_private_key, + IdentityKey::new(data.aci_public_key()), + data.aci_private_key(), )) } async fn get_local_registration_id(&self) -> Result { - let state = self - .load_state() + let data = self + .load_registration_data() .map_err(SledStoreError::into_signal_error)? .ok_or(SignalProtocolError::InvalidState( - "failed to load identity key pair", + "failed to load registration ID", "no registration data".into(), ))?; - Ok(state.registration_id) + Ok(data.registration_id()) } async fn save_identity( @@ -1098,19 +1104,17 @@ impl DoubleEndedIterator for SledMessagesIter { mod tests { use core::fmt; - use presage::{ - libsignal_service::{ - content::{ContentBody, Metadata}, - prelude::Uuid, - proto::DataMessage, - protocol::{ - self, Direction, GenericSignedPreKey, IdentityKeyStore, PreKeyRecord, PreKeyStore, - SessionRecord, SessionStore, SignedPreKeyRecord, SignedPreKeyStore, - }, - ServiceAddress, + use presage::libsignal_service::{ + content::{ContentBody, Metadata}, + prelude::Uuid, + proto::DataMessage, + protocol::{ + self, Direction, GenericSignedPreKey, IdentityKeyStore, PreKeyRecord, PreKeyStore, + SessionRecord, SessionStore, SignedPreKeyRecord, SignedPreKeyStore, }, - Store, + ServiceAddress, }; + use presage::store::ContentsStore; use quickcheck::{Arbitrary, Gen}; use super::SledStore; @@ -1122,10 +1126,10 @@ mod tests { struct KeyPair(protocol::KeyPair); #[derive(Debug, Clone)] - struct Thread(presage::Thread); + struct Thread(presage::store::Thread); #[derive(Debug, Clone)] - struct Content(presage::prelude::Content); + struct Content(presage::libsignal_service::content::Content); impl fmt::Debug for KeyPair { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -1170,13 +1174,16 @@ mod tests { timestamp: Some(timestamp), ..Default::default() }); - Self(presage::prelude::Content::from_body(content_body, metadata)) + Self(presage::libsignal_service::content::Content::from_body( + content_body, + metadata, + )) } } impl Arbitrary for Thread { fn arbitrary(g: &mut Gen) -> Self { - Self(presage::Thread::Contact(Uuid::from_u128( + Self(presage::store::Thread::Contact(Uuid::from_u128( Arbitrary::arbitrary(g), ))) } @@ -1247,8 +1254,11 @@ mod tests { == signed_pre_key_record.serialize().unwrap() } - fn content_with_timestamp(content: &Content, ts: u64) -> presage::prelude::Content { - presage::prelude::Content { + fn content_with_timestamp( + content: &Content, + ts: u64, + ) -> presage::libsignal_service::content::Content { + presage::libsignal_service::content::Content { metadata: Metadata { timestamp: ts, ..content.0.metadata.clone() diff --git a/presage-store-sled/src/protobuf.rs b/presage-store-sled/src/protobuf.rs index 5a65340a3..b1502d08a 100644 --- a/presage-store-sled/src/protobuf.rs +++ b/presage-store-sled/src/protobuf.rs @@ -4,15 +4,16 @@ mod textsecure { include!(concat!(env!("OUT_DIR"), "/textsecure.rs")); } +use presage::libsignal_service::content::Content; +use presage::libsignal_service::content::ContentBody; +use presage::libsignal_service::content::Metadata; +use presage::libsignal_service::proto; +use presage::libsignal_service::ServiceAddress; + use crate::SledStoreError; use self::textsecure::AddressProto; use self::textsecure::MetadataProto; -use presage::prelude::content::ContentBody; -use presage::prelude::content::Metadata; -use presage::prelude::proto; -use presage::prelude::Content; -use presage::prelude::ServiceAddress; impl From for AddressProto { fn from(s: ServiceAddress) -> Self { diff --git a/presage/src/errors.rs b/presage/src/errors.rs index 2507b0c2a..3b5d1cc38 100644 --- a/presage/src/errors.rs +++ b/presage/src/errors.rs @@ -6,6 +6,7 @@ use libsignal_service::{ use crate::store::StoreError; +/// The error type of Signal manager #[derive(thiserror::Error, Debug)] pub enum Error { #[error("captcha from https://signalcaptchas.org/registration/generate.html required")] diff --git a/presage/src/lib.rs b/presage/src/lib.rs index 753aa5516..9acd64488 100644 --- a/presage/src/lib.rs +++ b/presage/src/lib.rs @@ -1,38 +1,14 @@ mod cache; mod errors; -mod manager; +pub mod manager; mod serde; -mod store; +pub mod store; -pub use errors::Error; -pub use manager::{ - Confirmation, Linking, Manager, ReceivingMode, Registered, Registration, RegistrationOptions, -}; -pub use store::{ContentTimestamp, Store, StoreError, Thread}; - -#[deprecated(note = "Please help use improve the prelude module instead")] pub use libsignal_service; +/// Protobufs used in Signal protocol and service communication +pub use libsignal_service::proto; -pub mod prelude { - pub use libsignal_service::{ - configuration::SignalServers, - content::{ - self, Content, ContentBody, DataMessage, GroupContext, GroupContextV2, GroupType, - Metadata, SyncMessage, - }, - groups_v2::{AccessControl, Group, GroupChange, PendingMember, RequestingMember, Timer}, - models::Contact, - prelude::{ - phonenumber::{self, PhoneNumber}, - GroupMasterKey, GroupSecretParams, Uuid, - }, - proto, - sender::AttachmentSpec, - ParseServiceAddressError, ServiceAddress, - }; -} +pub use errors::Error; +pub use manager::Manager; const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "-rs-", env!("CARGO_PKG_VERSION")); - -// TODO: open a PR in libsignal and make sure the bytes can be read from `GroupMasterKey` instead of using this type -pub type GroupMasterKeyBytes = [u8; 32]; diff --git a/presage/src/manager.rs b/presage/src/manager.rs deleted file mode 100644 index b03452cc4..000000000 --- a/presage/src/manager.rs +++ /dev/null @@ -1,1565 +0,0 @@ -use std::{ - fmt, - ops::RangeBounds, - sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; - -use futures::{channel::mpsc, channel::oneshot, future, pin_mut, AsyncReadExt, Stream, StreamExt}; -use log::{debug, error, info, trace, warn}; -use parking_lot::Mutex; -use rand::{ - distributions::{Alphanumeric, DistString}, - rngs::StdRng, - RngCore, SeedableRng, -}; -use serde::{Deserialize, Serialize}; -use url::Url; - -use libsignal_service::push_service::{RegistrationMethod, VerificationTransport}; -use libsignal_service::{ - attachment_cipher::decrypt_in_place, - cipher, - configuration::{ServiceConfiguration, SignalServers, SignalingKey}, - content::{ContentBody, DataMessage, DataMessageFlags, Metadata, SyncMessage}, - groups_v2::{decrypt_group, Group, GroupsManager, InMemoryCredentialsCache}, - messagepipe::ServiceCredentials, - models::Contact, - prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid}, - proto::{data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage}, - protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate}, - provisioning::{generate_registration_id, LinkingManager, SecondaryDeviceProvisioning}, - push_service::{ - AccountAttributes, DeviceCapabilities, DeviceId, ServiceError, ServiceIds, WhoAmIResponse, - DEFAULT_DEVICE_ID, - }, - receiver::MessageReceiver, - sender::{AttachmentSpec, AttachmentUploadError}, - unidentified_access::UnidentifiedAccess, - utils::{ - serde_optional_private_key, serde_optional_public_key, serde_private_key, serde_public_key, - serde_signaling_key, - }, - websocket::SignalWebSocket, - AccountManager, Profile, ServiceAddress, -}; -use libsignal_service::{messagepipe::Incoming, proto::EditMessage}; -use libsignal_service_hyper::push_service::HyperPushService; - -use crate::cache::CacheCell; -use crate::{serde::serde_profile_key, Thread}; -use crate::{store::Store, Error}; - -type ServiceCipher = cipher::ServiceCipher; -type MessageSender = libsignal_service::prelude::MessageSender; - -#[derive(Clone)] -pub struct Manager { - /// Implementation of a config-store to give to libsignal - config_store: Store, - /// Part of the manager which is persisted in the store. - state: State, - /// Random number generator - rng: StdRng, -} - -impl fmt::Debug for Manager { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Manager") - .field("state", &self.state) - .finish_non_exhaustive() - } -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct RegistrationOptions<'a> { - pub signal_servers: SignalServers, - pub phone_number: PhoneNumber, - pub use_voice_call: bool, - pub captcha: Option<&'a str>, - pub force: bool, -} - -pub struct Registration; -pub struct Linking; - -pub struct Confirmation { - signal_servers: SignalServers, - phone_number: PhoneNumber, - password: String, - session_id: String, -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct Registered { - #[serde(skip)] - push_service_cache: CacheCell, - #[serde(skip)] - identified_websocket: Arc>>, - #[serde(skip)] - unidentified_websocket: Arc>>, - #[serde(skip)] - unidentified_sender_certificate: Option, - - pub signal_servers: SignalServers, - pub device_name: Option, - pub phone_number: PhoneNumber, - #[serde(flatten)] - pub service_ids: ServiceIds, - password: String, - #[serde(with = "serde_signaling_key")] - signaling_key: SignalingKey, - pub device_id: Option, - pub registration_id: u32, - #[serde(default)] - pub pni_registration_id: Option, - #[serde(with = "serde_private_key", rename = "private_key")] - pub aci_private_key: PrivateKey, - #[serde(with = "serde_public_key", rename = "public_key")] - pub aci_public_key: PublicKey, - #[serde(with = "serde_optional_private_key", default)] - pub pni_private_key: Option, - #[serde(with = "serde_optional_public_key", default)] - pub pni_public_key: Option, - #[serde(with = "serde_profile_key")] - profile_key: ProfileKey, -} - -impl fmt::Debug for Registered { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Registered") - .field("websocket", &self.identified_websocket.lock().is_some()) - .finish_non_exhaustive() - } -} - -impl Registered { - pub fn device_id(&self) -> u32 { - self.device_id.unwrap_or(DEFAULT_DEVICE_ID) - } -} - -impl Manager { - /// Registers a new account with a phone number (and some options). - /// - /// The returned value is a [confirmation manager](Manager::confirm_verification_code) which you then - /// have to use to send the confirmation code. - /// - /// ```no_run - /// use std::str::FromStr; - /// - /// use presage::{ - /// prelude::{phonenumber::PhoneNumber, SignalServers}, - /// Manager, RegistrationOptions, - /// }; - /// use presage_store_sled::{MigrationConflictStrategy, SledStore}; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// - /// let config_store = - /// SledStore::open("/tmp/presage-example", MigrationConflictStrategy::Drop)?; - /// - /// let manager = Manager::register( - /// config_store, - /// RegistrationOptions { - /// signal_servers: SignalServers::Production, - /// phone_number: PhoneNumber::from_str("+16137827274")?, - /// use_voice_call: false, - /// captcha: None, - /// force: false, - /// }, - /// ) - /// .await?; - /// - /// Ok(()) - /// } - /// ``` - pub async fn register( - mut config_store: C, - registration_options: RegistrationOptions<'_>, - ) -> Result, Error> { - let RegistrationOptions { - signal_servers, - phone_number, - use_voice_call, - captcha, - force, - } = registration_options; - - // check if we are already registered - if !force && config_store.is_registered() { - return Err(Error::AlreadyRegisteredError); - } - - config_store.clear_registration()?; - - // generate a random alphanumeric 24 chars password - let mut rng = StdRng::from_entropy(); - let password = Alphanumeric.sample_string(&mut rng, 24); - - let service_configuration: ServiceConfiguration = signal_servers.into(); - let mut push_service = - HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); - - trace!("creating registration verification session"); - - let phone_number_string = phone_number.to_string(); - let mut session = push_service - .create_verification_session(&phone_number_string, None, None, None) - .await?; - - if !session.allowed_to_request_code { - if session.captcha_required() { - trace!("captcha required"); - if captcha.is_none() { - return Err(Error::CaptchaRequired); - } - session = push_service - .patch_verification_session(&session.id, None, None, None, captcha, None) - .await? - } - if session.push_challenge_required() { - return Err(Error::PushChallengeRequired); - } - } - - if !session.allowed_to_request_code { - return Err(Error::RequestingCodeForbidden(session)); - } - - trace!("requesting verification code"); - - session = push_service - .request_verification_code( - &session.id, - crate::USER_AGENT, - if use_voice_call { - VerificationTransport::Voice - } else { - VerificationTransport::Sms - }, - ) - .await?; - - let manager = Manager { - config_store, - state: Confirmation { - signal_servers, - phone_number, - password, - session_id: session.id, - }, - rng, - }; - - Ok(manager) - } -} - -impl Manager { - /// Links this client as a secondary device from the device used to register the account (usually a phone). - /// The URL to present to the user will be sent in the channel given as the argument. - /// - /// ```no_run - /// use futures::{channel::oneshot, future, StreamExt}; - /// use presage::{prelude::SignalServers, Manager}; - /// use presage_store_sled::{MigrationConflictStrategy, SledStore}; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// let config_store = - /// SledStore::open("/tmp/presage-example", MigrationConflictStrategy::Drop)?; - /// - /// let (mut tx, mut rx) = oneshot::channel(); - /// let (manager, err) = future::join( - /// Manager::link_secondary_device( - /// config_store, - /// SignalServers::Production, - /// "my-linked-client".into(), - /// tx, - /// ), - /// async move { - /// match rx.await { - /// Ok(url) => println!("Show URL {} as QR code to user", url), - /// Err(e) => println!("Error linking device: {}", e), - /// } - /// }, - /// ) - /// .await; - /// - /// Ok(()) - /// } - /// ``` - pub async fn link_secondary_device( - mut config_store: C, - signal_servers: SignalServers, - device_name: String, - provisioning_link_channel: oneshot::Sender, - ) -> Result, Error> { - // clear the database: the moment we start the process, old API credentials are invalidated - // and you won't be able to use this client anyways - config_store.clear_registration()?; - - // generate a random alphanumeric 24 chars password - let mut rng = StdRng::from_entropy(); - let password = Alphanumeric.sample_string(&mut rng, 24); - - // generate a 52 bytes signaling key - let mut signaling_key = [0u8; 52]; - rng.fill_bytes(&mut signaling_key); - - let service_configuration: ServiceConfiguration = signal_servers.into(); - let push_service = - HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); - - let mut linking_manager: LinkingManager = - LinkingManager::new(push_service, password.clone()); - - let (tx, mut rx) = mpsc::channel(1); - - let (wait_for_qrcode_scan, registration) = future::join( - linking_manager.provision_secondary_device(&mut rng, signaling_key, tx), - async move { - if let Some(SecondaryDeviceProvisioning::Url(url)) = rx.next().await { - log::info!("generating qrcode from provisioning link: {}", &url); - if provisioning_link_channel.send(url).is_err() { - return Err(Error::LinkError); - } - } else { - return Err(Error::LinkError); - } - - if let Some(SecondaryDeviceProvisioning::NewDeviceRegistration { - phone_number, - device_id: DeviceId { device_id }, - registration_id, - pni_registration_id, - profile_key, - service_ids, - aci_private_key, - aci_public_key, - pni_private_key, - pni_public_key, - }) = rx.next().await - { - log::info!("successfully registered device {}", &service_ids); - Ok(Registered { - push_service_cache: CacheCell::default(), - identified_websocket: Default::default(), - unidentified_websocket: Default::default(), - unidentified_sender_certificate: Default::default(), - signal_servers, - device_name: Some(device_name), - phone_number, - service_ids, - signaling_key, - password, - device_id: Some(device_id), - registration_id, - pni_registration_id: Some(pni_registration_id), - aci_public_key, - aci_private_key, - pni_public_key: Some(pni_public_key), - pni_private_key: Some(pni_private_key), - profile_key: ProfileKey::create( - profile_key.try_into().expect("32 bytes for profile key"), - ), - }) - } else { - Err(Error::NoProvisioningMessageReceived) - } - }, - ) - .await; - - wait_for_qrcode_scan?; - - let mut manager = Manager { - rng, - config_store, - state: registration?, - }; - - manager.config_store.save_state(&manager.state)?; - - match ( - manager.register_pre_keys().await, - manager.set_account_attributes().await, - manager.sync_contacts().await, - ) { - (Err(e), _, _) | (_, Err(e), _) => { - // clear the entire store on any error, there's no possible recovery here - manager.config_store.clear_registration()?; - Err(e) - } - (_, _, Err(e)) => { - warn!("failed to synchronize contacts: {e}"); - Ok(manager) - } - _ => Ok(manager), - } - } -} - -impl Manager { - /// Confirm a newly registered account using the code you - /// received by SMS or phone call. - /// - /// Returns a [registered manager](Manager::load_registered) that you can use - /// to send and receive messages. - pub async fn confirm_verification_code( - self, - confirmation_code: impl AsRef, - ) -> Result, Error> { - trace!("confirming verification code"); - - let registration_id = generate_registration_id(&mut StdRng::from_entropy()); - let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); - - let Confirmation { - signal_servers, - phone_number, - password, - session_id, - } = self.state; - - let credentials = ServiceCredentials { - uuid: None, - phonenumber: phone_number.clone(), - password: Some(password.clone()), - signaling_key: None, - device_id: None, - }; - - let service_configuration: ServiceConfiguration = signal_servers.into(); - let mut push_service = HyperPushService::new( - service_configuration, - Some(credentials), - crate::USER_AGENT.to_string(), - ); - - let session = push_service - .submit_verification_code(&session_id, confirmation_code.as_ref()) - .await?; - - trace!("verification code submitted"); - - if !session.verified { - return Err(Error::UnverifiedRegistrationSession); - } - - let mut rng = StdRng::from_entropy(); - - // generate a 52 bytes signaling key - let mut signaling_key = [0u8; 52]; - rng.fill_bytes(&mut signaling_key); - - let mut profile_key = [0u8; 32]; - rng.fill_bytes(&mut profile_key); - - let profile_key = ProfileKey::generate(profile_key); - - let skip_device_transfer = false; - let registered = push_service - .submit_registration_request( - RegistrationMethod::SessionId(&session_id), - AccountAttributes { - name: None, - signaling_key: Some(signaling_key.to_vec()), - registration_id, - pni_registration_id, - voice: false, - video: false, - fetches_messages: true, - pin: None, - registration_lock: None, - unidentified_access_key: Some(profile_key.derive_access_key().to_vec()), - unrestricted_unidentified_access: false, // TODO: make this configurable? - discoverable_by_phone_number: true, - capabilities: DeviceCapabilities { - gv2: true, - gv1_migration: true, - ..Default::default() - }, - }, - skip_device_transfer, - ) - .await?; - - let aci_identity_key_pair = KeyPair::generate(&mut rng); - let pni_identity_key_pair = KeyPair::generate(&mut rng); - - trace!("confirmed! (and registered)"); - - let mut manager = Manager { - rng, - config_store: self.config_store, - state: Registered { - push_service_cache: CacheCell::default(), - identified_websocket: Default::default(), - unidentified_websocket: Default::default(), - unidentified_sender_certificate: Default::default(), - signal_servers: self.state.signal_servers, - device_name: None, - phone_number, - service_ids: ServiceIds { - aci: registered.uuid, - pni: registered.pni, - }, - password, - signaling_key, - device_id: None, - registration_id, - pni_registration_id: Some(pni_registration_id), - aci_private_key: aci_identity_key_pair.private_key, - aci_public_key: aci_identity_key_pair.public_key, - pni_private_key: Some(pni_identity_key_pair.private_key), - pni_public_key: Some(pni_identity_key_pair.public_key), - profile_key, - }, - }; - - manager.config_store.save_state(&manager.state)?; - - if let Err(e) = manager.register_pre_keys().await { - // clear the entire store on any error, there's no possible recovery here - manager.config_store.clear_registration()?; - Err(e) - } else { - Ok(manager) - } - } -} - -impl Manager { - /// Loads a previously registered account from the implemented [Store]. - /// - /// Returns a instance of [Manager] you can use to send & receive messages. - pub async fn load_registered(config_store: C) -> Result> { - let state = config_store - .load_state()? - .ok_or(Error::NotYetRegisteredError)?; - - let mut manager = Self { - rng: StdRng::from_entropy(), - config_store, - state, - }; - - if manager.state.pni_registration_id.is_none() { - manager.set_account_attributes().await?; - } - - Ok(manager) - } - - async fn register_pre_keys(&mut self) -> Result<(), Error> { - trace!("registering pre keys"); - let mut account_manager = - AccountManager::new(self.push_service()?, Some(self.state.profile_key)); - - let (pre_keys_offset_id, next_signed_pre_key_id, next_pq_pre_key_id) = account_manager - .update_pre_key_bundle( - &mut self.config_store.clone(), - &mut self.rng, - self.config_store.pre_keys_offset_id()?, - self.config_store.next_signed_pre_key_id()?, - self.config_store.next_pq_pre_key_id()?, - true, - ) - .await?; - - self.config_store - .set_pre_keys_offset_id(pre_keys_offset_id)?; - self.config_store - .set_next_signed_pre_key_id(next_signed_pre_key_id)?; - self.config_store - .set_next_pq_pre_key_id(next_pq_pre_key_id)?; - - trace!("registered pre keys"); - Ok(()) - } - - async fn set_account_attributes(&mut self) -> Result<(), Error> { - trace!("setting account attributes"); - let mut account_manager = - AccountManager::new(self.push_service()?, Some(self.state.profile_key)); - - let pni_registration_id = if let Some(pni_registration_id) = self.state.pni_registration_id - { - pni_registration_id - } else { - info!("migrating to PNI"); - let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); - self.state.pni_registration_id = Some(pni_registration_id); - self.config_store.save_state(&self.state)?; - pni_registration_id - }; - - account_manager - .set_account_attributes(AccountAttributes { - name: self.state.device_name.clone(), - registration_id: self.state.registration_id, - pni_registration_id, - signaling_key: None, - voice: false, - video: false, - fetches_messages: true, - pin: None, - registration_lock: None, - unidentified_access_key: Some(self.state.profile_key.derive_access_key().to_vec()), - unrestricted_unidentified_access: false, - discoverable_by_phone_number: true, - capabilities: DeviceCapabilities { - gv2: true, - gv1_migration: true, - ..Default::default() - }, - }) - .await?; - - if self.state.pni_registration_id.is_none() { - debug!("fetching PNI UUID and updating state"); - let whoami = self.whoami().await?; - self.state.service_ids.pni = whoami.pni; - self.config_store.save_state(&self.state)?; - } - - trace!("done setting account attributes"); - Ok(()) - } - - async fn wait_for_contacts_sync( - &mut self, - mut messages: impl Stream + Unpin, - ) -> Result<(), Error> { - let mut message_receiver = MessageReceiver::new(self.push_service()?); - while let Some(Content { body, .. }) = messages.next().await { - if let ContentBody::SynchronizeMessage(SyncMessage { - contacts: Some(contacts), - .. - }) = body - { - let contacts = message_receiver.retrieve_contacts(&contacts).await?; - let _ = self.config_store.clear_contacts(); - self.config_store - .save_contacts(contacts.filter_map(Result::ok))?; - info!("saved contacts"); - return Ok(()); - } - } - Ok(()) - } - - async fn sync_contacts(&mut self) -> Result<(), Error> { - let messages = self - .receive_messages_stream(ReceivingMode::WaitForContacts) - .await?; - self.request_contacts_sync().await?; - - info!("waiting for contacts sync for up to 60 seconds"); - - pin_mut!(messages); - tokio::time::timeout( - Duration::from_secs(60), - self.wait_for_contacts_sync(messages), - ) - .await - .map_err(Error::from)??; - - Ok(()) - } - - /// Request that the primary device to encrypt & send all of its contacts as a message to ourselves - /// which can be then received, decrypted and stored in the message receiving loop. - /// - /// **Note**: If successful, the contacts are not yet received and stored, but will only be - /// processed when they're received using the `MessageReceiver`. - pub async fn request_contacts_sync(&mut self) -> Result<(), Error> { - trace!("requesting contacts sync"); - let sync_message = SyncMessage { - request: Some(sync_message::Request { - r#type: Some(sync_message::request::Type::Contacts as i32), - }), - ..Default::default() - }; - - let timestamp = std::time::SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis() as u64; - - self.send_message(self.state.service_ids.aci, sync_message, timestamp) - .await?; - - Ok(()) - } - - async fn sender_certificate(&mut self) -> Result> { - let needs_renewal = |sender_certificate: Option<&SenderCertificate>| -> bool { - if sender_certificate.is_none() { - return true; - } - - let seconds_since_epoch = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs(); - - if let Some(expiration) = sender_certificate.and_then(|s| s.expiration().ok()) { - expiration >= seconds_since_epoch - 600 - } else { - true - } - }; - - if needs_renewal(self.state.unidentified_sender_certificate.as_ref()) { - let sender_certificate = self - .push_service()? - .get_uuid_only_sender_certificate() - .await?; - - self.state - .unidentified_sender_certificate - .replace(sender_certificate); - } - - Ok(self - .state - .unidentified_sender_certificate - .clone() - .expect("logic error")) - } - - pub async fn submit_recaptcha_challenge( - &self, - token: &str, - captcha: &str, - ) -> Result<(), Error> { - let mut account_manager = AccountManager::new(self.push_service()?, None); - account_manager - .submit_recaptcha_challenge(token, captcha) - .await?; - Ok(()) - } - - /// Returns a handle on the registered state - pub fn state(&self) -> &Registered { - &self.state - } - - /// Fetches basic information on the registered device. - pub async fn whoami(&self) -> Result> { - Ok(self.push_service()?.whoami().await?) - } - - /// Fetches the profile (name, about, status emoji) of the registered user. - pub async fn retrieve_profile(&mut self) -> Result> { - self.retrieve_profile_by_uuid(self.state.service_ids.aci, self.state.profile_key) - .await - } - - /// Fetches the profile of the provided user by UUID and profile key. - pub async fn retrieve_profile_by_uuid( - &mut self, - uuid: Uuid, - profile_key: ProfileKey, - ) -> Result> { - // Check if profile is cached. - if let Some(profile) = self.config_store.profile(uuid, profile_key).ok().flatten() { - return Ok(profile); - } - - let mut account_manager = AccountManager::new(self.push_service()?, Some(profile_key)); - - let profile = account_manager.retrieve_profile(uuid.into()).await?; - - let _ = self - .config_store - .save_profile(uuid, profile_key, profile.clone()); - Ok(profile) - } - - /// Get a single contact by its UUID - /// - /// Note: this only currently works when linked as secondary device (the contacts are sent by the primary device at linking time) - pub fn contact_by_id(&self, id: &Uuid) -> Result, Error> { - Ok(self.config_store.contact_by_id(*id)?) - } - - /// Returns an iterator on contacts stored in the [Store]. - pub fn contacts( - &self, - ) -> Result>>, Error> { - let iter = self.config_store.contacts()?; - Ok(iter.map(|r| r.map_err(Into::into))) - } - - /// Get a group (either from the local cache, or fetch it remotely) using its master key - pub fn group(&self, master_key_bytes: &[u8]) -> Result, Error> { - Ok(self.config_store.group(master_key_bytes.try_into()?)?) - } - - /// Returns an iterator on groups stored in the [Store]. - pub fn groups(&self) -> Result> { - Ok(self.config_store.groups()?) - } - - /// Get a single message in a thread (identified by its server-side sent timestamp) - pub fn message( - &self, - thread: &Thread, - timestamp: u64, - ) -> Result, Error> { - Ok(self.config_store.message(thread, timestamp)?) - } - - /// Get an iterator of messages in a thread, optionally starting from a point in time. - pub fn messages( - &self, - thread: &Thread, - range: impl RangeBounds, - ) -> Result> { - Ok(self.config_store.messages(thread, range)?) - } - - async fn receive_messages_encrypted( - &mut self, - ) -> Result>, Error> { - let credentials = self.credentials()?.ok_or(Error::NotYetRegisteredError)?; - let allow_stories = false; - let pipe = MessageReceiver::new(self.push_service()?) - .create_message_pipe(credentials, allow_stories) - .await?; - - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - let mut unidentified_push_service = - HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); - let unidentified_ws = unidentified_push_service - .ws("/v1/websocket/", &[], None, false) - .await?; - self.state.identified_websocket.lock().replace(pipe.ws()); - self.state - .unidentified_websocket - .lock() - .replace(unidentified_ws); - - Ok(pipe.stream()) - } - - /// Starts receiving and storing messages. - /// - /// Returns a [futures::Stream] of messages to consume. Messages will also be stored by the implementation of the [Store]. - pub async fn receive_messages( - &mut self, - ) -> Result, Error> { - self.receive_messages_stream(ReceivingMode::Forever).await - } - - pub async fn receive_messages_with_mode( - &mut self, - mode: ReceivingMode, - ) -> Result, Error> { - self.receive_messages_stream(mode).await - } - - fn groups_manager( - &self, - ) -> Result, Error> { - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - let server_public_params = service_configuration.zkgroup_server_public_params; - - let groups_credentials_cache = InMemoryCredentialsCache::default(); - let groups_manager = GroupsManager::new( - self.state.service_ids.clone(), - self.push_service()?, - groups_credentials_cache, - server_public_params, - ); - - Ok(groups_manager) - } - - async fn receive_messages_stream( - &mut self, - mode: ReceivingMode, - ) -> Result, Error> { - struct StreamState { - encrypted_messages: S, - message_receiver: MessageReceiver, - service_cipher: ServiceCipher, - config_store: C, - groups_manager: GroupsManager, - mode: ReceivingMode, - } - - let init = StreamState { - encrypted_messages: Box::pin(self.receive_messages_encrypted().await?), - message_receiver: MessageReceiver::new(self.push_service()?), - service_cipher: self.new_service_cipher()?, - config_store: self.config_store.clone(), - groups_manager: self.groups_manager()?, - mode, - }; - - Ok(futures::stream::unfold(init, |mut state| async move { - loop { - match state.encrypted_messages.next().await { - Some(Ok(Incoming::Envelope(envelope))) => { - match state.service_cipher.open_envelope(envelope).await { - Ok(Some(content)) => { - // contacts synchronization sent from the primary device (happens after linking, or on demand) - if let ContentBody::SynchronizeMessage(SyncMessage { - contacts: Some(contacts), - .. - }) = &content.body - { - match state.message_receiver.retrieve_contacts(contacts).await { - Ok(contacts) => { - let _ = state.config_store.clear_contacts(); - match state - .config_store - .save_contacts(contacts.filter_map(Result::ok)) - { - Ok(()) => { - info!("saved contacts"); - } - Err(e) => { - warn!("failed to save contacts: {e}"); - } - } - } - Err(e) => { - warn!("failed to retrieve contacts: {e}"); - } - } - - if let ReceivingMode::WaitForContacts = state.mode { - return None; - } - } - - if let ContentBody::DataMessage(DataMessage { - group_v2: - Some(GroupContextV2 { - master_key: Some(master_key_bytes), - revision: Some(revision), - .. - }), - .. - }) - | ContentBody::SynchronizeMessage(SyncMessage { - sent: - Some(sync_message::Sent { - message: - Some(DataMessage { - group_v2: - Some(GroupContextV2 { - master_key: Some(master_key_bytes), - revision: Some(revision), - .. - }), - .. - }), - .. - }), - .. - }) = &content.body - { - // there's two things to implement: the group metadata (fetched from HTTP API) - // and the group changes, which are part of the protobuf messages - // this means we kinda need our own internal representation of groups inside of presage? - if let Ok(Some(group)) = upsert_group( - &state.config_store, - &mut state.groups_manager, - master_key_bytes, - revision, - ) - .await - { - log::trace!("{group:?}"); - } - } - - if let Err(e) = - save_message(&mut state.config_store, content.clone()) - { - log::error!("Error saving message to store: {}", e); - } - - return Some((content, state)); - } - Ok(None) => { - debug!("Empty envelope..., message will be skipped!") - } - Err(e) => { - error!("Error opening envelope: {:?}, message will be skipped!", e); - } - } - } - Some(Ok(Incoming::QueueEmpty)) => { - debug!("empty queue"); - if let ReceivingMode::InitialSync = state.mode { - return None; - } - } - Some(Err(e)) => error!("Error: {}", e), - None => return None, - } - } - })) - } - - /// Sends a messages to the provided [ServiceAddress]. - /// The timestamp should be set to now and is used by Signal mobile apps - /// to order messages later, and apply reactions. - /// - /// This method will automatically update the [DataMessage::expire_timer] if it is set to - /// [None] such that the chat will keep the current expire timer. - pub async fn send_message( - &mut self, - recipient_addr: impl Into, - message: impl Into, - timestamp: u64, - ) -> Result<(), Error> { - let mut sender = self.new_message_sender().await?; - - let online_only = false; - let recipient = recipient_addr.into(); - let mut content_body: ContentBody = message.into(); - - // Only update the expiration timer if it is not set. - match content_body { - ContentBody::DataMessage(DataMessage { - expire_timer: ref mut timer, - .. - }) if timer.is_none() => { - // Set the expire timer to None for errors. - let store_expire_timer = self - .config_store - .expire_timer(&Thread::Contact(recipient.uuid)) - .unwrap_or_default(); - - *timer = store_expire_timer; - } - _ => {} - } - - let sender_certificate = self.sender_certificate().await?; - let unidentified_access = - self.config_store - .profile_key(&recipient.uuid)? - .map(|profile_key| UnidentifiedAccess { - key: profile_key.derive_access_key().to_vec(), - certificate: sender_certificate.clone(), - }); - - sender - .send_message( - &recipient, - unidentified_access, - content_body.clone(), - timestamp, - online_only, - ) - .await?; - - // save the message - let content = Content { - metadata: Metadata { - sender: self.state.service_ids.aci.into(), - sender_device: self.state.device_id(), - timestamp, - needs_receipt: false, - unidentified_sender: false, - }, - body: content_body, - }; - - save_message(&mut self.config_store, content)?; - - Ok(()) - } - - /// Uploads attachments prior to linking them in a message. - pub async fn upload_attachments( - &self, - attachments: Vec<(AttachmentSpec, Vec)>, - ) -> Result>, Error> { - if attachments.is_empty() { - return Ok(Vec::new()); - } - let sender = self.new_message_sender().await?; - let upload = future::join_all(attachments.into_iter().map(move |(spec, contents)| { - let mut sender = sender.clone(); - async move { sender.upload_attachment(spec, contents).await } - })); - Ok(upload.await) - } - - /// Sends one message in a group (v2). The `master_key_bytes` is required to have 32 elements. - /// - /// This method will automatically update the [DataMessage::expire_timer] if it is set to - /// [None] such that the chat will keep the current expire timer. - pub async fn send_message_to_group( - &mut self, - master_key_bytes: &[u8], - mut message: DataMessage, - timestamp: u64, - ) -> Result<(), Error> { - // Only update the expiration timer if it is not set. - match message { - DataMessage { - expire_timer: ref mut timer, - .. - } if timer.is_none() => { - // Set the expire timer to None for errors. - let store_expire_timer = self - .config_store - .expire_timer(&Thread::Group( - master_key_bytes - .try_into() - .expect("Master key bytes to be of size 32."), - )) - .unwrap_or_default(); - - *timer = store_expire_timer; - } - _ => {} - } - let mut sender = self.new_message_sender().await?; - - let mut groups_manager = self.groups_manager()?; - let Some(group) = upsert_group( - &self.config_store, - &mut groups_manager, - master_key_bytes, - &0, - ) - .await? - else { - return Err(Error::UnknownGroup); - }; - - let sender_certificate = self.sender_certificate().await?; - let mut recipients = Vec::new(); - for member in group - .members - .into_iter() - .filter(|m| m.uuid != self.state.service_ids.aci) - { - let unidentified_access = - self.config_store - .profile_key(&member.uuid)? - .map(|profile_key| UnidentifiedAccess { - key: profile_key.derive_access_key().to_vec(), - certificate: sender_certificate.clone(), - }); - recipients.push((member.uuid.into(), unidentified_access)); - } - - let online_only = false; - let results = sender - .send_message_to_group(recipients, message.clone(), timestamp, online_only) - .await; - - // return first error if any - results.into_iter().find(|res| res.is_err()).transpose()?; - - let content = Content { - metadata: Metadata { - sender: self.state.service_ids.aci.into(), - sender_device: self.state.device_id(), - timestamp, - needs_receipt: false, // TODO: this is just wrong - unidentified_sender: false, - }, - body: message.into(), - }; - - save_message(&mut self.config_store, content)?; - - Ok(()) - } - - /// Clears all sessions established wiht [recipient](ServiceAddress). - pub async fn clear_sessions(&self, recipient: &ServiceAddress) -> Result<(), Error> { - self.config_store.delete_all_sessions(recipient).await?; - Ok(()) - } - - /// Downloads and decrypts a single attachment. - pub async fn get_attachment( - &self, - attachment_pointer: &AttachmentPointer, - ) -> Result, Error> { - let mut service = self.push_service()?; - let mut attachment_stream = service.get_attachment(attachment_pointer).await?; - - // We need the whole file for the crypto to check out - let mut ciphertext = Vec::new(); - let len = attachment_stream.read_to_end(&mut ciphertext).await?; - - trace!("downloaded encrypted attachment of {} bytes", len); - - let key: [u8; 64] = attachment_pointer.key().try_into()?; - decrypt_in_place(key, &mut ciphertext)?; - - Ok(ciphertext) - } - - pub async fn send_session_reset( - &mut self, - recipient: &ServiceAddress, - timestamp: u64, - ) -> Result<(), Error> { - log::trace!("Resetting session for address: {}", recipient.uuid); - let message = DataMessage { - flags: Some(DataMessageFlags::EndSession as u32), - ..Default::default() - }; - - self.send_message(*recipient, message, timestamp).await?; - - Ok(()) - } - - fn credentials(&self) -> Result, Error> { - Ok(Some(ServiceCredentials { - uuid: Some(self.state.service_ids.aci), - phonenumber: self.state.phone_number.clone(), - password: Some(self.state.password.clone()), - signaling_key: Some(self.state.signaling_key), - device_id: self.state.device_id, - })) - } - - /// Returns a clone of a cached push service. - /// - /// If no service is yet cached, it will create and cache one. - fn push_service(&self) -> Result> { - self.state.push_service_cache.get(|| { - let credentials = self.credentials()?; - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - - Ok(HyperPushService::new( - service_configuration, - credentials, - crate::USER_AGENT.to_string(), - )) - }) - } - - /// Creates a new message sender. - async fn new_message_sender(&self) -> Result, Error> { - let local_addr = ServiceAddress { - uuid: self.state.service_ids.aci, - }; - - let identified_websocket = self - .state - .identified_websocket - .lock() - .clone() - .ok_or(Error::MessagePipeNotStarted)?; - - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - let mut unidentified_push_service = - HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); - let unidentified_websocket = unidentified_push_service - .ws("/v1/websocket/", &[], None, false) - .await?; - - Ok(MessageSender::new( - identified_websocket, - unidentified_websocket, - self.push_service()?, - self.new_service_cipher()?, - self.rng.clone(), - self.config_store.clone(), - local_addr, - self.state.device_id.unwrap_or(DEFAULT_DEVICE_ID).into(), - )) - } - - /// Creates a new service cipher. - fn new_service_cipher(&self) -> Result, Error> { - let service_configuration: ServiceConfiguration = self.state.signal_servers.into(); - let service_cipher = ServiceCipher::new( - self.config_store.clone(), - self.rng.clone(), - service_configuration.unidentified_sender_trust_root, - self.state.service_ids.aci, - self.state.device_id.unwrap_or(DEFAULT_DEVICE_ID), - ); - - Ok(service_cipher) - } - - /// Returns the title of a thread (contact or group). - pub async fn thread_title(&self, thread: &Thread) -> Result> { - match thread { - Thread::Contact(uuid) => { - let contact = match self.contact_by_id(uuid) { - Ok(contact) => contact, - Err(e) => { - log::info!("Error getting contact by id: {}, {:?}", e, uuid); - None - } - }; - Ok(match contact { - Some(contact) => contact.name, - None => uuid.to_string(), - }) - } - Thread::Group(id) => match self.group(id)? { - Some(group) => Ok(group.title), - None => Ok("".to_string()), - }, - } - } - - #[deprecated = "use Manager::contact_by_id"] - pub fn get_contacts( - &self, - ) -> Result>>, Error> { - self.contacts() - } - - #[deprecated = "use Manager::contact_by_id"] - pub fn get_contact_by_id(&self, id: Uuid) -> Result, Error> { - self.contact_by_id(&id) - } - - #[deprecated = "use Manager::groups"] - pub fn get_groups(&self) -> Result> { - self.groups() - } - - #[deprecated = "use Manager::group"] - pub fn get_group(&self, master_key_bytes: &[u8]) -> Result, Error> { - self.group(master_key_bytes) - } -} - -/// The mode receiving messages stream -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] -pub enum ReceivingMode { - /// Don't stop the stream - #[default] - Forever, - /// Stop the stream after the initial sync - /// - /// That is, when the Signal's message queue becomes empty. - InitialSync, - /// Stop the stream after contacts are synced - WaitForContacts, -} - -async fn upsert_group( - config_store: &C, - groups_manager: &mut GroupsManager, - master_key_bytes: &[u8], - revision: &u32, -) -> Result, Error> { - let upsert_group = match config_store.group(master_key_bytes.try_into()?) { - Ok(Some(group)) => { - log::debug!("loaded group from local db {}", group.title); - group.revision < *revision - } - Ok(None) => true, - Err(e) => { - log::warn!("failed to retrieve group from local db {}", e); - true - } - }; - - if upsert_group { - log::debug!("fetching and saving group"); - match groups_manager.fetch_encrypted_group(master_key_bytes).await { - Ok(encrypted_group) => { - let group = decrypt_group(master_key_bytes, encrypted_group)?; - if let Err(e) = config_store.save_group(master_key_bytes.try_into()?, &group) { - log::error!("failed to save group {master_key_bytes:?}: {e}",); - } - } - Err(e) => { - log::warn!("failed to fetch encrypted group: {e}") - } - } - } - - Ok(config_store.group(master_key_bytes.try_into()?)?) -} - -fn save_message(config_store: &mut C, message: Content) -> Result<(), Error> { - // derive the thread from the message type - let thread = Thread::try_from(&message)?; - - // update recipient profile keys - if let ContentBody::DataMessage(DataMessage { - profile_key: Some(profile_key_bytes), - .. - }) = &message.body - { - if let Ok(profile_key_bytes) = profile_key_bytes.clone().try_into() { - let sender_uuid = message.metadata.sender.uuid; - let profile_key = ProfileKey::create(profile_key_bytes); - log::debug!("inserting profile key for {sender_uuid}"); - config_store.upsert_profile_key(&sender_uuid, profile_key)?; - } - } - - // only save DataMessage and SynchronizeMessage (sent) - let message = match message.body { - ContentBody::NullMessage(_) => Some(message), - ContentBody::DataMessage(ref data_message) - | ContentBody::SynchronizeMessage(SyncMessage { - sent: - Some(sync_message::Sent { - message: Some(ref data_message), - .. - }), - .. - }) => match data_message { - DataMessage { - delete: - Some(Delete { - target_sent_timestamp: Some(ts), - }), - .. - } => { - // replace an existing message by an empty NullMessage - if let Some(mut existing_msg) = config_store.message(&thread, *ts)? { - existing_msg.metadata.sender.uuid = Uuid::nil(); - existing_msg.body = NullMessage::default().into(); - config_store.save_message(&thread, existing_msg)?; - debug!("message in thread {thread} @ {ts} deleted"); - None - } else { - warn!("could not find message to delete in thread {thread} @ {ts}"); - None - } - } - _ => Some(message), - }, - ContentBody::EditMessage(EditMessage { - target_sent_timestamp: Some(ts), - data_message: Some(data_message), - }) - | ContentBody::SynchronizeMessage(SyncMessage { - sent: - Some(sync_message::Sent { - edit_message: - Some(EditMessage { - target_sent_timestamp: Some(ts), - data_message: Some(data_message), - }), - .. - }), - .. - }) => { - if let Some(mut existing_msg) = config_store.message(&thread, ts)? { - existing_msg.metadata = message.metadata; - existing_msg.body = ContentBody::DataMessage(data_message); - // TODO: find a way to mark the message as edited (so that it's visible in a client) - trace!("message in thread {thread} @ {ts} edited"); - Some(existing_msg) - } else { - warn!("could not find edited message {thread} @ {ts}"); - None - } - } - ContentBody::CallMessage(_) - | ContentBody::SynchronizeMessage(SyncMessage { - call_event: Some(_), - .. - }) => Some(message), - ContentBody::SynchronizeMessage(s) => { - debug!("skipping saving sync message without interesting fields: {s:?}"); - None - } - ContentBody::ReceiptMessage(_) => { - debug!("skipping saving receipt message"); - None - } - ContentBody::TypingMessage(_) => { - debug!("skipping saving typing message"); - None - } - ContentBody::StoryMessage(_) => { - debug!("skipping story message"); - None - } - ContentBody::PniSignatureMessage(_) => { - debug!("skipping PNI signature message"); - None - } - ContentBody::EditMessage(_) => { - debug!("invalid edited"); - None - } - }; - - if let Some(message) = message { - config_store.save_message(&thread, message)?; - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use base64::engine::general_purpose; - use base64::Engine; - use libsignal_service::prelude::ProfileKey; - use libsignal_service::protocol::KeyPair; - use rand::RngCore; - use serde_json::json; - - use crate::Registered; - - #[test] - fn test_state_before_pni() { - let mut rng = rand::thread_rng(); - let key_pair = KeyPair::generate(&mut rng); - let mut profile_key = [0u8; 32]; - rng.fill_bytes(&mut profile_key); - let profile_key = ProfileKey::generate(profile_key); - let mut signaling_key = [0u8; 52]; - rng.fill_bytes(&mut signaling_key); - - // this is before public_key and private_key were renamed to aci_public_key and aci_private_key - // and pni_public_key + pni_private_key were added - let previous_state = json!({ - "signal_servers": "Production", - "device_name": "Test", - "phone_number": { - "code": { - "value": 1, - "source": "plus" - }, - "national": { - "value": 5550199, - "zeros": 0 - }, - "extension": null, - "carrier": null - }, - "uuid": "ff9a89d9-8052-4af0-a91d-2a0dfa0c6b95", - "password": "HelloWorldOfPasswords", - "signaling_key": general_purpose::STANDARD.encode(signaling_key), - "device_id": 42, - "registration_id": 64, - "private_key": general_purpose::STANDARD.encode(key_pair.private_key.serialize()), - "public_key": general_purpose::STANDARD.encode(key_pair.public_key.serialize()), - "profile_key": general_purpose::STANDARD.encode(profile_key.get_bytes()), - }); - - let state: Registered = serde_json::from_value(previous_state).expect("should deserialize"); - assert_eq!(state.aci_public_key, key_pair.public_key); - assert!(state.aci_private_key == key_pair.private_key); - assert!(state.pni_public_key.is_none()); - } -} diff --git a/presage/src/manager/confirmation.rs b/presage/src/manager/confirmation.rs new file mode 100644 index 000000000..d5eb27848 --- /dev/null +++ b/presage/src/manager/confirmation.rs @@ -0,0 +1,162 @@ +use libsignal_service::configuration::{ServiceConfiguration, SignalServers}; +use libsignal_service::messagepipe::ServiceCredentials; +use libsignal_service::prelude::phonenumber::PhoneNumber; +use libsignal_service::protocol::KeyPair; +use libsignal_service::provisioning::generate_registration_id; +use libsignal_service::push_service::{ + AccountAttributes, DeviceCapabilities, PushService, RegistrationMethod, ServiceIds, +}; +use libsignal_service::zkgroup::profiles::ProfileKey; +use libsignal_service_hyper::push_service::HyperPushService; +use log::trace; +use rand::rngs::StdRng; +use rand::{RngCore, SeedableRng}; + +use crate::cache::CacheCell; +use crate::manager::registered::RegistrationData; +use crate::store::Store; +use crate::{Error, Manager}; + +use super::Registered; + +/// Manager state after a successful registration of new main device +/// +/// In this state, the user has to confirm the new registration via a validation code. +pub struct Confirmation { + pub(crate) signal_servers: SignalServers, + pub(crate) phone_number: PhoneNumber, + pub(crate) password: String, + pub(crate) session_id: String, +} + +impl Manager { + /// Confirm a newly registered account using the code you + /// received by SMS or phone call. + /// + /// Returns a [registered manager](Manager::load_registered) that you can use + /// to send and receive messages. + pub async fn confirm_verification_code( + self, + confirmation_code: impl AsRef, + ) -> Result, Error> { + trace!("confirming verification code"); + + let registration_id = generate_registration_id(&mut StdRng::from_entropy()); + let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); + + let Confirmation { + signal_servers, + phone_number, + password, + session_id, + } = self.state; + + let credentials = ServiceCredentials { + uuid: None, + phonenumber: phone_number.clone(), + password: Some(password.clone()), + signaling_key: None, + device_id: None, + }; + + let service_configuration: ServiceConfiguration = signal_servers.into(); + let mut push_service = HyperPushService::new( + service_configuration, + Some(credentials), + crate::USER_AGENT.to_string(), + ); + + let session = push_service + .submit_verification_code(&session_id, confirmation_code.as_ref()) + .await?; + + trace!("verification code submitted"); + + if !session.verified { + return Err(Error::UnverifiedRegistrationSession); + } + + let mut rng = StdRng::from_entropy(); + + // generate a 52 bytes signaling key + let mut signaling_key = [0u8; 52]; + rng.fill_bytes(&mut signaling_key); + + let mut profile_key = [0u8; 32]; + rng.fill_bytes(&mut profile_key); + + let profile_key = ProfileKey::generate(profile_key); + + let skip_device_transfer = false; + let registered = push_service + .submit_registration_request( + RegistrationMethod::SessionId(&session_id), + AccountAttributes { + name: None, + signaling_key: Some(signaling_key.to_vec()), + registration_id, + pni_registration_id, + voice: false, + video: false, + fetches_messages: true, + pin: None, + registration_lock: None, + unidentified_access_key: Some(profile_key.derive_access_key().to_vec()), + unrestricted_unidentified_access: false, // TODO: make this configurable? + discoverable_by_phone_number: true, + capabilities: DeviceCapabilities { + gv2: true, + gv1_migration: true, + ..Default::default() + }, + }, + skip_device_transfer, + ) + .await?; + + let aci_identity_key_pair = KeyPair::generate(&mut rng); + let pni_identity_key_pair = KeyPair::generate(&mut rng); + + trace!("confirmed! (and registered)"); + + let mut manager = Manager { + rng, + store: self.store, + state: Registered { + push_service_cache: CacheCell::default(), + identified_websocket: Default::default(), + unidentified_websocket: Default::default(), + unidentified_sender_certificate: Default::default(), + data: RegistrationData { + signal_servers: self.state.signal_servers, + device_name: None, + phone_number, + service_ids: ServiceIds { + aci: registered.uuid, + pni: registered.pni, + }, + password, + signaling_key, + device_id: None, + registration_id, + pni_registration_id: Some(pni_registration_id), + aci_private_key: aci_identity_key_pair.private_key, + aci_public_key: aci_identity_key_pair.public_key, + pni_private_key: Some(pni_identity_key_pair.private_key), + pni_public_key: Some(pni_identity_key_pair.public_key), + profile_key, + }, + }, + }; + + manager.store.save_registration_data(&manager.state.data)?; + + if let Err(e) = manager.register_pre_keys().await { + // clear the entire store on any error, there's no possible recovery here + manager.store.clear_registration()?; + Err(e) + } else { + Ok(manager) + } + } +} diff --git a/presage/src/manager/linking.rs b/presage/src/manager/linking.rs new file mode 100644 index 000000000..b19a6ba50 --- /dev/null +++ b/presage/src/manager/linking.rs @@ -0,0 +1,163 @@ +use futures::channel::{mpsc, oneshot}; +use futures::{future, StreamExt}; +use libsignal_service::configuration::{ServiceConfiguration, SignalServers}; +use libsignal_service::provisioning::{LinkingManager, SecondaryDeviceProvisioning}; +use libsignal_service::push_service::DeviceId; +use libsignal_service::zkgroup::profiles::ProfileKey; +use libsignal_service_hyper::push_service::HyperPushService; +use log::{info, warn}; +use rand::distributions::{Alphanumeric, DistString}; +use rand::rngs::StdRng; +use rand::{RngCore, SeedableRng}; +use url::Url; + +use crate::manager::registered::RegistrationData; +use crate::store::Store; +use crate::{Error, Manager}; + +use super::Registered; + +/// Manager state where it is possible to link a new secondary device +pub struct Linking; + +impl Manager { + /// Links this client as a secondary device from the device used to register the account (usually a phone). + /// The URL to present to the user will be sent in the channel given as the argument. + /// + /// ```no_run + /// use futures::{channel::oneshot, future, StreamExt}; + /// use presage::libsignal_service::configuration::SignalServers; + /// use presage::Manager; + /// use presage_store_sled::{MigrationConflictStrategy, SledStore}; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let store = + /// SledStore::open("/tmp/presage-example", MigrationConflictStrategy::Drop)?; + /// + /// let (mut tx, mut rx) = oneshot::channel(); + /// let (manager, err) = future::join( + /// Manager::link_secondary_device( + /// store, + /// SignalServers::Production, + /// "my-linked-client".into(), + /// tx, + /// ), + /// async move { + /// match rx.await { + /// Ok(url) => println!("Show URL {} as QR code to user", url), + /// Err(e) => println!("Error linking device: {}", e), + /// } + /// }, + /// ) + /// .await; + /// + /// Ok(()) + /// } + /// ``` + pub async fn link_secondary_device( + mut store: S, + signal_servers: SignalServers, + device_name: String, + provisioning_link_channel: oneshot::Sender, + ) -> Result, Error> { + // clear the database: the moment we start the process, old API credentials are invalidated + // and you won't be able to use this client anyways + store.clear_registration()?; + + // generate a random alphanumeric 24 chars password + let mut rng = StdRng::from_entropy(); + let password = Alphanumeric.sample_string(&mut rng, 24); + + // generate a 52 bytes signaling key + let mut signaling_key = [0u8; 52]; + rng.fill_bytes(&mut signaling_key); + + let service_configuration: ServiceConfiguration = signal_servers.into(); + let push_service = + HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); + + let mut linking_manager: LinkingManager = + LinkingManager::new(push_service, password.clone()); + + let (tx, mut rx) = mpsc::channel(1); + + let (wait_for_qrcode_scan, registration) = future::join( + linking_manager.provision_secondary_device(&mut rng, signaling_key, tx), + async move { + if let Some(SecondaryDeviceProvisioning::Url(url)) = rx.next().await { + info!("generating qrcode from provisioning link: {}", &url); + if provisioning_link_channel.send(url).is_err() { + return Err(Error::LinkError); + } + } else { + return Err(Error::LinkError); + } + + if let Some(SecondaryDeviceProvisioning::NewDeviceRegistration { + phone_number, + device_id: DeviceId { device_id }, + registration_id, + pni_registration_id, + profile_key, + service_ids, + aci_private_key, + aci_public_key, + pni_private_key, + pni_public_key, + }) = rx.next().await + { + info!("successfully registered device {}", &service_ids); + Ok(Registered::with_data(RegistrationData { + signal_servers, + device_name: Some(device_name), + phone_number, + service_ids, + signaling_key, + password, + device_id: Some(device_id), + registration_id, + pni_registration_id: Some(pni_registration_id), + aci_public_key, + aci_private_key, + pni_public_key: Some(pni_public_key), + pni_private_key: Some(pni_private_key), + profile_key: ProfileKey::create( + profile_key.try_into().expect("32 bytes for profile key"), + ), + })) + } else { + Err(Error::NoProvisioningMessageReceived) + } + }, + ) + .await; + + wait_for_qrcode_scan?; + + let mut manager = Manager { + rng, + store, + state: registration?, + }; + + manager.store.save_registration_data(&manager.state.data)?; + + match ( + manager.register_pre_keys().await, + manager.set_account_attributes().await, + manager.sync_contacts().await, + ) { + (Err(e), _, _) | (_, Err(e), _) => { + // clear the entire store on any error, there's no possible recovery here + manager.store.clear_registration()?; + Err(e) + } + (_, _, Err(e)) => { + warn!("failed to synchronize contacts: {e}"); + Ok(manager) + } + _ => Ok(manager), + } + } +} diff --git a/presage/src/manager/mod.rs b/presage/src/manager/mod.rs new file mode 100644 index 000000000..a17558672 --- /dev/null +++ b/presage/src/manager/mod.rs @@ -0,0 +1,96 @@ +//! Signal manager and its states + +mod confirmation; +mod linking; +mod registered; +mod registration; + +use std::fmt; + +use rand::rngs::StdRng; + +pub use self::confirmation::Confirmation; +pub use self::linking::Linking; +pub use self::registered::{ReceivingMode, Registered, RegistrationData}; +pub use self::registration::{Registration, RegistrationOptions}; + +/// Signal manager +/// +/// The manager is parametrized over the [`crate::store::Store`] which stores the configuration, keys and +/// optionally messages; and over the type state which describes what is the current state of the +/// manager: in registration, linking, TODO +/// +/// Depending on the state specific methods are available or not. +#[derive(Clone)] +pub struct Manager { + /// Implementation of a metadata and messages store + store: Store, + /// Part of the manager which is persisted in the store. + state: State, + /// Random number generator + rng: StdRng, +} + +impl fmt::Debug for Manager { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Manager") + .field("state", &self.state) + .finish_non_exhaustive() + } +} + +#[cfg(test)] +mod tests { + use base64::engine::general_purpose; + use base64::Engine; + use libsignal_service::prelude::ProfileKey; + use libsignal_service::protocol::KeyPair; + use rand::RngCore; + use serde_json::json; + + use crate::manager::RegistrationData; + + #[test] + fn test_state_before_pni() { + let mut rng = rand::thread_rng(); + let key_pair = KeyPair::generate(&mut rng); + let mut profile_key = [0u8; 32]; + rng.fill_bytes(&mut profile_key); + let profile_key = ProfileKey::generate(profile_key); + let mut signaling_key = [0u8; 52]; + rng.fill_bytes(&mut signaling_key); + + // this is before public_key and private_key were renamed to aci_public_key and aci_private_key + // and pni_public_key + pni_private_key were added + let previous_state = json!({ + "signal_servers": "Production", + "device_name": "Test", + "phone_number": { + "code": { + "value": 1, + "source": "plus" + }, + "national": { + "value": 5550199, + "zeros": 0 + }, + "extension": null, + "carrier": null + }, + "uuid": "ff9a89d9-8052-4af0-a91d-2a0dfa0c6b95", + "password": "HelloWorldOfPasswords", + "signaling_key": general_purpose::STANDARD.encode(signaling_key), + "device_id": 42, + "registration_id": 64, + "private_key": general_purpose::STANDARD.encode(key_pair.private_key.serialize()), + "public_key": general_purpose::STANDARD.encode(key_pair.public_key.serialize()), + "profile_key": general_purpose::STANDARD.encode(profile_key.get_bytes()), + }); + + let data: RegistrationData = + serde_json::from_value(previous_state).expect("should deserialize"); + assert_eq!(data.aci_public_key, key_pair.public_key); + assert!(data.aci_private_key == key_pair.private_key); + assert!(data.pni_public_key.is_none()); + } +} diff --git a/presage/src/manager/registered.rs b/presage/src/manager/registered.rs new file mode 100644 index 000000000..460674ced --- /dev/null +++ b/presage/src/manager/registered.rs @@ -0,0 +1,1098 @@ +use std::fmt; +use std::ops::RangeBounds; +use std::pin::pin; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use futures::{future, AsyncReadExt, Stream, StreamExt}; +use libsignal_service::attachment_cipher::decrypt_in_place; +use libsignal_service::configuration::{ServiceConfiguration, SignalServers, SignalingKey}; +use libsignal_service::content::{Content, ContentBody, DataMessageFlags, Metadata}; +use libsignal_service::groups_v2::{decrypt_group, Group, GroupsManager, InMemoryCredentialsCache}; +use libsignal_service::messagepipe::{Incoming, ServiceCredentials}; +use libsignal_service::models::Contact; +use libsignal_service::prelude::phonenumber::PhoneNumber; +use libsignal_service::prelude::Uuid; +use libsignal_service::proto::data_message::Delete; +use libsignal_service::proto::{ + sync_message, AttachmentPointer, DataMessage, EditMessage, GroupContextV2, NullMessage, + SyncMessage, +}; +use libsignal_service::protocol::SenderCertificate; +use libsignal_service::protocol::{PrivateKey, PublicKey}; +use libsignal_service::provisioning::generate_registration_id; +use libsignal_service::push_service::{ + AccountAttributes, DeviceCapabilities, PushService, ServiceError, ServiceIds, WhoAmIResponse, + DEFAULT_DEVICE_ID, +}; +use libsignal_service::receiver::MessageReceiver; +use libsignal_service::sender::{AttachmentSpec, AttachmentUploadError}; +use libsignal_service::unidentified_access::UnidentifiedAccess; +use libsignal_service::utils::{ + serde_optional_private_key, serde_optional_public_key, serde_private_key, serde_public_key, + serde_signaling_key, +}; +use libsignal_service::websocket::SignalWebSocket; +use libsignal_service::zkgroup::profiles::ProfileKey; +use libsignal_service::{cipher, AccountManager, Profile, ServiceAddress}; +use libsignal_service_hyper::push_service::HyperPushService; +use log::{debug, error, info, trace, warn}; +use parking_lot::Mutex; +use rand::rngs::StdRng; +use rand::SeedableRng; +use serde::{Deserialize, Serialize}; + +use crate::cache::CacheCell; +use crate::serde::serde_profile_key; +use crate::store::{Store, Thread}; +use crate::{Error, Manager}; + +type ServiceCipher = cipher::ServiceCipher; +type MessageSender = libsignal_service::prelude::MessageSender; + +/// Manager state when the client is registered and can send and receive messages from Signal +#[derive(Clone)] +pub struct Registered { + pub(crate) push_service_cache: CacheCell, + pub(crate) identified_websocket: Arc>>, + pub(crate) unidentified_websocket: Arc>>, + pub(crate) unidentified_sender_certificate: Option, + + pub(crate) data: RegistrationData, +} + +impl fmt::Debug for Registered { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Registered").finish_non_exhaustive() + } +} + +impl Registered { + pub(crate) fn with_data(data: RegistrationData) -> Self { + Self { + push_service_cache: CacheCell::default(), + identified_websocket: Default::default(), + unidentified_websocket: Default::default(), + unidentified_sender_certificate: Default::default(), + data, + } + } + + fn service_configuration(&self) -> ServiceConfiguration { + self.data.signal_servers.into() + } + + pub fn device_id(&self) -> u32 { + self.data.device_id.unwrap_or(DEFAULT_DEVICE_ID) + } +} + +/// Registration data like device name, and credentials to connect to Signal +#[derive(Serialize, Deserialize, Clone)] +pub struct RegistrationData { + pub signal_servers: SignalServers, + pub device_name: Option, + pub phone_number: PhoneNumber, + #[serde(flatten)] + pub service_ids: ServiceIds, + pub(crate) password: String, + #[serde(with = "serde_signaling_key")] + pub(crate) signaling_key: SignalingKey, + pub device_id: Option, + pub registration_id: u32, + #[serde(default)] + pub pni_registration_id: Option, + #[serde(with = "serde_private_key", rename = "private_key")] + pub(crate) aci_private_key: PrivateKey, + #[serde(with = "serde_public_key", rename = "public_key")] + pub(crate) aci_public_key: PublicKey, + #[serde(with = "serde_optional_private_key", default)] + pub(crate) pni_private_key: Option, + #[serde(with = "serde_optional_public_key", default)] + pub(crate) pni_public_key: Option, + #[serde(with = "serde_profile_key")] + pub(crate) profile_key: ProfileKey, +} + +impl RegistrationData { + pub fn registration_id(&self) -> u32 { + self.registration_id + } + + pub fn service_ids(&self) -> &ServiceIds { + &self.service_ids + } + + pub fn profile_key(&self) -> ProfileKey { + self.profile_key + } + + pub fn device_name(&self) -> Option<&String> { + self.device_name.as_ref() + } + + pub fn aci_public_key(&self) -> PublicKey { + self.aci_public_key + } + + pub fn aci_private_key(&self) -> PrivateKey { + self.aci_private_key + } +} + +impl Manager { + /// Loads a previously registered account from the implemented [Store]. + /// + /// Returns a instance of [Manager] you can use to send & receive messages. + pub async fn load_registered(store: S) -> Result> { + let registration_data = store + .load_registration_data()? + .ok_or(Error::NotYetRegisteredError)?; + + let mut manager = Self { + rng: StdRng::from_entropy(), + store, + state: Registered::with_data(registration_data), + }; + + if manager.state.data.pni_registration_id.is_none() { + manager.set_account_attributes().await?; + } + + Ok(manager) + } + + /// Returns a handle to the [Store] implementation. + pub fn store(&self) -> &S { + &self.store + } + + /// Returns a handle on the [RegistrationData]. + pub fn registration_data(&self) -> &RegistrationData { + &self.state.data + } + + pub(crate) async fn register_pre_keys(&mut self) -> Result<(), Error> { + trace!("registering pre keys"); + let mut account_manager = + AccountManager::new(self.push_service()?, Some(self.state.data.profile_key)); + + let (pre_keys_offset_id, next_signed_pre_key_id, next_pq_pre_key_id) = account_manager + .update_pre_key_bundle( + &mut self.store.clone(), + &mut self.rng, + self.store.pre_keys_offset_id()?, + self.store.next_signed_pre_key_id()?, + self.store.next_pq_pre_key_id()?, + true, + ) + .await?; + + self.store.set_pre_keys_offset_id(pre_keys_offset_id)?; + self.store + .set_next_signed_pre_key_id(next_signed_pre_key_id)?; + self.store.set_next_pq_pre_key_id(next_pq_pre_key_id)?; + + trace!("registered pre keys"); + Ok(()) + } + + pub(crate) async fn set_account_attributes(&mut self) -> Result<(), Error> { + trace!("setting account attributes"); + let mut account_manager = + AccountManager::new(self.push_service()?, Some(self.state.data.profile_key)); + + let pni_registration_id = + if let Some(pni_registration_id) = self.state.data.pni_registration_id { + pni_registration_id + } else { + info!("migrating to PNI"); + let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); + self.store.save_registration_data(&self.state.data)?; + pni_registration_id + }; + + account_manager + .set_account_attributes(AccountAttributes { + name: self.state.data.device_name().cloned(), + registration_id: self.state.data.registration_id, + pni_registration_id, + signaling_key: None, + voice: false, + video: false, + fetches_messages: true, + pin: None, + registration_lock: None, + unidentified_access_key: Some( + self.state.data.profile_key.derive_access_key().to_vec(), + ), + unrestricted_unidentified_access: false, + discoverable_by_phone_number: true, + capabilities: DeviceCapabilities { + gv2: true, + gv1_migration: true, + ..Default::default() + }, + }) + .await?; + + if self.state.data.pni_registration_id.is_none() { + debug!("fetching PNI UUID and updating state"); + let whoami = self.whoami().await?; + self.state.data.service_ids.pni = whoami.pni; + self.store.save_registration_data(&self.state.data)?; + } + + trace!("done setting account attributes"); + Ok(()) + } + + async fn wait_for_contacts_sync( + &mut self, + mut messages: impl Stream + Unpin, + ) -> Result<(), Error> { + let mut message_receiver = MessageReceiver::new(self.push_service()?); + while let Some(Content { body, .. }) = messages.next().await { + if let ContentBody::SynchronizeMessage(SyncMessage { + contacts: Some(contacts), + .. + }) = body + { + let contacts = message_receiver.retrieve_contacts(&contacts).await?; + let _ = self.store.clear_contacts(); + self.store.save_contacts(contacts.filter_map(Result::ok))?; + info!("saved contacts"); + return Ok(()); + } + } + Ok(()) + } + + pub(crate) async fn sync_contacts(&mut self) -> Result<(), Error> { + let messages = pin!( + self.receive_messages_stream(ReceivingMode::WaitForContacts) + .await? + ); + self.request_contacts_sync().await?; + + info!("waiting for contacts sync for up to 60 seconds"); + + tokio::time::timeout( + Duration::from_secs(60), + self.wait_for_contacts_sync(messages), + ) + .await + .map_err(Error::from)??; + + Ok(()) + } + + /// Request that the primary device to encrypt & send all of its contacts as a message to ourselves + /// which can be then received, decrypted and stored in the message receiving loop. + /// + /// **Note**: If successful, the contacts are not yet received and stored, but will only be + /// processed when they're received using the `MessageReceiver`. + pub async fn request_contacts_sync(&mut self) -> Result<(), Error> { + trace!("requesting contacts sync"); + let var_name = sync_message::request::Type::Contacts as i32; + let sync_message = SyncMessage { + request: Some(sync_message::Request { + r#type: Some(var_name), + }), + ..Default::default() + }; + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + + self.send_message(self.state.data.service_ids.aci, sync_message, timestamp) + .await?; + + Ok(()) + } + + async fn sender_certificate(&mut self) -> Result> { + let needs_renewal = |sender_certificate: Option<&SenderCertificate>| -> bool { + if sender_certificate.is_none() { + return true; + } + + let seconds_since_epoch = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + + if let Some(expiration) = sender_certificate.and_then(|s| s.expiration().ok()) { + expiration >= seconds_since_epoch - 600 + } else { + true + } + }; + + if needs_renewal(self.state.unidentified_sender_certificate.as_ref()) { + let sender_certificate = self + .push_service()? + .get_uuid_only_sender_certificate() + .await?; + + self.state + .unidentified_sender_certificate + .replace(sender_certificate); + } + + Ok(self + .state + .unidentified_sender_certificate + .clone() + .expect("logic error")) + } + + pub async fn submit_recaptcha_challenge( + &self, + token: &str, + captcha: &str, + ) -> Result<(), Error> { + let mut account_manager = AccountManager::new(self.push_service()?, None); + account_manager + .submit_recaptcha_challenge(token, captcha) + .await?; + Ok(()) + } + + /// Fetches basic information on the registered device. + pub async fn whoami(&self) -> Result> { + Ok(self.push_service()?.whoami().await?) + } + + /// Fetches the profile (name, about, status emoji) of the registered user. + pub async fn retrieve_profile(&mut self) -> Result> { + self.retrieve_profile_by_uuid(self.state.data.service_ids.aci, self.state.data.profile_key) + .await + } + + /// Fetches the profile of the provided user by UUID and profile key. + pub async fn retrieve_profile_by_uuid( + &mut self, + uuid: Uuid, + profile_key: ProfileKey, + ) -> Result> { + // Check if profile is cached. + if let Some(profile) = self.store.profile(uuid, profile_key).ok().flatten() { + return Ok(profile); + } + + let mut account_manager = AccountManager::new(self.push_service()?, Some(profile_key)); + + let profile = account_manager.retrieve_profile(uuid.into()).await?; + + let _ = self.store.save_profile(uuid, profile_key, profile.clone()); + Ok(profile) + } + + /// Get an iterator of messages in a thread, optionally starting from a point in time. + pub fn messages( + &self, + thread: &Thread, + range: impl RangeBounds, + ) -> Result> { + Ok(self.store.messages(thread, range)?) + } + + async fn receive_messages_encrypted( + &mut self, + ) -> Result>, Error> { + let credentials = self.credentials().ok_or(Error::NotYetRegisteredError)?; + let allow_stories = false; + let pipe = MessageReceiver::new(self.push_service()?) + .create_message_pipe(credentials, allow_stories) + .await?; + + let service_configuration = self.state.service_configuration(); + let mut unidentified_push_service = + HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); + let unidentified_ws = unidentified_push_service + .ws("/v1/websocket/", &[], None, false) + .await?; + self.state.identified_websocket.lock().replace(pipe.ws()); + self.state + .unidentified_websocket + .lock() + .replace(unidentified_ws); + + Ok(pipe.stream()) + } + + /// Starts receiving and storing messages. + /// + /// Returns a [futures::Stream] of messages to consume. Messages will also be stored by the implementation of the [Store]. + pub async fn receive_messages( + &mut self, + ) -> Result, Error> { + self.receive_messages_stream(ReceivingMode::Forever).await + } + + pub async fn receive_messages_with_mode( + &mut self, + mode: ReceivingMode, + ) -> Result, Error> { + self.receive_messages_stream(mode).await + } + + fn groups_manager( + &self, + ) -> Result, Error> { + let service_configuration = self.state.service_configuration(); + let server_public_params = service_configuration.zkgroup_server_public_params; + + let groups_credentials_cache = InMemoryCredentialsCache::default(); + let groups_manager = GroupsManager::new( + self.state.data.service_ids.clone(), + self.push_service()?, + groups_credentials_cache, + server_public_params, + ); + + Ok(groups_manager) + } + + async fn receive_messages_stream( + &mut self, + mode: ReceivingMode, + ) -> Result, Error> { + struct StreamState { + encrypted_messages: S, + message_receiver: MessageReceiver, + service_cipher: ServiceCipher, + store: C, + groups_manager: GroupsManager, + mode: ReceivingMode, + } + + let init = StreamState { + encrypted_messages: Box::pin(self.receive_messages_encrypted().await?), + message_receiver: MessageReceiver::new(self.push_service()?), + service_cipher: self.new_service_cipher()?, + store: self.store.clone(), + groups_manager: self.groups_manager()?, + mode, + }; + + Ok(futures::stream::unfold(init, |mut state| async move { + loop { + match state.encrypted_messages.next().await { + Some(Ok(Incoming::Envelope(envelope))) => { + match state.service_cipher.open_envelope(envelope).await { + Ok(Some(content)) => { + // contacts synchronization sent from the primary device (happens after linking, or on demand) + if let ContentBody::SynchronizeMessage(SyncMessage { + contacts: Some(contacts), + .. + }) = &content.body + { + match state.message_receiver.retrieve_contacts(contacts).await { + Ok(contacts) => { + let _ = state.store.clear_contacts(); + match state + .store + .save_contacts(contacts.filter_map(Result::ok)) + { + Ok(()) => { + info!("saved contacts"); + } + Err(e) => { + warn!("failed to save contacts: {e}"); + } + } + } + Err(e) => { + warn!("failed to retrieve contacts: {e}"); + } + } + + if let ReceivingMode::WaitForContacts = state.mode { + return None; + } + } + + if let ContentBody::DataMessage(DataMessage { + group_v2: + Some(GroupContextV2 { + master_key: Some(master_key_bytes), + revision: Some(revision), + .. + }), + .. + }) + | ContentBody::SynchronizeMessage(SyncMessage { + sent: + Some(sync_message::Sent { + message: + Some(DataMessage { + group_v2: + Some(GroupContextV2 { + master_key: Some(master_key_bytes), + revision: Some(revision), + .. + }), + .. + }), + .. + }), + .. + }) = &content.body + { + // there's two things to implement: the group metadata (fetched from HTTP API) + // and the group changes, which are part of the protobuf messages + // this means we kinda need our own internal representation of groups inside of presage? + if let Ok(Some(group)) = upsert_group( + &state.store, + &mut state.groups_manager, + master_key_bytes, + revision, + ) + .await + { + trace!("{group:?}"); + } + } + + if let Err(e) = save_message(&mut state.store, content.clone()) { + error!("Error saving message to store: {}", e); + } + + return Some((content, state)); + } + Ok(None) => { + debug!("Empty envelope..., message will be skipped!") + } + Err(e) => { + error!("Error opening envelope: {:?}, message will be skipped!", e); + } + } + } + Some(Ok(Incoming::QueueEmpty)) => { + debug!("empty queue"); + if let ReceivingMode::InitialSync = state.mode { + return None; + } + } + Some(Err(e)) => error!("Error: {}", e), + None => return None, + } + } + })) + } + + /// Sends a messages to the provided [ServiceAddress]. + /// The timestamp should be set to now and is used by Signal mobile apps + /// to order messages later, and apply reactions. + /// + /// This method will automatically update the [DataMessage::expire_timer] if it is set to + /// [None] such that the chat will keep the current expire timer. + pub async fn send_message( + &mut self, + recipient_addr: impl Into, + message: impl Into, + timestamp: u64, + ) -> Result<(), Error> { + let mut sender = self.new_message_sender().await?; + + let online_only = false; + let recipient = recipient_addr.into(); + let mut content_body: ContentBody = message.into(); + + // Only update the expiration timer if it is not set. + match content_body { + ContentBody::DataMessage(DataMessage { + expire_timer: ref mut timer, + .. + }) if timer.is_none() => { + // Set the expire timer to None for errors. + let store_expire_timer = self + .store + .expire_timer(&Thread::Contact(recipient.uuid)) + .unwrap_or_default(); + + *timer = store_expire_timer; + } + _ => {} + } + + let sender_certificate = self.sender_certificate().await?; + let unidentified_access = + self.store + .profile_key(&recipient.uuid)? + .map(|profile_key| UnidentifiedAccess { + key: profile_key.derive_access_key().to_vec(), + certificate: sender_certificate.clone(), + }); + + sender + .send_message( + &recipient, + unidentified_access, + content_body.clone(), + timestamp, + online_only, + ) + .await?; + + // save the message + let content = Content { + metadata: Metadata { + sender: self.state.data.service_ids.aci.into(), + sender_device: self.state.device_id(), + timestamp, + needs_receipt: false, + unidentified_sender: false, + }, + body: content_body, + }; + + save_message(&mut self.store, content)?; + + Ok(()) + } + + /// Uploads attachments prior to linking them in a message. + pub async fn upload_attachments( + &self, + attachments: Vec<(AttachmentSpec, Vec)>, + ) -> Result>, Error> { + if attachments.is_empty() { + return Ok(Vec::new()); + } + let sender = self.new_message_sender().await?; + let upload = future::join_all(attachments.into_iter().map(move |(spec, contents)| { + let mut sender = sender.clone(); + async move { sender.upload_attachment(spec, contents).await } + })); + Ok(upload.await) + } + + /// Sends one message in a group (v2). The `master_key_bytes` is required to have 32 elements. + /// + /// This method will automatically update the [DataMessage::expire_timer] if it is set to + /// [None] such that the chat will keep the current expire timer. + pub async fn send_message_to_group( + &mut self, + master_key_bytes: &[u8], + mut message: DataMessage, + timestamp: u64, + ) -> Result<(), Error> { + // Only update the expiration timer if it is not set. + match message { + DataMessage { + expire_timer: ref mut timer, + .. + } if timer.is_none() => { + // Set the expire timer to None for errors. + let store_expire_timer = self + .store + .expire_timer(&Thread::Group( + master_key_bytes + .try_into() + .expect("Master key bytes to be of size 32."), + )) + .unwrap_or_default(); + + *timer = store_expire_timer; + } + _ => {} + } + let mut sender = self.new_message_sender().await?; + + let mut groups_manager = self.groups_manager()?; + let Some(group) = + upsert_group(&self.store, &mut groups_manager, master_key_bytes, &0).await? + else { + return Err(Error::UnknownGroup); + }; + + let sender_certificate = self.sender_certificate().await?; + let mut recipients = Vec::new(); + for member in group + .members + .into_iter() + .filter(|m| m.uuid != self.state.data.service_ids.aci) + { + let unidentified_access = + self.store + .profile_key(&member.uuid)? + .map(|profile_key| UnidentifiedAccess { + key: profile_key.derive_access_key().to_vec(), + certificate: sender_certificate.clone(), + }); + recipients.push((member.uuid.into(), unidentified_access)); + } + + let online_only = false; + let results = sender + .send_message_to_group(recipients, message.clone(), timestamp, online_only) + .await; + + // return first error if any + results.into_iter().find(|res| res.is_err()).transpose()?; + + let content = Content { + metadata: Metadata { + sender: self.state.data.service_ids.aci.into(), + sender_device: self.state.device_id(), + timestamp, + needs_receipt: false, // TODO: this is just wrong + unidentified_sender: false, + }, + body: message.into(), + }; + + save_message(&mut self.store, content)?; + + Ok(()) + } + + /// Clears all sessions established wiht [recipient](ServiceAddress). + pub async fn clear_sessions(&self, recipient: &ServiceAddress) -> Result<(), Error> { + self.store.delete_all_sessions(recipient).await?; + Ok(()) + } + + /// Downloads and decrypts a single attachment. + pub async fn get_attachment( + &self, + attachment_pointer: &AttachmentPointer, + ) -> Result, Error> { + let mut service = self.push_service()?; + let mut attachment_stream = service.get_attachment(attachment_pointer).await?; + + // We need the whole file for the crypto to check out + let mut ciphertext = Vec::new(); + let len = attachment_stream.read_to_end(&mut ciphertext).await?; + + trace!("downloaded encrypted attachment of {} bytes", len); + + let key: [u8; 64] = attachment_pointer.key().try_into()?; + decrypt_in_place(key, &mut ciphertext)?; + + Ok(ciphertext) + } + + pub async fn send_session_reset( + &mut self, + recipient: &ServiceAddress, + timestamp: u64, + ) -> Result<(), Error> { + trace!("Resetting session for address: {}", recipient.uuid); + let message = DataMessage { + flags: Some(DataMessageFlags::EndSession as u32), + ..Default::default() + }; + + self.send_message(*recipient, message, timestamp).await?; + + Ok(()) + } + + fn credentials(&self) -> Option { + Some(ServiceCredentials { + uuid: Some(self.state.data.service_ids.aci), + phonenumber: self.state.data.phone_number.clone(), + password: Some(self.state.data.password.clone()), + signaling_key: Some(self.state.data.signaling_key), + device_id: self.state.data.device_id, + }) + } + + /// Returns a clone of a cached push service. + /// + /// If no service is yet cached, it will create and cache one. + fn push_service(&self) -> Result> { + self.state.push_service_cache.get(|| { + Ok(HyperPushService::new( + self.state.service_configuration(), + self.credentials(), + crate::USER_AGENT.to_string(), + )) + }) + } + + /// Creates a new message sender. + async fn new_message_sender(&self) -> Result, Error> { + let local_addr = ServiceAddress { + uuid: self.state.data.service_ids.aci, + }; + + let identified_websocket = self + .state + .identified_websocket + .lock() + .clone() + .ok_or(Error::MessagePipeNotStarted)?; + + let mut unidentified_push_service = HyperPushService::new( + self.state.service_configuration(), + None, + crate::USER_AGENT.to_string(), + ); + let unidentified_websocket = unidentified_push_service + .ws("/v1/websocket/", &[], None, false) + .await?; + + Ok(MessageSender::new( + identified_websocket, + unidentified_websocket, + self.push_service()?, + self.new_service_cipher()?, + self.rng.clone(), + self.store.clone(), + local_addr, + self.state.device_id().into(), + )) + } + + /// Creates a new service cipher. + fn new_service_cipher(&self) -> Result, Error> { + let service_cipher = ServiceCipher::new( + self.store.clone(), + self.rng.clone(), + self.state + .service_configuration() + .unidentified_sender_trust_root, + self.state.data.service_ids.aci, + self.state.device_id(), + ); + + Ok(service_cipher) + } + + /// Returns the title of a thread (contact or group). + pub async fn thread_title(&self, thread: &Thread) -> Result> { + match thread { + Thread::Contact(uuid) => { + let contact = match self.store.contact_by_id(*uuid) { + Ok(contact) => contact, + Err(e) => { + info!("Error getting contact by id: {}, {:?}", e, uuid); + None + } + }; + Ok(match contact { + Some(contact) => contact.name, + None => uuid.to_string(), + }) + } + Thread::Group(id) => match self.store.group(*id)? { + Some(group) => Ok(group.title), + None => Ok("".to_string()), + }, + } + } + + /// Deprecated methods + + /// Get a single contact by its UUID + /// + /// Note: this only currently works when linked as secondary device (the contacts are sent by the primary device at linking time) + #[deprecated = "use the store handle directly"] + pub fn contact_by_id(&self, id: &Uuid) -> Result, Error> { + Ok(self.store.contact_by_id(*id)?) + } + + /// Returns an iterator on contacts stored in the [Store]. + #[deprecated = "use the store handle directly"] + pub fn contacts( + &self, + ) -> Result>>, Error> { + let iter = self.store.contacts()?; + Ok(iter.map(|r| r.map_err(Into::into))) + } + + /// Get a group (either from the local cache, or fetch it remotely) using its master key + #[deprecated = "use the store handle directly"] + pub fn group(&self, master_key_bytes: &[u8]) -> Result, Error> { + Ok(self.store.group(master_key_bytes.try_into()?)?) + } + + /// Returns an iterator on groups stored in the [Store]. + #[deprecated = "use the store handle directly"] + pub fn groups(&self) -> Result> { + Ok(self.store.groups()?) + } + + /// Get a single message in a thread (identified by its server-side sent timestamp) + #[deprecated = "use the store handle directly"] + pub fn message( + &self, + thread: &Thread, + timestamp: u64, + ) -> Result, Error> { + Ok(self.store.message(thread, timestamp)?) + } +} + +/// The mode receiving messages stream +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub enum ReceivingMode { + /// Don't stop the stream + #[default] + Forever, + /// Stop the stream after the initial sync + /// + /// That is, when the Signal's message queue becomes empty. + InitialSync, + /// Stop the stream after contacts are synced + WaitForContacts, +} + +async fn upsert_group( + store: &S, + groups_manager: &mut GroupsManager, + master_key_bytes: &[u8], + revision: &u32, +) -> Result, Error> { + let upsert_group = match store.group(master_key_bytes.try_into()?) { + Ok(Some(group)) => { + debug!("loaded group from local db {}", group.title); + group.revision < *revision + } + Ok(None) => true, + Err(e) => { + warn!("failed to retrieve group from local db {}", e); + true + } + }; + + if upsert_group { + debug!("fetching and saving group"); + match groups_manager.fetch_encrypted_group(master_key_bytes).await { + Ok(encrypted_group) => { + let group = decrypt_group(master_key_bytes, encrypted_group)?; + if let Err(e) = store.save_group(master_key_bytes.try_into()?, &group) { + error!("failed to save group {master_key_bytes:?}: {e}",); + } + } + Err(e) => { + warn!("failed to fetch encrypted group: {e}") + } + } + } + + Ok(store.group(master_key_bytes.try_into()?)?) +} + +fn save_message(store: &mut S, message: Content) -> Result<(), Error> { + // derive the thread from the message type + let thread = Thread::try_from(&message)?; + + // update recipient profile keys + if let ContentBody::DataMessage(DataMessage { + profile_key: Some(profile_key_bytes), + .. + }) = &message.body + { + if let Ok(profile_key_bytes) = profile_key_bytes.clone().try_into() { + let sender_uuid = message.metadata.sender.uuid; + let profile_key = ProfileKey::create(profile_key_bytes); + debug!("inserting profile key for {sender_uuid}"); + store.upsert_profile_key(&sender_uuid, profile_key)?; + } + } + + // only save DataMessage and SynchronizeMessage (sent) + let message = match message.body { + ContentBody::NullMessage(_) => Some(message), + ContentBody::DataMessage(ref data_message) + | ContentBody::SynchronizeMessage(SyncMessage { + sent: + Some(sync_message::Sent { + message: Some(ref data_message), + .. + }), + .. + }) => match data_message { + DataMessage { + delete: + Some(Delete { + target_sent_timestamp: Some(ts), + }), + .. + } => { + // replace an existing message by an empty NullMessage + if let Some(mut existing_msg) = store.message(&thread, *ts)? { + existing_msg.metadata.sender.uuid = Uuid::nil(); + existing_msg.body = NullMessage::default().into(); + store.save_message(&thread, existing_msg)?; + debug!("message in thread {thread} @ {ts} deleted"); + None + } else { + warn!("could not find message to delete in thread {thread} @ {ts}"); + None + } + } + _ => Some(message), + }, + ContentBody::EditMessage(EditMessage { + target_sent_timestamp: Some(ts), + data_message: Some(data_message), + }) + | ContentBody::SynchronizeMessage(SyncMessage { + sent: + Some(sync_message::Sent { + edit_message: + Some(EditMessage { + target_sent_timestamp: Some(ts), + data_message: Some(data_message), + }), + .. + }), + .. + }) => { + if let Some(mut existing_msg) = store.message(&thread, ts)? { + existing_msg.metadata = message.metadata; + existing_msg.body = ContentBody::DataMessage(data_message); + // TODO: find a way to mark the message as edited (so that it's visible in a client) + trace!("message in thread {thread} @ {ts} edited"); + Some(existing_msg) + } else { + warn!("could not find edited message {thread} @ {ts}"); + None + } + } + ContentBody::CallMessage(_) + | ContentBody::SynchronizeMessage(SyncMessage { + call_event: Some(_), + .. + }) => Some(message), + ContentBody::SynchronizeMessage(s) => { + debug!("skipping saving sync message without interesting fields: {s:?}"); + None + } + ContentBody::ReceiptMessage(_) => { + debug!("skipping saving receipt message"); + None + } + ContentBody::TypingMessage(_) => { + debug!("skipping saving typing message"); + None + } + ContentBody::StoryMessage(_) => { + debug!("skipping story message"); + None + } + ContentBody::PniSignatureMessage(_) => { + debug!("skipping PNI signature message"); + None + } + ContentBody::EditMessage(_) => { + debug!("invalid edited"); + None + } + }; + + if let Some(message) = message { + store.save_message(&thread, message)?; + } + + Ok(()) +} diff --git a/presage/src/manager/registration.rs b/presage/src/manager/registration.rs new file mode 100644 index 000000000..1bfaf5a2d --- /dev/null +++ b/presage/src/manager/registration.rs @@ -0,0 +1,144 @@ +use libsignal_service::configuration::{ServiceConfiguration, SignalServers}; +use libsignal_service::prelude::phonenumber::PhoneNumber; +use libsignal_service::push_service::{PushService, VerificationTransport}; +use libsignal_service_hyper::push_service::HyperPushService; +use log::trace; +use rand::distributions::{Alphanumeric, DistString}; +use rand::rngs::StdRng; +use rand::SeedableRng; + +use crate::store::Store; +use crate::{Error, Manager}; + +use super::Confirmation; + +/// Options when registering a new main device +#[derive(Debug)] +pub struct RegistrationOptions<'a> { + pub signal_servers: SignalServers, + pub phone_number: PhoneNumber, + pub use_voice_call: bool, + pub captcha: Option<&'a str>, + pub force: bool, +} + +/// Manager state where it is possible to register a new main device +pub struct Registration; + +impl Manager { + /// Registers a new account with a phone number (and some options). + /// + /// The returned value is a [confirmation manager](Manager::confirm_verification_code) which you then + /// have to use to send the confirmation code. + /// + /// ```no_run + /// use std::str::FromStr; + /// + /// use presage::libsignal_service::{ + /// configuration::SignalServers, prelude::phonenumber::PhoneNumber, + /// }; + /// use presage::manager::RegistrationOptions; + /// use presage::Manager; + /// use presage_store_sled::{MigrationConflictStrategy, SledStore}; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let store = + /// SledStore::open("/tmp/presage-example", MigrationConflictStrategy::Drop)?; + /// + /// let manager = Manager::register( + /// store, + /// RegistrationOptions { + /// signal_servers: SignalServers::Production, + /// phone_number: PhoneNumber::from_str("+16137827274")?, + /// use_voice_call: false, + /// captcha: None, + /// force: false, + /// }, + /// ) + /// .await?; + /// + /// Ok(()) + /// } + /// ``` + pub async fn register( + mut store: S, + registration_options: RegistrationOptions<'_>, + ) -> Result, Error> { + let RegistrationOptions { + signal_servers, + phone_number, + use_voice_call, + captcha, + force, + } = registration_options; + + // check if we are already registered + if !force && store.is_registered() { + return Err(Error::AlreadyRegisteredError); + } + + store.clear_registration()?; + + // generate a random alphanumeric 24 chars password + let mut rng = StdRng::from_entropy(); + let password = Alphanumeric.sample_string(&mut rng, 24); + + let service_configuration: ServiceConfiguration = signal_servers.into(); + let mut push_service = + HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()); + + trace!("creating registration verification session"); + + let phone_number_string = phone_number.to_string(); + let mut session = push_service + .create_verification_session(&phone_number_string, None, None, None) + .await?; + + if !session.allowed_to_request_code { + if session.captcha_required() { + trace!("captcha required"); + if captcha.is_none() { + return Err(Error::CaptchaRequired); + } + session = push_service + .patch_verification_session(&session.id, None, None, None, captcha, None) + .await? + } + if session.push_challenge_required() { + return Err(Error::PushChallengeRequired); + } + } + + if !session.allowed_to_request_code { + return Err(Error::RequestingCodeForbidden(session)); + } + + trace!("requesting verification code"); + + session = push_service + .request_verification_code( + &session.id, + crate::USER_AGENT, + if use_voice_call { + VerificationTransport::Voice + } else { + VerificationTransport::Sms + }, + ) + .await?; + + let manager = Manager { + store, + state: Confirmation { + signal_servers, + phone_number, + password, + session_id: session.id, + }, + rng, + }; + + Ok(manager) + } +} diff --git a/presage/src/store.rs b/presage/src/store.rs index 211094bea..86be60857 100644 --- a/presage/src/store.rs +++ b/presage/src/store.rs @@ -1,6 +1,7 @@ +//! Traits that are used by the manager for storing the data. + use std::{fmt, ops::RangeBounds}; -use crate::{manager::Registered, GroupMasterKeyBytes}; use libsignal_service::{ content::ContentBody, groups_v2::Group, @@ -12,78 +13,110 @@ use libsignal_service::{ }, protocol::{ProtocolStore, SenderKeyStore}, session_store::SessionStoreExt, + zkgroup::GroupMasterKeyBytes, Profile, }; use serde::{Deserialize, Serialize}; -pub trait StoreError: std::error::Error + Sync + Send + 'static {} - -pub trait Store: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone { - type Error: StoreError; +use crate::manager::RegistrationData; - type ContactsIter: Iterator>; - type GroupsIter: Iterator>; - type MessagesIter: Iterator>; +/// An error trait implemented by store error types +pub trait StoreError: std::error::Error + Sync + Send + 'static {} - /// State +/// Stores the registered state of the manager +pub trait StateStore { + type StateStoreError: StoreError; /// Load registered (or linked) state - fn load_state(&self) -> Result, Self::Error>; + fn load_registration_data(&self) -> Result, Self::StateStoreError>; /// Save registered (or linked) state - fn save_state(&mut self, state: &Registered) -> Result<(), Self::Error>; + fn save_registration_data( + &mut self, + state: &RegistrationData, + ) -> Result<(), Self::StateStoreError>; /// Returns whether this store contains registration data or not fn is_registered(&self) -> bool; /// Clear registration data (including keys), but keep received messages, groups and contacts. - fn clear_registration(&mut self) -> Result<(), Self::Error>; + fn clear_registration(&mut self) -> Result<(), Self::StateStoreError>; +} - /// Clear the entire store: this can be useful when resetting an existing client. - fn clear(&mut self) -> Result<(), Self::Error>; +/// Stores the keys published ahead of time, pre-keys +/// +/// +pub trait PreKeyStoreExt { + type PreKeyStoreExtError: StoreError; - /// Pre-keys + fn pre_keys_offset_id(&self) -> Result; - fn pre_keys_offset_id(&self) -> Result; + fn set_pre_keys_offset_id(&mut self, id: u32) -> Result<(), Self::PreKeyStoreExtError>; - fn set_pre_keys_offset_id(&mut self, id: u32) -> Result<(), Self::Error>; + fn next_signed_pre_key_id(&self) -> Result; - fn next_signed_pre_key_id(&self) -> Result; + fn next_pq_pre_key_id(&self) -> Result; - fn next_pq_pre_key_id(&self) -> Result; + fn set_next_signed_pre_key_id(&mut self, id: u32) -> Result<(), Self::PreKeyStoreExtError>; - fn set_next_signed_pre_key_id(&mut self, id: u32) -> Result<(), Self::Error>; + fn set_next_pq_pre_key_id(&mut self, id: u32) -> Result<(), Self::PreKeyStoreExtError>; +} - fn set_next_pq_pre_key_id(&mut self, id: u32) -> Result<(), Self::Error>; +/// Stores messages, contacts, groups and profiles +pub trait ContentsStore { + type ContentsStoreError: StoreError; - /// Messages + /// Iterator over the contacts + type ContactsIter: Iterator>; - // Clear all stored messages. - fn clear_messages(&mut self) -> Result<(), Self::Error>; + /// Iterator over all stored groups + /// + /// Each items is a tuple consisting of the group master key and its corresponding data. + type GroupsIter: Iterator>; - // Clear the messages in a thread. - fn clear_thread(&mut self, thread: &Thread) -> Result<(), Self::Error>; + /// Iterator over all stored messages + type MessagesIter: Iterator>; + + // Messages + + /// Clear all stored messages. + fn clear_messages(&mut self) -> Result<(), Self::ContentsStoreError>; + + /// Clear the messages in a thread. + fn clear_thread(&mut self, thread: &Thread) -> Result<(), Self::ContentsStoreError>; /// Save a message in a [Thread] identified by a timestamp. - fn save_message(&mut self, thread: &Thread, message: Content) -> Result<(), Self::Error>; + fn save_message( + &mut self, + thread: &Thread, + message: Content, + ) -> Result<(), Self::ContentsStoreError>; /// Delete a single message, identified by its received timestamp from a thread. - #[deprecated = "message deletion is now handled internally"] - fn delete_message(&mut self, thread: &Thread, timestamp: u64) -> Result; + /// Useful when you want to delete a message locally only. + fn delete_message( + &mut self, + thread: &Thread, + timestamp: u64, + ) -> Result; /// Retrieve a message from a [Thread] by its timestamp. - fn message(&self, thread: &Thread, timestamp: u64) -> Result, Self::Error>; + fn message( + &self, + thread: &Thread, + timestamp: u64, + ) -> Result, Self::ContentsStoreError>; /// Retrieve all messages from a [Thread] within a range in time fn messages( &self, thread: &Thread, range: impl RangeBounds, - ) -> Result; + ) -> Result; /// Get the expire timer from a [Thread], which corresponds to either [Contact::expire_timer] /// or [Group::disappearing_messages_timer]. - fn expire_timer(&self, thread: &Thread) -> Result, Self::Error> { + fn expire_timer(&self, thread: &Thread) -> Result, Self::ContentsStoreError> { match thread { Thread::Contact(uuid) => Ok(self.contact_by_id(*uuid)?.map(|c| c.expire_timer)), Thread::Group(key) => Ok(self @@ -93,41 +126,53 @@ pub trait Store: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone } } - /// Contacts + // Contacts /// Clear all saved synchronized contact data - fn clear_contacts(&mut self) -> Result<(), Self::Error>; + fn clear_contacts(&mut self) -> Result<(), Self::ContentsStoreError>; /// Replace all contact data - fn save_contacts(&mut self, contacts: impl Iterator) - -> Result<(), Self::Error>; + fn save_contacts( + &mut self, + contacts: impl Iterator, + ) -> Result<(), Self::ContentsStoreError>; /// Get an iterator on all stored (synchronized) contacts - fn contacts(&self) -> Result; + fn contacts(&self) -> Result; /// Get contact data for a single user by its [Uuid]. - fn contact_by_id(&self, id: Uuid) -> Result, Self::Error>; + fn contact_by_id(&self, id: Uuid) -> Result, Self::ContentsStoreError>; /// Delete all cached group data - fn clear_groups(&mut self) -> Result<(), Self::Error>; + fn clear_groups(&mut self) -> Result<(), Self::ContentsStoreError>; /// Save a group in the cache - fn save_group(&self, master_key: GroupMasterKeyBytes, group: &Group) - -> Result<(), Self::Error>; + fn save_group( + &self, + master_key: GroupMasterKeyBytes, + group: &Group, + ) -> Result<(), Self::ContentsStoreError>; /// Get an iterator on all cached groups - fn groups(&self) -> Result; + fn groups(&self) -> Result; /// Retrieve a single unencrypted group indexed by its `[GroupMasterKeyBytes]` - fn group(&self, master_key: GroupMasterKeyBytes) -> Result, Self::Error>; + fn group( + &self, + master_key: GroupMasterKeyBytes, + ) -> Result, Self::ContentsStoreError>; - /// Profiles + // Profiles /// Insert or update the profile key of a contact - fn upsert_profile_key(&mut self, uuid: &Uuid, key: ProfileKey) -> Result; + fn upsert_profile_key( + &mut self, + uuid: &Uuid, + key: ProfileKey, + ) -> Result; /// Get the profile key for a contact - fn profile_key(&self, uuid: &Uuid) -> Result, Self::Error>; + fn profile_key(&self, uuid: &Uuid) -> Result, Self::ContentsStoreError>; /// Save a profile by [Uuid] and [ProfileKey]. fn save_profile( @@ -135,10 +180,33 @@ pub trait Store: ProtocolStore + SenderKeyStore + SessionStoreExt + Sync + Clone uuid: Uuid, key: ProfileKey, profile: Profile, - ) -> Result<(), Self::Error>; + ) -> Result<(), Self::ContentsStoreError>; /// Retrieve a profile by [Uuid] and [ProfileKey]. - fn profile(&self, uuid: Uuid, key: ProfileKey) -> Result, Self::Error>; + fn profile( + &self, + uuid: Uuid, + key: ProfileKey, + ) -> Result, Self::ContentsStoreError>; +} + +/// The manager store trait combining all other stores into a single one +pub trait Store: + StateStore + + PreKeyStoreExt + + ContentsStore + + ProtocolStore + + SenderKeyStore + + SessionStoreExt + + Sync + + Clone +{ + type Error: StoreError; + + /// Clear the entire store + /// + /// This can be useful when resetting an existing client. + fn clear(&mut self) -> Result<(), ::StateStoreError>; } /// A thread specifies where a message was sent, either to or from a contact or in a group. @@ -147,7 +215,7 @@ pub enum Thread { /// The message was sent inside a contact-chat. Contact(Uuid), // Cannot use GroupMasterKey as unable to extract the bytes. - /// The message was sent inside a groups-chat with the [GroupMasterKey](crate::prelude::GroupMasterKey) (specified as bytes). + /// The message was sent inside a groups-chat with the [`GroupMasterKeyBytes`] (specified as bytes). Group(GroupMasterKeyBytes), } @@ -246,11 +314,12 @@ impl TryFrom<&Content> for Thread { } } -pub trait ContentTimestamp { +/// Extension trait of [`Content`] +pub trait ContentExt { fn timestamp(&self) -> u64; } -impl ContentTimestamp for Content { +impl ContentExt for Content { /// The original timestamp of the message. fn timestamp(&self) -> u64 { match self.body {