Skip to content

Commit

Permalink
feat: kafka message with payload and headers and without key (#159)
Browse files Browse the repository at this point in the history
* feat: kafka simple send without key and with headers for scala and javaapi

* feat: enable test in CI

* feat: add test in scala

* fix: scalafmt

* fix: links on examples

---------

Co-authored-by: a.ugodnikov <[email protected]>
  • Loading branch information
daylikon and a.ugodnikov authored May 18, 2023
1 parent e05d5e0 commit 2eb0e15
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 5 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ jobs:

- name: Test Release
run: sbt clean scalafmtCheckAll scalafmtSbtCheck compile coverage "Gatling / testOnly ru.tinkoff.gatling.kafka.examples.KafkaGatlingTest" test coverageOff


- name: Test Javaapi Methods
run: sbt compile "Gatling / testOnly ru.tinkoff.gatling.kafka.examples.KafkaJavaapiMethodsGatlingTest"

- name: Coverage Report
run: sbt coverageReport

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ val de = KafkaAvroDeserializer(CachedSchemaRegistryClient("schRegUrl".split(',')

Example [scala](src/test/scala/ru/tinkoff/gatling/kafka/examples/AvroClassWithRequestReplySimulation.scala)

Example [java](src/test/java/ru/tinkoff/gatling/kafka/examples/AvroClassWithRequestReplySimulation.java)
Example [java](src/test/java/ru/tinkoff/gatling/kafka/javaapi/examples/AvroClassWithRequestReplySimulation.java)

Example [kotlin](src/test/kotlin/ru/tinkoff/gatling/kafka/examples/AvroClassWithRequestReplySimulation.kt)
Example [kotlin](src/test/kotlin/ru/tinkoff/gatling/kafka/javaapi/examples/AvroClassWithRequestReplySimulation.kt)
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ public KafkaRequestBuilderBase(ru.tinkoff.gatling.kafka.request.builder.KafkaReq
ru.tinkoff.gatling.kafka.request.builder.Sender.noSchemaSender()));
}

public <K, V> RequestBuilder<?, ?> send(V payload, Headers headers) {
return new RequestBuilder<>(
wrapped.send(
null,
calculateExpression(payload),
toStaticValueExpression(headers),
ru.tinkoff.gatling.kafka.request.builder.Sender.noSchemaSender()
));
}

public ReqRepBase requestReply() {
return new ReqRepBase(requestName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ case class KafkaRequestBuilderBase(requestName: Expression[String]) {
headers: Expression[Headers] = List.empty[Header],
)(implicit
sender: Sender[K, V],
): RequestBuilder[K, V] =
sender.send(requestName, Some(key), payload, Some(headers))
): RequestBuilder[K, V] = {
if (key == null)
sender.send(requestName, None, payload, Some(headers))
else
sender.send(requestName, Some(key), payload, Some(headers))
}

def send[V](payload: Expression[V])(implicit sender: Sender[Nothing, V]): RequestBuilder[_, V] =
sender.send(requestName, None, payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ class KafkaGatlingTest extends Simulation {
kafka("Request String")
.send[String]("foo"),
)
.exec(
kafka("Request Stirng With null key")
.send[Any, String](null, "nullkey"),
)

val scn: ScenarioBuilder = scenario("Request String")
.exec(kafka("Request String 2").send[String, String]("testCheckJson", """{ "m": "dkf" }"""))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package ru.tinkoff.gatling.kafka.examples

import io.gatling.core.Predef._
import io.gatling.core.structure.ScenarioBuilder
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.header.internals.RecordHeaders
import ru.tinkoff.gatling.kafka.javaapi.KafkaDsl._

class KafkaJavaapiMethodsGatlingTest extends Simulation {

val kafkaConfwoKey = kafka
.topic("myTopic3")
.properties(
java.util.Map.of(
ProducerConfig.ACKS_CONFIG,
"1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9093",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer",
),
)
.protocol()

setUp(
scenario("Request String without key")
.exec(
kafka("Request String without headers and key")
.send("testJavaWithoutKeyAndHeaders")
.asScala(),
)
.exec(
kafka("Request String with headers without key")
.send("testJavaWithHeadersWithoutKey", new RecordHeaders().add("test-header", "test_value".getBytes()))
.asScala(),
)
.inject(nothingFor(1), atOnceUsers(1))
.protocols(kafkaConfwoKey),
)

}

0 comments on commit 2eb0e15

Please sign in to comment.