Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API to wait for initial messages sync on startup #197

Closed
wants to merge 14 commits into from
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ resolver = "2"
[patch.crates-io]
curve25519-dalek = { git = 'https://github.com/signalapp/curve25519-dalek', tag = 'signal-curve25519-4.0.0' }

# [patch."https://github.com/whisperfish/libsignal-service-rs.git"]
# libsignal-service = { path = "../libsignal-service-rs/libsignal-service" }
# libsignal-service-hyper = { path = "../libsignal-service-rs/libsignal-service-hyper" }
[patch."https://github.com/whisperfish/libsignal-service-rs.git"]
libsignal-service = { path = "../libsignal-service-rs/libsignal-service" }
libsignal-service-hyper = { path = "../libsignal-service-rs/libsignal-service-hyper" }
1 change: 1 addition & 0 deletions presage-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ presage = { path = "../presage" }
presage-store-sled = { path = "../presage-store-sled" }

anyhow = "1.0"
axum = "0.6"
base64 = "0.12"
chrono = { version = "0.4", default-features = false, features = ["serde", "clock"] }
clap = { version = ">=4.2.4", features = ["derive"] }
Expand Down
126 changes: 87 additions & 39 deletions presage-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use core::fmt;
use std::convert::TryInto;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use std::time::UNIX_EPOCH;

use anyhow::{anyhow, bail, Context as _};
use axum::extract::Path;
use axum::routing::post;
use axum::Router;
use chrono::Local;
use clap::{ArgGroup, Parser, Subcommand};
use directories::ProjectDirs;
Expand All @@ -21,6 +23,7 @@ use presage::libsignal_service::{groups_v2::Group, prelude::ProfileKey};
use presage::prelude::proto::EditMessage;
use presage::prelude::SyncMessage;
use presage::ContentTimestamp;
use presage::ReceivingMode;
use presage::{
prelude::{
content::{Content, ContentBody, DataMessage, GroupContextV2},
Expand Down Expand Up @@ -118,6 +121,9 @@ enum Cmd {
Receive {
#[clap(long = "notifications", short = 'n')]
notifications: bool,
/// Start a webserver to be able to send messages, useful for testing
#[clap(long)]
webserver: bool,
},
#[clap(about = "List groups")]
ListGroups,
Expand Down Expand Up @@ -180,7 +186,7 @@ fn parse_group_master_key(value: &str) -> anyhow::Result<GroupMasterKeyBytes> {
.map_err(|_| anyhow::format_err!("master key should be 32 bytes long"))
}

#[tokio::main(flavor = "multi_thread")]
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
env_logger::from_env(
Env::default().default_filter_or(format!("{}=warn", env!("CARGO_PKG_NAME"))),
Expand All @@ -205,21 +211,10 @@ async fn main() -> anyhow::Result<()> {
}

async fn send<C: Store + 'static>(
msg: &str,
manager: &mut Manager<C, Registered<C>>,
uuid: &Uuid,
manager: &mut Manager<C, Registered>,
content_body: impl Into<ContentBody>,
) -> anyhow::Result<()> {
let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

let message = ContentBody::DataMessage(DataMessage {
body: Some(msg.to_string()),
timestamp: Some(timestamp),
..Default::default()
});

let local = task::LocalSet::new();

local
Expand All @@ -233,10 +228,7 @@ async fn send<C: Store + 'static>(

sleep(Duration::from_secs(5)).await;

manager
.send_message(*uuid, message, timestamp)
.await
.unwrap();
manager.send_message(*uuid, content_body).await.unwrap();

sleep(Duration::from_secs(5)).await;
})
Expand All @@ -245,11 +237,30 @@ async fn send<C: Store + 'static>(
Ok(())
}

async fn process_incoming_messages<C: Store>(
mut manager: Manager<C, Registered<C>>,
attachments_tmp_dir: &std::path::Path,
notifications: bool,
) -> anyhow::Result<()> {
let messages = manager
.receive_messages(ReceivingMode::Forever)
.await
.context("failed to initialize messages stream")?;

pin_mut!(messages);

while let Some(content) = messages.next().await {
process_incoming_message(&mut manager, attachments_tmp_dir, notifications, &content).await;
}

Ok(())
}

// Note to developers, this is a good example of a function you can use as a source of inspiration
// to process incoming messages.
async fn process_incoming_message<C: Store>(
manager: &mut Manager<C, Registered>,
attachments_tmp_dir: &Path,
manager: &mut Manager<C, Registered<C>>,
attachments_tmp_dir: &std::path::Path,
notifications: bool,
content: &Content,
) {
Expand Down Expand Up @@ -287,7 +298,7 @@ async fn process_incoming_message<C: Store>(
}

fn print_message<C: Store>(
manager: &Manager<C, Registered>,
manager: &Manager<C, Registered<C>>,
notifications: bool,
content: &Content,
) {
Expand Down Expand Up @@ -436,29 +447,45 @@ fn print_message<C: Store>(
}

async fn receive<C: Store>(
manager: &mut Manager<C, Registered>,
manager: Manager<C, Registered<C>>,
notifications: bool,
webserver: bool,
) -> anyhow::Result<()> {
let attachments_tmp_dir = Builder::new().prefix("presage-attachments").tempdir()?;
info!(
"attachments will be stored in {}",
attachments_tmp_dir.path().display()
);

let messages = manager
.receive_messages()
.await
.context("failed to initialize messages stream")?;
pin_mut!(messages);
if webserver {
let app = Router::new()
.with_state(manager)
.route("/message", post(web_send_message));

while let Some(content) = messages.next().await {
process_incoming_message(manager, attachments_tmp_dir.path(), notifications, &content)
.await;
// run our app with hyper, listening globally on port 3000
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
let webserver =
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()).serve(app.into_make_service());

future::join(
webserver,
process_incoming_messages(manager, attachments_tmp_dir.path(), notifications),
)
.await;
} else {
process_incoming_messages(manager, attachments_tmp_dir.path(), notifications).await;
}

Ok(())
}

async fn web_send_message<C: Store>(
manager: Manager<C, Registered<C>>,
Path(recipient): Path<Uuid>,
Path(message): Path<String>,
) {
}

async fn run<C: Store + 'static>(subcommand: Cmd, config_store: C) -> anyhow::Result<()> {
match subcommand {
Cmd::Register {
Expand Down Expand Up @@ -512,7 +539,7 @@ async fn run<C: Store + 'static>(subcommand: Cmd, config_store: C) -> anyhow::Re
.await;

match manager {
(Ok(manager), _) => {
(Ok(mut manager), _) => {
let uuid = manager.whoami().await.unwrap().uuid;
println!("{uuid:?}");
}
Expand All @@ -521,13 +548,25 @@ async fn run<C: Store + 'static>(subcommand: Cmd, config_store: C) -> anyhow::Re
}
}
}
Cmd::Receive { notifications } => {
let mut manager = Manager::load_registered(config_store).await?;
receive(&mut manager, notifications).await?;
Cmd::Receive {
notifications,
webserver,
} => {
let manager = Manager::load_registered(config_store).await?;
receive(manager, notifications, webserver).await?;
}
Cmd::Send { uuid, message } => {
let mut manager = Manager::load_registered(config_store).await?;
send(&message, &uuid, &mut manager).await?;
let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let message = DataMessage {
body: Some(message.to_string()),
timestamp: Some(timestamp),
..Default::default()
};
send(&mut manager, &uuid, message).await?;
}
Cmd::SendToGroup {
message,
Expand Down Expand Up @@ -626,8 +665,8 @@ async fn run<C: Store + 'static>(subcommand: Cmd, config_store: C) -> anyhow::Re
}
}
Cmd::Whoami => {
let manager = Manager::load_registered(config_store).await?;
println!("{:?}", &manager.whoami().await?);
let mut manager = Manager::load_registered(config_store).await?;
println!("{:?}", manager.whoami().await?);
}
Cmd::GetContact { ref uuid } => {
let manager = Manager::load_registered(config_store).await?;
Expand All @@ -654,8 +693,17 @@ async fn run<C: Store + 'static>(subcommand: Cmd, config_store: C) -> anyhow::Re
}
#[cfg(feature = "quirks")]
Cmd::RequestSyncContacts => {
use presage::prelude::proto::sync_message;

let mut manager = Manager::load_registered(config_store).await?;
manager.request_contacts_sync().await?;
let uuid = manager.state().service_ids.aci;
let sync_message = SyncMessage {
gferon marked this conversation as resolved.
Show resolved Hide resolved
request: Some(sync_message::Request {
r#type: Some(sync_message::request::Type::Contacts as i32),
}),
..Default::default()
};
send(&mut manager, &uuid, sync_message).await?;
}
Cmd::ListMessages {
group_master_key,
Expand Down
8 changes: 3 additions & 5 deletions presage-store-sled/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ use presage::{
session_store::SessionStoreExt,
Profile, ServiceAddress,
},
ContentTimestamp,
ContentTimestamp, GroupMasterKeyBytes, RegisteredData, Store, Thread,
};
use prost::Message;
use protobuf::ContentProto;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sha2::{Digest, Sha256};
use sled::{Batch, IVec};

use presage::{GroupMasterKeyBytes, Registered, Store, Thread};

mod error;
mod protobuf;

Expand Down Expand Up @@ -373,11 +371,11 @@ impl Store for SledStore {

/// State

fn load_state(&self) -> Result<Option<Registered>, SledStoreError> {
fn load_state(&self) -> Result<Option<RegisteredData>, SledStoreError> {
self.get(SLED_TREE_STATE, SLED_KEY_REGISTRATION)
}

fn save_state(&mut self, state: &Registered) -> Result<(), SledStoreError> {
fn save_state(&mut self, state: &RegisteredData) -> Result<(), SledStoreError> {
self.insert(SLED_TREE_STATE, SLED_KEY_REGISTRATION, state)?;
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions presage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
# be a sign or warning of (an imminent event, typically an unwelcome one).
name = "presage"
version = "0.6.0-dev"
version = "0.7.0-dev"
authors = ["Gabriel Féron <[email protected]>"]
edition = "2021"

[dependencies]
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "454d234" }
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "16695d0" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "16695d0" }

base64 = "0.21"
futures = "0.3"
Expand Down
5 changes: 4 additions & 1 deletion presage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ mod serde;
mod store;

pub use errors::Error;
pub use manager::{Confirmation, Linking, Manager, Registered, Registration, RegistrationOptions};
pub use manager::{
Confirmation, Linking, Manager, ReceivingMode, Registered, RegisteredData, Registration,
RegistrationOptions,
};
pub use store::{ContentTimestamp, Store, StoreError, Thread};

#[deprecated(note = "Please help use improve the prelude module instead")]
Expand Down
Loading
Loading