Skip to content

Commit

Permalink
Move to Avro 1.10.2 (#82)
Browse files Browse the repository at this point in the history
* update to avro 1.10.2 and disable avro validation for default values

* backporting fix for Parquet to work with Avro 1.10.2 from PR#1648 (mainstream)

Co-authored-by: Shenoda Guirguis <[email protected]>
  • Loading branch information
Shenoda Guirguis and Shenoda Guirguis authored Sep 21, 2021
1 parent 508e49a commit a0fc79b
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 107 deletions.
8 changes: 5 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ subprojects {
force 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.2'
force 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.10.2'
force 'com.fasterxml.jackson.module:jackson-module-paranamer:2.10.2'
force 'com.fasterxml.jackson.core:jackson-databind:2.10.2'
}
}

Expand Down Expand Up @@ -532,10 +533,11 @@ project(':iceberg-mr') {
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')

testCompile("org.apache.avro:avro:1.9.2")
testCompile("org.apache.avro:avro:1.10.2")
testCompile("org.apache.calcite:calcite-core")
testCompile("com.esotericsoftware:kryo-shaded:4.0.2")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.10.2")
testCompile("com.fasterxml.jackson.core:jackson-databind:2.10.2")
testCompile("org.apache.hive:hive-service") {
exclude group: 'org.apache.hive', module: 'hive-exec'
}
Expand Down Expand Up @@ -602,7 +604,7 @@ if (jdkVersion == '8') {
testCompile("org.apache.avro:avro:1.9.2")
testCompile("org.apache.calcite:calcite-core")
testCompile("com.esotericsoftware:kryo-shaded:4.0.2")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.10.2")
testCompile("org.apache.hive:hive-service:3.1.2") {
exclude group: 'org.apache.hive', module: 'hive-exec'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.TestReadProjection;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -132,9 +133,9 @@ public void testV1ForwardCompatibility() throws IOException {
Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count"));
Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count"));
Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count"));
Assert.assertNull("Content", generic.get(ManifestFile.MANIFEST_CONTENT.name()));
Assert.assertNull("Sequence number", generic.get(ManifestFile.SEQUENCE_NUMBER.name()));
Assert.assertNull("Min sequence number", generic.get(ManifestFile.MIN_SEQUENCE_NUMBER.name()));
TestReadProjection.assertNotProjected("Content", generic, ManifestFile.MANIFEST_CONTENT.name());
TestReadProjection.assertNotProjected("Sequence number", generic, ManifestFile.SEQUENCE_NUMBER.name());
TestReadProjection.assertNotProjected("Min sequence number", generic, ManifestFile.MIN_SEQUENCE_NUMBER.name());
}

@Test
Expand All @@ -154,9 +155,9 @@ public void testV2ForwardCompatibility() throws IOException {
Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count"));
Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count"));
Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count"));
Assert.assertNull("Content", generic.get(ManifestFile.MANIFEST_CONTENT.name()));
Assert.assertNull("Sequence number", generic.get(ManifestFile.SEQUENCE_NUMBER.name()));
Assert.assertNull("Min sequence number", generic.get(ManifestFile.MIN_SEQUENCE_NUMBER.name()));
TestReadProjection.assertNotProjected("Content", generic, ManifestFile.MANIFEST_CONTENT.name());
TestReadProjection.assertNotProjected("Sequence number", generic, ManifestFile.SEQUENCE_NUMBER.name());
TestReadProjection.assertNotProjected("Min sequence number", generic, ManifestFile.MIN_SEQUENCE_NUMBER.name());
}

@Test
Expand Down
97 changes: 53 additions & 44 deletions core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ private LegacyHiveTableUtils() {
static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) {
Map<String, String> props = getTableProperties(table);
String schemaStr = props.get("avro.schema.literal");
org.apache.avro.Schema avroSchema = schemaStr != null ? new org.apache.avro.Schema.Parser().parse(schemaStr) : null;
// Disable default value validation for backward compatibility with Avro 1.7
org.apache.avro.Schema avroSchema =
schemaStr != null ? new org.apache.avro.Schema.Parser().setValidateDefaults(false).parse(schemaStr) : null;
Schema schema;
if (avroSchema != null) {
String serde = table.getSd().getSerdeInfo().getSerializationLib();
Expand Down
21 changes: 13 additions & 8 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.iceberg.parquet;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
Expand All @@ -30,6 +32,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.specific.SpecificData;
import org.apache.commons.math3.util.Pair;
import org.apache.iceberg.avro.AvroSchemaVisitor;
import org.apache.iceberg.avro.UUIDConversion;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -48,8 +51,8 @@ static Schema parquetAvroSchema(Schema avroSchema) {
static class ParquetDecimal extends LogicalType {
private static final String NAME = "parquet-decimal";

private int precision;
private int scale;
private final int precision;
private final int scale;

ParquetDecimal(int precision, int scale) {
super(NAME);
Expand Down Expand Up @@ -154,12 +157,10 @@ public Long toLong(BigDecimal value, org.apache.avro.Schema schema, LogicalType
}

private static class FixedDecimalConversion extends Conversions.DecimalConversion {
private final LogicalType[] decimalsByScale = new LogicalType[39];
private final WeakHashMap<Pair<Integer, Integer>, LogicalType> decimalsByScale;

private FixedDecimalConversion() {
for (int i = 0; i < decimalsByScale.length; i += 1) {
decimalsByScale[i] = LogicalTypes.decimal(i, i);
}
this.decimalsByScale = new WeakHashMap<>();
}

@Override
Expand All @@ -169,12 +170,16 @@ public String getLogicalTypeName() {

@Override
public BigDecimal fromFixed(GenericFixed value, Schema schema, LogicalType type) {
return super.fromFixed(value, schema, decimalsByScale[((ParquetDecimal) type).scale()]);
ParquetDecimal dec = (ParquetDecimal) type;
return new BigDecimal(new BigInteger(value.bytes()), dec.scale());
}

@Override
public GenericFixed toFixed(BigDecimal value, Schema schema, LogicalType type) {
return super.toFixed(value, schema, decimalsByScale[((ParquetDecimal) type).scale()]);
ParquetDecimal dec = (ParquetDecimal) type;
Pair<Integer, Integer> key = new Pair<>(dec.precision(), dec.scale());
return super.toFixed(value, schema,
decimalsByScale.computeIfAbsent(key, k -> LogicalTypes.decimal(k.getFirst(), k.getSecond())));
}
}

Expand Down
Loading

0 comments on commit a0fc79b

Please sign in to comment.