Skip to content

Commit

Permalink
Update remove method in RedisMessageListenerContainer to handle null …
Browse files Browse the repository at this point in the history
…listener.
  • Loading branch information
sujl95 committed Sep 29, 2024
1 parent f2752d1 commit 3930884
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
* @author Thomas Darimont
* @author Mark Paluch
* @author John Blum
* @author SEONGJUN LEE
* @see MessageListener
* @see SubscriptionListener
*/
Expand Down Expand Up @@ -770,33 +771,35 @@ else if (isListening()) {
}

private void remove(@Nullable MessageListener listener, Topic topic, ByteArrayWrapper holder,
Map<ByteArrayWrapper, Collection<MessageListener>> mapping, List<byte[]> topicToRemove) {
Map<ByteArrayWrapper, Collection<MessageListener>> mapping, List<byte[]> topicToRemove) {

Collection<MessageListener> listeners = mapping.get(holder);
Collection<MessageListener> listenersToRemove = null;

if (listeners != null) {
// remove only one listener
listeners.remove(listener);
listenersToRemove = Collections.singletonList(listener);

// start removing listeners
for (MessageListener messageListener : listenersToRemove) {
Set<Topic> topics = listenerTopics.get(messageListener);
if (topics != null) {
topics.remove(topic);
}
if (CollectionUtils.isEmpty(topics)) {
listenerTopics.remove(messageListener);
}
}
if (listeners == null || listeners.isEmpty()) {
return;
}

// if we removed everything, remove the empty holder collection
if (listeners.isEmpty()) {
mapping.remove(holder);
topicToRemove.add(holder.getArray());
Collection<MessageListener> listenersToRemove = (listener == null) ? new ArrayList<>(listeners)
: Collections.singletonList(listener);

// Remove the specified listener(s) from the original collection
listeners.removeAll(listenersToRemove);

// Start removing listeners
for (MessageListener messageListener : listenersToRemove) {
Set<Topic> topics = listenerTopics.get(messageListener);
if (topics != null) {
topics.remove(topic);
}
if (CollectionUtils.isEmpty(topics)) {
listenerTopics.remove(messageListener);
}
}

// If all listeners were removed, clean up the mapping and the holder
if (listeners.isEmpty()) {
mapping.remove(holder);
topicToRemove.add(holder.getArray());
}
}

private Subscriber createSubscriber(RedisConnectionFactory connectionFactory, Executor executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Mockito.*;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,10 +31,7 @@
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
Expand Down Expand Up @@ -221,4 +219,34 @@ void shouldRecoverFromConnectionFailure() throws Exception {
void failsOnDuplicateInit() {
assertThatIllegalStateException().isThrownBy(() -> container.afterPropertiesSet());
}

@Test
void shouldRemoveSpecificListenerFromMappingAndListenerTopics() {
MessageListener listener1 = mock(MessageListener.class);
MessageListener listener2 = mock(MessageListener.class);
Topic topic = new ChannelTopic("topic1");

container.addMessageListener(listener1, Collections.singletonList(topic));
container.addMessageListener(listener2, Collections.singletonList(topic));

container.removeMessageListener(listener1, Collections.singletonList(topic));

container.addMessageListener(listener2, Collections.singletonList(topic));
verify(listener1, never()).onMessage(any(), any());
}

@Test
void shouldRemoveAllListenersWhenListenerIsNull() {
MessageListener listener1 = mock(MessageListener.class);
MessageListener listener2 = mock(MessageListener.class);
Topic topic = new ChannelTopic("topic1");

container.addMessageListener(listener1, Collections.singletonList(topic));
container.addMessageListener(listener2, Collections.singletonList(topic));

container.removeMessageListener(null, Collections.singletonList(topic));

verify(listener1, never()).onMessage(any(), any());
verify(listener2, never()).onMessage(any(), any());
}
}

0 comments on commit 3930884

Please sign in to comment.