Skip to content

Commit

Permalink
Progress trying to improve things
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon committed Oct 16, 2023
1 parent 5706fa1 commit dcc57e1
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 22 deletions.
5 changes: 2 additions & 3 deletions presage-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ async fn send<C: Store + 'static>(
manager: &mut Manager<C, Registered>,
uuid: &Uuid,
content_body: impl Into<ContentBody>,
timestamp: u64,
) -> anyhow::Result<()> {
let local = task::LocalSet::new();

Expand All @@ -224,7 +223,7 @@ async fn send<C: Store + 'static>(
sleep(Duration::from_secs(5)).await;

manager
.send_message(*uuid, content_body, timestamp)
.send_message(*uuid, content_body)
.await
.unwrap();

Expand Down Expand Up @@ -526,7 +525,7 @@ async fn run<C: Store + 'static>(subcommand: Cmd, config_store: C) -> anyhow::Re
timestamp: Some(timestamp),
..Default::default()
};
send(&mut manager, &uuid, message, timestamp).await?;
send(&mut manager, &uuid, message).await?;
}
Cmd::SendToGroup {
message,
Expand Down
73 changes: 54 additions & 19 deletions presage/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use rand::{
use serde::{Deserialize, Serialize};
use url::Url;

use libsignal_service::{proto::EditMessage, messagepipe::Incoming};
use libsignal_service::push_service::{RegistrationMethod, VerificationTransport};
use libsignal_service::{
attachment_cipher::decrypt_in_place,
Expand All @@ -27,10 +26,7 @@ use libsignal_service::{
messagepipe::ServiceCredentials,
models::Contact,
prelude::{phonenumber::PhoneNumber, Content, ProfileKey, PushService, Uuid},
proto::{
data_message::Delete, sync_message, AttachmentPointer, GroupContextV2,
NullMessage,
},
proto::{data_message::Delete, sync_message, AttachmentPointer, GroupContextV2, NullMessage},
protocol::{KeyPair, PrivateKey, PublicKey, SenderCertificate},
provisioning::{generate_registration_id, LinkingManager, SecondaryDeviceProvisioning},
push_service::{
Expand All @@ -47,6 +43,7 @@ use libsignal_service::{
websocket::SignalWebSocket,
AccountManager, Profile, ServiceAddress,
};
use libsignal_service::{messagepipe::Incoming, proto::EditMessage};
use libsignal_service_hyper::push_service::HyperPushService;

use crate::cache::CacheCell;
Expand Down Expand Up @@ -140,7 +137,7 @@ impl fmt::Debug for Registered {
enum ReceivingMode {
InitialSync,
WaitForContacts,
Forever
Forever,
}

impl Registered {
Expand Down Expand Up @@ -637,19 +634,26 @@ impl<C: Store> Manager<C, Registered> {
Ok(())
}


async fn sync_contacts(&mut self) -> Result<(), Error<C::Error>> {
let messages = self.receive_messages_stream(ReceivingMode::InitialSync).await?;
let messages = self
.receive_messages_stream(ReceivingMode::InitialSync)
.await?;
pin_mut!(messages);
while let Some(_msg) = messages.next().await {
// debug!("{msg:?}");
}

self.request_configuration_sync().await?;
self.request_block_sync().await?;
self.request_contacts_sync().await?;

let messages = self.receive_messages_stream(ReceivingMode::WaitForContacts).await?;
let messages = self
.receive_messages_stream(ReceivingMode::WaitForContacts)
.await?;
pin_mut!(messages);
while let Ok(Some(_msg)) = tokio::time::timeout(Duration::from_secs(60), messages.next()).await {
while let Ok(Some(_msg)) =
tokio::time::timeout(Duration::from_secs(60), messages.next()).await
{
// debug!("{msg:?}");
}

Expand All @@ -670,13 +674,40 @@ impl<C: Store> Manager<C, Registered> {
..Default::default()
};

let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
// first request the sync
self.send_message(self.state.service_ids.aci, sync_message)
.await?;

Ok(())
}

async fn request_block_sync(&mut self) -> Result<(), Error<C::Error>> {
trace!("requesting blocked sync");
let sync_message = SyncMessage {
request: Some(sync_message::Request {
r#type: Some(sync_message::request::Type::Blocked as i32),
}),
..Default::default()
};

// first request the sync
self.send_message(self.state.service_ids.aci, sync_message, timestamp)
self.send_message(self.state.service_ids.aci, sync_message)
.await?;

Ok(())
}

async fn request_configuration_sync(&mut self) -> Result<(), Error<C::Error>> {
trace!("requesting configuration sync");
let sync_message = SyncMessage {
request: Some(sync_message::Request {
r#type: Some(sync_message::request::Type::Configuration as i32),
}),
..Default::default()
};

// first request the sync
self.send_message(self.state.service_ids.aci, sync_message)
.await?;

Ok(())
Expand Down Expand Up @@ -982,7 +1013,9 @@ impl<C: Store> Manager<C, Registered> {
}
Some(Ok(Incoming::QueueEmpty)) => {
debug!("empty queue");
if state.mode == ReceivingMode::InitialSync { return None; }
if state.mode == ReceivingMode::InitialSync {
return None;
}
}
Some(Err(e)) => error!("Error: {}", e),
None => return None,
Expand All @@ -1001,10 +1034,13 @@ impl<C: Store> Manager<C, Registered> {
&mut self,
recipient_addr: impl Into<ServiceAddress>,
message: impl Into<ContentBody>,
timestamp: u64,
) -> Result<(), Error<C::Error>> {
let mut sender = self.new_message_sender().await?;

let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let online_only = false;
let recipient = recipient_addr.into();
let mut content_body: ContentBody = message.into();
Expand Down Expand Up @@ -1192,15 +1228,14 @@ impl<C: Store> Manager<C, Registered> {
pub async fn send_session_reset(
&mut self,
recipient: &ServiceAddress,
timestamp: u64,
) -> Result<(), Error<C::Error>> {
log::trace!("Resetting session for address: {}", recipient.uuid);
let message = DataMessage {
flags: Some(DataMessageFlags::EndSession as u32),
..Default::default()
};

self.send_message(*recipient, message, timestamp).await?;
self.send_message(*recipient, message).await?;

Ok(())
}
Expand Down

0 comments on commit dcc57e1

Please sign in to comment.