From 6a1f4906c500da6261718bd63e55df2407ebdf8e Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Wed, 6 Nov 2024 16:29:27 -0500 Subject: [PATCH] Retire the legacy message availability system --- .../textsecuregcm/WhisperServerService.java | 17 +- .../push/ClientEventListener.java | 2 +- .../push/PubSubClientEventManager.java | 2 +- .../storage/MessageAvailabilityListener.java | 26 --- .../textsecuregcm/storage/MessagesCache.java | 216 +----------------- .../storage/MessagesManager.java | 27 --- .../AuthenticatedConnectListener.java | 12 - .../websocket/WebSocketConnection.java | 37 +-- .../workers/CommandDependencies.java | 5 +- .../MessagePersisterServiceCommand.java | 1 - ...LocalFaultTolerantRedisClusterFactory.java | 3 - .../push/PubSubClientEventManagerTest.java | 4 +- .../MessagePersisterIntegrationTest.java | 38 +-- .../storage/MessagePersisterTest.java | 2 +- .../storage/MessagesCacheTest.java | 176 +------------- .../WebSocketConnectionIntegrationTest.java | 2 +- .../websocket/WebSocketConnectionTest.java | 2 +- 17 files changed, 47 insertions(+), 525 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 3754f38c3..59a0d6c0c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -50,7 +50,6 @@ import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -459,9 +458,6 @@ public void run(WhisperServerConfiguration config, Environment environment) thro FaultTolerantRedisClient pubsubClient = config.getRedisPubSubConfiguration().build("pubsub", sharedClientResources); - final BlockingQueue keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000); - Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(), - keyspaceNotificationDispatchQueue); final BlockingQueue receiptSenderQueue = new LinkedBlockingQueue<>(); Metrics.gaugeCollectionSize(name(getClass(), "receiptSenderQueue"), Collections.emptyList(), receiptSenderQueue); final BlockingQueue fcmSenderQueue = new LinkedBlockingQueue<>(); @@ -474,14 +470,6 @@ public void run(WhisperServerConfiguration config, Environment environment) thro .scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build(); ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle() .scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build(); - ExecutorService keyspaceNotificationDispatchExecutor = ExecutorServiceMetrics.monitor(Metrics.globalRegistry, - environment.lifecycle() - .executorService(name(getClass(), "keyspaceNotification-%d")) - .maxThreads(16) - .workQueue(keyspaceNotificationDispatchQueue) - .build(), - MetricsUtil.name(getClass(), "keyspaceNotificationExecutor"), - MetricsUtil.PREFIX); ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d")) .maxThreads(1).minThreads(1).build(); ExecutorService fcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d")) @@ -611,8 +599,8 @@ public void run(WhisperServerConfiguration config, Environment environment) thro storageServiceExecutor, storageServiceRetryExecutor, config.getSecureStorageServiceConfiguration()); PubSubClientEventManager pubSubClientEventManager = new PubSubClientEventManager(messagesCluster, clientEventExecutor); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); - MessagesCache messagesCache = new MessagesCache(messagesCluster, keyspaceNotificationDispatchExecutor, - messageDeliveryScheduler, messageDeletionAsyncExecutor, clock, dynamicConfigurationManager); + MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler, + messageDeletionAsyncExecutor, clock, dynamicConfigurationManager); ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases, recurringJobExecutor, config.getClientReleaseConfiguration().refreshInterval(), @@ -733,7 +721,6 @@ public void run(WhisperServerConfiguration config, Environment environment) thro environment.lifecycle().manage(apnSender); environment.lifecycle().manage(pushNotificationScheduler); environment.lifecycle().manage(provisioningManager); - environment.lifecycle().manage(messagesCache); environment.lifecycle().manage(pubSubClientEventManager); environment.lifecycle().manage(currencyManager); environment.lifecycle().manage(registrationServiceClient); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java index 140f9fcb3..405a59445 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientEventListener.java @@ -19,7 +19,7 @@ public interface ClientEventListener { /** * Indicates that messages for the client have been persisted from short-term storage to long-term storage. */ - void handleMessagesPersistedPubSub(); + void handleMessagesPersisted(); /** * Indicates that the client's presence has been displaced and the listener should close the client's underlying diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index 2ffb2fdfc..bedcd120c 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -371,7 +371,7 @@ public void smessage(final RedisClusterNode node, final byte[] shardChannel, fin case DISCONNECT_REQUESTED -> listenerEventExecutor.execute(() -> listener.handleConnectionDisplaced(false)); - case MESSAGES_PERSISTED -> listenerEventExecutor.execute(listener::handleMessagesPersistedPubSub); + case MESSAGES_PERSISTED -> listenerEventExecutor.execute(listener::handleMessagesPersisted); default -> logger.warn("Unexpected client event type: {}", clientEvent.getClass()); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java deleted file mode 100644 index e7fed470a..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessageAvailabilityListener.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2013-2021 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.storage; - -/** - * A message availability listener is notified when new messages are available for a specific device for a specific - * account. Availability listeners are also notified when messages are moved from the message cache to long-term storage - * as an optimization hint to implementing classes. - */ -public interface MessageAvailabilityListener { - - /** - * @return whether the listener is still active. {@code false} indicates the listener can no longer handle messages - * and may be discarded - */ - boolean handleNewMessagesAvailable(); - - /** - * @return whether the listener is still active. {@code false} indicates the listener can no longer handle messages - * and may be discarded - */ - boolean handleMessagesPersisted(); -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index dbb2fd7bd..413ea877e 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -10,12 +10,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; -import io.dropwizard.lifecycle.Managed; import io.lettuce.core.ZAddArgs; import io.lettuce.core.cluster.SlotHash; -import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; -import io.lettuce.core.cluster.models.partitions.RedisClusterNode; -import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; @@ -28,18 +24,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; -import javax.annotation.Nullable; import org.reactivestreams.Publisher; import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage; import org.signal.libsignal.protocol.ServiceId; @@ -51,11 +42,9 @@ import org.whispersystems.textsecuregcm.experiment.Experiment; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; -import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.RedisClusterUtil; -import org.whispersystems.textsecuregcm.util.Util; import reactor.core.observability.micrometer.Micrometer; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -118,13 +107,11 @@ * @see MessagesCacheRemoveRecipientViewFromMrmDataScript * @see MessagesCacheRemoveQueueScript */ -public class MessagesCache extends RedisClusterPubSubAdapter implements Managed { +public class MessagesCache { private final FaultTolerantRedisClusterClient redisCluster; - private final FaultTolerantPubSubClusterConnection pubSubConnection; private final Clock clock; - private final ExecutorService notificationExecutorService; private final Scheduler messageDeliveryScheduler; private final ExecutorService messageDeletionExecutorService; // messageDeletionExecutorService wrapped into a reactor Scheduler @@ -140,10 +127,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript; private final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript; - private final ReentrantLock messageListenersLock = new ReentrantLock(); - private final Map messageListenersByQueueName = new HashMap<>(); - private final Map queueNamesByMessageListener = new IdentityHashMap<>(); - private final Timer insertTimer = Metrics.timer(name(MessagesCache.class, "insert")); private final Timer insertSharedMrmPayloadTimer = Metrics.timer(name(MessagesCache.class, "insertSharedMrmPayload")); private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get")); @@ -151,17 +134,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private final Timer removeByGuidTimer = Metrics.timer(name(MessagesCache.class, "removeByGuid")); private final Timer removeRecipientViewTimer = Metrics.timer(name(MessagesCache.class, "removeRecipientView")); private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear")); - private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage")); - private final Counter newMessageNotificationCounter = Metrics.counter( - name(MessagesCache.class, "newMessageNotification")); - private final Counter queuePersistedNotificationCounter = Metrics.counter( - name(MessagesCache.class, "queuePersisted")); private final Counter staleEphemeralMessagesCounter = Metrics.counter( name(MessagesCache.class, "staleEphemeralMessages")); - private final Counter messageAvailabilityListenerRemovedAfterAddCounter = Metrics.counter( - name(MessagesCache.class, "messageAvailabilityListenerRemovedAfterAdd")); - private final Counter prunedStaleSubscriptionCounter = Metrics.counter( - name(MessagesCache.class, "prunedStaleSubscription")); private final Counter mrmContentRetrievedCounter = Metrics.counter(name(MessagesCache.class, "mrmViewRetrieved")); private final Counter sharedMrmDataKeyRemovedCounter = Metrics.counter( name(MessagesCache.class, "sharedMrmKeyRemoved")); @@ -169,9 +143,6 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); - private static final String QUEUE_KEYSPACE_PREFIX = "__keyspace@0__:user_queue::"; - private static final String PERSISTING_KEYSPACE_PREFIX = "__keyspace@0__:user_queue_persisting::"; - private static final String MRM_VIEWS_EXPERIMENT_NAME = "mrmViews"; @VisibleForTesting @@ -184,13 +155,15 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class); - public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final ExecutorService notificationExecutorService, - final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock, - final DynamicConfigurationManager dynamicConfigurationManager) + public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, + final Scheduler messageDeliveryScheduler, + final ExecutorService messageDeletionExecutorService, + final Clock clock, + final DynamicConfigurationManager dynamicConfigurationManager) throws IOException { + this( redisCluster, - notificationExecutorService, messageDeliveryScheduler, messageDeletionExecutorService, clock, @@ -206,8 +179,9 @@ public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final E } @VisibleForTesting - MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final ExecutorService notificationExecutorService, - final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock, + MessagesCache(final FaultTolerantRedisClusterClient redisCluster, + final Scheduler messageDeliveryScheduler, + final ExecutorService messageDeletionExecutorService, final Clock clock, final DynamicConfigurationManager dynamicConfigurationManager, final MessagesCacheInsertScript insertScript, final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript, @@ -218,10 +192,8 @@ public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final E throws IOException { this.redisCluster = redisCluster; - this.pubSubConnection = redisCluster.createPubSubConnection(); this.clock = clock; - this.notificationExecutorService = notificationExecutorService; this.messageDeliveryScheduler = messageDeliveryScheduler; this.messageDeletionExecutorService = messageDeletionExecutorService; this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion"); @@ -237,34 +209,6 @@ public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final E this.removeRecipientViewFromMrmDataScript = removeRecipientViewFromMrmDataScript; } - @Override - public void start() { - pubSubConnection.usePubSubConnection(connection -> connection.addListener(this)); - pubSubConnection.subscribeToClusterTopologyChangedEvents(this::resubscribeAll); - } - - @Override - public void stop() { - pubSubConnection.usePubSubConnection(connection -> connection.sync().upstream().commands().unsubscribe()); - } - - private void resubscribeAll(final ClusterTopologyChangedEvent event) { - - final Set queueNames; - - messageListenersLock.lock(); - try { - queueNames = new HashSet<>(messageListenersByQueueName.keySet()); - } finally { - messageListenersLock.unlock(); - } - - for (final String queueName : queueNames) { - // avoid overwhelming a newly recovered node by processing synchronously, rather than using CompletableFuture.allOf() - subscribeForKeyspaceNotifications(queueName).join(); - } - } - public long insert(final UUID guid, final UUID destinationUuid, final byte destinationDevice, final MessageProtos.Envelope message) { final MessageProtos.Envelope messageWithGuid = message.toBuilder().setServerGuid(guid.toString()).build(); @@ -642,146 +586,6 @@ void unlockQueueForPersistence(final UUID accountUuid, final byte deviceId) { connection -> connection.sync().del(getPersistInProgressKey(accountUuid, deviceId))); } - public void addMessageAvailabilityListener(final UUID destinationUuid, final byte deviceId, - final MessageAvailabilityListener listener) { - final String queueName = getQueueName(destinationUuid, deviceId); - - final CompletableFuture subscribeFuture; - messageListenersLock.lock(); - try { - messageListenersByQueueName.put(queueName, listener); - queueNamesByMessageListener.put(listener, queueName); - // Submit to the Redis queue while holding the lock, but don’t wait until exiting - subscribeFuture = subscribeForKeyspaceNotifications(queueName); - } finally { - messageListenersLock.unlock(); - } - - subscribeFuture.join(); - } - - public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) { - @Nullable final String queueName; - messageListenersLock.lock(); - try { - queueName = queueNamesByMessageListener.get(listener); - } finally { - messageListenersLock.unlock(); - } - - if (queueName != null) { - - final CompletableFuture unsubscribeFuture; - messageListenersLock.lock(); - try { - queueNamesByMessageListener.remove(listener); - if (messageListenersByQueueName.remove(queueName, listener)) { - // Submit to the Redis queue holding the lock, but don’t wait until exiting - unsubscribeFuture = unsubscribeFromKeyspaceNotifications(queueName); - } else { - messageAvailabilityListenerRemovedAfterAddCounter.increment(); - unsubscribeFuture = CompletableFuture.completedFuture(null); - } - } finally { - messageListenersLock.unlock(); - } - - unsubscribeFuture.join(); - } - } - - private void pruneStaleSubscription(final String channel) { - unsubscribeFromKeyspaceNotifications(getQueueNameFromKeyspaceChannel(channel)) - .thenRun(prunedStaleSubscriptionCounter::increment); - } - - private CompletableFuture subscribeForKeyspaceNotifications(final String queueName) { - final int slot = SlotHash.getSlot(queueName); - - return pubSubConnection.withPubSubConnection( - connection -> connection.async() - .nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) - .commands() - .subscribe(getKeyspaceChannels(queueName))).toCompletableFuture() - .thenRun(Util.NOOP); - } - - private CompletableFuture unsubscribeFromKeyspaceNotifications(final String queueName) { - final int slot = SlotHash.getSlot(queueName); - - return pubSubConnection.withPubSubConnection( - connection -> connection.async() - .nodes(node -> node.is(RedisClusterNode.NodeFlag.UPSTREAM) && node.hasSlot(slot)) - .commands() - .unsubscribe(getKeyspaceChannels(queueName))) - .toCompletableFuture() - .thenRun(Util.NOOP); - } - - private static String[] getKeyspaceChannels(final String queueName) { - return new String[]{ - QUEUE_KEYSPACE_PREFIX + "{" + queueName + "}", - PERSISTING_KEYSPACE_PREFIX + "{" + queueName + "}" - }; - } - - @Override - public void message(final RedisClusterNode node, final String channel, final String message) { - pubSubMessageCounter.increment(); - - if (channel.startsWith(QUEUE_KEYSPACE_PREFIX) && "zadd".equals(message)) { - newMessageNotificationCounter.increment(); - notificationExecutorService.execute(() -> { - try { - findListener(channel).ifPresentOrElse(listener -> { - if (!listener.handleNewMessagesAvailable()) { - removeMessageAvailabilityListener(listener); - } - }, () -> pruneStaleSubscription(channel)); - } catch (final Exception e) { - logger.warn("Unexpected error handling new message", e); - } - }); - } else if (channel.startsWith(PERSISTING_KEYSPACE_PREFIX) && "del".equals(message)) { - queuePersistedNotificationCounter.increment(); - notificationExecutorService.execute(() -> { - try { - findListener(channel).ifPresentOrElse(listener -> { - if (!listener.handleMessagesPersisted()) { - removeMessageAvailabilityListener(listener); - } - }, () -> pruneStaleSubscription(channel)); - } catch (final Exception e) { - logger.warn("Unexpected error handling messages persisted", e); - } - }); - } - } - - private Optional findListener(final String keyspaceChannel) { - final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel); - - messageListenersLock.lock(); - try { - return Optional.ofNullable(messageListenersByQueueName.get(queueName)); - } finally { - messageListenersLock.unlock(); - } - } - - @VisibleForTesting - static String getQueueName(final UUID accountUuid, final byte deviceId) { - return accountUuid + "::" + deviceId; - } - - @VisibleForTesting - static String getQueueNameFromKeyspaceChannel(final String channel) { - final int startOfHashTag = channel.indexOf('{'); - final int endOfHashTag = channel.lastIndexOf('}'); - - return channel.substring(startOfHashTag + 1, endOfHashTag); - } - static byte[] getMessageQueueKey(final UUID accountUuid, final byte deviceId) { return ("user_queue::{" + accountUuid.toString() + "::" + deviceId + "}").getBytes(StandardCharsets.UTF_8); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java index 35ece5bbb..a3f5381c8 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; -import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.util.Pair; import reactor.core.observability.micrometer.Micrometer; @@ -128,10 +127,6 @@ private Publisher getMessagesForDevice(UUID destinationUuid, Device de .tap(Micrometer.metrics(Metrics.globalRegistry)); } - public Mono getEarliestUndeliveredTimestampForDevice(UUID destinationUuid, Device destinationDevice) { - return Mono.from(messagesDynamoDb.load(destinationUuid, destinationDevice, 1)).map(Envelope::getServerTimestamp); - } - public CompletableFuture clear(UUID destinationUuid) { return messagesCache.clear(destinationUuid); } @@ -190,17 +185,6 @@ public int persistMessages( return messagesRemovedFromCache; } - public void addMessageAvailabilityListener( - final UUID destinationUuid, - final byte destinationDeviceId, - final MessageAvailabilityListener listener) { - messagesCache.addMessageAvailabilityListener(destinationUuid, destinationDeviceId, listener); - } - - public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) { - messagesCache.removeMessageAvailabilityListener(listener); - } - /** * Inserts the shared multi-recipient message payload to storage. * @@ -211,15 +195,4 @@ public byte[] insertSharedMultiRecipientMessagePayload( final SealedSenderMultiRecipientMessage sealedSenderMultiRecipientMessage) { return messagesCache.insertSharedMultiRecipientMessagePayload(sealedSenderMultiRecipientMessage); } - - /** - * Removes the recipient's view from shared MRM data if necessary - */ - public void removeRecipientViewFromMrmData(final byte destinationDeviceId, final Envelope message) { - if (message.hasSharedMrmKey()) { - messagesCache.removeRecipientViewFromMrmData(List.of(message.getSharedMrmKey().toByteArray()), - ServiceIdentifier.valueOf(message.getDestinationServiceId()), - destinationDeviceId); - } - } } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index f83afda3f..a9606b720 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -19,7 +19,6 @@ import org.whispersystems.textsecuregcm.push.PushNotificationManager; import org.whispersystems.textsecuregcm.push.PushNotificationScheduler; import org.whispersystems.textsecuregcm.push.ReceiptSender; -import org.whispersystems.textsecuregcm.redis.RedisOperation; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.websocket.session.WebSocketSessionContext; @@ -109,23 +108,12 @@ public void onWebSocketConnect(WebSocketSessionContext context) { pubSubClientEventManager.handleClientDisconnected(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId()); - // Next, we stop listening for inbound messages. If a message arrives after this call, the websocket connection - // will not be notified and will not change its state, but that's okay because it has already closed and - // attempts to deliver mesages via this connection will not succeed. - RedisOperation.unchecked(() -> messagesManager.removeMessageAvailabilityListener(connection)); - // Finally, stop trying to deliver messages and send a push notification if the connection is aware of any // undelivered messages. connection.stop(); }); try { - // Once we add this connection as a message availability listener, it will be notified any time a new message - // arrives in the message cache. This updates the connection's "may have messages" state. It's important that - // we do this first because we want to make sure we're accurately tracking message availability in the - // connection's internal state. - messagesManager.addMessageAvailabilityListener(auth.getAccount().getUuid(), auth.getAuthenticatedDevice().getId(), connection); - // Once we "start" the websocket connection, we'll cancel any scheduled "you may have new messages" push // notifications and begin delivering any stored messages for the connected device. We have not yet declared the // client as "present" yet. If a message arrives at this point, we will update the message availability state diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index e357df125..2735699cf 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -51,7 +51,6 @@ import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.storage.ClientReleaseManager; import org.whispersystems.textsecuregcm.storage.Device; -import org.whispersystems.textsecuregcm.storage.MessageAvailabilityListener; import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.util.HeaderUtils; import org.whispersystems.websocket.WebSocketClient; @@ -63,7 +62,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -public class WebSocketConnection implements MessageAvailabilityListener, ClientEventListener { +public class WebSocketConnection implements ClientEventListener { private static final DistributionSummary messageTime = Metrics.summary( name(MessageController.class, "messageDeliveryDuration")); @@ -81,8 +80,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, ClientE private static final String DISPLACEMENT_COUNTER_NAME = name(WebSocketConnection.class, "displacement"); private static final String NON_SUCCESS_RESPONSE_COUNTER_NAME = name(WebSocketConnection.class, "clientNonSuccessResponse"); - private static final String CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME = name(WebSocketConnection.class, - "messageAvailableAfterClientClosed"); private static final String SEND_MESSAGES_FLUX_NAME = MetricsUtil.name(WebSocketConnection.class, "sendMessages"); private static final String SEND_MESSAGE_ERROR_COUNTER = MetricsUtil.name(WebSocketConnection.class, @@ -460,21 +457,6 @@ private CompletableFuture sendMessage(Envelope envelope) { } } - @Override - public boolean handleNewMessagesAvailable() { - if (!client.isOpen()) { - // The client may become closed without successful removal of references to the `MessageAvailabilityListener` - Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment(); - return false; - } - - Metrics.counter(MESSAGE_AVAILABLE_COUNTER_NAME, - PRESENCE_MANAGER_TAG, "legacy") - .increment(); - - return true; - } - @Override public void handleNewMessageAvailable() { Metrics.counter(MESSAGE_AVAILABLE_COUNTER_NAME, @@ -487,22 +469,7 @@ public void handleNewMessageAvailable() { } @Override - public boolean handleMessagesPersisted() { - if (!client.isOpen()) { - // The client may become without successful removal of references to the `MessageAvailabilityListener` - Metrics.counter(CLIENT_CLOSED_MESSAGE_AVAILABLE_COUNTER_NAME).increment(); - return false; - } - - Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME, - PRESENCE_MANAGER_TAG, "legacy") - .increment(); - - return true; - } - - @Override - public void handleMessagesPersistedPubSub() { + public void handleMessagesPersisted() { Metrics.counter(MESSAGES_PERSISTED_COUNTER_NAME, PRESENCE_MANAGER_TAG, "pubsub") .increment(); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index c63f111ac..df1f18189 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -119,8 +119,6 @@ static CommandDependencies build( Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService( environment.lifecycle().executorService("messageDelivery").minThreads(4).maxThreads(4).build()); - ExecutorService keyspaceNotificationDispatchExecutor = environment.lifecycle() - .executorService(name(name, "keyspaceNotification-%d")).minThreads(4).maxThreads(4).build(); ExecutorService messageDeletionExecutor = environment.lifecycle() .executorService(name(name, "messageDeletion-%d")).minThreads(4).maxThreads(4).build(); ExecutorService secureValueRecoveryServiceExecutor = environment.lifecycle() @@ -209,7 +207,7 @@ static CommandDependencies build( SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator, storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration()); PubSubClientEventManager pubSubClientEventManager = new PubSubClientEventManager(messagesCluster, clientEventExecutor); - MessagesCache messagesCache = new MessagesCache(messagesCluster, keyspaceNotificationDispatchExecutor, + MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC(), dynamicConfigurationManager); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, @@ -262,7 +260,6 @@ static CommandDependencies build( Clock.systemUTC()); environment.lifecycle().manage(apnSender); - environment.lifecycle().manage(messagesCache); environment.lifecycle().manage(pubSubClientEventManager); environment.lifecycle().manage(new ManagedAwsCrt()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java index b261465d0..081574d87 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java @@ -68,7 +68,6 @@ protected void run(Environment environment, Namespace namespace, WhisperServerCo Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()), namespace.getInt(WORKER_COUNT)); - environment.lifecycle().manage(deps.messagesCache()); environment.lifecycle().manage(messagePersister); MetricsUtil.registerSystemResourceMetrics(environment); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClusterFactory.java b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClusterFactory.java index a49935fd1..6287a600c 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClusterFactory.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/configuration/LocalFaultTolerantRedisClusterFactory.java @@ -22,9 +22,6 @@ private LocalFaultTolerantRedisClusterFactory() { try { redisClusterExtension.beforeAll(null); redisClusterExtension.beforeEach(null); - - redisClusterExtension.getRedisCluster().useCluster(connection -> - connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz")); } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java index 7a6244ddc..fd4f89bbd 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java @@ -59,7 +59,7 @@ public void handleNewMessageAvailable() { } @Override - public void handleMessagesPersistedPubSub() { + public void handleMessagesPersisted() { } @Override @@ -174,7 +174,7 @@ void handleMessagesPersisted(final boolean messagesPersistedRemotely) throws Int localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() { @Override - public void handleMessagesPersistedPubSub() { + public void handleMessagesPersisted() { messagesPersistedLatch.countDown(); } }).toCompletableFuture().join(); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index b7bac9f71..4e1230902 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -9,7 +9,6 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; @@ -33,6 +32,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.push.ClientEventListener; import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; @@ -50,9 +50,9 @@ class MessagePersisterIntegrationTest { @RegisterExtension static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder().build(); - private ExecutorService notificationExecutorService; private Scheduler messageDeliveryScheduler; private ExecutorService messageDeletionExecutorService; + private ExecutorService clientEventExecutorService; private MessagesCache messagesCache; private MessagesManager messagesManager; private PubSubClientEventManager pubSubClientEventManager; @@ -65,7 +65,6 @@ class MessagePersisterIntegrationTest { void setUp() throws Exception { REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> { connection.sync().flushall(); - connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"); }); @SuppressWarnings("unchecked") final DynamicConfigurationManager dynamicConfigurationManager = @@ -80,12 +79,14 @@ void setUp() throws Exception { messageDeletionExecutorService); final AccountsManager accountsManager = mock(AccountsManager.class); - notificationExecutorService = Executors.newSingleThreadExecutor(); - messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), notificationExecutorService, + messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC(), dynamicConfigurationManager); messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class), messageDeletionExecutorService); - pubSubClientEventManager = mock(PubSubClientEventManager.class); + + clientEventExecutorService = Executors.newVirtualThreadPerTaskExecutor(); + pubSubClientEventManager = new PubSubClientEventManager(REDIS_CLUSTER_EXTENSION.getRedisCluster(), clientEventExecutorService); + pubSubClientEventManager.start(); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, pubSubClientEventManager, dynamicConfigurationManager, PERSIST_DELAY, 1); @@ -100,19 +101,19 @@ void setUp() throws Exception { when(account.getDevice(Device.PRIMARY_ID)).thenReturn(Optional.of(DevicesHelper.createDevice(Device.PRIMARY_ID))); when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); - - messagesCache.start(); } @AfterEach void tearDown() throws Exception { - notificationExecutorService.shutdown(); - notificationExecutorService.awaitTermination(15, TimeUnit.SECONDS); - messageDeletionExecutorService.shutdown(); messageDeletionExecutorService.awaitTermination(15, TimeUnit.SECONDS); + clientEventExecutorService.shutdown(); + clientEventExecutorService.awaitTermination(15, TimeUnit.SECONDS); + messageDeliveryScheduler.dispose(); + + pubSubClientEventManager.stop(); } @Test @@ -141,21 +142,22 @@ void testScheduledPersistMessages() { final AtomicBoolean messagesPersisted = new AtomicBoolean(false); - messagesManager.addMessageAvailabilityListener(account.getUuid(), Device.PRIMARY_ID, - new MessageAvailabilityListener() { + pubSubClientEventManager.handleClientConnected(account.getUuid(), Device.PRIMARY_ID, new ClientEventListener() { @Override - public boolean handleNewMessagesAvailable() { - return true; + public void handleNewMessageAvailable() { } @Override - public boolean handleMessagesPersisted() { + public void handleMessagesPersisted() { synchronized (messagesPersisted) { messagesPersisted.set(true); messagesPersisted.notifyAll(); - return true; } } + + @Override + public void handleConnectionDisplaced(final boolean connectedElsewhere) { + } }); messagePersister.start(); @@ -183,8 +185,6 @@ public boolean handleMessagesPersisted() { .toList(); assertEquals(expectedMessages, persistedMessages); - - verify(pubSubClientEventManager).handleMessagesPersisted(account.getUuid(), Device.PRIMARY_ID); }); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index d032bc11e..a0f7f992d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -100,7 +100,7 @@ void setUp() throws Exception { sharedExecutorService = Executors.newSingleThreadExecutor(); resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor(); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); - messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, + messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager); pubSubClientEventManager = mock(PubSubClientEventManager.class); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, pubSubClientEventManager, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 9af62e4e7..93b3b19de 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -50,9 +50,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.RandomStringUtils; @@ -105,12 +103,6 @@ class WithRealCluster { @BeforeEach void setUp() throws Exception { - - REDIS_CLUSTER_EXTENSION.getRedisCluster().useCluster(connection -> { - connection.sync().flushall(); - connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"); - }); - final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class); when(dynamicConfiguration.getMessagesConfiguration()).thenReturn(new DynamicMessagesConfiguration(true, true)); dynamicConfigurationManager = mock(DynamicConfigurationManager.class); @@ -119,16 +111,12 @@ void setUp() throws Exception { sharedExecutorService = Executors.newSingleThreadExecutor(); resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor(); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); - messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, + messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager); - - messagesCache.start(); } @AfterEach void tearDown() throws Exception { - messagesCache.stop(); - sharedExecutorService.shutdown(); sharedExecutorService.awaitTermination(1, TimeUnit.SECONDS); @@ -303,8 +291,7 @@ void testGetMessagesPublisher(final boolean expectStale) throws Exception { } final MessagesCache messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, cacheClock, - dynamicConfigurationManager); + messageDeliveryScheduler, sharedExecutorService, cacheClock, dynamicConfigurationManager); final List actualMessages = Flux.from( messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID)) @@ -394,13 +381,6 @@ void testGetDeviceIdFromQueueName() { StandardCharsets.UTF_8))); } - @Test - void testGetQueueNameFromKeyspaceChannel() { - assertEquals("1b363a31-a429-4fb6-8959-984a025e72ff::7", - MessagesCache.getQueueNameFromKeyspaceChannel( - "__keyspace@0__:user_queue::{1b363a31-a429-4fb6-8959-984a025e72ff::7}")); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) public void testGetQueuesToPersist(final boolean sealedSender) { @@ -415,152 +395,8 @@ public void testGetQueuesToPersist(final boolean sealedSender) { final List queues = messagesCache.getQueuesToPersist(slot, Instant.now().plusSeconds(60), 100); assertEquals(1, queues.size()); - assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.get(0))); - assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.get(0))); - } - - @Test - void testNotifyListenerNewMessage() { - final AtomicBoolean notified = new AtomicBoolean(false); - final UUID messageGuid = UUID.randomUUID(); - - final MessageAvailabilityListener listener = new MessageAvailabilityListener() { - @Override - public boolean handleNewMessagesAvailable() { - synchronized (notified) { - notified.set(true); - notified.notifyAll(); - - return true; - } - } - - @Override - public boolean handleMessagesPersisted() { - return true; - } - }; - - assertTimeoutPreemptively(Duration.ofSeconds(5), () -> { - messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener); - messagesCache.insert(messageGuid, DESTINATION_UUID, DESTINATION_DEVICE_ID, - generateRandomMessage(messageGuid, true)); - - synchronized (notified) { - while (!notified.get()) { - notified.wait(); - } - } - - assertTrue(notified.get()); - }); - } - - @Test - void testNotifyListenerPersisted() { - final AtomicBoolean notified = new AtomicBoolean(false); - - final MessageAvailabilityListener listener = new MessageAvailabilityListener() { - @Override - public boolean handleNewMessagesAvailable() { - return true; - } - - @Override - public boolean handleMessagesPersisted() { - synchronized (notified) { - notified.set(true); - notified.notifyAll(); - - return true; - } - } - }; - - assertTimeoutPreemptively(Duration.ofSeconds(5), () -> { - messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener); - - messagesCache.lockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID); - messagesCache.unlockQueueForPersistence(DESTINATION_UUID, DESTINATION_DEVICE_ID); - - synchronized (notified) { - while (!notified.get()) { - notified.wait(); - } - } - - assertTrue(notified.get()); - }); - } - - - /** - * Helper class that implements {@link MessageAvailabilityListener#handleNewMessagesAvailable()} by always returning - * {@code false}. Its {@code counter} field tracks how many times {@code handleNewMessagesAvailable} has been - * called. - *

- * It uses a {@link CompletableFuture} to signal that it has received a “messages available” callback for the first - * time. - */ - private static class NewMessagesAvailabilityClosedListener implements MessageAvailabilityListener { - - private int counter; - - private final Consumer messageHandledCallback; - private final CompletableFuture firstMessageHandled = new CompletableFuture<>(); - - private NewMessagesAvailabilityClosedListener(final Consumer messageHandledCallback) { - this.messageHandledCallback = messageHandledCallback; - } - - @Override - public boolean handleNewMessagesAvailable() { - counter++; - messageHandledCallback.accept(counter); - firstMessageHandled.complete(null); - - return false; - - } - - @Override - public boolean handleMessagesPersisted() { - return true; - } - } - - @Test - void testAvailabilityListenerResponses() { - final NewMessagesAvailabilityClosedListener listener1 = new NewMessagesAvailabilityClosedListener( - count -> assertEquals(1, count)); - final NewMessagesAvailabilityClosedListener listener2 = new NewMessagesAvailabilityClosedListener( - count -> assertEquals(1, count)); - - assertTimeoutPreemptively(Duration.ofSeconds(30), () -> { - messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener1); - final UUID messageGuid1 = UUID.randomUUID(); - messagesCache.insert(messageGuid1, DESTINATION_UUID, DESTINATION_DEVICE_ID, - generateRandomMessage(messageGuid1, true)); - - listener1.firstMessageHandled.get(); - - // Avoid a race condition by blocking on the message handled future *and* the current notification executor task— - // the notification executor task includes unsubscribing `listener1`, and, if we don’t wait, sometimes - // `listener2` will get subscribed before `listener1` is cleaned up - sharedExecutorService.submit(() -> listener1.firstMessageHandled.get()).get(); - - final UUID messageGuid2 = UUID.randomUUID(); - messagesCache.insert(messageGuid2, DESTINATION_UUID, DESTINATION_DEVICE_ID, - generateRandomMessage(messageGuid2, true)); - - messagesCache.addMessageAvailabilityListener(DESTINATION_UUID, DESTINATION_DEVICE_ID, listener2); - - final UUID messageGuid3 = UUID.randomUUID(); - messagesCache.insert(messageGuid3, DESTINATION_UUID, DESTINATION_DEVICE_ID, - generateRandomMessage(messageGuid3, true)); - - listener2.firstMessageHandled.get(); - }); + assertEquals(DESTINATION_UUID, MessagesCache.getAccountUuidFromQueueName(queues.getFirst())); + assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.getFirst())); } @ParameterizedTest @@ -621,7 +457,7 @@ void testMultiRecipientMessage(final boolean sharedMrmKeyPresent) throws Excepti @ParameterizedTest @ValueSource(booleans = {true, false}) - void testGetMessagesToPersist(final boolean sharedMrmKeyPresent) throws Exception { + void testGetMessagesToPersist(final boolean sharedMrmKeyPresent) { final UUID destinationUuid = UUID.randomUUID(); final byte deviceId = 1; @@ -697,7 +533,7 @@ void setup() throws Exception { messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); - messagesCache = new MessagesCache(mockCluster, mock(ExecutorService.class), messageDeliveryScheduler, + messagesCache = new MessagesCache(mockCluster, messageDeliveryScheduler, Executors.newSingleThreadExecutor(), Clock.systemUTC(), mock(DynamicConfigurationManager.class)); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 47621eeb0..065ec7dc5 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -98,7 +98,7 @@ void setUp() throws Exception { messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); dynamicConfigurationManager = mock(DynamicConfigurationManager.class); when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); - messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, + messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager); messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(), DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(7), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java index 1af3b9a67..e4c039a31 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionTest.java @@ -769,7 +769,7 @@ void testProcessDatabaseMessagesAfterPersist() { // whenComplete method will get called immediately on THIS thread, so we don't need to synchronize or wait for // anything. connection.processStoredMessages(); - connection.handleMessagesPersistedPubSub(); + connection.handleMessagesPersisted(); verify(messagesManager, times(2)).getMessagesForDeviceReactive(account.getUuid(), device, false); }