Pulsar Transformations is a Pulsar Function that implements commonly done transformations on the data. The intent is to provide a low-code approach, so you don't need to write code, understand Pulsar Schemas, or know one of the languages supported by Pulsar Functions to transform the data flowing in your Pulsar cluster. The goal is also that the Pulsar Transformations Function is easy to use in the Pulsar cluster without having to install and operate other software. Only basic transformations are available. For more complex use cases, such as aggregation, joins and lookups, more sophisticated tools such as SQL stream processing engines shall be used.
Currently available transformations are:
- cast: modifies the key or value schema to a target compatible schema.
- drop-fields: drops fields from structured data.
- merge-key-value: merges the fields of KeyValue records where both the key and value are structured data with the same schema type.
- unwrap-key-value: if the record is a KeyValue, extract the KeyValue's key or value and make it the record value.
- flatten: flattens structured data.
- drop: drops a record from further processing.
- compute: computes new properties, values or field values on the fly or replaces existing ones.
Pulsar Transformations requires Pulsar 2.11+ or Luna Streaming 2.10+ to run.
The TransformFunction
reads its configuration as JSON
from the Function userConfig
parameter in the format:
{
"steps": [
{
"type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
},
{
"type": "merge-key-value"
},
{
"type": "unwrap-key-value"
},
{
"type": "cast", "schema-type": "STRING"
}
]
}
The transformations are done in the order in which they appear in the steps
array.
Each step is defined by its type
and uses its own arguments.
Additionally, each step can be dynamically toggled on or off
by supplying a when
condition that evaluates to true or false.
This example config applied on a KeyValue<AVRO, AVRO>
input record with value {key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}
will return after each step:
{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}(KeyValue<AVRO, AVRO>)
|
| ”type": "drop-fields", "fields": "keyField1,keyField2”, "part": "key”
|
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
|
| "type": "merge-key-value"
|
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
|
| "type": "unwrap-key-value"
|
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO)
|
| "type": "cast", "schema-type": "STRING"
|
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING)
Some step operations like cast
or compute
involve conversions from a type to another.
When this happens the rules are:
- timestamp, date and time related object conversions assume UTC time zone if it is not explicit.
- date and time related object conversions to/from STRING use the RFC3339 format.
- timestamp related object conversions to/from LONG and DOUBLE are done using the number of milliseconds since EPOCH (1970-01-01T00:00:00Z).
- date related object conversions to/from INTEGER, LONG, FLOAT and DOUBLE are done using the number of days since EPOCH (1970-01-01).
- time related object conversions to/from INTEGER, LONG and DOUBLE are done using the number of milliseconds since midnight (00:00:00).
Transforms the data to a target compatible schema. Conversion are done using the rules described in Type conversions
Step name: cast
Parameters:
Name | Description |
---|---|
schema-type | the target schema type. |
part | when used with KeyValue data, defines if the transformation is done on the key or on the value . If null or absent the transformation applies to both the key and the value. |
UserConfig: {"steps": [{"type": "cast", "schema-type": "STRING"}]}
Input: {field1: value1, field2: value2} (AVRO)
Output: {"field1": "value1", "field2": "value2"} (STRING)
Drops fields of structured data (Currently only AVRO and JSON are supported).
Step name: drop-fields
Parameters:
Name | Description |
---|---|
fields | the list of fields to drop separated by commas , |
part | when used with KeyValue data, defines if the transformation is done on the key or on the value . If null or absent the transformation applies to both the key and the value. |
UserConfig: {"steps": [{"type": "drop-fields", "fields": "password,other"}]}
Input: {name: value1, password: value2} (AVRO)
Output: {name: value1} (AVRO)
Merges the fields of KeyValue records where both the key and value are structured types of the same schema type. (Currently only AVRO and JSON are supported).
Step name: merge-key-value
Parameters: N/A
UserConfig: {"steps": [{"type": "merge-key-value"}]}
Input: {key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>)
Output: {key={keyField: key}, value={keyField: key, valueField: value}} (KeyValue<AVRO, AVRO>)
If the record value is a KeyValue, extracts the KeyValue's key or value and make it the record value.
Step name: unwrap-key-value
Parameters:
Name | Description |
---|---|
unwrapKey | by default, the value is unwrapped. Set this parameter to true to unwrap the key instead. |
UserConfig: {"steps": [{"type": "unwrap-key-value"}]}
Input: {key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>)
Output: {valueField: value} (AVRO)
Converts structured nested data into a new single-hierarchy-level structured data. The names of the new fields are built by concatenating the intermediate level field names.
Step name: flatten
Name | Description |
---|---|
delimiter | the delimiter to use when concatenating the field names (default: _ ) |
part | when used with KeyValue data, defines if the transformation is done on the key or on the value . If null or absent the transformation applies to both the key and the value. |
Drops the record from further processing. Use in conjunction with when
to selectively drop records.
Step name: drop
Parameters:
Name | Description |
---|---|
when | by default, the record is dropped. Set this parameter to selectively choose when to drop a message. |
UserConfig: {"steps": [{"type": "drop", "when": "value.firstName == value1"}]}
Input: {firstName: value1, lastName: value2} (AVRO)
Output: N/A. Record is dropped.
UserConfig: {"steps": [{"type": "flatten"}]}
Input: {field1: {field11: value11, field12: value12}} (AVRO)
Output: {field1_field11: value11, field1_field12: value12} (AVRO)
Computes new properties, values or field values based on an expression
evaluated at runtime. If the field already exists, it will be overwritten.
Step name: compute
Parameters:
Name | Description |
---|---|
fields | an array of JSON objects describing how to calculate the field values. The JSON object represents a field as described in the next table |
Name (field) | Description |
---|---|
name | the name of the field to be computed. Prefix with key. or value. to compute the fields in the key or value parts of the message. In addition, you can compute values on the following message headers [destinationTopic , messageKey , properties. ]. Please note that properties is a map of key/value pairs that are referenced by the dot notation, for example properties.key0 |
expression | supports the Expression Language syntax. It is evaluated at runtime and the result of the evaluation is assigned to the field. |
type | the type of the computed field. This will translate to the schema type of the new field in the transformed message. The following types are currently supported [STRING , INT8 , INT16 , INT32 , INT64 , FLOAT , DOUBLE , BOOLEAN , DATE , TIME , TIMESTAMP , LOCAL_DATE_TIME , LOCAL_TIME , LOCAL_DATE , INSTANT ]. For more details about each type, please check the next table. Conversions are done using the rules described in Type conversions. The type field is not required for the message headers [destinationTopic , messageKey , properties. ] and STRING will be used. For the value and key, if it is not provided, then the type will be inferred from the result of the expression evaluation. |
optional (default: true) | if true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression. |
Name (field.type) | Input | Pulsar Schema Type | AVRO Schema Type | Expression Examples |
---|---|---|---|---|
STRING |
A unicode character sequence. | STRING |
string |
"'first name'", "fn:str(value)", "fn:concat(value, '-suffix')" |
INT8 |
An 8-bit integer. | INT8 |
int |
"127", "1 + 1" |
INT16 |
A 16-bit integer. | INT16 |
int |
"32768" |
INT32 |
A 32-bit integer. | INT32 |
int |
"2147483647" |
INT64 |
A 64-bit integer. | INT64 |
int |
"9223372036854775807" |
FLOAT |
A 32-bit floating point. | FLOAT |
float |
"340282346638528859999999999999999999999.999999", "1.1 + 1.1" |
DOUBLE |
A 64-bit floating point. | DOUBLE |
double |
"1.79769313486231570e+308" |
BOOLEAN |
true or false | BOOLEAN |
boolean |
"true", "1 == 1", "value.stringField == 'matching string'" |
DATE |
A date without a time-zone. | Not supported | date (logical) |
"'2022-10-02'", "19267" (days) |
TIME |
A time without a time-zone. | TIME |
time-millis (logical) |
"'10:15:30'", "36930000" (millis) since 00:00:00 |
TIMESTAMP |
A timestamp in UTC time-zone. | TIMESTAMP |
timestamp-millis (logical) |
"'2022-10-02T01:02:03+02:00'", "1664665323000" (millis), "fn:now()" |
INSTANT |
A timestamp in UTC time-zone. | INSTANT |
timestamp-millis (logical) |
"'2022-10-02T01:02:03+02:00'", "1664665323000" (millis), "fn:now()" |
LOCAL_DATE |
A date without a time-zone. | LOCAL_DATE |
date (logical) |
"'2022-10-02'", "19267" (days) |
LOCAL_TIME |
A time without a time-zone. | LOCAL_TIME |
time-millis (logical) |
"'10:15:30'", "36930000" (millis) since 00:00:00 |
LOCAL_DATE_TIME |
A timestamp without a time-zone. | LOCAL_DATE_TIME |
timestamp-millis (logical) |
"'2022-10-02T01:02:03+02:00'", "1664665323000" (millis) |
BYTES |
A sequence of 8-bit unsigned bytes. | BYTES |
bytes |
"'input'.bytes" |
DECIMAL |
arbitrary-precision signed decimal number of the form unscaled × 10-scale | Not supported | decimal (logical) |
"fn:decimalFromUnscaled(value, 38)", where value is a byte array containing the two's-complement representation of the unscaled integer value in big-endian byte order |
UserConfig: {"steps": [{"type": "compute", "fields":[ {"name": "key.newKeyField", "expression" : "5*3", "type": "INT32"}," {"name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING"}]} ]}
Input: {key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>)
Output: {key={keyField: key, newKeyField: 15}, value={valueField: value_suffix}} (KeyValue<AVRO, AVRO>)
UserConfig: {"steps": [{"type": "compute", "fields":[ {"name": "destinationTopic", "expression" : "'routed'"}, {"name": "properties.k1", "expression" : "'overwritten'"}, {"name": "properties.k2", "expression" : "'new'"}]} ]}
Input: {key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic: out1, propertes: {k1:v1}
Output: {key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic:routed, propertes: {k1:overwritten, k2:new}
In order to support Condition Steps and the Compute Transform, an expression language is required to evaluate the conditional step when
or the compute step expression
.
The syntax is (EL) like that uses the dot notation to access field properties or map keys.
It supports the following operators and functions:
The Expression Language supports the following operators:
- Arithmetic: +, - (binary), *, / and div, % and mod, - (unary)
- Logical: and, &&, or, ||, not, !
- Relational: ==, eq, !=, ne, <, lt, >, gt, <=, ge, >=, le.
Utility methods available under the fn
namespace.
For example, to get the current timestamp, use 'fn:now()'.
The Expression Language supports the following functions:
toDouble(input)
: Converts the input value to a DOUBLE number, If the input isnull
, it returnsnull
.toInt(input)
: Converts the input value to an INTEGER number, If the input isnull
, it returnsnull
.uppercase(input)
: Returns the stringinput
uppercased, If the input isnull
, it returnsnull
.lowercase(input)
: Returns the stringinput
lowercased, If the input isnull
, it returnsnull
.contains(input, value)
: Returns the booleantrue
ifvalue
exists ininput
. Ifinput
orvalue
isnull
, it returnsfalse
.trim(input)
: Returns theinput
string with all leading and trailing spaces removed.concat(input1, input2)
: Returns a string concatenation ofinput1
andinput2
. If either input isnull
, it is treated as an empty string.coalesce(value, valueIfNull)
: Returnsvalue
if it is notnull
, otherwise returnsvalueIfNull
.replace(input, regex, replacement)
: Replaces each substring ofinput
that matches theregex
regular expression withreplacement
. See Java's replaceAll.str(input)
: Convertsinput
to a string.toJson(input)
: Convertsinput
to a JSON string.fromJson(input)
: Parseinput
as JSON.split(input, separatorExpression)
: Split the input to a list of strings, this is internally using the String.split() function. An empty input corresponds to an empty list. The input is convered to a String using the str() function.now()
: Returns the current timestamp.timestampAdd(input, delta, unit)
: Returns a timestamp formed by addingdelta
inunit
to theinput
timestamp.input
a timestamp to add to.delta
along
amount ofunit
to add toinput
. Can be a negative value to perform subtraction.unit
the string unit of time to add or subtract. Can be one of [years
,months
,days
,hours
,minutes
,seconds
,millis
].
decimalFromUnscaled(input, scale)
: Convertsinput
to aBigDecimal
with the givenscale
.input
unscaled value of the BigDecimal. Can be any of STRING, INTEGER, LONG or Array of bytes containing the two's-complement representation in big-endian byte order.scale
the scale of theBigDecimal
to create.
decimalFromNumber(input)
: Convertsinput
to aBigDecimal
.input
value of the BigDecimal in DOUBLE or FLOAT. If INTEGER or LONG is provided, an unscaled BigDecimal value will be returned.
filter(collection, expression)
: Returns a new collection containing only the elements ofcollection
for whichexpression
istrue
. The current element is available under therecord
variable. An example is fn:filter(value.queryResults, "fn:toDouble(record.similarity) >= 0.5") For all methods, if a parameter is not in the right type, a conversion will be done using the rules described in Type conversions. For instance, you can dofn:timestampAdd('2022-10-02T01:02:03Z', '42', 'hours'.bytes)
unpack(input, fieldsList)
: Returns a map containing the elements ofinput
, for each field in thefieldList
you will see an entry in the map. If the input is a string it is converted to a list using thesplit()
function with the ',' separator
When a function returns a timestamp, its type is INSTANT
.
Each step accept an optional when
configuration that is evaluated at step execution time against current record (i.e. the as seen by
the current step in the transformation pipeline). The when
condition supports the Expression Language syntax. It provides access to the record attributes as follows:
key
: the message key or the key portion of the record in a KeyValue schema.value
: the value portion of the record in a KeyValue schema, or the message payload itself.topicName
: the optional name of the topic which the record originated from (aka. Input Topic).destinationTopic
: the name of the topic on which the transformed record will be sent (aka. Output Topic).eventTime
: the optional timestamp attached to the record from its source. For example, the original timestamp attached to the pulsar message.properties
: the optional user-defined properties attached to record
You can use the .
operator to access top level or nested properties on a schema-full key
or value
. For example, key.keyField1
or value.valueFiled1.nestedValueField
. You can also use to access different keys of the user defined properties. For example, properties.prop1
.
{
"key": {
"compound": {
"uuid": "uuidValue",
"timestamp": 1663616014
},
"value" : {
"first" : "f1",
"last" : "l1",
"rank" : 1,
"address" : {
"zipcode" : "abc-def"
}
}
}}
when | Evaluates to |
---|---|
"key.compound.uuid == 'uudValue'" |
True |
"key.compound.timestamp <= 10" |
False |
"value.first == 'f1' && value.last.toUpperCase() == 'L1' |
True |
"value.rank <= 1 && value.address.substring(0, 3) == 'abc' |
True |
- Partition Key:
key1
- Source topic:
topic1
- User defined k/v:
{"prop1": "p1", "prop2": "p2"}
- Payload (String):
Hello world!
when | Evaluates to |
---|---|
"key == 'key1' or topicName == 'topic1' " |
True |
"value == 'Hello world!'" |
True |
"properties.prop1 == 'p2'" |
False |
See the Pulsar docs for more details on how to deploy a Function.
- Create a Transformation Function providing the path to the Pulsar Transformations NAR.
pulsar-admin functions create \
--jar pulsar-transformations-2.0.0.nar \
--name my-function \
--inputs my-input-topic \
--output my-output-topic \
--user-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}, {"type": "cast", "schema-type": "STRING"}]}'
- Put the Pulsar Transformations NAR in the
functions
directory of the Pulsar Function worker (or broker).
cp pulsar-transformations-2.0.0.nar $PULSAR_HOME/functions/pulsar-transformations-2.0.0.nar
- Restart the function worker (or broker) instance or reload all the built-in functions:
pulsar-admin functions reload
- Create a Transformation Function with the admin CLI. The built-in function type is
transforms
.
pulsar-admin functions create \
--function-type transforms \
--name my-function \
--inputs my-input-topic \
--output my-output-topic \
--user-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}, {"type": "cast", "schema-type": "STRING"}]}'
This requires Datastax Luna Streaming 2.10.1.6+.
Thanks to PIP-193 it's possible to execute a function inside a sink process, removing the need of temporary topics.
- Create a Pulsar Sink instance with
transform-function
Transformation Function with the admin CLI.
pulsar-admin sinks create \
--sink-type <sink_type> \
--inputs my-input-topic \
--tenant public \
--namespace default \
--name my-sink \
--transform-function "builtin://transforms" \
--transform-function-config '{"steps": [{"type": "drop-fields", "fields": "password"}, {"type": "merge-key-value"}, {"type": "unwrap-key-value"}, {"type": "cast", "schema-type": "STRING"}]}'