Skip to content

Commit

Permalink
Perform automatic type conversion from a Python function result into …
Browse files Browse the repository at this point in the history
…the specified result type (#68)

* Combination of KSML Producer, XML and SOAP features

* Making progress

* Update schema

* Some improvements and cleanups

* Continued work on SOAP and XML

* SOAP improvements

* Improved SOAP and UserFunction type checking

* Bug fix for properly encoding defaultValues in schema in native format

* Added support for automatic conversion between notations and types

* Small big fixes and cleanups

* Bugfix for queryable state stores, now work as intended

* Improved on internal XML structure and handling

* Improved toString output of internal data types

* Split up the producer examples into separate files

* Made XML schema handling work in the same way as AVRO schema. First stab at JSON schema. And added initial CSV support.

* CSV and JSON schema added and tested

* Renamed the producer to data-generator

* Updated to a single multistage Dockerfile
Refactored Example Producer image to data generator
Updated example docker compose to use the new data generator
Updated GitHub workflow to use new Docker image approach

* Fixed configuration bug causing missing schema registry configuration properties with Kafka Client Factory backend

* [skip ci] Updated ReadMe file

* Polishing for release

* Add automatic type conversion from a Python return value into the specified function return type

* Support global functions to register Python globals

* Check metadata fields of NamedSchema to ensure compatibility with other schema

* Allow null producing

* Enable conversion from simple types to unions

* Add the convertKeyValue operation to convert message keys and values in one go

* First steps towards error handling

* Added Streams Error Handling configuration, updated ExecutionContext to handle errors

* Added KSMLErrorHandlingConfig to define error handling settings

* Reimplemented error handling

* Converted KSMLConfig into a record

* Set default name for error handlers  + small fix

* Small improvements

---------

Co-authored-by: Richard Bosch <[email protected]>
  • Loading branch information
jeroenvandisseldorp and richard-axual authored Mar 15, 2023
1 parent 33e5bfc commit f8f151b
Show file tree
Hide file tree
Showing 47 changed files with 820 additions and 366 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ Axual KSML
KSML is a wrapper around Kafka Streams that allows for development of low code stream processing applications. It was developed by Axual early 2021 and released as open source in May 2021.

## Introduction
Kafka Streams has captured the hearts and minds of many developers that want to develop streaming applications on top of Kafka. But as powerful as the framework is, Kafka Streams has had a hard time getting around the requirement of writing Java code and setting up build pipelines. There were some attempts to rebuild Kafka Streams, but up until now popular languages like Python did not receive equally powerful (and maintained) stream processing frameworks. KSML provides a new declarative approach to unlock Kafka Streamsto a wider audience. Using only a few simple basic rules and Python snippets, you will be able to write streaming applications in very little time.
Kafka Streams has captured the hearts and minds of many developers that want to develop streaming applications on top of Kafka. But as powerful as the framework is, Kafka Streams has had a hard time getting around the requirement of writing Java code and setting up build pipelines. There were some attempts to rebuild Kafka Streams, but up until now popular languages like Python did not receive equally powerful (and maintained) stream processing frameworks. KSML provides a new declarative approach to unlock Kafka Streams to a wider audience. Using only a few simple basic rules and Python snippets, you will be able to write streaming applications in very little time.

## Language
To quickly jump to the KSML specification, use this link: https://axual.github.io/ksml/

## Project Overview
The project is divided into modules based functionality in order to be included separately depending
Expand Down
18 changes: 18 additions & 0 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
1. [Operations](#transform-operations)
* [aggregate](#aggregate)
* [convertKey](#convertkey)
* [convertKeyValue](#convertkeyvalue)
* [convertValue](#convertvalue)
* [count](#count)
* [filter](#filter)
Expand Down Expand Up @@ -134,6 +135,23 @@ via:
to: output_stream
```

### convertKeyValue

This built-in operation takes a message and converts the key and value into a given type.

|Stream Type | Returns |Parameter |Value Type| Description
|:---|:-----------------|:---|:---|:---
|KStream`<K,V>`| KStream`<KR,VR>` |`into`|string|The type to convert to

Example:
```yaml
from: input_stream
via:
- type: convertKeyValue
into: (string,avro:SensorData)
to: output_stream
```

### convertValue

This built-in operation takes a message and converts the value into a given type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -22,6 +22,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.axual.ksml.data.mapper.DataObjectConverter;
import io.axual.ksml.datagenerator.config.KSMLDataGeneratorConfig;
import io.axual.ksml.datagenerator.execution.ExecutableProducer;
import io.axual.ksml.datagenerator.execution.IntervalSchedule;
Expand Down Expand Up @@ -49,7 +50,6 @@
public class KSMLDataGenerator {
private static final Logger LOG = LoggerFactory.getLogger(KSMLDataGenerator.class);
private static final String DEFAULT_CONFIG_FILE_SHORT = "ksml-data-generator.yaml";
private static final PythonContext context = new PythonContext();
private static final IntervalSchedule<ExecutableProducer> schedule = new IntervalSchedule<>();

private static Map<String, Object> getGenericConfigs() {
Expand Down Expand Up @@ -107,16 +107,20 @@ public static void main(String[] args) {
}

// Read all producer definitions from the configured YAML files
var producers = new ProducerDefinitionFileParser(config.getProducer()).create(factory.getNotationLibrary());
var notationLibrary = factory.getNotationLibrary();
var context = new PythonContext(new DataObjectConverter(notationLibrary));
var producers = new ProducerDefinitionFileParser(config.getProducer()).create(notationLibrary, context);

// Load all functions into the Python context

// Schedule all defined producers
for (var entry : producers.entrySet()) {
var target = entry.getValue().target();
var generator = new PythonFunction(context, entry.getKey(), entry.getValue().generator());
var condition = entry.getValue().condition() != null ? new PythonFunction(context, entry.getKey() + "_producercondition", entry.getValue().condition()) : null;
var keySerde = notationLibrary.get(target.keyType.notation()).getSerde(target.keyType.dataType(), true);
var valueSerde = notationLibrary.get(target.valueType.notation()).getSerde(target.valueType.dataType(), false);
var ep = new ExecutableProducer(notationLibrary, generator, target.topic, target.keyType, target.valueType, keySerde.serializer(), valueSerde.serializer());
var ep = new ExecutableProducer(notationLibrary, generator, condition, target.topic, target.keyType, target.valueType, keySerde.serializer(), valueSerde.serializer());
schedule.schedule(entry.getValue().interval().toMillis(), ep);
LOG.info("Scheduled producers: {}", entry.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@

import java.time.Duration;

public record ProducerDefinition(FunctionDefinition generator, Duration interval, StreamDefinition target) {
public record ProducerDefinition(FunctionDefinition generator, Duration interval, FunctionDefinition condition, StreamDefinition target) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -29,5 +29,6 @@ private ProducerDSL() {
public static final String PRODUCERS_DEFINITION = "producers";
public static final String GENERATOR_ATTRIBUTE = "generator";
public static final String INTERVAL_ATTRIBUTE = "interval";
public static final String CONDITION_ATTRIBUTE = "condition";
public static final String TARGET_ATTRIBUTE = "to";
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,10 +21,12 @@
*/

import io.axual.ksml.data.mapper.DataObjectConverter;
import io.axual.ksml.data.object.DataBoolean;
import io.axual.ksml.data.object.DataObject;
import io.axual.ksml.data.object.DataTuple;
import io.axual.ksml.data.type.UserType;
import io.axual.ksml.exception.KSMLExecutionException;
import io.axual.ksml.execution.FatalError;
import io.axual.ksml.notation.NotationLibrary;
import io.axual.ksml.notation.binary.NativeDataObjectMapper;
import io.axual.ksml.user.UserFunction;
Expand All @@ -41,6 +43,7 @@
public class ExecutableProducer {
private static final Logger LOG = LoggerFactory.getLogger(ExecutableProducer.class);
private final UserFunction generator;
private final UserFunction condition;
private final String topic;
private final UserType keyType;
private final UserType valueType;
Expand All @@ -51,13 +54,15 @@ public class ExecutableProducer {

public ExecutableProducer(NotationLibrary notationLibrary,
UserFunction generator,
UserFunction condition,
String topic,
UserType keyType,
UserType valueType,
Serializer<Object> keySerializer,
Serializer<Object> valueSerializer) {
this.dataObjectConverter = new DataObjectConverter(notationLibrary);
this.generator = generator;
this.condition = condition;
this.topic = topic;
this.keyType = keyType;
this.valueType = valueType;
Expand All @@ -68,46 +73,60 @@ public ExecutableProducer(NotationLibrary notationLibrary,
public void produceMessage(Producer<byte[], byte[]> producer) {
DataObject result = generator.call();
if (result instanceof DataTuple tuple && tuple.size() == 2) {
var key = dataObjectConverter.convert(DEFAULT_NOTATION, tuple.get(0), keyType);
var value = dataObjectConverter.convert(DEFAULT_NOTATION, tuple.get(1), valueType);
var okay = true;
var key = tuple.get(0);
var value = tuple.get(1);

if (!keyType.dataType().isAssignableFrom(key.type())) {
LOG.error("Can not convert {} to topic key type {}", key.type(), keyType);
okay = false;
}
if (!valueType.dataType().isAssignableFrom(value.type())) {
LOG.error("Can not convert {} to topic value type {}", value.type(), valueType);
okay = false;
}
if (checkCondition(key, value)) {
key = dataObjectConverter.convert(DEFAULT_NOTATION, key, keyType);
value = dataObjectConverter.convert(DEFAULT_NOTATION, value, valueType);
var okay = true;

if (okay) {
var keyStr = key != null ? key.toString() : "null";
var valueStr = value != null ? value.toString() : "null";
if (key != null && !keyType.dataType().isAssignableFrom(key.type())) {
LOG.error("Can not convert {} to topic key type {}", key.type(), keyType);
okay = false;
}
if (value != null && !valueType.dataType().isAssignableFrom(value.type())) {
LOG.error("Can not convert {} to topic value type {}", value.type(), valueType);
okay = false;
}

keyStr = keyStr.replaceAll("\n", "\\\\n");
valueStr = valueStr.replaceAll("\n", "\\\\n");
LOG.info("Message: key={}, value={}", keyStr, valueStr);
if (okay) {
var keyStr = key != null ? key.toString() : "null";
var valueStr = value != null ? value.toString() : "null";

var serializedKey = keySerializer.serialize(topic, nativeMapper.fromDataObject(key));
var serializedValue = valueSerializer.serialize(topic, nativeMapper.fromDataObject(value));
ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(
topic,
serializedKey,
serializedValue
);
var future = producer.send(message);
try {
var metadata = future.get();
if (metadata != null && metadata.hasOffset()) {
LOG.info("Produced message to {}, partition {}, offset {}", metadata.topic(), metadata.partition(), metadata.offset());
} else {
LOG.error("Error producing message to topic {}", topic);
keyStr = keyStr.replaceAll("\n", "\\\\n");
valueStr = valueStr.replaceAll("\n", "\\\\n");
LOG.info("Message: key={}, value={}", keyStr, valueStr);

var serializedKey = keySerializer.serialize(topic, nativeMapper.fromDataObject(key));
var serializedValue = valueSerializer.serialize(topic, nativeMapper.fromDataObject(value));
ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(
topic,
serializedKey,
serializedValue
);
var future = producer.send(message);
try {
var metadata = future.get();
if (metadata != null && metadata.hasOffset()) {
LOG.info("Produced message to {}, partition {}, offset {}", metadata.topic(), metadata.partition(), metadata.offset());
} else {
LOG.error("Error producing message to topic {}", topic);
}
} catch (InterruptedException | ExecutionException e) {
throw new KSMLExecutionException("Could not produce to topic " + topic, e);
}
} catch (InterruptedException | ExecutionException e) {
throw new KSMLExecutionException("Could not produce to topic " + topic, e);
}
} else {
LOG.info("Condition FALSE, skipping message");
}
}
}

private boolean checkCondition(DataObject key, DataObject value) {
if (condition == null) return true;
DataObject result = condition.call(key, value);
if (result instanceof DataBoolean resultBoolean) return resultBoolean.value();
throw FatalError.executionError("Producer condition did not return a boolean value: " + condition.name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import io.axual.ksml.KSMLConfig;
import io.axual.ksml.data.schema.SchemaLibrary;
import io.axual.ksml.datagenerator.config.producer.DataGeneratorConfig;
import io.axual.ksml.datagenerator.definition.ProducerDefinition;
import io.axual.ksml.definition.parser.StreamDefinitionParser;
import io.axual.ksml.generator.YAMLDefinition;
import io.axual.ksml.generator.YAMLObjectMapper;
Expand All @@ -38,8 +40,8 @@
import io.axual.ksml.notation.xml.XmlSchemaLoader;
import io.axual.ksml.parser.MapParser;
import io.axual.ksml.parser.YamlNode;
import io.axual.ksml.datagenerator.config.producer.DataGeneratorConfig;
import io.axual.ksml.datagenerator.definition.ProducerDefinition;
import io.axual.ksml.python.PythonContext;
import io.axual.ksml.python.PythonFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,9 +52,9 @@
import java.util.Map;
import java.util.TreeMap;

import static io.axual.ksml.datagenerator.dsl.ProducerDSL.PRODUCERS_DEFINITION;
import static io.axual.ksml.dsl.KSMLDSL.FUNCTIONS_DEFINITION;
import static io.axual.ksml.dsl.KSMLDSL.STREAMS_DEFINITION;
import static io.axual.ksml.datagenerator.dsl.ProducerDSL.PRODUCERS_DEFINITION;

/**
* Generate a Kafka Streams topology from a KSML configuration, using a Python interpreter.
Expand All @@ -79,7 +81,7 @@ private List<YAMLDefinition> readDefinitions() {
return new ArrayList<>();
}

public Map<String, ProducerDefinition> create(NotationLibrary notationLibrary) {
public Map<String, ProducerDefinition> create(NotationLibrary notationLibrary, PythonContext pythonContext) {
// Register schema loaders
SchemaLibrary.registerLoader(AvroNotation.NOTATION_NAME, new AvroSchemaLoader(config.workingDirectory));
SchemaLibrary.registerLoader(CsvNotation.NOTATION_NAME, new CsvSchemaLoader(config.workingDirectory));
Expand All @@ -89,7 +91,7 @@ public Map<String, ProducerDefinition> create(NotationLibrary notationLibrary) {
List<YAMLDefinition> definitions = readDefinitions();
Map<String, ProducerDefinition> producers = new TreeMap<>();
for (YAMLDefinition definition : definitions) {
producers.putAll(generate(YamlNode.fromRoot(definition.root(), "definition"), notationLibrary));
producers.putAll(generate(YamlNode.fromRoot(definition.root(), "definition"), notationLibrary, pythonContext));
}

StringBuilder output = new StringBuilder("\n\nRegistered producers:\n");
Expand Down Expand Up @@ -129,17 +131,21 @@ private String getPrefix(String source) {
return source;
}

private Map<String, ProducerDefinition> generate(YamlNode node, NotationLibrary notationLibrary) {
private Map<String, ProducerDefinition> generate(YamlNode node, NotationLibrary notationLibrary, PythonContext pythonContext) {
if (node == null) return null;

// Set up the parse context, which will gather toplevel information on the streams topology
var context = new ProducerParseContext(notationLibrary);
var context = new ProducerParseContext(notationLibrary, pythonContext);

// Parse all defined streams
new MapParser<>("stream definition", new StreamDefinitionParser()).parse(node.get(STREAMS_DEFINITION)).forEach(context::registerStreamDefinition);

// Parse all defined functions
new MapParser<>("function definition", new TypedFunctionDefinitionParser()).parse(node.get(FUNCTIONS_DEFINITION)).forEach(context::registerFunction);
// Generate all the function code in the Python context
for (var function : context.getFunctionDefinitions().entrySet()) {
new PythonFunction(pythonContext, function.getKey(), function.getValue());
}

// Parse all defined message producers
return new HashMap<>(new MapParser<>("producer definition", new ProducerDefinitionParser(context)).parse(node.get(PRODUCERS_DEFINITION)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,14 +21,13 @@
*/


import io.axual.ksml.definition.parser.PredicateDefinitionParser;
import io.axual.ksml.definition.parser.StreamDefinitionParser;
import io.axual.ksml.parser.ReferenceOrInlineParser;
import io.axual.ksml.parser.YamlNode;
import io.axual.ksml.datagenerator.definition.ProducerDefinition;

import static io.axual.ksml.datagenerator.dsl.ProducerDSL.GENERATOR_ATTRIBUTE;
import static io.axual.ksml.datagenerator.dsl.ProducerDSL.INTERVAL_ATTRIBUTE;
import static io.axual.ksml.datagenerator.dsl.ProducerDSL.TARGET_ATTRIBUTE;
import static io.axual.ksml.datagenerator.dsl.ProducerDSL.*;

public class ProducerDefinitionParser extends ContextAwareParser<ProducerDefinition> {
public ProducerDefinitionParser(ParseContext context) {
Expand All @@ -41,6 +40,7 @@ public ProducerDefinition parse(YamlNode node) {
return new ProducerDefinition(
new ReferenceOrInlineParser<>("generator", GENERATOR_ATTRIBUTE, context.getFunctionDefinitions()::get, new GeneratorDefinitionParser()).parse(node),
parseDuration(node, INTERVAL_ATTRIBUTE),
new ReferenceOrInlineParser<>("condition", CONDITION_ATTRIBUTE, context.getFunctionDefinitions()::get, new PredicateDefinitionParser()).parse(node),
new ReferenceOrInlineParser<>("to", TARGET_ATTRIBUTE, context.getStreamDefinitions()::get, new StreamDefinitionParser()).parse(node));
}
}
Loading

0 comments on commit f8f151b

Please sign in to comment.