Skip to content

Commit

Permalink
Add ReceivingMode enum
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon committed Oct 15, 2023
1 parent 30a8755 commit 5706fa1
Showing 1 changed file with 27 additions and 16 deletions.
43 changes: 27 additions & 16 deletions presage/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ impl fmt::Debug for Registered {
}
}

#[derive(PartialEq, Eq)]
enum ReceivingMode {
InitialSync,
WaitForContacts,
Forever
}

impl Registered {
pub fn device_id(&self) -> u32 {
self.device_id.unwrap_or(DEFAULT_DEVICE_ID)
Expand Down Expand Up @@ -630,21 +637,21 @@ impl<C: Store> Manager<C, Registered> {
Ok(())
}

async fn wait_for_initial_sync(&mut self) -> Result<(), Error<C::Error>> {
let messages = self.receive_messages_stream(true).await?;

async fn sync_contacts(&mut self) -> Result<(), Error<C::Error>> {
let messages = self.receive_messages_stream(ReceivingMode::InitialSync).await?;
pin_mut!(messages);
while let Some(msg) = messages.next().await {
debug!("{msg:?}");
while let Some(_msg) = messages.next().await {
// debug!("{msg:?}");
}
Ok(())
}

async fn sync_contacts(&mut self) -> Result<(), Error<C::Error>> {
self.wait_for_initial_sync().await?;
self.request_contacts_sync().await?;

// XXX: this will not work, too fast for the primary phone to send contacts
self.wait_for_initial_sync().await?;
let messages = self.receive_messages_stream(ReceivingMode::WaitForContacts).await?;
pin_mut!(messages);
while let Ok(Some(_msg)) = tokio::time::timeout(Duration::from_secs(60), messages.next()).await {
// debug!("{msg:?}");
}

Ok(())
}
Expand Down Expand Up @@ -833,7 +840,7 @@ impl<C: Store> Manager<C, Registered> {
pub async fn receive_messages(
&mut self,
) -> Result<impl Stream<Item = Content>, Error<C::Error>> {
self.receive_messages_stream(false).await
self.receive_messages_stream(ReceivingMode::Forever).await
}

fn groups_manager(
Expand All @@ -855,15 +862,15 @@ impl<C: Store> Manager<C, Registered> {

async fn receive_messages_stream(
&mut self,
initial_sync: bool,
mode: ReceivingMode,
) -> Result<impl Stream<Item = Content>, Error<C::Error>> {
struct StreamState<S, C> {
encrypted_messages: S,
message_receiver: MessageReceiver<HyperPushService>,
service_cipher: ServiceCipher<C>,
config_store: C,
groups_manager: GroupsManager<HyperPushService, InMemoryCredentialsCache>,
initial_sync: bool,
mode: ReceivingMode,
}

let init = StreamState {
Expand All @@ -872,7 +879,7 @@ impl<C: Store> Manager<C, Registered> {
service_cipher: self.new_service_cipher()?,
config_store: self.config_store.clone(),
groups_manager: self.groups_manager()?,
initial_sync,
mode,
};

Ok(futures::stream::unfold(init, |mut state| async move {
Expand Down Expand Up @@ -908,7 +915,11 @@ impl<C: Store> Manager<C, Registered> {
}
}

continue;
if state.mode == ReceivingMode::WaitForContacts {
return None;
} else {
continue;
}
}

if let ContentBody::DataMessage(DataMessage {
Expand Down Expand Up @@ -971,7 +982,7 @@ impl<C: Store> Manager<C, Registered> {
}
Some(Ok(Incoming::QueueEmpty)) => {
debug!("empty queue");
if state.initial_sync { return None; }
if state.mode == ReceivingMode::InitialSync { return None; }
}
Some(Err(e)) => error!("Error: {}", e),
None => return None,
Expand Down

0 comments on commit 5706fa1

Please sign in to comment.