From 3485d36f5407047e5a83d7a4b0b9a34e7a78dfa3 Mon Sep 17 00:00:00 2001 From: Vasiliy Sadokhin Date: Sun, 30 Sep 2018 23:55:52 +0800 Subject: [PATCH] Made consumer restart on error programmatically to replay unacknowledged messages for robust delivery --- .../iot/stream/consumer/KafkaConsumer.java | 2 +- .../stream/consumer/KafkaConsumerConfig.java | 8 +- .../consumer/KafkaConsumerErrorHandler.java | 19 +++++ .../consumer/StreamConsumerApplication.java | 27 ++++++- .../consumer/KafkaConsumerConfigTest.java | 16 +++- .../KafkaConsumerErrorHandlerTest.java | 75 +++++++++++++++++ .../StreamConsumerApplicationTest.java | 81 ++++++++++++++++++- 7 files changed, 220 insertions(+), 8 deletions(-) create mode 100644 stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerErrorHandler.java create mode 100644 stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerErrorHandlerTest.java diff --git a/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumer.java b/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumer.java index 2f6ffce..e34495d 100644 --- a/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumer.java +++ b/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumer.java @@ -18,7 +18,7 @@ public KafkaConsumer(MetricRepository metricRepository) { this.metricRepository = metricRepository; } - @KafkaListener(topics = "metric", groupId = "stream") + @KafkaListener(id = "metric-listener", topics = "metric", groupId = "stream", errorHandler = "metricListenerErrorHandler") public void processMessage(Metric metric, Acknowledgment acknowledgment) { for (MetricTable metricTable : MetricTable.values()) { metricRepository.insert(metric, metricTable); diff --git a/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerConfig.java b/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerConfig.java index b613eaf..dd7eea6 100644 --- a/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerConfig.java +++ b/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerConfig.java @@ -13,6 +13,7 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.support.serializer.JsonDeserializer; @Configuration @@ -45,7 +46,12 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont } @Bean - public MetricRepository sensorRepository() { + KafkaListenerErrorHandler errorHandler() { + return new KafkaConsumerErrorHandler(); + } + + @Bean + public MetricRepository metricRepository() { return new MetricRepository(); } } diff --git a/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerErrorHandler.java b/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerErrorHandler.java new file mode 100644 index 0000000..7ba43cf --- /dev/null +++ b/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerErrorHandler.java @@ -0,0 +1,19 @@ +package me.vsadokhin.iot.stream.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.messaging.Message; + +public class KafkaConsumerErrorHandler implements KafkaListenerErrorHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerErrorHandler.class); + + @Override + public Object handleError(Message message, ListenerExecutionFailedException exception) { + LOGGER.error("Exception occurred on metric receiving: " + message, exception); + StreamConsumerApplication.restart(); + return null; + } +} diff --git a/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/StreamConsumerApplication.java b/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/StreamConsumerApplication.java index 6485515..67f1d00 100644 --- a/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/StreamConsumerApplication.java +++ b/stream-consumer/src/main/java/me/vsadokhin/iot/stream/consumer/StreamConsumerApplication.java @@ -5,12 +5,22 @@ import me.vsadokhin.iot.data.utility.CassandraClusterUtility; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication(scanBasePackages = { "me.vsadokhin.iot.data", "me.vsadokhin.iot.stream.consumer" }) public class StreamConsumerApplication { + private static String[] args; + private static ConfigurableApplicationContext context; + public static void main(String... args) { - SpringApplication.run(StreamConsumerApplication.class, args); + StreamConsumerApplication.args = args; + context = SpringApplication.run(StreamConsumerApplication.class, args); + } + + public static void restart() { + context.close(); + context = SpringApplication.run(StreamConsumerApplication.class, args); } @PreDestroy @@ -18,4 +28,19 @@ public void destroy() { CassandraClusterUtility.closeCluster(); } + static void setArgs(String[] args) { + StreamConsumerApplication.args = args; + } + + static String[] getArgs() { + return args; + } + + static void setContext(ConfigurableApplicationContext context) { + StreamConsumerApplication.context = context; + } + + static ConfigurableApplicationContext getContext() { + return context; + } } diff --git a/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerConfigTest.java b/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerConfigTest.java index b9a3c95..aaa7b03 100644 --- a/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerConfigTest.java +++ b/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerConfigTest.java @@ -24,6 +24,7 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.serializer.JsonDeserializer; @@ -176,9 +177,22 @@ public void metricRepository() throws Exception { whenNew(MetricRepository.class).withNoArguments().thenReturn(mockMetricRepository); // act - MetricRepository result = kafkaConsumerConfig.sensorRepository(); + MetricRepository result = kafkaConsumerConfig.metricRepository(); // verify assertThat(result, is(mockMetricRepository)); } + + @Test + public void errorHandler() throws Exception { + // setup + KafkaConsumerErrorHandler mockKafkaConsumerErrorHandler = mock(KafkaConsumerErrorHandler.class); + whenNew(KafkaConsumerErrorHandler.class).withNoArguments().thenReturn(mockKafkaConsumerErrorHandler); + + // act + KafkaListenerErrorHandler result = kafkaConsumerConfig.errorHandler(); + + // verify + assertThat(result, is(mockKafkaConsumerErrorHandler)); + } } \ No newline at end of file diff --git a/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerErrorHandlerTest.java b/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerErrorHandlerTest.java new file mode 100644 index 0000000..79fb784 --- /dev/null +++ b/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/KafkaConsumerErrorHandlerTest.java @@ -0,0 +1,75 @@ +package me.vsadokhin.iot.stream.consumer; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.core.IsNull.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.verifyStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.messaging.Message; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({StreamConsumerApplication.class, LoggerFactory.class }) +public class KafkaConsumerErrorHandlerTest { + + private KafkaConsumerErrorHandler handler; + private static Logger MOCK_LOGGER= mock(Logger.class); + + @BeforeClass + public static void setUpClass() { + mockStatic(LoggerFactory.class); + when(LoggerFactory.getLogger(KafkaConsumerErrorHandler.class)).thenReturn(MOCK_LOGGER); + } + + @Before + public void setUp() { + reset(MOCK_LOGGER); + mockStatic(StreamConsumerApplication.class); + handler = new KafkaConsumerErrorHandler(); + } + + @Test + public void handleError_callLoggerError() { + // setup + Message mockMessage = mock(Message.class); + ListenerExecutionFailedException mockListenerExecutionFailedException = mock(ListenerExecutionFailedException.class); + + // act + handler.handleError(mockMessage, mockListenerExecutionFailedException); + + // verify + verify(MOCK_LOGGER).error("Exception occurred on metric receiving: " + mockMessage, mockListenerExecutionFailedException); + } + + @Test + public void handleError_callStreamConsumerApplicationError() { + // act + handler.handleError(mock(Message.class), new ListenerExecutionFailedException(null)); + + // verify + verifyStatic(StreamConsumerApplication.class); + StreamConsumerApplication.restart(); + } + + @Test + public void handleError_checkResultIsNull() { + // act + Object result = handler.handleError(mock(Message.class), new ListenerExecutionFailedException(null)); + + // verify + assertThat(result, is(nullValue())); + } +} \ No newline at end of file diff --git a/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/StreamConsumerApplicationTest.java b/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/StreamConsumerApplicationTest.java index 650285e..970d427 100644 --- a/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/StreamConsumerApplicationTest.java +++ b/stream-consumer/src/test/java/me/vsadokhin/iot/stream/consumer/StreamConsumerApplicationTest.java @@ -1,24 +1,33 @@ package me.vsadokhin.iot.stream.consumer; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.verifyStatic; +import static org.powermock.api.mockito.PowerMockito.when; import me.vsadokhin.iot.data.utility.CassandraClusterUtility; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.boot.SpringApplication; +import org.springframework.context.ConfigurableApplicationContext; @RunWith(PowerMockRunner.class) -@PrepareForTest({SpringApplication.class, CassandraClusterUtility.class }) +@PrepareForTest({ SpringApplication.class, CassandraClusterUtility.class }) public class StreamConsumerApplicationTest { - @Test - public void main_callSpringApplicationRun() { - // setup + @Before + public void setUp() { mockStatic(SpringApplication.class); + } + @Test + public void main_callSpringApplicationRun() { // act StreamConsumerApplication.main("arg", "1", "2"); @@ -27,6 +36,70 @@ public void main_callSpringApplicationRun() { SpringApplication.run(StreamConsumerApplication.class, "arg", "1", "2"); } + @Test + public void main_checkArgsProperty() { + // act + StreamConsumerApplication.main("args", "1", "2"); + + // verify + assertThat(StreamConsumerApplication.getArgs(), is(new String[]{ "args", "1", "2" })); + } + + @Test + public void main_checkContextProperty() { + // setup + ConfigurableApplicationContext mockConfigurableApplicationContext = mock(ConfigurableApplicationContext.class); + when(SpringApplication.run(StreamConsumerApplication.class, "args")).thenReturn(mockConfigurableApplicationContext); + + // act + StreamConsumerApplication.main("args"); + + // verify + assertThat(StreamConsumerApplication.getContext(), is(mockConfigurableApplicationContext)); + } + + @Test + public void restart_callContextClose() { + // setup + ConfigurableApplicationContext mockConfigurableApplicationContext = mock(ConfigurableApplicationContext.class); + StreamConsumerApplication.setContext(mockConfigurableApplicationContext); + + // act + StreamConsumerApplication.restart(); + + // verify + verify(mockConfigurableApplicationContext).close(); + } + + @Test + public void restart_callSpringApplicationRun() { + // setup + StreamConsumerApplication.setContext(mock(ConfigurableApplicationContext.class)); + StreamConsumerApplication.setArgs(new String[]{ "1", "2", "3" }); + + // act + StreamConsumerApplication.restart(); + + // verify + verifyStatic(SpringApplication.class); + SpringApplication.run(StreamConsumerApplication.class, "1", "2", "3"); + } + + @Test + public void restart_checkContextProperty() { + // setup + StreamConsumerApplication.setArgs(new String[]{ "1", "2", "3" }); + StreamConsumerApplication.setContext(mock(ConfigurableApplicationContext.class)); + ConfigurableApplicationContext mockConfigurableApplicationContext = mock(ConfigurableApplicationContext.class); + when(SpringApplication.run(StreamConsumerApplication.class, "1", "2", "3")).thenReturn(mockConfigurableApplicationContext); + + // act + StreamConsumerApplication.restart(); + + // verify + assertThat(StreamConsumerApplication.getContext(), is(mockConfigurableApplicationContext)); + } + @Test public void destroy_callCassandraClusterUtilityCloseCluster() { // setup