Skip to content

Commit

Permalink
Support more types in converting RelDatatype to Avro type (#29)
Browse files Browse the repository at this point in the history
Co-authored-by: Wenye Zhang <[email protected]>
  • Loading branch information
funcheetah and Wenye Zhang authored Dec 16, 2020
1 parent 5bf50c5 commit 05ebc22
Show file tree
Hide file tree
Showing 8 changed files with 437 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@
package com.linkedin.coral.schema.avro;

import com.linkedin.coral.com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.calcite.rel.type.DynamicRecordType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.sql.type.ArraySqlType;
import org.apache.calcite.sql.type.BasicSqlType;
import org.apache.calcite.sql.type.MapSqlType;
import org.apache.calcite.sql.type.MultisetSqlType;
import org.apache.calcite.sql.type.SqlTypeName;


/**
Expand All @@ -26,29 +35,50 @@ private RelDataTypeToAvroType() {
/**
* This method converts RelDataType to avro data type
*
* The return schema is always non nullable (instead of [null, type]) since the nullability if decided
* out of this method
*
* @param relDataType
* @return Schema of Avro data type corresponding to input RelDataType
*/
static Schema relDataTypeToAvroType(@Nonnull RelDataType relDataType) {
static Schema relDataTypeToAvroTypeNonNullable(@Nonnull RelDataType relDataType, String recordName) {
Preconditions.checkNotNull(relDataType);

if (relDataType instanceof RelRecordType || relDataType instanceof DynamicRecordType) {
return relRecordTypeToAvroType(relDataType, null, recordName, "rel_avro", null);
}

if (relDataType instanceof BasicSqlType) {
return basicSqlTypeToAvroType((BasicSqlType) relDataType);
}

if (relDataType instanceof ArraySqlType) {
return Schema.createArray(relDataTypeToAvroType(relDataType.getComponentType()));
if (relDataType instanceof MultisetSqlType || relDataType instanceof ArraySqlType) {
return Schema.createArray(relDataTypeToAvroType(relDataType.getComponentType(), recordName));
}

// TODO: support more RelDataType if necessary
// For example: MultisetSqlType, RelRecordType, DynamicRecordType, MapSqlType
if (relDataType instanceof MapSqlType) {
final MapSqlType mapSqlType = (MapSqlType) relDataType;
if (!SqlTypeName.CHAR_TYPES.contains(mapSqlType.getKeyType().getSqlTypeName())) {
throw new UnsupportedOperationException("Key of Map can only be a String: "
+ mapSqlType.getKeyType().getSqlTypeName().getName());
}
return Schema.createMap(relDataTypeToAvroType(mapSqlType.getValueType(), recordName));
}

// TODO: improve logging
throw new UnsupportedOperationException("Unsupported RelDataType: " + relDataType.toString());
throw new UnsupportedOperationException("Unsupported RelDataType to be converted to Avro type: "
+ relDataType.toString());
}

private static Schema basicSqlTypeToAvroType(BasicSqlType relType) {
switch (relType.getSqlTypeName()) {
private static Schema relDataTypeToAvroType(RelDataType relDataType, String recordName) {
final Schema avroSchema = relDataTypeToAvroTypeNonNullable(relDataType, recordName);
if (relDataType.isNullable() && avroSchema.getType() != Schema.Type.NULL) {
return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), avroSchema));
}
return avroSchema;
}

private static Schema basicSqlTypeToAvroType(BasicSqlType relDataType) {
switch (relDataType.getSqlTypeName()) {
case BOOLEAN:
return Schema.create(Schema.Type.BOOLEAN);
case TINYINT:
Expand All @@ -70,7 +100,42 @@ private static Schema basicSqlTypeToAvroType(BasicSqlType relType) {
case ANY:
return Schema.create(Schema.Type.BYTES);
default:
throw new UnsupportedOperationException(relType.getSqlTypeName() + " is not supported.");
throw new UnsupportedOperationException(relDataType.getSqlTypeName() + " is not supported.");
}
}

/**
* This method converts RelRecordType or DynamicRecordType to Avro type
*
* It iterates each RelDataTypeField in the field list of record type and convert them recursively
*
* @param relRecord RelRecordType or DynamicRecordType to convert to Avro Record type
* @param fieldComments documentations of sub-fields in Avro Record type
* @param recordName record name of Avro Record type
* @param recordNamespace record namespace of Avro Record type
* @param doc documentation of Avro Record type
* @return Avro type corresponding to RelDataType
*/
private static Schema relRecordTypeToAvroType(RelDataType relRecord,
List<String> fieldComments,
String recordName,
String recordNamespace,
String doc) {
final List<Schema.Field> fields = new ArrayList();
final Schema avroSchema = Schema.createRecord(recordName, doc, recordNamespace, false);

for (RelDataTypeField relField : relRecord.getFieldList()) {
final String comment = fieldComments != null && fieldComments.size() > relField.getIndex() ? fieldComments.get(
relField.getIndex()) : null;
fields.add(new Schema.Field(
toAvroQualifiedName(relField.getName()), relDataTypeToAvroType(relField.getType(), toAvroQualifiedName(relField.getName())), comment, null));
}

avroSchema.setFields(fields);
return avroSchema;
}

private static String toAvroQualifiedName(String relName) {
return relName.replace("$", "_");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ static void appendField(@Nonnull String fieldName,
Preconditions.checkNotNull(fieldRelDataType);
Preconditions.checkNotNull(fieldAssembler);

Schema fieldSchema = RelDataTypeToAvroType.relDataTypeToAvroType(fieldRelDataType);
Schema fieldSchema = RelDataTypeToAvroType.relDataTypeToAvroTypeNonNullable(fieldRelDataType, fieldName);

// TODO: handle default value properly
if (isNullable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2020 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.coral.schema.avro;

import com.linkedin.coral.hive.hive2rel.HiveToRelConverter;
import org.apache.avro.Schema;
import org.apache.calcite.rel.RelNode;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class RelDataTypeToAvroTypeTests {
private HiveToRelConverter hiveToRelConverter;

@BeforeClass
public void beforeClass() throws HiveException, MetaException {
hiveToRelConverter = TestUtils.setupRelDataTypeToAvroTypeTests();
}

@Test
public void testNestedRecord() {
String viewSql = "CREATE VIEW v AS SELECT * FROM basecomplex";

TestUtils.executeCreateViewQuery("default", "v", viewSql);
RelNode relNode = hiveToRelConverter.convertView("default", "v");
Schema actualAvroType = RelDataTypeToAvroType.relDataTypeToAvroTypeNonNullable(relNode.getRowType(), "nestedRecord");

Assert.assertEquals(actualAvroType.toString(true),
TestUtils.loadSchema("rel2avro-testNestedRecord-expected.avsc"));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.linkedin.coral.schema.avro;

import com.linkedin.coral.hive.hive2rel.HiveToRelConverter;
import com.linkedin.coral.hive.hive2rel.functions.StaticHiveFunctionRegistry;
import com.linkedin.coral.hive.hive2rel.HiveMetastoreClient;
import com.linkedin.coral.hive.hive2rel.HiveMscAdapter;
Expand All @@ -14,6 +15,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down Expand Up @@ -43,6 +45,13 @@ public static HiveMetastoreClient setup() throws HiveException, MetaException {
return metastoreClient;
}

public static HiveToRelConverter setupRelDataTypeToAvroTypeTests() throws HiveException, MetaException {
HiveMetastoreClient metastoreClient = setup();
HiveToRelConverter hiveToRelConverter = HiveToRelConverter.create(metastoreClient);

return hiveToRelConverter;
}

public static void executeCreateViewQuery(String dbName, String viewName, String sql) {
executeQuery("DROP VIEW IF EXISTS " + dbName + "." + viewName);
executeQuery(sql);
Expand Down Expand Up @@ -71,6 +80,7 @@ private static void initializeTables() {
String baseNullabilitySchema = loadSchema("base-nullability.avsc");
String baseCasePreservation = loadSchema("base-casepreservation.avsc");
String baseComplexFieldSchema = loadSchema("base-complex-fieldschema");
String baseNestedComplexSchema = loadSchema("base-nested-complex.avsc");

executeCreateTableQuery("default", "basecomplex", baseComplexSchema);
executeCreateTableQuery("default", "basecomplexunioncompatible", baseComplexUnionCompatible);
Expand All @@ -79,6 +89,7 @@ private static void initializeTables() {
executeCreateTableQuery("default", "basenullability", baseNullabilitySchema);
executeCreateTableWithPartitionQuery("default", "basecasepreservation", baseCasePreservation);
executeCreateTableWithPartitionFieldSchemaQuery("default", "basecomplexfieldschema", baseComplexFieldSchema);
executeCreateTableWithPartitionQuery("default", "basenestedcomplex", baseNestedComplexSchema);
}

private static void initializeUdfs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,19 @@ public void testCompatibleUnion() {
TestUtils.loadSchema("testCompatibleUnion-expected.avsc"));
}

@Test
public void testSelectStarFromNestComplex() {
String viewSql = "CREATE VIEW v AS SELECT * FROM basenestedcomplex";

TestUtils.executeCreateViewQuery("default", "v", viewSql);

ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient);
Schema actualSchema = viewToAvroSchemaConverter.toAvroSchema("default", "v");

Assert.assertEquals(actualSchema.toString(true),
TestUtils.loadSchema("testSelectStarFromNestComplex-expected.avsc"));
}

@Test
public void testSubQueryWhere() {
// TODO: implement this test
Expand Down
123 changes: 123 additions & 0 deletions coral-schema/src/test/resources/base-nested-complex.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
{
"type" : "record",
"name" : "basenestedcomplex",
"namespace" : "coral.schema.avro.base.nested.complex",
"fields" : [ {
"name" : "id",
"type" : "int"
}, {
"name" : "array_col",
"type" : [ "null", {
"type" : "array",
"items" : [ "null", {
"type" : "record",
"name" : "Array_Col",
"namespace" : "coral.schema.avro.base.nested.complex.basenestedcomplex",
"fields" : [ {
"name" : "Bool_Field",
"type" : [ "null", "boolean" ],
"default" : null
}, {
"name" : "Int_Field",
"type" : [ "null", "int" ]
}, {
"name" : "Bigint_Field",
"type" : [ "null", "long" ],
"default" : null
}, {
"name" : "Float_Field",
"type" : [ "null", "float" ]
}, {
"name" : "Double_Field",
"type" : [ "null", "double" ],
"default" : null
}, {
"name" : "Date_String_Field",
"type" : [ "null", "string" ]
}, {
"name" : "String_Field",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "Array_Col_1",
"type" : [ "null", {
"type" : "array",
"items" : {
"type" : "array",
"items" : "string"
}
} ],
"default" : null
}, {
"name" : "Array_Col_2",
"type" : [ "null", {
"type" : "array",
"items" : {
"type" : "map",
"values" : "string"
}
} ],
"default" : null
}, {
"name" : "Map_Col_3",
"type" : [ "null", {
"type" : "map",
"values" : {
"type" : "map",
"values" : "string"
}
} ],
"default" : null
}, {
"name" : "Map_Col_4",
"type" : [ "null", {
"type" : "map",
"values" : {
"type" : "array",
"items" : "string"
}
} ],
"default" : null
} ]
} ]
} ],
"default" : null
}, {
"name" : "struct_col",
"type" : [ "null", {
"type" : "record",
"name" : "Struct_col",
"namespace" : "coral.schema.avro.base.nested.complex.basenestedcomplex",
"fields" : [ {
"name" : "bool_field",
"type" : [ "null", "boolean" ],
"default" : null
}, {
"name" : "int_field",
"type" : [ "null", "int" ],
"default" : null
}, {
"name" : "bigint_field",
"type" : [ "null", "long" ],
"default" : null
}, {
"name" : "float_field",
"type" : [ "null", "float" ],
"default" : null
}, {
"name" : "double_field",
"type" : [ "null", "double" ],
"default" : null
}, {
"name" : "date_string_field",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "string_field",
"type" : [ "null", "string" ],
"default" : null
} ]
} ],
"default" : null
} ]
}
Loading

0 comments on commit 05ebc22

Please sign in to comment.