Skip to content

Commit

Permalink
Update README and MyConfigurableDeserializer
Browse files Browse the repository at this point in the history
  • Loading branch information
qboileau committed Apr 4, 2024
1 parent 922dc50 commit 05fbdb0
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,48 @@ message Person {
repeated PhoneNumber phones = 4;
}
```

## Configurable example
`io.example.conduktor.custom.deserializers.MyConfigurableDeserializer`

[located here](./src/main/scala/io/example/conduktor/custom/deserializers/MyConfigurableDeserializer.scala)

This example allow to show deserializer configuration to change it's behavior.
To configure the behabor, the Deserializer check for a `output` property in it's configuration.

### Passthrough mode :
With configuration :
```properties
output=passthrough
```
The data on record are not de coded and returned as-is in bytes array form.

### Config mode :
With configuration :
```properties
output=config
```
The configuration is returned on each record deserialization.
For example with configuration
```properties
output=config
other.property=some value
```
Will always return JSON like
```json
{
"output": "config",
"other.property": "some value"
}
```

### Constant mode :

With configuration output defined to something else other than `config` or `passthrough` and not empty like:
```properties
output=some constant output
```
The Deserializer will always return String value like
```json
"some constant output"
```
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,28 @@ case object MyConfigurableDeserializerException
"ConfigurableDeserializer fail when its `::configure` method is called without `output` property"
)

sealed trait Output
final case class Constant(value: String) extends Output
final case class Config(config: util.Map[String, _]) extends Output
final case object Passthrough extends Output
final case object Unconfigured extends Output

final class MyConfigurableDeserializer extends Deserializer[Any] {

var output: Any = "unconfigured"
var output: Output = Unconfigured

override def deserialize(topic: String, data: Array[Byte]): Any = output
override def deserialize(topic: String, data: Array[Byte]): Any = output match {
case Constant(value) => value
case Config(config) => config
case Passthrough => data
case Unconfigured => throw MyConfigurableDeserializerException
}

override def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
configs.asScala.get("output").map(_.asInstanceOf[String]) match {
case Some("config") => output = configs
case Some(value) => output = value
case None => throw MyConfigurableDeserializerException
case Some("config") => output = Config(configs)
case Some("passthrough") => output = Passthrough
case Some(value) => output = Constant(value)
case None => throw MyConfigurableDeserializerException
}
}

0 comments on commit 05fbdb0

Please sign in to comment.