Skip to content

Commit

Permalink
Fix Utf8 coercions (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
aymkhalil authored Feb 16, 2023
1 parent 5649bbf commit 240ca1e
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.datastax.oss.pulsar.functions.transforms;

import com.datastax.oss.pulsar.functions.transforms.jstl.CustomTypeConverter;
import com.datastax.oss.pulsar.functions.transforms.jstl.JstlTypeConverter;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
Expand Down Expand Up @@ -50,7 +50,7 @@ public void process(TransformContext transformContext) {
}

private Object convertValue(Object originalValue, SchemaType schemaType) {
return CustomTypeConverter.INSTANCE.convert(originalValue, getJavaType(schemaType));
return JstlTypeConverter.INSTANCE.coerceToType(originalValue, getJavaType(schemaType));
}

private Class<?> getJavaType(SchemaType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import static org.apache.pulsar.common.schema.SchemaType.AVRO;

import com.datastax.oss.pulsar.functions.transforms.jstl.CustomTypeConverter;
import com.datastax.oss.pulsar.functions.transforms.jstl.JstlTypeConverter;
import com.datastax.oss.pulsar.functions.transforms.model.ComputeField;
import com.datastax.oss.pulsar.functions.transforms.model.ComputeFieldType;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -287,7 +287,7 @@ private Object getAvroValue(Schema schema, Object value) {

private Long getAvroTimestampMillis(Object value, LogicalType logicalType) {
validateLogicalType(value, logicalType, Instant.class, Timestamp.class, LocalDateTime.class);
return CustomTypeConverter.INSTANCE.convert(value, Long.class);
return JstlTypeConverter.INSTANCE.coerceToType(value, Long.class);
}

private Integer getAvroDate(Object value, LogicalType logicalType) {
Expand All @@ -299,7 +299,7 @@ private Integer getAvroDate(Object value, LogicalType logicalType) {

private Integer getAvroTimeMillis(Object value, LogicalType logicalType) {
validateLogicalType(value, logicalType, Time.class, LocalTime.class);
return CustomTypeConverter.INSTANCE.convert(value, Integer.class);
return JstlTypeConverter.INSTANCE.coerceToType(value, Integer.class);
}

private LogicalType getLogicalType(Schema schema) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static String replace(Object input, Object regex, Object replacement) {
}

public static String toString(Object input) {
return CustomTypeConverter.INSTANCE.coerceToString(input);
return JstlTypeConverter.INSTANCE.coerceToString(input);
}

public static Instant now() {
Expand All @@ -77,7 +77,7 @@ public static Instant timestampAdd(Object input, Object delta, Object unit) {
}

ChronoUnit chronoUnit;
switch (CustomTypeConverter.INSTANCE.coerceToString(unit)) {
switch (JstlTypeConverter.INSTANCE.coerceToString(unit)) {
case "years":
chronoUnit = ChronoUnit.YEARS;
break;
Expand Down Expand Up @@ -109,14 +109,14 @@ public static Instant timestampAdd(Object input, Object delta, Object unit) {
+ ". Should be one of [years, months, days, hours, minutes, seconds, millis]");
}
if (chronoUnit == ChronoUnit.MONTHS || chronoUnit == ChronoUnit.YEARS) {
return CustomTypeConverter.INSTANCE
return JstlTypeConverter.INSTANCE
.coerceToOffsetDateTime(input)
.plus(CustomTypeConverter.INSTANCE.coerceToLong(delta), chronoUnit)
.plus(JstlTypeConverter.INSTANCE.coerceToLong(delta), chronoUnit)
.toInstant();
}
return CustomTypeConverter.INSTANCE
return JstlTypeConverter.INSTANCE
.coerceToInstant(input)
.plus(CustomTypeConverter.INSTANCE.coerceToLong(delta), chronoUnit);
.plus(JstlTypeConverter.INSTANCE.coerceToLong(delta), chronoUnit);
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Date;
import java.util.Map;
import org.apache.avro.util.Utf8;
import org.apache.el.lang.ELSupport;
import org.apache.el.util.MessageFactory;
import org.apache.pulsar.client.api.Schema;

Expand All @@ -47,7 +48,7 @@ protected Boolean coerceToBoolean(Object value) {
if (value instanceof byte[]) {
return Schema.BOOL.decode((byte[]) value);
}
return null;
return ELSupport.coerceToBoolean(null, value, false);
}

protected Double coerceToDouble(Object value) {
Expand All @@ -74,7 +75,7 @@ protected Double coerceToDouble(Object value) {
Instant instant = coerceToInstant(value);
return (double) instant.getEpochSecond() * 1000 + (double) instant.getNano() / 1_000_000;
}
return null;
return ELSupport.coerceToNumber(null, value, Double.class).doubleValue();
}

protected Float coerceToFloat(Object value) {
Expand All @@ -84,7 +85,7 @@ protected Float coerceToFloat(Object value) {
if (value instanceof LocalDate) {
return (float) (((LocalDate) value).toEpochDay());
}
return null;
return ELSupport.coerceToNumber(null, value, Double.class).floatValue();
}

protected Long coerceToLong(Object value) {
Expand All @@ -106,7 +107,7 @@ protected Long coerceToLong(Object value) {
if (value instanceof TemporalAccessor) {
return coerceToInstant(value).toEpochMilli();
}
return null;
return ELSupport.coerceToNumber(null, value, Double.class).longValue();
}

protected Integer coerceToInteger(Object value) {
Expand All @@ -122,21 +123,21 @@ protected Integer coerceToInteger(Object value) {
if (value instanceof LocalDate) {
return Math.toIntExact(((LocalDate) value).toEpochDay());
}
return null;
return ELSupport.coerceToNumber(null, value, Double.class).intValue();
}

protected Short coerceToShort(Object value) {
if (value instanceof byte[]) {
return Schema.INT16.decode((byte[]) value);
}
return null;
return ELSupport.coerceToNumber(null, value, Double.class).shortValue();
}

protected Byte coerceToByte(Object value) {
if (value instanceof byte[]) {
return Schema.INT8.decode((byte[]) value);
}
return null;
return ELSupport.coerceToType(null, value, Byte.class);
}

protected String coerceToString(Object value) {
Expand All @@ -149,66 +150,66 @@ protected String coerceToString(Object value) {
if (value instanceof byte[]) {
return Schema.STRING.decode((byte[]) value);
}
return null;
return ELSupport.coerceToString(null, value);
}

protected Object coerceToType(Object value, Class<?> type) {
public <T> T coerceToType(Object value, Class<T> type) {
if (value == null) {
return null;
}
if (value instanceof Utf8) {
return coerceToType(value.toString(), type);
}
if (type == Boolean.class) {
return coerceToBoolean(value);
return (T) coerceToBoolean(value);
}
if (type == Double.class) {
return coerceToDouble(value);
return (T) coerceToDouble(value);
}
if (type == Float.class) {
return coerceToFloat(value);
return (T) coerceToFloat(value);
}
if (type == Long.class) {
return coerceToLong(value);
return (T) coerceToLong(value);
}
if (type == Short.class) {
return coerceToShort(value);
return (T) coerceToShort(value);
}
if (type == Integer.class) {
return coerceToInteger(value);
return (T) coerceToInteger(value);
}
if (type == Byte.class) {
return coerceToByte(value);
return (T) coerceToByte(value);
}
if (type == String.class) {
return coerceToString(value);
return (T) coerceToString(value);
}
if (type == byte[].class) {
return coerceToBytes(value);
return (T) coerceToBytes(value);
}
if (type == Timestamp.class) {
return coerceToTimestamp(value);
return (T) coerceToTimestamp(value);
}
if (type == Time.class) {
return coerceToTime(value);
return (T) coerceToTime(value);
}
if (type == Date.class) {
return coerceToDate(value);
return (T) coerceToDate(value);
}
if (type == LocalDateTime.class) {
return coerceToLocalDateTime(value);
return (T) coerceToLocalDateTime(value);
}
if (type == LocalDate.class) {
return coerceToLocalDate(value);
return (T) coerceToLocalDate(value);
}
if (type == LocalTime.class) {
return coerceToLocalTime(value);
return (T) coerceToLocalTime(value);
}
if (type == Instant.class) {
return coerceToInstant(value);
return (T) coerceToInstant(value);
}
if (type == OffsetDateTime.class) {
return coerceToOffsetDateTime(value);
return (T) coerceToOffsetDateTime(value);
}
return null;
}
Expand Down Expand Up @@ -455,7 +456,7 @@ public <T> T convertToType(ELContext elContext, Object value, Class<T> type) {
elContext.setPropertyResolved(true);
return null;
}
Object coercedValue = coerceToType(value, type);
T coercedValue = coerceToType(value, type);
elContext.setPropertyResolved(coercedValue != null);
return (T) coercedValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void testAvro() throws Exception {
recordSchemaBuilder.field("date").type(SchemaType.DATE);
recordSchemaBuilder.field("timestamp").type(SchemaType.TIMESTAMP);
recordSchemaBuilder.field("time").type(SchemaType.TIME);
recordSchemaBuilder.field("integerStr").type(SchemaType.STRING);

SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
GenericSchema<GenericRecord> genericSchema = Schema.generic(schemaInfo);
Expand All @@ -107,6 +108,7 @@ void testAvro() throws Exception {
.set("date", (int) LocalDate.of(2023, 1, 2).toEpochDay())
.set("timestamp", Instant.parse("2023-01-02T23:04:05.006Z").toEpochMilli())
.set("time", (int) (LocalTime.parse("23:04:05.006").toNanoOfDay() / 1_000_000))
.set("integerStr", "13360")
.build();

Record<GenericObject> record = new Utils.TestRecord<>(genericSchema, genericRecord, "test-key");
Expand Down Expand Up @@ -136,6 +138,12 @@ void testAvro() throws Exception {
.expression("value.time")
.type(ComputeFieldType.STRING)
.build());
fields.add(
ComputeField.builder()
.scopedName("value.integer")
.expression("value.integerStr")
.type(ComputeFieldType.INT32)
.build());
ComputeStep step = ComputeStep.builder().fields(fields).build();
Record<?> outputRecord = Utils.process(record, step);
assertEquals(outputRecord.getKey().orElse(null), "test-key");
Expand All @@ -146,6 +154,7 @@ void testAvro() throws Exception {
assertEquals(read.get("dateStr"), new Utf8("2023-01-02"));
assertEquals(read.get("timestampStr"), new Utf8("2023-01-02T23:04:05.006Z"));
assertEquals(read.get("timeStr"), new Utf8("23:04:05.006"));
assertEquals(read.get("integer"), 13360);

assertTrue(read.hasField("newStringField"));
assertEquals(read.getSchema().getField("newStringField").schema(), STRING_SCHEMA);
Expand Down
Loading

0 comments on commit 240ca1e

Please sign in to comment.