-
Notifications
You must be signed in to change notification settings - Fork 655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP - Decimal support #294
Open
simplesteph
wants to merge
7
commits into
confluentinc:master
Choose a base branch
from
simplesteph:decimal-support
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
1285799
WIP - working POC to transform decimals to bytes
simplesteph aed0de8
WIP - recursion partially done - not all big decimals are correctly c…
simplesteph 98f3988
quick fix on numbers missing decimal points
simplesteph 6a793fa
working version for sending decimals and retrieving decimals
simplesteph bf267ec
removed comments
simplesteph e7bea71
reverted to java 7
simplesteph fcfbbb3
optimisations and naming changes thanks to @kdrakon
simplesteph File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
21 changes: 21 additions & 0 deletions
21
src/main/java/io/confluent/kafkarest/converters/BigDecimalDecoder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package io.confluent.kafkarest.converters; | ||
|
||
import java.math.BigDecimal; | ||
import java.math.BigInteger; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.Charset; | ||
|
||
public class BigDecimalDecoder { | ||
|
||
// see: https://github.com/apache/avro/blob/33d495840c896b693b7f37b5ec786ac1acacd3b4/lang/java/avro/src/main/java/org/apache/avro/Conversions.java#L79 | ||
public static BigDecimal fromBytes(ByteBuffer value, int scale) { | ||
byte[] bytes = new byte[value.remaining()]; | ||
value.get(bytes); | ||
return new BigDecimal(new BigInteger(bytes), scale); | ||
} | ||
|
||
public static BigDecimal fromEncodedString(String encoded, int scale){ | ||
return fromBytes(ByteBuffer.wrap(encoded.getBytes(Charset.forName("ISO-8859-1"))), scale); | ||
} | ||
|
||
} |
18 changes: 18 additions & 0 deletions
18
src/main/java/io/confluent/kafkarest/converters/BigDecimalEncoder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package io.confluent.kafkarest.converters; | ||
|
||
import java.math.BigDecimal; | ||
import java.nio.charset.Charset; | ||
|
||
public class BigDecimalEncoder { | ||
|
||
static public byte[] toByteArray(BigDecimal bigDecimal, int scale) { | ||
// will throw an error if rounding is necessary (meaning scale is wrong) | ||
return bigDecimal.setScale(scale, BigDecimal.ROUND_UNNECESSARY).unscaledValue().toByteArray(); | ||
} | ||
|
||
|
||
static public String toEncodedString(BigDecimal bigDecimal, int scale, int precision) { | ||
return new String(toByteArray(bigDecimal,scale), Charset.forName("ISO-8859-1")); | ||
} | ||
|
||
} |
8 changes: 8 additions & 0 deletions
8
src/main/java/io/confluent/kafkarest/converters/JsonNodeConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package io.confluent.kafkarest.converters; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import org.apache.avro.Schema; | ||
|
||
interface JsonNodeConverter { | ||
JsonNode convert(JsonNode jsonNode, Schema schema); | ||
} |
76 changes: 76 additions & 0 deletions
76
src/main/java/io/confluent/kafkarest/converters/JsonNodeConverters.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package io.confluent.kafkarest.converters; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.node.*; | ||
import org.apache.avro.Schema; | ||
|
||
import java.math.BigDecimal; | ||
import java.util.Iterator; | ||
import java.util.Objects; | ||
|
||
public class JsonNodeConverters { | ||
|
||
|
||
static JsonNodeConverter withDecimalNodesAsText = new JsonNodeConverter() { | ||
@Override | ||
public JsonNode convert(JsonNode jsonNode, Schema schema) { | ||
if (jsonNode instanceof DoubleNode) { | ||
BigDecimal bd = jsonNode.decimalValue(); | ||
int scale = schema.getJsonProps().get("scale").asInt(); | ||
int precision = schema.getJsonProps().get("precision").asInt(); | ||
String bdBytesAsUtf8 = BigDecimalEncoder.toEncodedString(bd, scale, precision); | ||
return new TextNode(bdBytesAsUtf8); | ||
} else { | ||
return jsonNode; | ||
} | ||
} | ||
}; | ||
|
||
static JsonNodeConverter withTextNodesAsDecimal = new JsonNodeConverter() { | ||
@Override | ||
public JsonNode convert(JsonNode jsonNode, Schema schema) { | ||
if (jsonNode instanceof TextNode) { | ||
String encodedBigDecimal = jsonNode.asText(); | ||
int scale = schema.getJsonProps().get("scale").asInt(); | ||
BigDecimal bd = BigDecimalDecoder.fromEncodedString(encodedBigDecimal, scale); | ||
return new DecimalNode(bd); | ||
} else { | ||
return jsonNode; | ||
} | ||
} | ||
}; | ||
|
||
// private static JsonNode transformJsonNode(JsonNode jsonNode, Schema schema) { | ||
static JsonNode transformJsonNode(JsonNode jsonNode, Schema schema, JsonNodeConverter jsonNodeConverter) { | ||
if (schema.getType() == Schema.Type.BYTES && schema.getJsonProps().containsKey("logicalType") && | ||
Objects.equals(schema.getJsonProps().get("logicalType").asText(), "decimal")) { | ||
return jsonNodeConverter.convert(jsonNode, schema); | ||
} else if (schema.getType() == Schema.Type.RECORD) { | ||
for (Schema.Field s : schema.getFields()) { | ||
JsonNode transformed = transformJsonNode(jsonNode.get(s.name()), s.schema(), jsonNodeConverter); | ||
((ObjectNode) jsonNode).set(s.name(), transformed); | ||
} | ||
} else if (schema.getType() == Schema.Type.UNION) { | ||
if (jsonNode.has("bytes") && jsonNode.get("bytes").isNumber()) { | ||
for (Schema subSchema : schema.getTypes()) { | ||
if (subSchema.getType() == Schema.Type.BYTES && subSchema.getJsonProps().containsKey("logicalType") && | ||
Objects.equals(subSchema.getJsonProps().get("logicalType").asText(), "decimal")) { | ||
JsonNode transformed = transformJsonNode(jsonNode.get("bytes"), subSchema, jsonNodeConverter); | ||
((ObjectNode) jsonNode).set("bytes", transformed); | ||
} | ||
} | ||
} | ||
} else if (schema.getType() == Schema.Type.ARRAY) { | ||
Schema subSchema = schema.getElementType(); | ||
int i = 0; | ||
for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext(); ) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use instead
|
||
JsonNode elem = it.next(); | ||
JsonNode transformed = transformJsonNode(elem, subSchema, jsonNodeConverter); | ||
((ArrayNode) jsonNode).set(i, transformed); | ||
i += 1; | ||
} | ||
} | ||
// TODO: More cases - missing MAP and ENUM | ||
return jsonNode; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,8 @@ | |
import com.fasterxml.jackson.databind.node.JsonNodeFactory; | ||
import com.fasterxml.jackson.databind.node.TextNode; | ||
|
||
import io.confluent.kafkarest.converters.BigDecimalDecoder; | ||
import io.confluent.kafkarest.converters.BigDecimalEncoder; | ||
import org.apache.avro.Schema; | ||
import org.apache.avro.generic.GenericArray; | ||
import org.apache.avro.generic.GenericData; | ||
|
@@ -29,6 +31,7 @@ | |
import org.apache.avro.util.Utf8; | ||
import org.junit.Test; | ||
|
||
import java.math.BigDecimal; | ||
import java.nio.ByteBuffer; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
|
@@ -61,6 +64,7 @@ public class AvroConverterTest { | |
+ " {\"name\": \"float\", \"type\": \"float\"},\n" | ||
+ " {\"name\": \"double\", \"type\": \"double\"},\n" | ||
+ " {\"name\": \"bytes\", \"type\": \"bytes\"},\n" | ||
+ " {\"name\": \"decimal\", \"type\": { \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 5, \"scale\": 2} },\n" | ||
+ " {\"name\": \"string\", \"type\": \"string\", \"aliases\": [\"string_alias\"]},\n" | ||
+ " {\"name\": \"null_default\", \"type\": \"null\", \"default\": null},\n" | ||
+ " {\"name\": \"boolean_default\", \"type\": \"boolean\", \"default\": false},\n" | ||
|
@@ -106,6 +110,43 @@ public class AvroConverterTest { | |
+ "}" | ||
); | ||
|
||
private static final Schema decimalSchema = new Schema.Parser().parse( | ||
"{\"type\": \"record\",\n" | ||
+ " \"name\": \"testDecimal\",\n" | ||
+ " \"fields\": [\n" | ||
+ " {\"name\": \"decimal\", \"type\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } }\n" | ||
+ "]}" | ||
); | ||
|
||
|
||
private static final Schema decimalUnionNullSchema = new Schema.Parser().parse( | ||
"{\"type\": \"record\",\n" | ||
+ " \"name\": \"testDecimal\",\n" | ||
+ " \"fields\": [\n" | ||
+ " {\"name\": \"decimal\", \"type\": [\"null\",{\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 }] }\n" | ||
+ "]}" | ||
); | ||
|
||
|
||
private static final Schema nestedDecimalSchema = new Schema.Parser().parse( | ||
"{\"type\": \"record\",\n" | ||
+ " \"name\": \"testDecimal\",\n" | ||
+ " \"fields\": [\n" | ||
+ " {\"name\": \"nested\", \"type\":\n" | ||
+ " {\"type\": \"record\", \"name\":\"nestedRecord\", \"fields\":[\n" | ||
+ " {\"name\": \"decimal\", \"type\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } }]}}\n" | ||
+ "]}" | ||
); | ||
|
||
|
||
private static final Schema decimalArraySchema = new Schema.Parser().parse( | ||
"{\"namespace\": \"namespace\",\n" | ||
+ " \"type\": \"array\",\n" | ||
+ " \"name\": \"test\",\n" | ||
+ " \"items\": {\"type\":\"bytes\", \"logicalType\": \"decimal\", \"precision\" : 5,\"scale\" : 2 } \n" | ||
+ "}" | ||
); | ||
|
||
@Test | ||
public void testPrimitiveTypesToAvro() { | ||
Object result = AvroConverter.toAvro(null, createPrimitiveSchema("null")); | ||
|
@@ -188,6 +229,7 @@ public void testRecordToAvro() { | |
+ " \"float\": 23.4,\n" | ||
+ " \"double\": 800.25,\n" | ||
+ " \"bytes\": \"hello\",\n" | ||
+ " \"decimal\": 123.45,\n" | ||
+ " \"string\": \"string\",\n" | ||
+ " \"null_default\": null,\n" | ||
+ " \"boolean_default\": false,\n" | ||
|
@@ -210,6 +252,7 @@ public void testRecordToAvro() { | |
assertEquals(800.25, resultRecord.get("double")); | ||
assertEquals(EntityUtils.encodeBase64Binary("hello".getBytes()), | ||
EntityUtils.encodeBase64Binary(((ByteBuffer) resultRecord.get("bytes")).array())); | ||
assertEquals(new BigDecimal("123.45"), BigDecimalDecoder.fromBytes((ByteBuffer) resultRecord.get("decimal"), 2)); | ||
assertEquals("string", resultRecord.get("string").toString()); | ||
// Nothing to check with default values, just want to make sure an exception wasn't thrown | ||
// when they values weren't specified for their fields. | ||
|
@@ -263,6 +306,76 @@ public void testEnumToAvro() { | |
// serialization. | ||
} | ||
|
||
@Test | ||
public void testDecimalToAvro(){ | ||
// this has been tested for numbers ranging from -10000.99 to 10000.99 | ||
for (int i = -100; i <= 100; i++){ | ||
for (int j = 0; j <= 99; j++){ | ||
BigDecimal numberBigDecimal = new BigDecimal(i + (float) j / 100); | ||
numberBigDecimal = numberBigDecimal .setScale(2, BigDecimal.ROUND_HALF_UP); | ||
String decimal = numberBigDecimal.toString(); | ||
|
||
Object result = AvroConverter.toAvro( | ||
TestUtils.jsonTree(String.format("{\"decimal\": %s}", decimal)), | ||
decimalSchema); | ||
|
||
ByteBuffer byteBuffer = ((ByteBuffer) ((GenericData.Record) result).get("decimal")); | ||
int scale = decimalSchema.getField("decimal").schema().getJsonProp("scale").asInt(); | ||
|
||
BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale); | ||
assertEquals(new BigDecimal(decimal), expected ); | ||
} | ||
} | ||
|
||
} | ||
|
||
|
||
@Test | ||
public void testDecimalUnionNullToAvro(){ | ||
String decimal = "123.45"; | ||
Object result = AvroConverter.toAvro( | ||
TestUtils.jsonTree(String.format("{\"decimal\": {\"bytes\": %s }}", decimal)), | ||
decimalUnionNullSchema); | ||
|
||
ByteBuffer byteBuffer = ((ByteBuffer) ((GenericData.Record) result).get("decimal")); | ||
int scale = decimalUnionNullSchema.getField("decimal").schema() | ||
.getTypes().get(1).getJsonProp("scale").asInt(); | ||
|
||
BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale); | ||
assertEquals(new BigDecimal(decimal), expected); | ||
|
||
} | ||
|
||
@Test | ||
public void testNestedDecimalToAvro(){ | ||
String decimal = "123.45"; | ||
|
||
Object result = AvroConverter.toAvro( | ||
TestUtils.jsonTree(String.format("{\"nested\": {\"decimal\": %s }}", decimal)), | ||
nestedDecimalSchema); | ||
|
||
ByteBuffer byteBuffer = (ByteBuffer) ((GenericData.Record) ((GenericData.Record) result).get("nested")).get("decimal"); | ||
int scale = 2; | ||
|
||
BigDecimal expected = BigDecimalDecoder.fromBytes(byteBuffer, scale); | ||
assertEquals(new BigDecimal(decimal), expected); | ||
|
||
} | ||
|
||
|
||
@Test | ||
public void testDecimalArrayToAvro() { | ||
String json = "[123.45,555.55]"; | ||
Object result = AvroConverter.toAvro(TestUtils.jsonTree(json), decimalArraySchema); | ||
int scale = 2; | ||
|
||
ByteBuffer byteBuffer0 = (ByteBuffer) ((GenericData.Array) result).get(0); | ||
assertEquals(new BigDecimal("123.45"), BigDecimalDecoder.fromBytes(byteBuffer0,2)); | ||
|
||
ByteBuffer byteBuffer1 = (ByteBuffer) ((GenericData.Array) result).get(1); | ||
assertEquals(new BigDecimal("555.55"), BigDecimalDecoder.fromBytes(byteBuffer1,2)); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add more test for different scales? You really only testing scale of 2 |
||
|
||
@Test | ||
public void testPrimitiveTypesToJson() { | ||
|
@@ -313,6 +426,7 @@ public void testRecordToJson() { | |
.set("float", 23.4f) | ||
.set("double", 800.25) | ||
.set("bytes", ByteBuffer.wrap("bytes".getBytes())) | ||
.set("decimal", ByteBuffer.wrap(BigDecimalEncoder.toByteArray(new BigDecimal("123.45"),2))) | ||
.set("string", "string") | ||
.build(); | ||
|
||
|
@@ -334,6 +448,8 @@ public void testRecordToJson() { | |
// The bytes value was created from an ASCII string, so Avro's encoding should just give that | ||
// string back to us in the JSON-serialized version | ||
assertEquals("bytes", result.json.get("bytes").textValue()); | ||
assertTrue(result.json.get("decimal").isBigDecimal()); | ||
assertEquals(new BigDecimal("123.45"), result.json.get("decimal").decimalValue()); | ||
assertTrue(result.json.get("string").isTextual()); | ||
assertEquals("string", result.json.get("string").textValue()); | ||
} | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you use ISO-8859-1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the only charset that I found that could encode the bytes the way I wanted for a String, so that my tests passed. Unsure why this one works and not others, but hoping for Confluent to let me know