Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How does one create kafka streams setup to read AVRO encoded messages? #375

Open
StankovicMarko opened this issue Jul 26, 2024 · 0 comments

Comments

@StankovicMarko
Copy link

StankovicMarko commented Jul 26, 2024

Hi,

I've been trying to use streams to be able to read messages from kafka topic that has AVRO encoded messages, AVRO schemas are in AWS Glue.

I've tried many variations of configuration below but failed to get any progress

(defn build-topology [builder input-topics]
  (let [topic-maps (map topic-config input-topics)]
    (-> (jstreams/kstreams builder topic-maps)
       (jstreams/peek forward ONLY PRN MSG)))
  builder)
(mount/defstate kafka
  :start (let [builder (jstreams/streams-builder)
               topology (build-topology builder ["test"])
               application-config {"bootstrap.servers" (:kafka-servers config/config)
                                   "consumer.auto.offset.reset" "latest"
                                   "application.id" (str (:profile (mount/args)) "-match-status-" (random-uuid))
                                   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
                                   "value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
                                   "avroRecordType" "GENERIC_RECORD"
                                   "registry.name" "test"
                                   "schemaName" "test"}
               application (jstreams/kafka-streams
                            topology
                            application-config)]
           (timbre/info "Starting Kafka Streams application..." {:application-config application-config})
           (jstreams/start application)

           {:application application
            :application-config application-config
            :builder builder
            :topology topology})

  :stop (do (jstreams/close (:application kafka))
            {}))

I've setup basic consumer to make sure i am not losing my sanity and this works.

(def consumer (jc/subscribed-consumer {"bootstrap.servers" (:kafka-servers config/config)
                                         "consumer.auto.offset.reset" "latest"
                                         "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
                                         "value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
                                         "avroRecordType" "GENERIC_RECORD"
                                         "registry.name" "test123"
                                         "group.id" "test123"
                                         "max.poll.records" (.intValue 1)
                                         "schemaName" "test123"}
                                 [{:topic-name "test123"}]))
(.close consumer)



(jc/poll consumer 1000)            

I think using streams for avro encoded glue is possible but my lack of skill/knowledge is showing.
Any guidance would be appreciated.

Just wanna mention that i tried asking for help in slack and didn't get response for few days now that's why i am opening an issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant