Skip to content

Commit

Permalink
Add configurable deserializer and fix build (#109)
Browse files Browse the repository at this point in the history
* Add MyConfigurableDeserializer example

* Update tpolecat and add warn exclusion for scalaPB

* Update README and MyConfigurableDeserializer
  • Loading branch information
qboileau authored Apr 4, 2024
1 parent 10473f3 commit d5aa7be
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 1 deletion.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,48 @@ message Person {
repeated PhoneNumber phones = 4;
}
```

## Configurable example
`io.example.conduktor.custom.deserializers.MyConfigurableDeserializer`

[located here](./src/main/scala/io/example/conduktor/custom/deserializers/MyConfigurableDeserializer.scala)

This example allow to show deserializer configuration to change it's behavior.
To configure the behabor, the Deserializer check for a `output` property in it's configuration.

### Passthrough mode :
With configuration :
```properties
output=passthrough
```
The data on record are not de coded and returned as-is in bytes array form.

### Config mode :
With configuration :
```properties
output=config
```
The configuration is returned on each record deserialization.
For example with configuration
```properties
output=config
other.property=some value
```
Will always return JSON like
```json
{
"output": "config",
"other.property": "some value"
}
```

### Constant mode :

With configuration output defined to something else other than `config` or `passthrough` and not empty like:
```properties
output=some constant output
```
The Deserializer will always return String value like
```json
"some constant output"
```
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import org.typelevel.scalacoptions.ScalacOptions

name := "my_custom_deserializers"
version := sys.env.getOrElse("CREATED_TAG", "0.1")
scalaVersion := "2.13.10"
Expand All @@ -8,6 +10,10 @@ libraryDependencies ++= Seq(
"com.thesamet.scalapb.common-protos" %% "proto-google-common-protos-scalapb_0.11" % "2.9.6-0"
)

Compile / tpolecatExcludeOptions ++= Set(
ScalacOptions.warnNonUnitStatement, // for scalaPB gen sources
)

assembly / assemblyJarName := "plugins.jar"

// ## Github Packages publish configs
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.example.conduktor.custom.deserializers

import org.apache.kafka.common.serialization.Deserializer

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala

case object MyConfigurableDeserializerException
extends RuntimeException(
"ConfigurableDeserializer fail when its `::configure` method is called without `output` property"
)

sealed trait Output
final case class Constant(value: String) extends Output
final case class Config(config: util.Map[String, _]) extends Output
final case object Passthrough extends Output
final case object Unconfigured extends Output

final class MyConfigurableDeserializer extends Deserializer[Any] {

var output: Output = Unconfigured

override def deserialize(topic: String, data: Array[Byte]): Any = output match {
case Constant(value) => value
case Config(config) => config
case Passthrough => data
case Unconfigured => throw MyConfigurableDeserializerException
}

override def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
configs.asScala.get("output").map(_.asInstanceOf[String]) match {
case Some("config") => output = Config(configs)
case Some("passthrough") => output = Passthrough
case Some(value) => output = Constant(value)
case None => throw MyConfigurableDeserializerException
}
}

0 comments on commit d5aa7be

Please sign in to comment.