Skip to content

Commit

Permalink
Made consumer restart on error programmatically to replay unacknowled…
Browse files Browse the repository at this point in the history
…ged messages for robust delivery
  • Loading branch information
vsadokhin committed Sep 30, 2018
1 parent 1eae970 commit 3485d36
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public KafkaConsumer(MetricRepository metricRepository) {
this.metricRepository = metricRepository;
}

@KafkaListener(topics = "metric", groupId = "stream")
@KafkaListener(id = "metric-listener", topics = "metric", groupId = "stream", errorHandler = "metricListenerErrorHandler")
public void processMessage(Metric metric, Acknowledgment acknowledgment) {
for (MetricTable metricTable : MetricTable.values()) {
metricRepository.insert(metric, metricTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@Configuration
Expand Down Expand Up @@ -45,7 +46,12 @@ public ConcurrentKafkaListenerContainerFactory<String, Metric> kafkaListenerCont
}

@Bean
public MetricRepository sensorRepository() {
KafkaListenerErrorHandler errorHandler() {
return new KafkaConsumerErrorHandler();
}

@Bean
public MetricRepository metricRepository() {
return new MetricRepository();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package me.vsadokhin.iot.stream.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;

public class KafkaConsumerErrorHandler implements KafkaListenerErrorHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerErrorHandler.class);

@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
LOGGER.error("Exception occurred on metric receiving: " + message, exception);
StreamConsumerApplication.restart();
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,42 @@
import me.vsadokhin.iot.data.utility.CassandraClusterUtility;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication(scanBasePackages = { "me.vsadokhin.iot.data", "me.vsadokhin.iot.stream.consumer" })
public class StreamConsumerApplication {

private static String[] args;
private static ConfigurableApplicationContext context;

public static void main(String... args) {
SpringApplication.run(StreamConsumerApplication.class, args);
StreamConsumerApplication.args = args;
context = SpringApplication.run(StreamConsumerApplication.class, args);
}

public static void restart() {
context.close();
context = SpringApplication.run(StreamConsumerApplication.class, args);
}

@PreDestroy
public void destroy() {
CassandraClusterUtility.closeCluster();
}

static void setArgs(String[] args) {
StreamConsumerApplication.args = args;
}

static String[] getArgs() {
return args;
}

static void setContext(ConfigurableApplicationContext context) {
StreamConsumerApplication.context = context;
}

static ConfigurableApplicationContext getContext() {
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;

Expand Down Expand Up @@ -176,9 +177,22 @@ public void metricRepository() throws Exception {
whenNew(MetricRepository.class).withNoArguments().thenReturn(mockMetricRepository);

// act
MetricRepository result = kafkaConsumerConfig.sensorRepository();
MetricRepository result = kafkaConsumerConfig.metricRepository();

// verify
assertThat(result, is(mockMetricRepository));
}

@Test
public void errorHandler() throws Exception {
// setup
KafkaConsumerErrorHandler mockKafkaConsumerErrorHandler = mock(KafkaConsumerErrorHandler.class);
whenNew(KafkaConsumerErrorHandler.class).withNoArguments().thenReturn(mockKafkaConsumerErrorHandler);

// act
KafkaListenerErrorHandler result = kafkaConsumerConfig.errorHandler();

// verify
assertThat(result, is(mockKafkaConsumerErrorHandler));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package me.vsadokhin.iot.stream.consumer;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.verifyStatic;
import static org.powermock.api.mockito.PowerMockito.when;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;

@RunWith(PowerMockRunner.class)
@PrepareForTest({StreamConsumerApplication.class, LoggerFactory.class })
public class KafkaConsumerErrorHandlerTest {

private KafkaConsumerErrorHandler handler;
private static Logger MOCK_LOGGER= mock(Logger.class);

@BeforeClass
public static void setUpClass() {
mockStatic(LoggerFactory.class);
when(LoggerFactory.getLogger(KafkaConsumerErrorHandler.class)).thenReturn(MOCK_LOGGER);
}

@Before
public void setUp() {
reset(MOCK_LOGGER);
mockStatic(StreamConsumerApplication.class);
handler = new KafkaConsumerErrorHandler();
}

@Test
public void handleError_callLoggerError() {
// setup
Message mockMessage = mock(Message.class);
ListenerExecutionFailedException mockListenerExecutionFailedException = mock(ListenerExecutionFailedException.class);

// act
handler.handleError(mockMessage, mockListenerExecutionFailedException);

// verify
verify(MOCK_LOGGER).error("Exception occurred on metric receiving: " + mockMessage, mockListenerExecutionFailedException);
}

@Test
public void handleError_callStreamConsumerApplicationError() {
// act
handler.handleError(mock(Message.class), new ListenerExecutionFailedException(null));

// verify
verifyStatic(StreamConsumerApplication.class);
StreamConsumerApplication.restart();
}

@Test
public void handleError_checkResultIsNull() {
// act
Object result = handler.handleError(mock(Message.class), new ListenerExecutionFailedException(null));

// verify
assertThat(result, is(nullValue()));
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
package me.vsadokhin.iot.stream.consumer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.verifyStatic;
import static org.powermock.api.mockito.PowerMockito.when;

import me.vsadokhin.iot.data.utility.CassandraClusterUtility;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;

@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplication.class, CassandraClusterUtility.class })
@PrepareForTest({ SpringApplication.class, CassandraClusterUtility.class })
public class StreamConsumerApplicationTest {

@Test
public void main_callSpringApplicationRun() {
// setup
@Before
public void setUp() {
mockStatic(SpringApplication.class);
}

@Test
public void main_callSpringApplicationRun() {
// act
StreamConsumerApplication.main("arg", "1", "2");

Expand All @@ -27,6 +36,70 @@ public void main_callSpringApplicationRun() {
SpringApplication.run(StreamConsumerApplication.class, "arg", "1", "2");
}

@Test
public void main_checkArgsProperty() {
// act
StreamConsumerApplication.main("args", "1", "2");

// verify
assertThat(StreamConsumerApplication.getArgs(), is(new String[]{ "args", "1", "2" }));
}

@Test
public void main_checkContextProperty() {
// setup
ConfigurableApplicationContext mockConfigurableApplicationContext = mock(ConfigurableApplicationContext.class);
when(SpringApplication.run(StreamConsumerApplication.class, "args")).thenReturn(mockConfigurableApplicationContext);

// act
StreamConsumerApplication.main("args");

// verify
assertThat(StreamConsumerApplication.getContext(), is(mockConfigurableApplicationContext));
}

@Test
public void restart_callContextClose() {
// setup
ConfigurableApplicationContext mockConfigurableApplicationContext = mock(ConfigurableApplicationContext.class);
StreamConsumerApplication.setContext(mockConfigurableApplicationContext);

// act
StreamConsumerApplication.restart();

// verify
verify(mockConfigurableApplicationContext).close();
}

@Test
public void restart_callSpringApplicationRun() {
// setup
StreamConsumerApplication.setContext(mock(ConfigurableApplicationContext.class));
StreamConsumerApplication.setArgs(new String[]{ "1", "2", "3" });

// act
StreamConsumerApplication.restart();

// verify
verifyStatic(SpringApplication.class);
SpringApplication.run(StreamConsumerApplication.class, "1", "2", "3");
}

@Test
public void restart_checkContextProperty() {
// setup
StreamConsumerApplication.setArgs(new String[]{ "1", "2", "3" });
StreamConsumerApplication.setContext(mock(ConfigurableApplicationContext.class));
ConfigurableApplicationContext mockConfigurableApplicationContext = mock(ConfigurableApplicationContext.class);
when(SpringApplication.run(StreamConsumerApplication.class, "1", "2", "3")).thenReturn(mockConfigurableApplicationContext);

// act
StreamConsumerApplication.restart();

// verify
assertThat(StreamConsumerApplication.getContext(), is(mockConfigurableApplicationContext));
}

@Test
public void destroy_callCassandraClusterUtilityCloseCluster() {
// setup
Expand Down

0 comments on commit 3485d36

Please sign in to comment.