-
Notifications
You must be signed in to change notification settings - Fork 25
Support Avro classes in ReqRep #102
Comments
It is possible, but not straight out of the box. import com.sksamuel.avro4s._
import org.apache.kafka.clients.producer.ProducerConfig
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocol
import path.AvroClass // here goes the path to the generated avro class
// define Serde for Avro-class
val ser =
new KafkaAvroSerializer(
new CachedSchemaRegistryClient("url".split(',').toList.asJava, 16),
)
val de =
new KafkaAvroDeserializer(
new CachedSchemaRegistryClient("url".split(',').toList.asJava, 16),
)
implicit val serdeClass: Serde[AvroClass] = new Serde[AvroClass] {
override def serializer(): Serializer[AvroClass] = ser.asInstanceOf[Serializer[AvroClass]]
override def deserializer(): Deserializer[AvroClass] = de.asInstanceOf[Deserializer[AvroClass]]
}
// Protocol description
val kafkaScn1Protocol: KafkaProtocol = kafka
.topic("myTopic")
.properties(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "url",
),
)
val kafkaScn2Protocol: KafkaProtocol = kafka.requestReply
.producerSettings(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "url",
),
)
.consumeSettings(
Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
"schema.registry.url" -> "url",
),
)
.timeout(5.seconds)
// Sending Avro-serialised messages scenario
val scn: ScenarioBuilder = scenario("Simple request")
.exec(
kafka("Simple Request")
.send[AvroClass](new AvroClass("someParam1", "someParam2")),
)
val scn2: ScenarioBuilder = scenario("RequestReply")
.exec(
kafka("RequestReply").requestReply
.requestTopic("request.t")
.replyTopic("reply.t")
.send[String, AvroClass]("key", new AvroClass("someParam1", "someParam2")),
) |
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
In the default
Kafka.send
method it is possible to use Avro Java classes (generated by avro-maven-plugin) as payload in key or value. The payload then gets serialized as valid Avro bytestream.But in the new ReqRepBases (used in the Request/Reply mechanism)
send
method this does not seem possible. We are seeing an error like[No implicits found for parameter evidence]
.Is Avro not supported here? Or are we simply using it wrong?
The text was updated successfully, but these errors were encountered: