Skip to content

Commit

Permalink
GH-2357: Remove Remaining Uses of ListenableFuture
Browse files Browse the repository at this point in the history
Resolves #2357

**main only; will issue separate PR to deprecate setters in 2.9**
  • Loading branch information
garyrussell authored and artembilan committed Jul 27, 2022
1 parent f4b73af commit eb7648a
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 75 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ ext {
springBootVersion = '2.6.7' // docs module
springDataVersion = '2022.0.0-M5'
springRetryVersion = '1.3.3'
springVersion = '6.0.0-M5'
springVersion = '6.0.0-SNAPSHOT'
zookeeperVersion = '3.6.3'

idPrefix = 'kafka'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.support.TopicPartitionOffset;
Expand Down Expand Up @@ -62,7 +62,7 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis

private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

private final List<AsyncListenableTaskExecutor> executors = new ArrayList<>();
private final List<AsyncTaskExecutor> executors = new ArrayList<>();

private int concurrency = 1;

Expand Down Expand Up @@ -237,7 +237,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
stopAbnormally(() -> {
});
});
AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();
AsyncTaskExecutor exec = container.getContainerProperties().getListenerTaskExecutor();
if (exec == null) {
if ((this.executors.size() > index)) {
exec = this.executors.get(index);
Expand All @@ -246,7 +246,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
exec = new SimpleAsyncTaskExecutor(beanName + "-C-");
this.executors.add(exec);
}
container.getContainerProperties().setConsumerTaskExecutor(exec);
container.getContainerProperties().setListenerTaskExecutor(exec);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.springframework.aop.framework.Advised;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
Expand Down Expand Up @@ -231,7 +231,7 @@ public enum EOSMode {
/**
* The executor for threads that poll the consumer.
*/
private AsyncListenableTaskExecutor consumerTaskExecutor;
private AsyncTaskExecutor listenerTaskExecutor;

/**
* The timeout for shutting down the container. This is the maximum amount of
Expand Down Expand Up @@ -380,10 +380,11 @@ public void setAckTime(long ackTime) {

/**
* Set the executor for threads that poll the consumer.
* @param consumerTaskExecutor the executor
* @param listenerTaskExecutor the executor
* @since 2.8.9
*/
public void setConsumerTaskExecutor(@Nullable AsyncListenableTaskExecutor consumerTaskExecutor) {
this.consumerTaskExecutor = consumerTaskExecutor;
public void setListenerTaskExecutor(@Nullable AsyncTaskExecutor listenerTaskExecutor) {
this.listenerTaskExecutor = listenerTaskExecutor;
}

/**
Expand Down Expand Up @@ -466,8 +467,8 @@ public Object getMessageListener() {
* @return the executor.
*/
@Nullable
public AsyncListenableTaskExecutor getConsumerTaskExecutor() {
return this.consumerTaskExecutor;
public AsyncTaskExecutor getListenerTaskExecutor() {
return this.listenerTaskExecutor;
}

public long getShutdownTimeout() {
Expand Down Expand Up @@ -919,8 +920,8 @@ public String toString() {
+ "\n ackCount=" + this.ackCount
+ "\n ackTime=" + this.ackTime
+ "\n messageListener=" + this.messageListener
+ (this.consumerTaskExecutor != null
? "\n consumerTaskExecutor=" + this.consumerTaskExecutor
+ (this.listenerTaskExecutor != null
? "\n listenerTaskExecutor=" + this.listenerTaskExecutor
: "")
+ "\n shutdownTimeout=" + this.shutdownTimeout
+ "\n idleEventInterval="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -72,7 +74,7 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
Expand Down Expand Up @@ -123,8 +125,6 @@
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;


/**
Expand Down Expand Up @@ -175,7 +175,7 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count

private volatile ListenerConsumer listenerConsumer;

private volatile ListenableFuture<?> listenerConsumerFuture;
private volatile CompletableFuture<Void> listenerConsumerFuture;

private volatile CountDownLatch startLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -350,21 +350,22 @@ protected void doStart() {
checkAckMode(containerProperties);

Object messageListener = containerProperties.getMessageListener();
AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
containerProperties.setListenerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = determineListenerType(listener);
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = consumerExecutor
.submitListenable(this.listenerConsumer);
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(),
TimeUnit.MILLISECONDS)) {

this.logger.error("Consumer thread failed to start - does the configured task executor "
+ "have enough threads to support all containers and concurrency?");
publishConsumerFailedToStart();
Expand Down Expand Up @@ -403,7 +404,7 @@ private ListenerType determineListenerType(GenericMessageListener<?> listener) {
@Override
protected void doStop(final Runnable callback, boolean normal) {
if (isRunning()) {
this.listenerConsumerFuture.addCallback(new StopCallback(callback));
this.listenerConsumerFuture.whenComplete(new StopCallback(callback));
setRunning(false);
this.listenerConsumer.wakeIfNecessaryForStop();
setStoppedNormally(normal);
Expand Down Expand Up @@ -3712,7 +3713,7 @@ private static final class OffsetMetadata {

}

private class StopCallback implements ListenableFutureCallback<Object> {
private class StopCallback implements BiConsumer<Object, Throwable> {

private final Runnable callback;

Expand All @@ -3721,20 +3722,19 @@ private class StopCallback implements ListenableFutureCallback<Object> {
}

@Override
public void onFailure(Throwable e) {
KafkaMessageListenerContainer.this.logger
.error(e, "Error while stopping the container: ");
if (this.callback != null) {
this.callback.run();
public void accept(Object result, @Nullable Throwable throwable) {
if (throwable != null) {
KafkaMessageListenerContainer.this.logger.error(throwable, "Error while stopping the container");
if (this.callback != null) {
this.callback.run();
}
}
}

@Override
public void onSuccess(Object result) {
KafkaMessageListenerContainer.this.logger
.debug(() -> KafkaMessageListenerContainer.this + " stopped normally");
if (this.callback != null) {
this.callback.run();
else {
KafkaMessageListenerContainer.this.logger
.debug(() -> KafkaMessageListenerContainer.this + " stopped normally");
if (this.callback != null) {
this.callback.run();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -58,7 +59,6 @@
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.CannotCreateTransactionException;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand All @@ -71,7 +71,7 @@ public class DefaultKafkaProducerFactoryTests {
@Test
void testProducerClosedAfterBadTransition() throws Exception {
final Producer producer = mock(Producer.class);
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer.send(any(), any())).willReturn(new CompletableFuture());
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand Down Expand Up @@ -422,7 +421,7 @@ void testWithCallbackFailure() throws Exception {
willAnswer(inv -> {
Callback callback = inv.getArgument(1);
callback.onCompletion(null, new RuntimeException("test"));
return new SettableListenableFuture<RecordMetadata>();
return new CompletableFuture<RecordMetadata>();
}).given(producer).send(any(), any());
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
given(pf.createProducer()).willReturn(producer);
Expand All @@ -449,7 +448,7 @@ void testWithCallbackFailureFunctional() throws Exception {
willAnswer(inv -> {
Callback callback = inv.getArgument(1);
callback.onCompletion(null, new RuntimeException("test"));
return new SettableListenableFuture<RecordMetadata>();
return new CompletableFuture<RecordMetadata>();
}).given(producer).send(any(), any());
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
given(pf.createProducer()).willReturn(producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -79,7 +80,6 @@
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand Down Expand Up @@ -316,10 +316,10 @@ public void testTransactionSynchronizationExceptionOnCommit() {
public void testDeadLetterPublisherWhileTransactionActive() {
@SuppressWarnings("unchecked")
Producer<Object, Object> producer1 = mock(Producer.class);
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer1.send(any(), any())).willReturn(new CompletableFuture<>());
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer2.send(any(), any())).willReturn(new CompletableFuture<>());
producer1.initTransactions();

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -503,10 +503,10 @@ public void testAbort() {
public void testExecuteInTransactionNewInnerTx() {
@SuppressWarnings("unchecked")
Producer<Object, Object> producer1 = mock(Producer.class);
given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer1.send(any(), any())).willReturn(new CompletableFuture<>());
@SuppressWarnings("unchecked")
Producer<Object, Object> producer2 = mock(Producer.class);
given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(producer2.send(any(), any())).willReturn(new CompletableFuture<>());
producer1.initTransactions();
AtomicBoolean first = new AtomicBoolean(true);

Expand Down Expand Up @@ -596,15 +596,15 @@ public static class DeclarativeConfig {
@Bean
public Producer producer1() {
Producer mock = mock(Producer.class);
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(mock.send(any(), any())).willReturn(new CompletableFuture<>());
return mock;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Producer producer2() {
Producer mock = mock(Producer.class);
given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(mock.send(any(), any())).willReturn(new CompletableFuture<>());
return mock;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,13 +25,13 @@

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;

import org.apache.kafka.clients.producer.Producer;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand All @@ -44,9 +44,9 @@ public class RoutingKafkaTemplateTests {
@Test
public void routing() {
Producer<Object, Object> p1 = mock(Producer.class);
given(p1.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(p1.send(any(), any())).willReturn(new CompletableFuture<>());
Producer<Object, Object> p2 = mock(Producer.class);
given(p2.send(any(), any())).willReturn(new SettableListenableFuture<>());
given(p2.send(any(), any())).willReturn(new CompletableFuture<>());
ProducerFactory<Object, Object> pf1 = mock(ProducerFactory.class);
ProducerFactory<Object, Object> pf2 = mock(ProducerFactory.class);
given(pf1.createProducer()).willReturn(p1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void testThreadStarvation() throws InterruptedException {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(1);
exec.afterPropertiesSet();
containerProperties.setConsumerTaskExecutor(exec);
containerProperties.setListenerTaskExecutor(exec);
containerProperties.setConsumerStartTimeout(Duration.ofMillis(50));
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory,
containerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void testDelegateType() throws Exception {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.initialize();
containerProps.setConsumerTaskExecutor(scheduler);
containerProps.setListenerTaskExecutor(scheduler);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("delegate");
Expand Down
Loading

0 comments on commit eb7648a

Please sign in to comment.