Spring Cloud Streams example
Start kafka
docker compose -f infra/redpanda.yml up redpanda
docker exec -it infra-redpanda-1 /bin/bash
docker exec -it infra-redpanda-1 rpk version
docker exec -it infra-redpanda-1 rpk cluster info
docker exec -it infra-redpanda-1 rpk topic delete state-out-0 city-in-0
# produce
docker exec -it infra-redpanda-1 rpk topic produce all-in-topic -k my-key
{"name": "Red", "city": "nuur", "state": "ca"}
{"name": "Red2", "city": "nuur2", "state": "ca"}
# or
docker exec -it infra-redpanda-1 /bin/bash
echo '{"name": "Red", "city": "nuur", "state": "ca"}' | rpk topic produce all-in-topic -k my-key
# consume
docker exec -it infra-redpanda-1 rpk topic consume all-in-topic
docker exec -it infra-redpanda-1 rpk topic consume state-out-topic
docker exec -it infra-redpanda-1 rpk topic consume city-out-topic
Start µService
gradle :services:streams:bootRun
# log at debug level
gradle :services:streams:bootRun --debug
gradle services:streams:spotlessApply
gradle services:streams:build
# list all schemas
curl -s \
"http://localhost:8081/subjects" \
| jq .
# get schemas for `all-in-topic-value`
curl -s \
"http://localhost:8081/subjects/all-in-topic-value/versions/1" \
| jq '.schema | fromjson'
# (or) you can see ` "sensitive": "true"` property.
curl -s \
"http://localhost:8081/subjects/all-in-topic-value/versions/latest/schema" \
| jq .
http :8080/actuator
http :8080/actuator/health
http :8080/actuator/metrics
http :8080/actuator/metrics/kafka.admin.client.request.total
http :8080/actuator/bindings
http :8080/actuator/bindings/state-out-0
http :8080/actuator/bindings/generate-in-0
http :8080/actuator/bindings/print-in-0
http :8080/actuator/kafkastreamstopology
http :8080/actuator/kafkastreamstopology/<application-id of the processor>
http :8080/actuator/kafkastreamstopology/state-applicationId
http :8080/actuator/kafkastreamstopology/city-applicationId
http :8080/actuator/kafkastreamstopology/print-applicationId
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
http :8080/actuator/bindings/print-in-0
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
we need add kafka
binder for Supplier
functions to work
We can only use Consumer
and Function
functions with KStream
binder.
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
- Fictional Spring Cloud Streams
- Introducing Java Functions for Spring Cloud Stream Applications - Part 0
- spring-cloud-stream-binder-kafka Docs
- No need for Schema Registry in your Spring-Kafka tests
- https://github.com/spring-cloud/spring-cloud-stream-samples/
- https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples
- InteractiveQueryService https://github.com/piomin/sample-spring-cloud-stream-kafka/blob/master/stock-service/src/main/java/pl/piomin/samples/kafka/stock/controller/TransactionController.java
- https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/kafka-streams-samples/kafka-streams-inventory-count/src/main/java/kafka/streams/inventory/count/KafkaStreamsInventoryCountApplication.java
- https://github.com/ru-rocker/kafka-stream-employee-example