Skip to content

Commit

Permalink
Fixed manual acknowledge with missed kafka factory property
Browse files Browse the repository at this point in the history
  • Loading branch information
vsadokhin committed Sep 30, 2018
1 parent fc13b78 commit 1eae970
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
Expand Down Expand Up @@ -39,6 +40,7 @@ public ConcurrentKafkaListenerContainerFactory<String, Metric> kafkaListenerCont
ConcurrentKafkaListenerContainerFactory<String, Metric> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.powermock.api.mockito.PowerMockito.doReturn;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.powermock.api.mockito.PowerMockito.whenNew;

import java.util.HashMap;
Expand All @@ -22,6 +23,8 @@
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@RunWith(PowerMockRunner.class)
Expand Down Expand Up @@ -127,6 +130,7 @@ public void kafkaListenerContainerFactory_checkResult() throws Exception {
// setup
ConcurrentKafkaListenerContainerFactory mockConcurrentKafkaListenerContainerFactory = mock(ConcurrentKafkaListenerContainerFactory.class);
whenNew(ConcurrentKafkaListenerContainerFactory.class).withNoArguments().thenReturn(mockConcurrentKafkaListenerContainerFactory);
when(mockConcurrentKafkaListenerContainerFactory.getContainerProperties()).thenReturn(new ContainerProperties(""));
kafkaConsumerConfig = spy(kafkaConsumerConfig);
doReturn(mock(ConsumerFactory.class)).when(kafkaConsumerConfig).consumerFactory();

Expand All @@ -151,6 +155,20 @@ public void kafkaListenerContainerFactory_checkResultConsumerFactory() {
assertThat(result.getConsumerFactory(), is(mockConsumerFactory));
}

@Test
public void kafkaListenerContainerFactory_checkResultAckMode() {
// setup
kafkaConsumerConfig = spy(kafkaConsumerConfig);
ConsumerFactory mockConsumerFactory = mock(ConsumerFactory.class);
doReturn(mockConsumerFactory).when(kafkaConsumerConfig).consumerFactory();

// act
ConcurrentKafkaListenerContainerFactory<String, Metric> result = kafkaConsumerConfig.kafkaListenerContainerFactory();

// verify
assertThat(result.getContainerProperties().getAckMode(), is(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE));
}

@Test
public void metricRepository() throws Exception {
// setup
Expand Down

0 comments on commit 1eae970

Please sign in to comment.