Skip to content

Latest commit

 

History

History

streams

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Streams

Spring Cloud Streams example

Run

Start kafka

docker compose -f infra/redpanda.yml up redpanda

docker exec -it infra-redpanda-1 /bin/bash
docker exec -it infra-redpanda-1 rpk version
docker exec -it infra-redpanda-1 rpk cluster info
docker exec -it infra-redpanda-1 rpk topic delete state-out-0 city-in-0

# produce
docker exec -it infra-redpanda-1 rpk topic produce all-in-topic -k my-key
{"name": "Red", "city": "nuur", "state": "ca"}
{"name": "Red2", "city": "nuur2", "state": "ca"}
# or
docker exec -it infra-redpanda-1 /bin/bash
echo '{"name": "Red", "city": "nuur", "state": "ca"}' | rpk topic produce all-in-topic -k my-key
# consume
docker exec -it infra-redpanda-1 rpk topic consume all-in-topic
docker exec -it infra-redpanda-1 rpk topic consume state-out-topic
docker exec -it infra-redpanda-1 rpk topic consume city-out-topic

Start µService

gradle :services:streams:bootRun
# log at debug level
gradle :services:streams:bootRun --debug

Build

gradle services:streams:spotlessApply
gradle services:streams:build

Test

# list all schemas 
curl -s \
  "http://localhost:8081/subjects" \
  | jq .
# get schemas for `all-in-topic-value`
curl -s \
  "http://localhost:8081/subjects/all-in-topic-value/versions/1" \
  | jq '.schema | fromjson' 
# (or) you can see ` "sensitive": "true"` property.
curl -s \
  "http://localhost:8081/subjects/all-in-topic-value/versions/latest/schema" \
  | jq .

Operations

Metrics

http :8080/actuator

http :8080/actuator/health

http :8080/actuator/metrics
http :8080/actuator/metrics/kafka.admin.client.request.total

http :8080/actuator/bindings
http :8080/actuator/bindings/state-out-0
http :8080/actuator/bindings/generate-in-0
http :8080/actuator/bindings/print-in-0

http :8080/actuator/kafkastreamstopology
http :8080/actuator/kafkastreamstopology/<application-id of the processor>
http :8080/actuator/kafkastreamstopology/state-applicationId
http :8080/actuator/kafkastreamstopology/city-applicationId
http :8080/actuator/kafkastreamstopology/print-applicationId

Binding control

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
http :8080/actuator/bindings/print-in-0
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
curl -d '{"state":"PAUSED"}'  -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST localhost:8080/actuator/bindings/print-in-0

Binders

we need add kafka binder for Supplier functions to work We can only use Consumer and Function functions with KStream binder.

implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")

Reference

Example projects