Skip to content

Commit

Permalink
Coral-Schema: Make Coral-Schema compatible with both Avro 1.7 and 1.10 (
Browse files Browse the repository at this point in the history
  • Loading branch information
ljfgem authored Oct 17, 2022
1 parent d0a4708 commit 6723e22
Show file tree
Hide file tree
Showing 43 changed files with 388 additions and 375 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ allprojects {

repositories {
mavenCentral()
maven {
url 'https://linkedin.jfrog.io/artifactory/avro-util/'
}
maven {
url 'https://linkedin.bintray.com/maven/'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
Expand Down Expand Up @@ -186,7 +185,7 @@ private Deserializer getDeserializer() {
private Deserializer getDeserializerFromMetaStore() {
try {
return MetaStoreUtils.getDeserializer(new Configuration(false), hiveTable, false);
} catch (MetaException e) {
} catch (Throwable e) { // Catch Throwable here because it may throw Exception or Error
throw new RuntimeException(e);
}
}
Expand Down
7 changes: 7 additions & 0 deletions coral-schema/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
compile project(path: ':coral-hive')
compile deps.'avroCompatHelper'

testCompile(deps.'hive'.'hive-exec-core') {
exclude group: 'org.apache.avro', module: 'avro-tools'
Expand All @@ -12,3 +13,9 @@ dependencies {
testCompile deps.'hadoop'.'hadoop-mapreduce-client-core'
testCompile deps.'kryo'
}

configurations.all {
resolutionStrategy {
force 'org.apache.avro:avro:1.10.2'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package com.linkedin.coral.schema.avro;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.Jackson1Utils;

import org.apache.avro.Schema;
import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
Expand All @@ -19,7 +21,6 @@
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.node.JsonNodeFactory;

import com.linkedin.coral.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -58,7 +59,10 @@ public Schema struct(StructTypeInfo struct, Schema partner, List<Schema.Field> f
} else {
result = SchemaUtilities.copyRecord(SchemaUtilities.extractIfOption(partner), fieldResults);
}
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result) : result;
// While calling `makeNullable`, we should respect the option order of `partner`
// i.e. if the schema of `partner` is [int, null], the resultant schema should also be [int, null] rather than [null, int]
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result, SchemaUtilities.isNullSecond(partner))
: result;
}

@Override
Expand All @@ -67,41 +71,26 @@ public Schema.Field field(String name, TypeInfo field, Schema.Field partner, Sch
// in their field results if required
if (partner == null) {
// if there was no matching Avro field, use name form the Hive schema and set a null default
return new Schema.Field(SchemaUtilities.makeCompatibleName(name), fieldResult, null, null);
return AvroCompatibilityHelper.createSchemaField(SchemaUtilities.makeCompatibleName(name), fieldResult, null,
null);
} else {
// TODO: How to ensure that field default value is compatible with new field type generated from Hive?
// Copy field type from the visitor result, copy everything else from the partner
// Avro requires the default value to match the first type in the option, reorder option if required
Schema reordered = reorderOptionIfRequired(fieldResult, partner.defaultValue());
Schema reordered = SchemaUtilities.reorderOptionIfRequired(fieldResult, SchemaUtilities.defaultValue(partner));
return SchemaUtilities.copyField(partner, reordered);
}
}

/**
* Reorders an option schema so that the type of the provided default value is the first type in the option schema
*
* e.g. If the schema is (NULL, INT) and the default value is 1, the returned schema is (INT, NULL)
* If the schema is not an option schema or if there is no default value, schema is returned as-is
*/
private Schema reorderOptionIfRequired(Schema schema, JsonNode defaultValue) {
if (isNullableType(schema) && defaultValue != null) {
boolean isNullFirstOption = schema.getTypes().get(0).getType() == Schema.Type.NULL;
if (isNullFirstOption && defaultValue.isNull()) {
return schema;
} else {
return Schema.createUnion(Arrays.asList(schema.getTypes().get(1), schema.getTypes().get(0)));
}
} else {
return schema;
}
}

@Override
public Schema list(ListTypeInfo list, Schema partner, Schema elementResult) {
// if there was no matching Avro list, or if matching Avro list was an option, return an optional list
boolean shouldResultBeOptional = partner == null || isNullableType(partner);
Schema result = Schema.createArray(elementResult);
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result) : result;
// While calling `makeNullable`, we should respect the option order of `partner`
// i.e. if the schema of `partner` is [int, null], the resultant schema should also be [int, null] rather than [null, int]
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result, SchemaUtilities.isNullSecond(partner))
: result;
}

@Override
Expand All @@ -111,7 +100,10 @@ public Schema map(MapTypeInfo map, Schema partner, Schema keyResult, Schema valu
// if there was no matching Avro map, or if matching Avro map was an option, return an optional map
boolean shouldResultBeOptional = partner == null || isNullableType(partner);
Schema result = Schema.createMap(valueResult);
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result) : result;
// While calling `makeNullable`, we should respect the option order of `partner`
// i.e. if the schema of `partner` is [int, null], the resultant schema should also be [int, null] rather than [null, int]
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result, SchemaUtilities.isNullSecond(partner))
: result;
}

@Override
Expand All @@ -120,7 +112,10 @@ public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) {
Schema hivePrimitive = hivePrimitiveToAvro(primitive);
// if there was no matching Avro primitive, use the Hive primitive
Schema result = partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner);
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result) : result;
// While calling `makeNullable`, we should respect the option order of `partner`
// i.e. if the schema of `partner` is [int, null], the resultant schema should also be [int, null] rather than [null, int]
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result, SchemaUtilities.isNullSecond(partner))
: result;
}

@Override
Expand Down Expand Up @@ -265,8 +260,10 @@ public Schema hivePrimitiveToAvro(PrimitiveTypeInfo primitive) {
JsonNodeFactory factory = JsonNodeFactory.instance;
Schema decimalSchema = Schema.create(Schema.Type.BYTES);
decimalSchema.addProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE, AvroSerDe.DECIMAL_TYPE_NAME);
decimalSchema.addProp(AvroSerDe.AVRO_PROP_PRECISION, factory.numberNode(dti.getPrecision()));
decimalSchema.addProp(AvroSerDe.AVRO_PROP_SCALE, factory.numberNode(dti.getScale()));
AvroCompatibilityHelper.setSchemaPropFromJsonString(decimalSchema, AvroSerDe.AVRO_PROP_PRECISION,
Jackson1Utils.toJsonString(factory.numberNode(dti.getPrecision())), false);
AvroCompatibilityHelper.setSchemaPropFromJsonString(decimalSchema, AvroSerDe.AVRO_PROP_SCALE,
Jackson1Utils.toJsonString(factory.numberNode(dti.getScale())), false);

return decimalSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

import javax.annotation.Nonnull;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.Jackson1Utils;

import org.apache.avro.Schema;
import org.apache.calcite.rel.type.DynamicRecordType;
import org.apache.calcite.rel.type.RelDataType;
Expand Down Expand Up @@ -111,8 +114,10 @@ private static Schema basicSqlTypeToAvroType(BasicSqlType relDataType) {
JsonNodeFactory factory = JsonNodeFactory.instance;
Schema decimalSchema = Schema.create(Schema.Type.BYTES);
decimalSchema.addProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE, AvroSerDe.DECIMAL_TYPE_NAME);
decimalSchema.addProp(AvroSerDe.AVRO_PROP_PRECISION, factory.numberNode(relDataType.getPrecision()));
decimalSchema.addProp(AvroSerDe.AVRO_PROP_SCALE, factory.numberNode(relDataType.getScale()));
AvroCompatibilityHelper.setSchemaPropFromJsonString(decimalSchema, AvroSerDe.AVRO_PROP_PRECISION,
Jackson1Utils.toJsonString(factory.numberNode(relDataType.getPrecision())), false);
AvroCompatibilityHelper.setSchemaPropFromJsonString(decimalSchema, AvroSerDe.AVRO_PROP_SCALE,
Jackson1Utils.toJsonString(factory.numberNode(relDataType.getScale())), false);
return decimalSchema;
default:
throw new UnsupportedOperationException(relDataType.getSqlTypeName() + " is not supported.");
Expand All @@ -139,7 +144,7 @@ private static Schema relRecordTypeToAvroType(RelDataType relRecord, List<String
for (RelDataTypeField relField : relRecord.getFieldList()) {
final String comment = fieldComments != null && fieldComments.size() > relField.getIndex()
? fieldComments.get(relField.getIndex()) : null;
fields.add(new Schema.Field(toAvroQualifiedName(relField.getName()),
fields.add(AvroCompatibilityHelper.createSchemaField(toAvroQualifiedName(relField.getName()),
relDataTypeToAvroType(relField.getType(), toAvroQualifiedName(relField.getName())), comment, null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public RelNode visit(LogicalProject logicalProject) {

@Override
public RelNode visit(LogicalJoin logicalJoin) {
// TODO: Modify this method to avoid that 2 schema fields share the same name in Avro 1.10, and enable the tests for join
RelNode relNode = super.visit(logicalJoin);

Schema leftInputSchema = schemaMap.get(logicalJoin.getLeft());
Expand Down
Loading

0 comments on commit 6723e22

Please sign in to comment.