Skip to content

Commit

Permalink
Enhanced consumer to acknowledge message manually after insert
Browse files Browse the repository at this point in the history
  • Loading branch information
vsadokhin committed Sep 30, 2018
1 parent b6dbf1e commit 8ee2d66
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import me.vsadokhin.iot.data.domain.Metric;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -18,9 +19,11 @@ public KafkaConsumer(MetricRepository metricRepository) {
}

@KafkaListener(topics = "metric", groupId = "stream")
public void processMessage(Metric metric) {
metricRepository.insert(metric, MetricTable.METRIC_BY_SENSOR);
metricRepository.insert(metric, MetricTable.METRIC_BY_TYPE);
public void processMessage(Metric metric, Acknowledgment acknowledgment) {
for (MetricTable metricTable : MetricTable.values()) {
metricRepository.insert(metric, metricTable);
}
acknowledgment.acknowledge();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ public class KafkaConsumerConfig {

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("kafka.endpoints", "localhost:9092"));
return props;
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("kafka.endpoints", "localhost:9092"));
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return properties;
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ public void consumerConfigs_kafkaEndpointsAreSpecifiedAsSystemProperty_checkResu
assertThat(result.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), is(customKafkaEndpoints));
}

@Test
public void consumerConfigs_checkEnableAutoCommitConfigKey() {
// act
Map result = kafkaConsumerConfig.consumerConfigs();

// verify
assertThat(result.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), is(false));
}

@Test
public void consumerFactory_checkResult() throws Exception {
// setup
Expand Down Expand Up @@ -143,7 +152,7 @@ public void kafkaListenerContainerFactory_checkResultConsumerFactory() {
}

@Test
public void sensorRepository() throws Exception {
public void metricRepository() throws Exception {
// setup
MetricRepository mockMetricRepository = mock(MetricRepository.class);
whenNew(MetricRepository.class).withNoArguments().thenReturn(mockMetricRepository);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package me.vsadokhin.iot.stream.consumer;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import me.vsadokhin.iot.data.MetricRepository;
import me.vsadokhin.iot.data.MetricTable;
import me.vsadokhin.iot.data.domain.Metric;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.springframework.kafka.support.Acknowledgment;

public class KafkaConsumerTest {

Expand All @@ -21,27 +26,52 @@ public void setUp() {
}

@Test
public void processMessage_callSensorRepositoryInsert_withMetricBySensor() {
public void processMessage_callMetricRepositoryInsert_withMetricByMetric() {
// setup
Metric mockMetric = mock(Metric.class);

// act
kafkaConsumer.processMessage(mockMetric);
kafkaConsumer.processMessage(mockMetric, mock(Acknowledgment.class));

// verify
verify(mockMetricRepository).insert(mockMetric, MetricTable.METRIC_BY_SENSOR);
}

@Test
public void processMessage_callSensorRepositoryInsert_withMetricBySensorType() {
public void processMessage_callMetricRepositoryInsert_withMetricByMetricType() {
// setup
Metric mockMetric = mock(Metric.class);

// act
kafkaConsumer.processMessage(mockMetric);
kafkaConsumer.processMessage(mockMetric, mock(Acknowledgment.class));

// verify
verify(mockMetricRepository).insert(mockMetric, MetricTable.METRIC_BY_TYPE);
}

@Test
public void processMessage_callAcknowledgmentAcknowledge() {
// setup
Acknowledgment mockAcknowledgment = mock(Acknowledgment.class);

// act
kafkaConsumer.processMessage(new Metric(), mockAcknowledgment);

// verify
verify(mockAcknowledgment).acknowledge();
}

@Test
public void processMessage_callAcknowledgmentAcknowledgeAfterCallMetricRepositoryInsert() {
// setup
Acknowledgment mockAcknowledgment = mock(Acknowledgment.class);

// act
kafkaConsumer.processMessage(new Metric(), mockAcknowledgment);

// verify
InOrder inOrder = Mockito.inOrder(mockMetricRepository, mockAcknowledgment);
inOrder.verify(mockMetricRepository, times(MetricTable.values().length)).insert(any(), any());
inOrder.verify(mockAcknowledgment).acknowledge();
}
}

0 comments on commit 8ee2d66

Please sign in to comment.