This project is a demo for learning about Spring for Apache Kafka.
Located under consumer/
, this service is a Spring Boot application that listens to Kafka topics and processes incoming messages.
The ConsumerConfig
class is a @Configuration
bean, annotated also with @EnableKafka
. The @EnableKafka
enables spring's application context to register
the consumers annotated with the @KafkaListener
annotation:
public static final String LIBRARY_EVENTS = "library-events";
@KafkaListener(topics = {LIBRARY_EVENTS})
@Override
public void consume(ConsumerRecord<Integer, String> consumerRecord) throws JsonProcessingException, MyRetriableException {
LibraryEvent libraryEvent = new ObjectMapper().readValue(consumerRecord.value(), LibraryEvent.class);
if (libraryEvent.getLibraryEventId() == null)
throw new IllegalArgumentException(LIBRARY_EVENT_ID_CANNOT_BE_NULL);
else if (libraryEvent.getLibraryEventId() == 0)
throw new MyRetriableException(LIBRARY_EVENT_ID_0);
log.info("Consumer Record: {}", consumerRecord);
}
The following configurations are specified in the application.properties
file.
- Bootstrap Servers:
kafka:29092
- The Kafka broker addresses. - Key Deserializer:
org.apache.kafka.common.serialization.IntegerDeserializer
- Deserializer for the key that is used when consuming messages. - Value Deserializer:
org.apache.kafka.common.serialization.StringDeserializer
- Deserializer for the value that is used when consuming messages. - Group ID:
library-events-listener-group
This method defines a ConcurrentKafkaListenerContainerFactory
bean that configures the Kafka listener containers and allows the creation of multiple concurrent consumer threads for each @KafkaListener
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler());
return factory;
}
Found in producer/
, the Producer Application serves as a Kafka message producer. It publishes library event messages to Kafka topics, handling serialization and response handling through callbacks. Key components include:
- LibraryEventProducer: Sends library events to Kafka, implementing error handling and successful message callbacks.
- Producer: Defines the generic producer interface, encapsulating the production of messages.
Both applications share a common domain context with two primary entities:
- LibraryEvent: Represents an event in the library, containing identifiers and associated book details.
- Book: Contains book details like ID, name, and author.
Check the docker-compose file in the root project and start from there to build & run