Skip to content

Commit

Permalink
Alternate implementation to #74 (#79)
Browse files Browse the repository at this point in the history
* Support fuzzy union for field types of schema


Co-authored-by: Jiefan Li <[email protected]>
  • Loading branch information
wmoustafa and ljfgem authored May 12, 2021
1 parent 9b3f5ab commit 68021e1
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public RelNode visit(LogicalUnion logicalUnion) {
Schema inputSchema1 = schemaMap.get(logicalUnion.getInput(0));
Schema inputSchema2 = schemaMap.get(logicalUnion.getInput(1));

Schema mergedSchema = SchemaUtilities.mergeUnionSchema(inputSchema1, inputSchema2, strictMode);
Schema mergedSchema = SchemaUtilities.mergeUnionRecordSchema(inputSchema1, inputSchema2, strictMode);

schemaMap.put(logicalUnion, mergedSchema);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import com.linkedin.coral.com.google.common.base.Strings;
import com.linkedin.coral.schema.avro.exceptions.SchemaNotFoundException;

import static org.apache.avro.Schema.Type.NULL;
import static org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.getOtherTypeFromNullableType;
import static org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.isNullableType;


class SchemaUtilities {
private static final Logger LOG = LoggerFactory.getLogger(SchemaUtilities.class);
Expand Down Expand Up @@ -342,49 +346,35 @@ static Schema joinSchemas(@Nonnull Schema leftSchema, @Nonnull Schema rightSchem
return combinedSchema;
}

static Schema mergeUnionSchema(@Nonnull Schema leftSchema, @Nonnull Schema rightSchema, boolean strictMode) {
Preconditions.checkNotNull(leftSchema);
Preconditions.checkNotNull(rightSchema);

if (!leftSchema.toString(true).equals(rightSchema.toString(true))) {
if (!SchemaUtilities.isUnionRecordSchemaCompatible(leftSchema, rightSchema, strictMode)) {
throw new RuntimeException("Input schemas of LogicalUnion operator are not compatible. " + "inputSchema1 is: "
+ leftSchema.toString(true) + ", " + "inputSchema2 is: " + rightSchema.toString(true));
}
}

return leftSchema;
}

/**
* This method decides if two input schemas of LogicalUnion operator are compatible.
* This method merges two input schemas of LogicalUnion operator, or throws exception if they can't be merged.
*
* Two schemas are compatible if they have same field names and types.
* namespace and doc etc are ignored.
* @param leftSchema Left schema to be merged
* @param rightSchema Right schema to be merged
* @param strictMode If set to true, namespaces are required to be same.
* If set to false, we don't check namespaces.
*
* @param leftSchema
* @param rightSchema
* @return return true if two schemas are union compatible; false otherwise.
* @return Merged schema if the input schemas can be merged
*/
static boolean isUnionRecordSchemaCompatible(@Nonnull Schema leftSchema, @Nonnull Schema rightSchema,
boolean strictMode) {
static Schema mergeUnionRecordSchema(@Nonnull Schema leftSchema, @Nonnull Schema rightSchema, boolean strictMode) {
Preconditions.checkNotNull(leftSchema);
Preconditions.checkNotNull(rightSchema);
if (leftSchema.toString(true).equals(rightSchema.toString(true))) {
return leftSchema;
}

List<Schema.Field> leftSchemaFields = leftSchema.getFields();
List<Schema.Field> rightSchemaFields = rightSchema.getFields();

if (strictMode) {
// we requires namespace matches in strictMode
// We require namespace to match in strictMode
if (!Objects.equals(leftSchema.getNamespace(), rightSchema.getNamespace())) {
LOG.error("Found namespace mismatch while configured with strict mode. " + "Namespace for "
throw new RuntimeException("Found namespace mismatch while configured with strict mode. " + "Namespace for "
+ leftSchema.getName() + " is: " + leftSchema.getNamespace() + ". " + "Namespace for "
+ rightSchema.getName() + " is: " + rightSchema.getNamespace());

return false;
}
}

List<Schema.Field> leftSchemaFields = leftSchema.getFields();
List<Schema.Field> rightSchemaFields = rightSchema.getFields();

Map<String, Schema.Field> leftSchemaFieldsMap =
leftSchemaFields.stream().collect(Collectors.toMap(Schema.Field::name, Function.identity()));
Map<String, Schema.Field> rightSchemaFieldsMap =
Expand All @@ -393,90 +383,112 @@ static boolean isUnionRecordSchemaCompatible(@Nonnull Schema leftSchema, @Nonnul
for (Schema.Field field : leftSchemaFields) {
if (!rightSchemaFieldsMap.containsKey(field.name())) {
// field in leftSchema is missing in rightSchema
LOG.error(field.name() + " is in schema " + leftSchema.getName() + ": " + leftSchema.toString(true)
+ ", but not in schema " + rightSchema.getName() + ": " + rightSchema.toString(true));
return false;
throw new RuntimeException(
field.name() + " is in schema " + leftSchema.getName() + ": " + leftSchema.toString(true)
+ ", but not in schema " + rightSchema.getName() + ": " + rightSchema.toString(true));
}
}

for (Schema.Field field : rightSchemaFields) {
if (!leftSchemaFieldsMap.containsKey(field.name())) {
// field in rightSchema is missing in leftSchema
LOG.error(field.name() + " is in schema " + rightSchema.getName() + ": " + rightSchema.toString(true)
+ ", but not in schema " + leftSchema.getName() + ": " + leftSchema.toString(true));
return false;
throw new RuntimeException(
field.name() + " is in schema " + rightSchema.getName() + ": " + rightSchema.toString(true)
+ ", but not in schema " + leftSchema.getName() + ": " + leftSchema.toString(true));
}
}

List<Schema.Field> mergedSchemaFields = new ArrayList<>();

for (Schema.Field leftField : leftSchemaFields) {
Schema.Field rightField = rightSchemaFieldsMap.get(leftField.name());
boolean isUnionSchemaCompatible = isUnionSchemaCompatible(leftField.schema(), rightField.schema(), strictMode);

if (!isUnionSchemaCompatible) {
LOG.error(leftField.name() + " is not compatible with " + rightField.name() + " for LogicalUnion operator.");

return false;
}
Schema unionFieldSchema = getUnionFieldSchema(leftField.schema(), rightField.schema(), strictMode);
Schema.Field unionField = new Schema.Field(leftField.name(), unionFieldSchema, leftField.doc(),
leftField.defaultValue(), leftField.order());
leftField.aliases().forEach(unionField::addAlias);
leftField.getJsonProps().forEach(unionField::addProp);
mergedSchemaFields.add(unionField);
}

return true;
Schema schema = Schema.createRecord(leftSchema.getName(), leftSchema.getDoc(), leftSchema.getNamespace(), false);
schema.setFields(mergedSchemaFields);
return schema;
}

private static boolean isUnionSchemaCompatible(@Nonnull Schema leftSchema, @Nonnull Schema rightSchema,
private static Schema getUnionFieldSchema(@Nonnull Schema leftSchema, @Nonnull Schema rightSchema,
boolean strictMode) {
Preconditions.checkNotNull(leftSchema);
Preconditions.checkNotNull(rightSchema);

switch (leftSchema.getType()) {
case BOOLEAN:
case BYTES:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case STRING:
case NULL:
return leftSchema.getType() == rightSchema.getType();
case FIXED:
boolean isSameType = leftSchema.getType() == rightSchema.getType();
boolean isSameNamespace = Objects.equals(leftSchema.getNamespace(), rightSchema.getNamespace());

return isSameType && (!strictMode || isSameNamespace);
case ENUM:
boolean isSameEnumType = (leftSchema.getType() == rightSchema.getType());
boolean isSameSymbolSize = (leftSchema.getEnumSymbols().size() == rightSchema.getEnumSymbols().size());
boolean isSameEnumNamespace = Objects.equals(leftSchema.getNamespace(), rightSchema.getNamespace());

return isSameEnumType && isSameSymbolSize && (!strictMode || isSameEnumNamespace);
case RECORD:
return leftSchema.getType() == rightSchema.getType()
&& isUnionRecordSchemaCompatible(leftSchema, rightSchema, strictMode);

case MAP:
return leftSchema.getType() == rightSchema.getType()
&& isUnionSchemaCompatible(leftSchema.getValueType(), rightSchema.getValueType(), strictMode);

case ARRAY:
return leftSchema.getType() == rightSchema.getType()
&& isUnionSchemaCompatible(leftSchema.getElementType(), rightSchema.getElementType(), strictMode);

case UNION:
boolean isSameUnionType = (leftSchema.getType() == rightSchema.getType());
boolean isBothNullableType =
AvroSerdeUtils.isNullableType(leftSchema) && AvroSerdeUtils.isNullableType(rightSchema);
Schema.Type leftSchemaType = leftSchema.getType();
Schema.Type rightSchemaType = rightSchema.getType();
if (leftSchemaType == NULL) {
return makeNullable(rightSchema);
}
if (rightSchemaType == NULL) {
return makeNullable(leftSchema);
}
if (isNullableType(leftSchema) || isNullableType(rightSchema)) {
return makeNullable(getUnionFieldSchema(makeNonNullable(leftSchema), makeNonNullable(rightSchema), strictMode));
}

Schema leftOtherType = AvroSerdeUtils.getOtherTypeFromNullableType(leftSchema);
Schema rightOtherType = AvroSerdeUtils.getOtherTypeFromNullableType(rightSchema);
boolean isOtherTypeUnionCompatible = isUnionSchemaCompatible(leftOtherType, rightOtherType, strictMode);
if (leftSchemaType == rightSchemaType) {
switch (leftSchema.getType()) {
case BOOLEAN:
case BYTES:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case STRING:
return leftSchema;
case FIXED:
if (isSameNamespace(leftSchema, rightSchema, strictMode)) {
return leftSchema;
}
break;
case ENUM:
if (leftSchema.getEnumSymbols().size() == rightSchema.getEnumSymbols().size()) {
return leftSchema;
}
break;
case RECORD:
return mergeUnionRecordSchema(leftSchema, rightSchema, strictMode);
case MAP:
Schema valueType = getUnionFieldSchema(leftSchema.getValueType(), rightSchema.getValueType(), strictMode);
return Schema.createMap(valueType);
case ARRAY:
Schema elementType =
getUnionFieldSchema(leftSchema.getElementType(), rightSchema.getElementType(), strictMode);
return Schema.createArray(elementType);
default:
throw new IllegalArgumentException(
"Unsupported Avro type " + leftSchema.getType() + " in schema: " + leftSchema.toString(true));
}
}
throw new RuntimeException("Found two incompatible schemas for LogicalUnion operator. Left schema is: "
+ leftSchema.toString(true) + ". " + "Right schema is: " + rightSchema.toString(true));
}

return isSameUnionType && isBothNullableType && isOtherTypeUnionCompatible;
private static Schema makeNonNullable(Schema schema) {
if (isNullableType(schema)) {
return getOtherTypeFromNullableType(schema);
} else {
return schema;
}
}

default:
throw new IllegalArgumentException(
"Unsupported Avro type " + leftSchema.getType() + " in schema: " + leftSchema.toString(true));
private static Schema makeNullable(Schema schema) {
if (schema.getType() == NULL || isNullableType(schema)) {
return schema;
} else {
return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), schema));
}
}

private static boolean isSameNamespace(@Nonnull Schema leftSchema, @Nonnull Schema rightSchema, boolean strictMode) {
return !strictMode || Objects.equals(leftSchema.getNamespace(), rightSchema.getNamespace());
}

private static void appendFieldWithNewNamespace(@Nonnull Schema.Field field, @Nonnull String namespace,
@Nonnull SchemaBuilder.FieldAssembler<Schema> fieldAssembler) {
Preconditions.checkNotNull(field);
Expand Down Expand Up @@ -601,8 +613,8 @@ private static Schema setupNestedNamespace(@Nonnull Schema schema, @Nonnull Stri
Schema recordSchema = setupNestedNamespaceForRecord(schema, namespace);
return recordSchema;
case UNION:
if (AvroSerdeUtils.isNullableType(schema)) {
Schema otherType = AvroSerdeUtils.getOtherTypeFromNullableType(schema);
if (isNullableType(schema)) {
Schema otherType = getOtherTypeFromNullableType(schema);
Schema otherTypeWithNestedNamespace = setupNestedNamespace(otherType, namespace);
Schema nullSchema = Schema.create(Schema.Type.NULL);
List<Schema> types = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,5 +645,80 @@ public void testSelectNullUnionNullField() {
TestUtils.loadSchema("testSelectNullUnionNullField-expected.avsc"));
}

@Test
public void testNullUnionNotNullableField() {
String viewSql = "CREATE VIEW v AS SELECT NULL Field FROM basecomplex UNION ALL SELECT Id Field FROM basecomplex";

TestUtils.executeCreateViewQuery("default", "v", viewSql);

ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient);
Schema actualSchema = viewToAvroSchemaConverter.toAvroSchema("default", "v", false);

Assert.assertEquals(actualSchema.toString(true), TestUtils.loadSchema("testNullUnionNonNullField-expected.avsc"));
}

@Test
public void testNotNullableFieldUnionNull() {
String viewSql = "CREATE VIEW v AS SELECT Id Field FROM basecomplex UNION ALL SELECT NULL Field FROM basecomplex";

TestUtils.executeCreateViewQuery("default", "v", viewSql);

ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient);
Schema actualSchema = viewToAvroSchemaConverter.toAvroSchema("default", "v", false);

Assert.assertEquals(actualSchema.toString(true), TestUtils.loadSchema("testNullUnionNonNullField-expected.avsc"));
}

@Test
public void testNullUnionNullableField() {
String viewSql =
"CREATE VIEW v AS SELECT NULL Field FROM basenulltypefield UNION ALL SELECT Nullable_Field Field FROM basenulltypefield";

TestUtils.executeCreateViewQuery("default", "v", viewSql);

ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient);
Schema actualSchema = viewToAvroSchemaConverter.toAvroSchema("default", "v", false);

Assert.assertEquals(actualSchema.toString(true), TestUtils.loadSchema("testNullUnionNonNullField-expected.avsc"));
}

@Test
public void testNullableFieldUnionNull() {
String viewSql =
"CREATE VIEW v AS SELECT Nullable_Field Field FROM basenulltypefield UNION ALL SELECT NULL Field FROM basenulltypefield";

TestUtils.executeCreateViewQuery("default", "v", viewSql);

ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient);
Schema actualSchema = viewToAvroSchemaConverter.toAvroSchema("default", "v", false);

Assert.assertEquals(actualSchema.toString(true), TestUtils.loadSchema("testNullUnionNonNullField-expected.avsc"));
}

@Test
public void testNotNullableFieldUnionNullableField() {
String viewSql =
"CREATE VIEW v AS SELECT Not_Nullable_Field Field FROM basenulltypefield UNION ALL SELECT Nullable_Field Field FROM basenulltypefield";

TestUtils.executeCreateViewQuery("default", "v", viewSql);

ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient);
Schema actualSchema = viewToAvroSchemaConverter.toAvroSchema("default", "v", false);

Assert.assertEquals(actualSchema.toString(true), TestUtils.loadSchema("testNullUnionNonNullField-expected.avsc"));
}

@Test
public void testNullableFieldUnionNotNullableField() {
String viewSql =
"CREATE VIEW v AS SELECT Nullable_Field Field FROM basenulltypefield UNION ALL SELECT Not_Nullable_Field Field FROM basenulltypefield";

TestUtils.executeCreateViewQuery("default", "v", viewSql);

ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient);
Schema actualSchema = viewToAvroSchemaConverter.toAvroSchema("default", "v", false);

Assert.assertEquals(actualSchema.toString(true), TestUtils.loadSchema("testNullUnionNonNullField-expected.avsc"));
}
// TODO: add more unit tests
}
6 changes: 6 additions & 0 deletions coral-schema/src/test/resources/base-null-type-field.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,11 @@
"fields" : [ {
"name" : "Null_Field",
"type" : "null"
}, {
"name" : "Nullable_Field",
"type" : ["null", "int"]
}, {
"name" : "Not_Nullable_Field",
"type" : "int"
} ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type" : "record",
"name" : "v",
"namespace" : "default.v",
"fields" : [ {
"name" : "Field",
"type" : [ "null", "int" ]
} ]
}

0 comments on commit 68021e1

Please sign in to comment.