Skip to content

Commit

Permalink
Shift authority for message availability to the pub/sub event system
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-signal committed Nov 7, 2024
1 parent c91242e commit ef716aa
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,6 @@ public boolean handleNewMessagesAvailable() {
PRESENCE_MANAGER_TAG, "legacy")
.increment();

storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);

processStoredMessages();

return true;
}

Expand All @@ -484,6 +480,10 @@ public void handleNewMessageAvailable() {
Metrics.counter(MESSAGE_AVAILABLE_COUNTER_NAME,
PRESENCE_MANAGER_TAG, "pubsub")
.increment();

storedMessageState.compareAndSet(StoredMessageState.EMPTY, StoredMessageState.CACHED_NEW_MESSAGES_AVAILABLE);

processStoredMessages();
}

@Override
Expand All @@ -498,10 +498,6 @@ public boolean handleMessagesPersisted() {
PRESENCE_MANAGER_TAG, "legacy")
.increment();

storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);

processStoredMessages();

return true;
}

Expand All @@ -510,6 +506,10 @@ public void handleMessagesPersistedPubSub() {
Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME,
PRESENCE_MANAGER_TAG, "pubsub")
.increment();

storedMessageState.set(StoredMessageState.PERSISTED_NEW_MESSAGES_AVAILABLE);

processStoredMessages();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,15 @@ public void testOnlineSend() {
// or wait for anything.
connection.start();

connection.handleNewMessagesAvailable();
connection.handleNewMessageAvailable();

synchronized (sendCounter) {
while (sendCounter.get() < 1) {
sendCounter.wait();
}
}

connection.handleNewMessagesAvailable();
connection.handleNewMessageAvailable();

synchronized (sendCounter) {
while (sendCounter.get() < 2) {
Expand Down Expand Up @@ -693,7 +693,7 @@ public void testRequeryOnStateMismatch() {

when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), any(List.class), any(Optional.class)))
.thenAnswer(invocation -> {
connection.handleNewMessagesAvailable();
connection.handleNewMessageAvailable();

return CompletableFuture.completedFuture(successResponse);
});
Expand Down Expand Up @@ -741,7 +741,7 @@ void testProcessCachedMessagesOnly() {

verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device, false);

connection.handleNewMessagesAvailable();
connection.handleNewMessageAvailable();

verify(messagesManager).getMessagesForDeviceReactive(account.getUuid(), device, true);
}
Expand Down Expand Up @@ -769,7 +769,7 @@ void testProcessDatabaseMessagesAfterPersist() {
// whenComplete method will get called immediately on THIS thread, so we don't need to synchronize or wait for
// anything.
connection.processStoredMessages();
connection.handleMessagesPersisted();
connection.handleMessagesPersistedPubSub();

verify(messagesManager, times(2)).getMessagesForDeviceReactive(account.getUuid(), device, false);
}
Expand Down

0 comments on commit ef716aa

Please sign in to comment.