Skip to content

Commit

Permalink
Merge branch 'Axual:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenvandisseldorp authored Nov 13, 2024
2 parents e41d0bc + dc72ef4 commit 85e0e54
Show file tree
Hide file tree
Showing 62 changed files with 5,832 additions and 676 deletions.
5,121 changes: 5,120 additions & 1 deletion docs/ksml-language-spec.json

Large diffs are not rendered by default.

49 changes: 49 additions & 0 deletions examples/00-example-generate-sensordata-batch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

# This example shows how to generate data and have it sent to a target topic in a given format.

functions:
generate_sensordata_message:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
# Generate some random sensor measurement data
types = { 0: { "type": "AREA", "unit": random.choice([ "m2", "ft2" ]), "value": str(random.randrange(1000)) },
1: { "type": "HUMIDITY", "unit": random.choice([ "g/m3", "%" ]), "value": str(random.randrange(100)) },
2: { "type": "LENGTH", "unit": random.choice([ "m", "ft" ]), "value": str(random.randrange(1000)) },
3: { "type": "STATE", "unit": "state", "value": random.choice([ "off", "on" ]) },
4: { "type": "TEMPERATURE", "unit": random.choice([ "C", "F" ]), "value": str(random.randrange(-100, 100)) }
}
# Build the result value using any of the above measurement types
value = { "name": key, "timestamp": str(round(time.time()*1000)), **random.choice(types) }
value["color"] = random.choice([ "black", "blue", "red", "yellow", "white" ])
value["owner"] = random.choice([ "Alice", "Bob", "Charlie", "Dave", "Evan" ])
value["city"] = random.choice([ "Amsterdam", "Xanten", "Utrecht", "Alkmaar", "Leiden" ])
if random.randrange(10) == 0:
key = None
if random.randrange(10) == 0:
value = None
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Indicate the type of key and value

producers:
# Produce 10k messages in batches of 100 messages with a 1ms interval
sensordata_avro_producer:
generator: generate_sensordata_message
interval: 1
count: 10000
batchSize: 100
to:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
48 changes: 48 additions & 0 deletions examples/00-example-generate-sensordata-binary.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

# This example shows how to generate data and have it sent to a target topic in a given format.

functions:
generate_sensordata_message:
type: generator
globalCode: |
import time
import random
sensorCounter = 0
code: |
global sensorCounter
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
# Generate some random sensor measurement data
types = { 0: { "type": "AREA", "unit": random.choice([ "m2", "ft2" ]), "value": str(random.randrange(1000)) },
1: { "type": "HUMIDITY", "unit": random.choice([ "g/m3", "%" ]), "value": str(random.randrange(100)) },
2: { "type": "LENGTH", "unit": random.choice([ "m", "ft" ]), "value": str(random.randrange(1000)) },
3: { "type": "STATE", "unit": "state", "value": random.choice([ "off", "on" ]) },
4: { "type": "TEMPERATURE", "unit": random.choice([ "C", "F" ]), "value": str(random.randrange(-100, 100)) }
}
# Build the result value using any of the above measurement types
value = { "name": key, "timestamp": str(round(time.time()*1000)), **random.choice(types) }
value["color"] = random.choice([ "black", "blue", "red", "yellow", "white" ])
value["owner"] = random.choice([ "Alice", "Bob", "Charlie", "Dave", "Evan" ])
value["city"] = random.choice([ "Amsterdam", "Xanten", "Utrecht", "Alkmaar", "Leiden" ])
if random.randrange(10) == 0:
key = None
if random.randrange(10) == 0:
value = None
expression: (key, value) # Return a message tuple with the key and value
resultType: (string, json) # Indicate the type of key and value

producers:
# This example uses the otherAvro notation, using the Apicurio Registry API and serializers.
# See the ksml-data-generator.yaml for the notation definition
sensordata_avro_producer_binary:
generator: generate_sensordata_message
interval: 3s
to:
topic: ksml_sensordata_avro_binary
keyType: string
valueType: otherAvro:SensorData
12 changes: 1 addition & 11 deletions examples/00-example-generate-sensordata.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
# $schema: https://raw.githubusercontent.com/Axual/ksml/main/docs/ksml-language-spec.json

# This example shows how to generate data and have it sent to a target topic in a given format.

Expand Down Expand Up @@ -44,13 +44,3 @@ producers:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData

# This example uses the otherAvro notation, using the Apicurio Registry API and serializers.
# See the ksml-data-generator.yaml for the notation definition
sensordata_avro_producer_binary:
generator: generate_sensordata_message
interval: 3s
to:
topic: ksml_sensordata_avro_binary
keyType: string
valueType: otherAvro:SensorData
8 changes: 0 additions & 8 deletions examples/01-example-inspect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ streams:
keyType: string
valueType: avro:SensorData
offsetResetPolicy: latest
sensor_source_avro_binary:
topic: ksml_sensordata_avro_binary
keyType: string
valueType: otherAvro:SensorData
sensor_source_csv:
topic: ksml_sensordata_csv
keyType: string
Expand Down Expand Up @@ -40,10 +36,6 @@ pipelines:
from: sensor_source_avro
forEach:
code: log_message(key, value, format="AVRO")
consume_avro_binary:
from: sensor_source_avro_binary
forEach:
code: log_message(key, value, format="OTHER_AVRO")
consume_csv:
from: sensor_source_csv
forEach:
Expand Down
44 changes: 44 additions & 0 deletions examples/19-example-performance-measurement.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json

# This example behaves similarly to example 2 (copy) but includes Python code to measure KSML's performance. It does
# so by storing the message count and startup timestamp in global variables, and outputting log statements every 100
# #messages, containing #messages processed, #seconds running since first message came in, and average #msg/sec.

streams:
sensor_source:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
offsetResetPolicy: latest
sensor_copy:
topic: ksml_sensordata_copy
keyType: string
valueType: avro:SensorData

pipelines:
main:
from: sensor_source
via:
# Use a PEEK operation to initialize the global messageCount and startTime
- type: peek
forEach:
globalCode: |
from datetime import datetime
messageCount, startTime = 0, 0
code: |
# Declare global variables, since we are updating them below
global messageCount, startTime
if messageCount == 0:
startTime = datetime.now()
messageCount += 1
# Output performance thus far, done in separate PEEK to allow easy insertion of other operations above
- type: peek
forEach:
code: |
# No need to include the global statement here, since we only read and don't update the global variables
# For every 100 messages that we process, we output a log statement with performance indication so far
if messageCount % 100 == 0:
# Prevent division by zero by using 1 second as minimum
runtime = max(1, (datetime.now() - startTime).total_seconds())
log.warn("Processed {} messages in {} seconds = {} msg/sec", messageCount, runtime, round(messageCount / runtime, 2))
to: sensor_copy
9 changes: 4 additions & 5 deletions examples/ksml-data-generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ksml:
## Below this line, specify properties to be passed into Confluent's KafkaAvroSerializer and KafkaAvroDeserializer
config:
# The example uses Apicurio, using the Apicurio Confluent Compatibility URL
schema.registry.url: http://schema_registry:8081/apis/ccompat/v7
schema.registry.url: http://schema-registry:8081/apis/ccompat/v7
auto.register.schemas: true
normalize.schemas: true
# Below is an example SSL configuration for Confluent Serialization library
Expand Down Expand Up @@ -81,15 +81,16 @@ ksml:
# apicurio.registry.request.ssl.truststore.type: JKS
# apicurio.registry.request.ssl.truststore.password: password


enableProducers: true # Set to true to allow producer definitions to be parsed in the KSML definitions and be executed.
enablePipelines: false # Set to true to allow pipeline definitions to be parsed in the KSML definitions and be executed.

# Section where you specify which KSML definitions to load, parse and execute.
definitions:
# Format is <namespace>: <ksml_definition_filename>
# generate_alert_setting: 00-example-generate-alertsettings.yaml
generate_alert_setting: 00-example-generate-alertsettings.yaml
generate_sensor_data: 00-example-generate-sensordata.yaml
# generate_sensor_data_batch: 00-example-generate-sensordata-batch.yaml
# generate_sensor_data_binary: 00-example-generate-sensordata-binary.yaml

# This setup connects to the Kafka broker and schema registry started with the example docker-compose file
# These examples are intended to run from a inside a container on the same network
Expand Down Expand Up @@ -118,12 +119,10 @@ kafka:
# These patterns are resolved into the actual name used on Kafka using the values in this configuration map
# and the topic names specified in the definition YAML files


# tenant: "ksmldemo"
# instance: "dta"
# environment: "dev"
# topic.pattern: "{tenant}-{instance}-{environment}-{topic}"
# # Results in Kafka topic ksmldemo-dta-dev-<topic name from KSML definition YAML>
# group.id.pattern: "{tenant}-{instance}-{environment}-{group.id}"
# transactional.id.pattern: "{tenant}-{instance}-{environment}-{transactional.id}"

4 changes: 2 additions & 2 deletions examples/ksml-runner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ksml:
## Below this line, specify properties to be passed into Confluent's KafkaAvroSerializer and KafkaAvroDeserializer
config:
# Link to Apicurio Confluent Compatibility URL
schema.registry.url: http://schema_registry:8081/apis/ccompat/v7
schema.registry.url: http://schema-registry:8081/apis/ccompat/v7
auto.register.schemas: true
normalize.schemas: true
# Below is an example SSL configuration for Confluent Serialization library
Expand Down Expand Up @@ -105,6 +105,7 @@ ksml:
# transform_metadata: 16-example-transform-metadata.yaml
# inspect_with_metrics: 17-example-inspect-with-metrics.yaml
# timestamp_extractor: 18-example-timestamp-extractor.yaml
# performance-measurement: 19-example-performance-measurement.yaml

# This setup connects to the Kafka broker and schema registry started with the example docker-compose file
# These examples are intended to run from a inside a container on the same network
Expand All @@ -119,7 +120,6 @@ kafka:
auto.offset.reset: earliest
acks: all


# These are Kafka SSL configuration properties. Check the documentation at1
# Check the documentation at https://kafka.apache.org/documentation/#producerconfigs for more properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.axual.ksml.data.mapper.DataSchemaMapper;
import io.axual.ksml.data.object.DataList;
import io.axual.ksml.data.object.DataObject;
import io.axual.ksml.data.schema.DataField;
import io.axual.ksml.data.schema.DataSchema;
import io.axual.ksml.data.schema.DataValue;
Expand All @@ -48,7 +49,14 @@ public DataSchema toDataSchema(String namespace, String name, String value) {
private DataSchema toDataSchema(String namespace, String name, DataList fieldNames) {
List<DataField> fields = new ArrayList<>();
for (var fieldName : fieldNames) {
fields.add(new DataField(fieldName.toString(), DataSchema.create(DataSchema.Type.STRING), fieldName.toString(), NO_INDEX, true, false, new DataValue("")));
fields.add(new DataField(
fieldName.toString(DataObject.Printer.INTERNAL),
DataSchema.create(DataSchema.Type.STRING),
fieldName.toString(DataObject.Printer.INTERNAL),
NO_INDEX,
true,
false,
new DataValue("")));
}
return new StructSchema(namespace, name, "CSV schema", fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void elementFromDataObject(ElementCreator elementCreator, Element elemen
return;
}
if (value != null) {
element.setTextContent(value.toString());
element.setTextContent(value.toString(DataObject.Printer.INTERNAL));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,22 @@ private DataField parseField(DataStruct fieldStruct) {
var fieldType = fieldStruct.get(ATTRIBUTES_ELEMENT_NAME) instanceof DataStruct attributeStruct ? attributeStruct.get(TYPE_NAME) : null;
if (fieldType instanceof DataString fieldTypeString) {
var type = fieldTypeString.value().contains(":") ? fieldTypeString.value().substring(fieldTypeString.value().indexOf(":") + 1) : fieldTypeString.value();
return simpleField(fieldName.toString(), type);
return simpleField(fieldName.toString(DataObject.Printer.INTERNAL), type);
} else {
// Field type is not specified, so dig down into the elements below to find out the type
var complexTypeElement = fieldStruct.get(COMPLEX_TYPE_NAME);
if (complexTypeElement instanceof DataStruct complexTypeStruct) {
var sequenceElement = complexTypeStruct.get(SEQUENCE_NAME);
if (sequenceElement instanceof DataStruct sequenceStruct) {
var fields = parseFields(sequenceStruct);
return new DataField(fieldName.toString(), new StructSchema(null, fieldName.toString(), "Converted from XSD", fields), null);
return new DataField(
fieldName.toString(DataObject.Printer.INTERNAL),
new StructSchema(
null,
fieldName.toString(DataObject.Printer.INTERNAL),
"Converted from XSD",
fields),
null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static boolean exists(String notation) {
return notations.containsKey(notation);
}

public static Notation notation(String notation) {
public static Notation get(String notation) {
var result = notation != null ? notations.get(notation) : null;
if (result != null) return result;
throw new DataException("Data notation is not registered in the NotationLibrary: " + (notation != null ? notation : "null"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.axual.ksml.data.type.UnionType;

public class JsonDataObjectConverter implements NotationConverter {
private static final JsonDataObjectMapper DATA_OBJECT_MAPPER = new JsonDataObjectMapper();
private static final JsonDataObjectMapper DATA_OBJECT_MAPPER = new JsonDataObjectMapper(false);

@Override
public DataObject convert(DataObject value, UserType targetType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@

public class JsonDataObjectMapper implements DataObjectMapper<String> {
private static final NativeDataObjectMapper NATIVE_MAPPER = new NativeDataObjectMapper();
private static final StringMapper<Object> STRING_MAPPER = new JsonStringMapper();
private final StringMapper<Object> stringMapper;

public JsonDataObjectMapper(boolean prettyPrint) {
stringMapper = new JsonStringMapper(prettyPrint);
}

@Override
public DataObject toDataObject(DataType expected, String value) {
var object = STRING_MAPPER.fromString(value);
var object = stringMapper.fromString(value);
return NATIVE_MAPPER.toDataObject(expected, object);
}

@Override
public String fromDataObject(DataObject value) {
var object = NATIVE_MAPPER.fromDataObject(value);
return STRING_MAPPER.toString(object);
return stringMapper.toString(object);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

@Slf4j
public class JsonSchemaLoader extends SchemaLoader {
private static final JsonSchemaMapper MAPPER = new JsonSchemaMapper();
private static final JsonSchemaMapper MAPPER = new JsonSchemaMapper(false);

public JsonSchemaLoader(String schemaDirectory) {
super("JSON", schemaDirectory, ".json");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import static io.axual.ksml.data.schema.DataField.NO_INDEX;

public class JsonSchemaMapper implements DataSchemaMapper<String> {
private static final JsonDataObjectMapper MAPPER = new JsonDataObjectMapper();
private static final String TITLE_NAME = "title";
private static final String DESCRIPTION_NAME = "description";
private static final String TYPE_NAME = "type";
Expand All @@ -55,11 +54,16 @@ public class JsonSchemaMapper implements DataSchemaMapper<String> {
private static final String NUMBER_TYPE = "number";
private static final String OBJECT_TYPE = "object";
private static final String STRING_TYPE = "string";
private final JsonDataObjectMapper mapper;

public JsonSchemaMapper(boolean prettyPrint) {
mapper = new JsonDataObjectMapper(prettyPrint);
}

@Override
public DataSchema toDataSchema(String namespace, String name, String value) {
// Convert JSON to internal DataObject format
var schema = MAPPER.toDataObject(value);
var schema = mapper.toDataObject(value);
if (schema instanceof DataStruct schemaStruct) {
return toDataSchema(namespace, name, schemaStruct);
}
Expand Down Expand Up @@ -135,7 +139,7 @@ public String fromDataSchema(DataSchema schema) {
final var result = fromDataSchema(structSchema);
// First translate the schema into DataObjects
// The use the mapper to convert it into JSON
return MAPPER.fromDataObject(result);
return mapper.fromDataObject(result);
}
return null;
}
Expand Down
Loading

0 comments on commit 85e0e54

Please sign in to comment.