diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/RelDataTypeToAvroType.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/RelDataTypeToAvroType.java index c1e7574d5..1e045f80f 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/RelDataTypeToAvroType.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/RelDataTypeToAvroType.java @@ -6,11 +6,20 @@ package com.linkedin.coral.schema.avro; import com.linkedin.coral.com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import javax.annotation.Nonnull; import org.apache.avro.Schema; +import org.apache.calcite.rel.type.DynamicRecordType; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; import org.apache.calcite.sql.type.ArraySqlType; import org.apache.calcite.sql.type.BasicSqlType; +import org.apache.calcite.sql.type.MapSqlType; +import org.apache.calcite.sql.type.MultisetSqlType; +import org.apache.calcite.sql.type.SqlTypeName; /** @@ -26,29 +35,50 @@ private RelDataTypeToAvroType() { /** * This method converts RelDataType to avro data type * + * The return schema is always non nullable (instead of [null, type]) since the nullability if decided + * out of this method + * * @param relDataType * @return Schema of Avro data type corresponding to input RelDataType */ - static Schema relDataTypeToAvroType(@Nonnull RelDataType relDataType) { + static Schema relDataTypeToAvroTypeNonNullable(@Nonnull RelDataType relDataType, String recordName) { Preconditions.checkNotNull(relDataType); + if (relDataType instanceof RelRecordType || relDataType instanceof DynamicRecordType) { + return relRecordTypeToAvroType(relDataType, null, recordName, "rel_avro", null); + } + if (relDataType instanceof BasicSqlType) { return basicSqlTypeToAvroType((BasicSqlType) relDataType); } - if (relDataType instanceof ArraySqlType) { - return Schema.createArray(relDataTypeToAvroType(relDataType.getComponentType())); + if (relDataType instanceof MultisetSqlType || relDataType instanceof ArraySqlType) { + return Schema.createArray(relDataTypeToAvroType(relDataType.getComponentType(), recordName)); } - // TODO: support more RelDataType if necessary - // For example: MultisetSqlType, RelRecordType, DynamicRecordType, MapSqlType + if (relDataType instanceof MapSqlType) { + final MapSqlType mapSqlType = (MapSqlType) relDataType; + if (!SqlTypeName.CHAR_TYPES.contains(mapSqlType.getKeyType().getSqlTypeName())) { + throw new UnsupportedOperationException("Key of Map can only be a String: " + + mapSqlType.getKeyType().getSqlTypeName().getName()); + } + return Schema.createMap(relDataTypeToAvroType(mapSqlType.getValueType(), recordName)); + } - // TODO: improve logging - throw new UnsupportedOperationException("Unsupported RelDataType: " + relDataType.toString()); + throw new UnsupportedOperationException("Unsupported RelDataType to be converted to Avro type: " + + relDataType.toString()); } - private static Schema basicSqlTypeToAvroType(BasicSqlType relType) { - switch (relType.getSqlTypeName()) { + private static Schema relDataTypeToAvroType(RelDataType relDataType, String recordName) { + final Schema avroSchema = relDataTypeToAvroTypeNonNullable(relDataType, recordName); + if (relDataType.isNullable() && avroSchema.getType() != Schema.Type.NULL) { + return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), avroSchema)); + } + return avroSchema; + } + + private static Schema basicSqlTypeToAvroType(BasicSqlType relDataType) { + switch (relDataType.getSqlTypeName()) { case BOOLEAN: return Schema.create(Schema.Type.BOOLEAN); case TINYINT: @@ -70,7 +100,42 @@ private static Schema basicSqlTypeToAvroType(BasicSqlType relType) { case ANY: return Schema.create(Schema.Type.BYTES); default: - throw new UnsupportedOperationException(relType.getSqlTypeName() + " is not supported."); + throw new UnsupportedOperationException(relDataType.getSqlTypeName() + " is not supported."); + } + } + + /** + * This method converts RelRecordType or DynamicRecordType to Avro type + * + * It iterates each RelDataTypeField in the field list of record type and convert them recursively + * + * @param relRecord RelRecordType or DynamicRecordType to convert to Avro Record type + * @param fieldComments documentations of sub-fields in Avro Record type + * @param recordName record name of Avro Record type + * @param recordNamespace record namespace of Avro Record type + * @param doc documentation of Avro Record type + * @return Avro type corresponding to RelDataType + */ + private static Schema relRecordTypeToAvroType(RelDataType relRecord, + List fieldComments, + String recordName, + String recordNamespace, + String doc) { + final List fields = new ArrayList(); + final Schema avroSchema = Schema.createRecord(recordName, doc, recordNamespace, false); + + for (RelDataTypeField relField : relRecord.getFieldList()) { + final String comment = fieldComments != null && fieldComments.size() > relField.getIndex() ? fieldComments.get( + relField.getIndex()) : null; + fields.add(new Schema.Field( + toAvroQualifiedName(relField.getName()), relDataTypeToAvroType(relField.getType(), toAvroQualifiedName(relField.getName())), comment, null)); } + + avroSchema.setFields(fields); + return avroSchema; + } + + private static String toAvroQualifiedName(String relName) { + return relName.replace("$", "_"); } } diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java index a0719985b..85c13b94d 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java @@ -172,7 +172,7 @@ static void appendField(@Nonnull String fieldName, Preconditions.checkNotNull(fieldRelDataType); Preconditions.checkNotNull(fieldAssembler); - Schema fieldSchema = RelDataTypeToAvroType.relDataTypeToAvroType(fieldRelDataType); + Schema fieldSchema = RelDataTypeToAvroType.relDataTypeToAvroTypeNonNullable(fieldRelDataType, fieldName); // TODO: handle default value properly if (isNullable) { diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/RelDataTypeToAvroTypeTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/RelDataTypeToAvroTypeTests.java new file mode 100644 index 000000000..314b98ab8 --- /dev/null +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/RelDataTypeToAvroTypeTests.java @@ -0,0 +1,39 @@ +/** + * Copyright 2020 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.coral.schema.avro; + +import com.linkedin.coral.hive.hive2rel.HiveToRelConverter; +import org.apache.avro.Schema; +import org.apache.calcite.rel.RelNode; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class RelDataTypeToAvroTypeTests { + private HiveToRelConverter hiveToRelConverter; + + @BeforeClass + public void beforeClass() throws HiveException, MetaException { + hiveToRelConverter = TestUtils.setupRelDataTypeToAvroTypeTests(); + } + + @Test + public void testNestedRecord() { + String viewSql = "CREATE VIEW v AS SELECT * FROM basecomplex"; + + TestUtils.executeCreateViewQuery("default", "v", viewSql); + RelNode relNode = hiveToRelConverter.convertView("default", "v"); + Schema actualAvroType = RelDataTypeToAvroType.relDataTypeToAvroTypeNonNullable(relNode.getRowType(), "nestedRecord"); + + Assert.assertEquals(actualAvroType.toString(true), + TestUtils.loadSchema("rel2avro-testNestedRecord-expected.avsc")); + } + + +} diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/TestUtils.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/TestUtils.java index 5a618fc0a..e45560507 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/TestUtils.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/TestUtils.java @@ -5,6 +5,7 @@ */ package com.linkedin.coral.schema.avro; +import com.linkedin.coral.hive.hive2rel.HiveToRelConverter; import com.linkedin.coral.hive.hive2rel.functions.StaticHiveFunctionRegistry; import com.linkedin.coral.hive.hive2rel.HiveMetastoreClient; import com.linkedin.coral.hive.hive2rel.HiveMscAdapter; @@ -14,6 +15,7 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.hadoop.hive.conf.HiveConf; @@ -43,6 +45,13 @@ public static HiveMetastoreClient setup() throws HiveException, MetaException { return metastoreClient; } + public static HiveToRelConverter setupRelDataTypeToAvroTypeTests() throws HiveException, MetaException { + HiveMetastoreClient metastoreClient = setup(); + HiveToRelConverter hiveToRelConverter = HiveToRelConverter.create(metastoreClient); + + return hiveToRelConverter; + } + public static void executeCreateViewQuery(String dbName, String viewName, String sql) { executeQuery("DROP VIEW IF EXISTS " + dbName + "." + viewName); executeQuery(sql); @@ -71,6 +80,7 @@ private static void initializeTables() { String baseNullabilitySchema = loadSchema("base-nullability.avsc"); String baseCasePreservation = loadSchema("base-casepreservation.avsc"); String baseComplexFieldSchema = loadSchema("base-complex-fieldschema"); + String baseNestedComplexSchema = loadSchema("base-nested-complex.avsc"); executeCreateTableQuery("default", "basecomplex", baseComplexSchema); executeCreateTableQuery("default", "basecomplexunioncompatible", baseComplexUnionCompatible); @@ -79,6 +89,7 @@ private static void initializeTables() { executeCreateTableQuery("default", "basenullability", baseNullabilitySchema); executeCreateTableWithPartitionQuery("default", "basecasepreservation", baseCasePreservation); executeCreateTableWithPartitionFieldSchemaQuery("default", "basecomplexfieldschema", baseComplexFieldSchema); + executeCreateTableWithPartitionQuery("default", "basenestedcomplex", baseNestedComplexSchema); } private static void initializeUdfs() { diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/ViewToAvroSchemaConverterTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/ViewToAvroSchemaConverterTests.java index ac24b2ea1..2a61935c7 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/ViewToAvroSchemaConverterTests.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/ViewToAvroSchemaConverterTests.java @@ -725,6 +725,19 @@ public void testCompatibleUnion() { TestUtils.loadSchema("testCompatibleUnion-expected.avsc")); } + @Test + public void testSelectStarFromNestComplex() { + String viewSql = "CREATE VIEW v AS SELECT * FROM basenestedcomplex"; + + TestUtils.executeCreateViewQuery("default", "v", viewSql); + + ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient); + Schema actualSchema = viewToAvroSchemaConverter.toAvroSchema("default", "v"); + + Assert.assertEquals(actualSchema.toString(true), + TestUtils.loadSchema("testSelectStarFromNestComplex-expected.avsc")); + } + @Test public void testSubQueryWhere() { // TODO: implement this test diff --git a/coral-schema/src/test/resources/base-nested-complex.avsc b/coral-schema/src/test/resources/base-nested-complex.avsc new file mode 100644 index 000000000..47a8e247d --- /dev/null +++ b/coral-schema/src/test/resources/base-nested-complex.avsc @@ -0,0 +1,123 @@ +{ + "type" : "record", + "name" : "basenestedcomplex", + "namespace" : "coral.schema.avro.base.nested.complex", + "fields" : [ { + "name" : "id", + "type" : "int" + }, { + "name" : "array_col", + "type" : [ "null", { + "type" : "array", + "items" : [ "null", { + "type" : "record", + "name" : "Array_Col", + "namespace" : "coral.schema.avro.base.nested.complex.basenestedcomplex", + "fields" : [ { + "name" : "Bool_Field", + "type" : [ "null", "boolean" ], + "default" : null + }, { + "name" : "Int_Field", + "type" : [ "null", "int" ] + }, { + "name" : "Bigint_Field", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "Float_Field", + "type" : [ "null", "float" ] + }, { + "name" : "Double_Field", + "type" : [ "null", "double" ], + "default" : null + }, { + "name" : "Date_String_Field", + "type" : [ "null", "string" ] + }, { + "name" : "String_Field", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "Array_Col_1", + "type" : [ "null", { + "type" : "array", + "items" : { + "type" : "array", + "items" : "string" + } + } ], + "default" : null + }, { + "name" : "Array_Col_2", + "type" : [ "null", { + "type" : "array", + "items" : { + "type" : "map", + "values" : "string" + } + } ], + "default" : null + }, { + "name" : "Map_Col_3", + "type" : [ "null", { + "type" : "map", + "values" : { + "type" : "map", + "values" : "string" + } + } ], + "default" : null + }, { + "name" : "Map_Col_4", + "type" : [ "null", { + "type" : "map", + "values" : { + "type" : "array", + "items" : "string" + } + } ], + "default" : null + } ] + } ] + } ], + "default" : null + }, { + "name" : "struct_col", + "type" : [ "null", { + "type" : "record", + "name" : "Struct_col", + "namespace" : "coral.schema.avro.base.nested.complex.basenestedcomplex", + "fields" : [ { + "name" : "bool_field", + "type" : [ "null", "boolean" ], + "default" : null + }, { + "name" : "int_field", + "type" : [ "null", "int" ], + "default" : null + }, { + "name" : "bigint_field", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "float_field", + "type" : [ "null", "float" ], + "default" : null + }, { + "name" : "double_field", + "type" : [ "null", "double" ], + "default" : null + }, { + "name" : "date_string_field", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "string_field", + "type" : [ "null", "string" ], + "default" : null + } ] + } ], + "default" : null + } ] +} \ No newline at end of file diff --git a/coral-schema/src/test/resources/rel2avro-testNestedRecord-expected.avsc b/coral-schema/src/test/resources/rel2avro-testNestedRecord-expected.avsc new file mode 100644 index 000000000..786b14a57 --- /dev/null +++ b/coral-schema/src/test/resources/rel2avro-testNestedRecord-expected.avsc @@ -0,0 +1,49 @@ +{ + "type" : "record", + "name" : "nestedRecord", + "namespace" : "rel_avro", + "fields" : [ { + "name" : "id", + "type" : [ "null", "int" ] + }, { + "name" : "array_col", + "type" : [ "null", { + "type" : "array", + "items" : [ "null", "string" ] + } ] + }, { + "name" : "map_col", + "type" : [ "null", { + "type" : "map", + "values" : [ "null", "string" ] + } ] + }, { + "name" : "struct_col", + "type" : [ "null", { + "type" : "record", + "name" : "struct_col", + "fields" : [ { + "name" : "bool_field", + "type" : [ "null", "boolean" ] + }, { + "name" : "int_field", + "type" : [ "null", "int" ] + }, { + "name" : "bigint_field", + "type" : [ "null", "long" ] + }, { + "name" : "float_field", + "type" : [ "null", "float" ] + }, { + "name" : "double_field", + "type" : [ "null", "double" ] + }, { + "name" : "date_string_field", + "type" : [ "null", "string" ] + }, { + "name" : "string_field", + "type" : [ "null", "string" ] + } ] + } ] + } ] +} \ No newline at end of file diff --git a/coral-schema/src/test/resources/testSelectStarFromNestComplex-expected.avsc b/coral-schema/src/test/resources/testSelectStarFromNestComplex-expected.avsc new file mode 100644 index 000000000..f9d2160b8 --- /dev/null +++ b/coral-schema/src/test/resources/testSelectStarFromNestComplex-expected.avsc @@ -0,0 +1,126 @@ +{ + "type" : "record", + "name" : "v", + "namespace" : "default.v", + "fields" : [ { + "name" : "id", + "type" : "int" + }, { + "name" : "array_col", + "type" : [ "null", { + "type" : "array", + "items" : [ "null", { + "type" : "record", + "name" : "Array_Col", + "namespace" : "default.v.v", + "fields" : [ { + "name" : "Bool_Field", + "type" : [ "null", "boolean" ], + "default" : null + }, { + "name" : "Int_Field", + "type" : [ "null", "int" ] + }, { + "name" : "Bigint_Field", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "Float_Field", + "type" : [ "null", "float" ] + }, { + "name" : "Double_Field", + "type" : [ "null", "double" ], + "default" : null + }, { + "name" : "Date_String_Field", + "type" : [ "null", "string" ] + }, { + "name" : "String_Field", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "Array_Col_1", + "type" : [ "null", { + "type" : "array", + "items" : { + "type" : "array", + "items" : "string" + } + } ], + "default" : null + }, { + "name" : "Array_Col_2", + "type" : [ "null", { + "type" : "array", + "items" : { + "type" : "map", + "values" : "string" + } + } ], + "default" : null + }, { + "name" : "Map_Col_3", + "type" : [ "null", { + "type" : "map", + "values" : { + "type" : "map", + "values" : "string" + } + } ], + "default" : null + }, { + "name" : "Map_Col_4", + "type" : [ "null", { + "type" : "map", + "values" : { + "type" : "array", + "items" : "string" + } + } ], + "default" : null + } ] + } ] + } ], + "default" : null + }, { + "name" : "struct_col", + "type" : [ "null", { + "type" : "record", + "name" : "Struct_col", + "namespace" : "default.v.v", + "fields" : [ { + "name" : "bool_field", + "type" : [ "null", "boolean" ], + "default" : null + }, { + "name" : "int_field", + "type" : [ "null", "int" ], + "default" : null + }, { + "name" : "bigint_field", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "float_field", + "type" : [ "null", "float" ], + "default" : null + }, { + "name" : "double_field", + "type" : [ "null", "double" ], + "default" : null + }, { + "name" : "date_string_field", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "string_field", + "type" : [ "null", "string" ], + "default" : null + } ] + } ], + "default" : null + }, { + "name" : "datepartition", + "type" : "string" + } ] +} \ No newline at end of file