From dee04255909dbf5c7aef6484a0fee33eb312d701 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20F=C3=A9ron?= Date: Tue, 14 Nov 2023 10:40:32 +0100 Subject: [PATCH] WIP --- presage-cli/Cargo.toml | 1 + presage-cli/src/main.rs | 79 ++++++++++++++++----- presage/src/manager.rs | 152 ++++++++++++++++++++-------------------- 3 files changed, 137 insertions(+), 95 deletions(-) diff --git a/presage-cli/Cargo.toml b/presage-cli/Cargo.toml index 10fbcb189..278ea120b 100644 --- a/presage-cli/Cargo.toml +++ b/presage-cli/Cargo.toml @@ -9,6 +9,7 @@ presage = { path = "../presage" } presage-store-sled = { path = "../presage-store-sled" } anyhow = "1.0" +axum = "0.6" base64 = "0.12" chrono = { version = "0.4", default-features = false, features = ["serde", "clock"] } clap = { version = ">=4.2.4", features = ["derive"] } diff --git a/presage-cli/src/main.rs b/presage-cli/src/main.rs index a09e04adb..8f7d2ea1c 100644 --- a/presage-cli/src/main.rs +++ b/presage-cli/src/main.rs @@ -1,11 +1,13 @@ use core::fmt; use std::convert::TryInto; -use std::path::Path; use std::path::PathBuf; use std::time::Duration; use std::time::UNIX_EPOCH; use anyhow::{anyhow, bail, Context as _}; +use axum::extract::Path; +use axum::routing::post; +use axum::Router; use chrono::Local; use clap::{ArgGroup, Parser, Subcommand}; use directories::ProjectDirs; @@ -119,6 +121,9 @@ enum Cmd { Receive { #[clap(long = "notifications", short = 'n')] notifications: bool, + /// Start a webserver to be able to send messages, useful for testing + #[clap(long)] + webserver: bool, }, #[clap(about = "List groups")] ListGroups, @@ -181,7 +186,7 @@ fn parse_group_master_key(value: &str) -> anyhow::Result { .map_err(|_| anyhow::format_err!("master key should be 32 bytes long")) } -#[tokio::main(flavor = "multi_thread")] +#[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { env_logger::from_env( Env::default().default_filter_or(format!("{}=warn", env!("CARGO_PKG_NAME"))), @@ -232,11 +237,30 @@ async fn send( Ok(()) } +async fn process_incoming_messages( + mut manager: Manager>, + attachments_tmp_dir: &std::path::Path, + notifications: bool, +) -> anyhow::Result<()> { + let messages = manager + .receive_messages(ReceivingMode::Forever) + .await + .context("failed to initialize messages stream")?; + + pin_mut!(messages); + + while let Some(content) = messages.next().await { + process_incoming_message(&mut manager, attachments_tmp_dir, notifications, &content).await; + } + + Ok(()) +} + // Note to developers, this is a good example of a function you can use as a source of inspiration // to process incoming messages. async fn process_incoming_message( manager: &mut Manager>, - attachments_tmp_dir: &Path, + attachments_tmp_dir: &std::path::Path, notifications: bool, content: &Content, ) { @@ -423,8 +447,9 @@ fn print_message( } async fn receive( - manager: &mut Manager>, + manager: Manager>, notifications: bool, + webserver: bool, ) -> anyhow::Result<()> { let attachments_tmp_dir = Builder::new().prefix("presage-attachments").tempdir()?; info!( @@ -432,20 +457,35 @@ async fn receive( attachments_tmp_dir.path().display() ); - let messages = manager - .receive_messages(ReceivingMode::Forever) - .await - .context("failed to initialize messages stream")?; - pin_mut!(messages); + if webserver { + let app = Router::new() + .with_state(manager) + .route("/message", post(web_send_message)); - while let Some(content) = messages.next().await { - process_incoming_message(manager, attachments_tmp_dir.path(), notifications, &content) - .await; + // run our app with hyper, listening globally on port 3000 + let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); + let webserver = + axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()).serve(app.into_make_service()); + + future::join( + webserver, + process_incoming_messages(manager, attachments_tmp_dir.path(), notifications), + ) + .await; + } else { + process_incoming_messages(manager, attachments_tmp_dir.path(), notifications).await; } Ok(()) } +async fn web_send_message( + manager: Manager>, + Path(recipient): Path, + Path(message): Path, +) { +} + async fn run(subcommand: Cmd, config_store: C) -> anyhow::Result<()> { match subcommand { Cmd::Register { @@ -499,7 +539,7 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re .await; match manager { - (Ok(manager), _) => { + (Ok(mut manager), _) => { let uuid = manager.whoami().await.unwrap().uuid; println!("{uuid:?}"); } @@ -508,9 +548,12 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re } } } - Cmd::Receive { notifications } => { - let mut manager = Manager::load_registered(config_store).await?; - receive(&mut manager, notifications).await?; + Cmd::Receive { + notifications, + webserver, + } => { + let manager = Manager::load_registered(config_store).await?; + receive(manager, notifications, webserver).await?; } Cmd::Send { uuid, message } => { let mut manager = Manager::load_registered(config_store).await?; @@ -622,8 +665,8 @@ async fn run(subcommand: Cmd, config_store: C) -> anyhow::Re } } Cmd::Whoami => { - let manager = Manager::load_registered(config_store).await?; - println!("{:?}", &manager.whoami().await?); + let mut manager = Manager::load_registered(config_store).await?; + println!("{:?}", manager.whoami().await?); } Cmd::GetContact { ref uuid } => { let manager = Manager::load_registered(config_store).await?; diff --git a/presage/src/manager.rs b/presage/src/manager.rs index b63aeb053..1b32d6f91 100644 --- a/presage/src/manager.rs +++ b/presage/src/manager.rs @@ -10,7 +10,7 @@ use log::{debug, error, info, trace, warn}; use parking_lot::Mutex; use rand::{ distributions::{Alphanumeric, DistString}, - rngs::StdRng, + rngs::{StdRng, ThreadRng}, RngCore, SeedableRng, }; use serde::{Deserialize, Serialize}; @@ -49,7 +49,6 @@ use libsignal_service::{ }; use libsignal_service_hyper::push_service::HyperPushService; -use crate::cache::CacheCell; use crate::{serde::serde_profile_key, Thread}; use crate::{store::Store, Error}; @@ -62,8 +61,6 @@ pub struct Manager { config_store: Store, /// Part of the manager which is persisted in the store. state: State, - /// Random number generator - rng: StdRng, } impl fmt::Debug for Manager { @@ -95,8 +92,8 @@ pub struct Confirmation { #[derive(Clone)] pub struct Registered { - identified_push_service_cache: CacheCell, - unidentified_push_service_cache: CacheCell, + identified_push_service: HyperPushService, + unidentified_push_service: HyperPushService, identified_websocket: Arc>>, message_sender: Arc>>>, unidentified_sender_certificate: Option, @@ -139,6 +136,33 @@ impl fmt::Debug for Registered { } } +impl RegisteredData { + fn credentials(&self) -> ServiceCredentials { + ServiceCredentials { + uuid: Some(self.service_ids.aci), + phonenumber: self.phone_number.clone(), + password: Some(self.password.clone()), + signaling_key: Some(self.signaling_key), + device_id: self.device_id, + } + } + + fn identified_push_service(&self) -> HyperPushService { + let credentials = self.credentials(); + let service_configuration: ServiceConfiguration = self.signal_servers.into(); + HyperPushService::new( + service_configuration, + Some(credentials), + crate::USER_AGENT.to_string(), + ) + } + + fn unidentified_push_service(&self) -> HyperPushService { + let service_configuration: ServiceConfiguration = self.signal_servers.into(); + HyperPushService::new(service_configuration, None, crate::USER_AGENT.to_string()) + } +} + #[derive(PartialEq, Eq)] pub enum ReceivingMode { InitialSync, @@ -149,8 +173,8 @@ pub enum ReceivingMode { impl Registered { fn with_data(inner: RegisteredData) -> Self { Self { - identified_push_service_cache: CacheCell::default(), - unidentified_push_service_cache: CacheCell::default(), + identified_push_service: inner.identified_push_service(), + unidentified_push_service: inner.unidentified_push_service(), identified_websocket: Default::default(), message_sender: Default::default(), unidentified_sender_certificate: Default::default(), @@ -219,7 +243,7 @@ impl Manager { config_store.clear_registration()?; // generate a random alphanumeric 24 chars password - let mut rng = StdRng::from_entropy(); + let mut rng = rand::thread_rng(); let password = Alphanumeric.sample_string(&mut rng, 24); let service_configuration: ServiceConfiguration = signal_servers.into(); @@ -326,7 +350,7 @@ impl Manager { config_store.clear_registration()?; // generate a random alphanumeric 24 chars password - let mut rng = StdRng::from_entropy(); + let mut rng = rand::thread_rng(); let password = Alphanumeric.sample_string(&mut rng, 24); // generate a 52 bytes signaling key @@ -434,8 +458,8 @@ impl Manager { ) -> Result>, Error> { trace!("confirming verification code"); - let registration_id = generate_registration_id(&mut StdRng::from_entropy()); - let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); + let registration_id = generate_registration_id(&mut rand::thread_rng()); + let pni_registration_id = generate_registration_id(&mut rand::thread_rng()); let Confirmation { signal_servers, @@ -469,7 +493,7 @@ impl Manager { return Err(Error::UnverifiedRegistrationSession); } - let mut rng = StdRng::from_entropy(); + let mut rng = rand::thread_rng(); // generate a 52 bytes signaling key let mut signaling_key = [0u8; 52]; @@ -556,13 +580,14 @@ impl Manager> { let inner = config_store .load_state()? .ok_or(Error::NotYetRegisteredError)?; + let identified_push_service = inner.identified_push_service(); + let unidentified_push_service = inner.unidentified_push_service(); let mut manager = Self { - rng: StdRng::from_entropy(), config_store, state: Registered { inner, - identified_push_service_cache: Default::default(), - unidentified_push_service_cache: Default::default(), + identified_push_service, + unidentified_push_service, identified_websocket: Default::default(), message_sender: Default::default(), unidentified_sender_certificate: Default::default(), @@ -573,10 +598,11 @@ impl Manager> { manager.set_account_attributes().await?; } - let credentials = manager.credentials()?; + let credentials = manager.state.inner.credentials(); manager.state.identified_websocket.lock().replace( manager - .identified_push_service()? + .state + .identified_push_service .ws("/v1/websocket/", &[], Some(credentials), true) .await?, ); @@ -587,7 +613,7 @@ impl Manager> { async fn register_pre_keys(&mut self) -> Result<(), Error> { trace!("registering pre keys"); let mut account_manager = AccountManager::new( - self.identified_push_service()?, + self.state.identified_push_service.clone(), Some(self.state.inner.profile_key), ); @@ -616,7 +642,7 @@ impl Manager> { async fn set_account_attributes(&mut self) -> Result<(), Error> { trace!("setting account attributes"); let mut account_manager = AccountManager::new( - self.identified_push_service()?, + self.state.identified_push_service.clone(), Some(self.state.inner.profile_key), ); @@ -625,7 +651,7 @@ impl Manager> { pni_registration_id } else { info!("migrating to PNI"); - let pni_registration_id = generate_registration_id(&mut StdRng::from_entropy()); + let pni_registration_id = generate_registration_id(&mut rand::thread_rng()); self.state.inner.pni_registration_id = Some(pni_registration_id); self.config_store.save_state(&self.state.inner)?; pni_registration_id @@ -667,13 +693,16 @@ impl Manager> { } async fn request_initial_sync(&mut self) -> Result<(), Error> { - self.request_keys_sync().await?; - self.request_block_sync().await?; - self.request_contacts_sync().await?; + // self.request_keys_sync().await?; + // // here, wait for answer first? + // self.request_block_sync().await?; + + // self.request_contacts_sync().await?; let messages = self .receive_messages_stream(ReceivingMode::WaitForContacts) .await?; + pin_mut!(messages); while let Ok(Some(_msg)) = tokio::time::timeout(Duration::from_secs(60), messages.next()).await @@ -740,7 +769,8 @@ impl Manager> { if needs_renewal(self.state.unidentified_sender_certificate.as_ref()) { let sender_certificate = self - .identified_push_service()? + .state + .identified_push_service .get_uuid_only_sender_certificate() .await?; @@ -761,7 +791,8 @@ impl Manager> { token: &str, captcha: &str, ) -> Result<(), Error> { - let mut account_manager = AccountManager::new(self.identified_push_service()?, None); + let mut account_manager = + AccountManager::new(self.state.identified_push_service.clone(), None); account_manager .submit_recaptcha_challenge(token, captcha) .await?; @@ -774,8 +805,8 @@ impl Manager> { } /// Fetches basic information on the registered device. - pub async fn whoami(&self) -> Result> { - Ok(self.identified_push_service()?.whoami().await?) + pub async fn whoami(&mut self) -> Result> { + Ok(self.state.identified_push_service.whoami().await?) } /// Fetches the profile (name, about, status emoji) of the registered user. @@ -798,8 +829,10 @@ impl Manager> { return Ok(profile); } - let mut account_manager = - AccountManager::new(self.identified_push_service()?, Some(profile_key)); + let mut account_manager = AccountManager::new( + self.state.identified_push_service.clone(), + Some(profile_key), + ); let profile = account_manager.retrieve_profile(uuid.into()).await?; @@ -855,7 +888,7 @@ impl Manager> { async fn receive_messages_encrypted( &mut self, ) -> Result>, Error> { - let credentials: ServiceCredentials = self.credentials()?; + let credentials: ServiceCredentials = self.state.inner.credentials(); let identified_ws = self.identified_websocket().await?; Ok(MessagePipe::from_socket(identified_ws, credentials).stream()) } @@ -882,7 +915,7 @@ impl Manager> { let groups_credentials_cache = InMemoryCredentialsCache::default(); let groups_manager = GroupsManager::new( self.state.inner.service_ids.clone(), - self.identified_push_service()?, + self.state.identified_push_service.clone(), groups_credentials_cache, server_public_params, ); @@ -905,7 +938,7 @@ impl Manager> { let init = StreamState { encrypted_messages: Box::pin(self.receive_messages_encrypted().await?), - message_receiver: MessageReceiver::new(self.identified_push_service()?), + message_receiver: MessageReceiver::new(self.state.identified_push_service.clone()), service_cipher: self.new_service_cipher()?, config_store: self.config_store.clone(), groups_manager: self.groups_manager()?, @@ -1209,7 +1242,7 @@ impl Manager> { &self, attachment_pointer: &AttachmentPointer, ) -> Result, Error> { - let mut service = self.identified_push_service()?; + let mut service = self.state.identified_push_service.clone(); let mut attachment_stream = service.get_attachment(attachment_pointer).await?; // We need the whole file for the crypto to check out @@ -1239,51 +1272,15 @@ impl Manager> { Ok(()) } - fn credentials(&self) -> Result> { - Ok(ServiceCredentials { - uuid: Some(self.state.inner.service_ids.aci), - phonenumber: self.state.inner.phone_number.clone(), - password: Some(self.state.inner.password.clone()), - signaling_key: Some(self.state.inner.signaling_key), - device_id: self.state.inner.device_id, - }) - } - - /// Create or return a clone of a cached identified (with credentials) push service. - fn identified_push_service(&self) -> Result> { - self.state.identified_push_service_cache.get(|| { - let credentials = self.credentials()?; - let service_configuration: ServiceConfiguration = - self.state.inner.signal_servers.into(); - Ok(HyperPushService::new( - service_configuration, - Some(credentials), - crate::USER_AGENT.to_string(), - )) - }) - } - - /// Create or return a clone of a cached unidentified (no credentials) push service. - fn unidentified_push_service(&self) -> Result> { - self.state.unidentified_push_service_cache.get(|| { - let service_configuration: ServiceConfiguration = - self.state.inner.signal_servers.into(); - Ok(HyperPushService::new( - service_configuration, - None, - crate::USER_AGENT.to_string(), - )) - }) - } - async fn identified_websocket(&mut self) -> Result> { let mut lock = self.state.identified_websocket.lock(); if let Some(identified_ws) = lock.as_ref() { Ok(identified_ws.clone()) } else { - let credentials = self.credentials()?; + let credentials = self.state.inner.credentials(); let ws = self - .identified_push_service()? + .state + .identified_push_service .ws("/v1/websocket/", &[], Some(credentials), true) .await?; lock.replace(ws.clone()); @@ -1302,16 +1299,17 @@ impl Manager> { }; let unidentified_ws: SignalWebSocket = self - .unidentified_push_service()? + .state + .unidentified_push_service .ws("/v1/websocket/", &[], None, true) .await?; let message_sender = MessageSender::new( self.identified_websocket().await?, unidentified_ws, - self.identified_push_service()?, + self.state.identified_push_service.clone(), self.new_service_cipher()?, - self.rng.clone(), + StdRng::from_entropy(), self.config_store.clone(), local_addr, self.state @@ -1334,7 +1332,7 @@ impl Manager> { let service_configuration: ServiceConfiguration = self.state.inner.signal_servers.into(); let service_cipher = ServiceCipher::new( self.config_store.clone(), - self.rng.clone(), + StdRng::from_entropy(), service_configuration.unidentified_sender_trust_root, self.state.inner.service_ids.aci, self.state.inner.device_id.unwrap_or(DEFAULT_DEVICE_ID),