Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API to wait for initial messages sync on startup #197

Closed
wants to merge 14 commits into from
44 changes: 28 additions & 16 deletions presage-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,11 @@ async fn main() -> anyhow::Result<()> {
}

async fn send<C: Store + 'static>(
msg: &str,
uuid: &Uuid,
manager: &mut Manager<C, Registered>,
uuid: &Uuid,
content_body: impl Into<ContentBody>,
timestamp: u64,
) -> anyhow::Result<()> {
let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

let message = ContentBody::DataMessage(DataMessage {
body: Some(msg.to_string()),
timestamp: Some(timestamp),
..Default::default()
});

let local = task::LocalSet::new();

local
Expand All @@ -234,7 +224,7 @@ async fn send<C: Store + 'static>(
sleep(Duration::from_secs(5)).await;

manager
.send_message(*uuid, message, timestamp)
.send_message(*uuid, content_body, timestamp)
.await
.unwrap();

Expand Down Expand Up @@ -527,7 +517,16 @@ async fn run<C: Store + 'static>(subcommand: Cmd, config_store: C) -> anyhow::Re
}
Cmd::Send { uuid, message } => {
let mut manager = Manager::load_registered(config_store).await?;
send(&message, &uuid, &mut manager).await?;
let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let message = DataMessage {
body: Some(message.to_string()),
timestamp: Some(timestamp),
..Default::default()
};
send(&mut manager, &uuid, message, timestamp).await?;
}
Cmd::SendToGroup {
message,
Expand Down Expand Up @@ -654,8 +653,21 @@ async fn run<C: Store + 'static>(subcommand: Cmd, config_store: C) -> anyhow::Re
}
#[cfg(feature = "quirks")]
Cmd::RequestSyncContacts => {
use presage::prelude::proto::sync_message;

let mut manager = Manager::load_registered(config_store).await?;
manager.request_contacts_sync().await?;
let uuid = manager.state().service_ids.aci;
let timestamp = std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let sync_message = SyncMessage {
gferon marked this conversation as resolved.
Show resolved Hide resolved
request: Some(sync_message::Request {
r#type: Some(sync_message::request::Type::Contacts as i32),
}),
..Default::default()
};
send(&mut manager, &uuid, sync_message, timestamp).await?;
}
Cmd::ListMessages {
group_master_key,
Expand Down
71 changes: 30 additions & 41 deletions presage/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,43 +630,9 @@ impl<C: Store> Manager<C, Registered> {
Ok(())
}

async fn wait_for_contacts_sync(
&mut self,
mut messages: impl Stream<Item = Content> + Unpin,
) -> Result<(), Error<C::Error>> {
let mut message_receiver = MessageReceiver::new(self.push_service()?);
while let Some(Content { body, .. }) = messages.next().await {
if let ContentBody::SynchronizeMessage(SyncMessage {
contacts: Some(contacts),
..
}) = body
{
let contacts = message_receiver.retrieve_contacts(&contacts).await?;
let _ = self.config_store.clear_contacts();
self.config_store
.save_contacts(contacts.filter_map(Result::ok))?;
info!("saved contacts");
return Ok(());
}
}
Ok(())
}

async fn sync_contacts(&mut self) -> Result<(), Error<C::Error>> {
let messages = self.receive_messages_stream(true).await?;
pin_mut!(messages);

let _messages = self.receive_messages_stream(true).await?;
gferon marked this conversation as resolved.
Show resolved Hide resolved
self.request_contacts_sync().await?;

info!("waiting for contacts sync for up to 60 seconds");

tokio::time::timeout(
Duration::from_secs(60),
self.wait_for_contacts_sync(messages),
)
.await
.map_err(Error::from)??;

Ok(())
}

Expand Down Expand Up @@ -876,22 +842,24 @@ impl<C: Store> Manager<C, Registered> {

async fn receive_messages_stream(
&mut self,
include_internal_events: bool,
stop_on_contacts_sync: bool,
) -> Result<impl Stream<Item = Content>, Error<C::Error>> {
struct StreamState<S, C> {
encrypted_messages: S,
message_receiver: MessageReceiver<HyperPushService>,
service_cipher: ServiceCipher<C>,
config_store: C,
groups_manager: GroupsManager<HyperPushService, InMemoryCredentialsCache>,
include_internal_events: bool,
stop_on_contacts_sync: bool,
}

let init = StreamState {
encrypted_messages: Box::pin(self.receive_messages_encrypted().await?),
message_receiver: MessageReceiver::new(self.push_service()?),
service_cipher: self.new_service_cipher()?,
config_store: self.config_store.clone(),
groups_manager: self.groups_manager()?,
include_internal_events,
stop_on_contacts_sync,
};

Ok(futures::stream::unfold(init, |mut state| async move {
Expand All @@ -902,12 +870,33 @@ impl<C: Store> Manager<C, Registered> {
Ok(Some(content)) => {
// contacts synchronization sent from the primary device (happens after linking, or on demand)
if let ContentBody::SynchronizeMessage(SyncMessage {
contacts: Some(_),
contacts: Some(contacts),
..
}) = &content.body
{
if state.include_internal_events {
return Some((content, state));
match state.message_receiver.retrieve_contacts(&contacts).await
{
Ok(contacts) => {
let _ = state.config_store.clear_contacts();
match state
.config_store
.save_contacts(contacts.filter_map(Result::ok))
{
Ok(()) => {
info!("saved contacts");
}
Err(e) => {
warn!("failed to save contacts: {e}");
}
}
}
Err(e) => {
warn!("failed to retrieve contacts: {e}");
}
}

if state.stop_on_contacts_sync {
return None;
} else {
continue;
}
Expand Down