Skip to content

Commit

Permalink
Support to field filtering of complex union type in ORC (#150)
Browse files Browse the repository at this point in the history
* Support to field filtering of complex union type in Orc

* using a constant variable instead of literal

* fix typos

* address comments

---------

Co-authored-by: Yiqiang Ding
  • Loading branch information
yiqiangin authored Oct 13, 2023
1 parent 3959284 commit 563e7d9
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 27 deletions.
40 changes: 32 additions & 8 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public TypeDescription type() {
public static final String ICEBERG_LONG_TYPE_ATTRIBUTE = "iceberg.long-type";
static final String ICEBERG_FIELD_LENGTH = "iceberg.length";

public static final String ICEBERG_UNION_TAG_FIELD_NAME = "tag";
public static final int ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH = 5;
private static final ImmutableMultimap<Type.TypeID, TypeDescription.Category> TYPE_MAPPING =
ImmutableMultimap.<Type.TypeID, TypeDescription.Category>builder()
.put(Type.TypeID.BOOLEAN, TypeDescription.Category.BOOLEAN)
Expand Down Expand Up @@ -381,9 +383,9 @@ private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Typ

private static TypeDescription getOrcSchemaForUnionType(Type type, boolean isRequired, Map<Integer, OrcField> mapping,
OrcField orcField) {
TypeDescription orcType;

if (orcField.type.getChildren().size() == 1) { // single type union
orcType = TypeDescription.createUnion();
TypeDescription orcType = TypeDescription.createUnion();

TypeDescription childOrcStructType = TypeDescription.createStruct();
for (Types.NestedField nestedField : type.asStructType().fields()) {
Expand All @@ -399,13 +401,35 @@ private static TypeDescription getOrcSchemaForUnionType(Type type, boolean isReq
}

orcType.addUnionChild(childOrcStructType);
return orcType;
} else { // complex union
orcType = TypeDescription.createUnion();
List<Types.NestedField> nestedFields = type.asStructType().fields();
for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) {
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addUnionChild(childType);
return getOrcSchemaForComplexUnionType(type, isRequired, mapping, orcField);
}
}

private static TypeDescription getOrcSchemaForComplexUnionType(Type type, boolean isRequired,
Map<Integer, OrcField> mapping,
OrcField orcField) {
TypeDescription orcType = TypeDescription.createUnion();
List<Types.NestedField> nestedFieldsFromIcebergSchema = type.asStructType().fields();
for (int i = 0; i < orcField.type.getChildren().size(); ++i) {
TypeDescription childOrcType = orcField.type.getChildren().get(i);
boolean typeProjectedInIcebergSchema = false;
for (Types.NestedField nestedField : nestedFieldsFromIcebergSchema) {
// the name of the field in the struct of Iceberg schema is in the pattern of "fieldx"
// where x is an integer index starting from 0
if (!nestedField.name().equals(ICEBERG_UNION_TAG_FIELD_NAME) &&
Integer.parseInt(nestedField.name().substring(ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH)) == i) {
// child type is projected in Iceberg schema
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addUnionChild(childType);
typeProjectedInIcebergSchema = true;
break;
}
}
if (!typeProjectedInIcebergSchema) {
orcType.addUnionChild(childOrcType);
}
}
return orcType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
package org.apache.iceberg.orc;

import java.util.List;
import java.util.Optional;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;


public abstract class OrcSchemaWithTypeVisitor<T> {
private static final String PSEUDO_ICEBERG_FIELD_ID = "-1";

public static <T> T visit(
org.apache.iceberg.Schema iSchema, TypeDescription schema, OrcSchemaWithTypeVisitor<T> visitor) {
return visit(iSchema.asStruct(), schema, visitor);
Expand Down Expand Up @@ -78,14 +80,75 @@ protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisito
if (types.size() == 1) { // single type union
options.add(visit(type, types.get(0), visitor));
} else { // complex union
for (int i = 0; i < types.size(); i += 1) {
options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor));
}
visitComplexUnion(type, union, visitor, options);
}

return visitor.union(type, union, options);
}

/*
A complex union with multiple types of Orc schema is converted into a struct with multiple fields of Iceberg schema.
Also an extra tag field is added into the struct of Iceberg schema during the conversion.
Given an example of complex union in both Orc and Iceberg:
Orc schema: {"name":"unionCol","type":["int","string"]}
Iceberg schema: struct<0: tag: required int, 1: field0: optional int, 2: field1: optional string>
The fields in the struct of Iceberg schema are expected to be stored in the same order
as the corresponding types in the union of Orc schema.
Except the tag field, the fields in the struct of Iceberg schema are the same as the types in the union of Orc schema
in the general case. In case of field projection, the fields in the struct of Iceberg schema only contains
the fields to be projected which equals to a subset of the types in the union of ORC schema.
Therefore, this function visits the complex union with the consideration of both cases.
Noted that null value and default value for complex union is not a consideration in case of ORC
*/
private <T> void visitComplexUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor<T> visitor,
List<T> options) {
int typeIndex = 0;
int fieldIndexInStruct = 0;
while (typeIndex < union.getChildren().size()) {
TypeDescription schema = union.getChildren().get(typeIndex);
boolean relatedFieldInStructFound = false;
Types.StructType struct = type.asStructType();
if (fieldIndexInStruct < struct.fields().size() &&
ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME
.equals(struct.fields().get(fieldIndexInStruct).name())) {
fieldIndexInStruct++;
}

if (fieldIndexInStruct < struct.fields().size()) {
String structFieldName = type.asStructType().fields().get(fieldIndexInStruct).name();
int indexFromStructFieldName = Integer.parseInt(structFieldName
.substring(ORCSchemaUtil.ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH));
if (typeIndex == indexFromStructFieldName) {
relatedFieldInStructFound = true;
T option = visit(type.asStructType().fields().get(fieldIndexInStruct).type(), schema, visitor);
options.add(option);
fieldIndexInStruct++;
}
}
if (!relatedFieldInStructFound) {
visitNotProjectedTypeInComplexUnion(schema, visitor, options, typeIndex);
}
typeIndex++;
}
}

// If a field is not projected, a corresponding field in the struct of Iceberg schema cannot be found
// for current type of union in Orc schema, a reader for current type still needs to be created and
// used to make the reading of Orc file successfully. In this case, a pseudo Iceberg type is converted from
// the Orc schema and is used to create the option for the reader of the current type which still can
// read the corresponding content in Orc file successfully.
private static <T> void visitNotProjectedTypeInComplexUnion(TypeDescription schema,
OrcSchemaWithTypeVisitor<T> visitor,
List<T> options,
int typeIndex) {
OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor();
schemaConverter.beforeField("field" + typeIndex, schema);
schema.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, PSEUDO_ICEBERG_FIELD_ID);
Optional<Types.NestedField> icebergSchema = OrcToIcebergVisitor.visit(schema, schemaConverter);
schemaConverter.afterField("field" + typeIndex, schema);
options.add(visit(icebergSchema.get().type(), schema, visitor));
}

public T record(Types.StructType iStruct, TypeDescription record, List<String> names, List<T> fields) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public OrcValueReader<?> record(

@Override
public OrcValueReader<?> union(Type expected, TypeDescription union, List<OrcValueReader<?>> options) {
return SparkOrcValueReaders.union(options);
return SparkOrcValueReaders.union(options, expected);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
package org.apache.iceberg.spark.data;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcValueReaders;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
Expand Down Expand Up @@ -71,8 +74,8 @@ static OrcValueReader<?> struct(
return new StructReader(readers, struct, idToConstant);
}

static OrcValueReader<?> union(List<OrcValueReader<?>> readers) {
return new UnionReader(readers);
static OrcValueReader<?> union(List<OrcValueReader<?>> readers, Type expected) {
return new UnionReader(readers, expected);
}

static OrcValueReader<?> array(OrcValueReader<?> elementReader) {
Expand Down Expand Up @@ -166,12 +169,40 @@ protected void set(InternalRow struct, int pos, Object value) {

static class UnionReader implements OrcValueReader<Object> {
private final OrcValueReader[] readers;
private final Type expectedIcebergSchema;
private int[] projectedFieldIdsToIdxInReturnedRow;
private boolean isTagFieldProjected;
private int numOfFieldsInReturnedRow;

private UnionReader(List<OrcValueReader<?>> readers) {
private UnionReader(List<OrcValueReader<?>> readers, Type expected) {
this.readers = new OrcValueReader[readers.size()];
for (int i = 0; i < this.readers.length; i += 1) {
this.readers[i] = readers.get(i);
}
this.expectedIcebergSchema = expected;

if (this.readers.length > 1) {
// Creating an integer array to track the mapping between the index of fields to be projected
// and the index of the value for the field stored in the returned row,
// if the value for a field equals to Integer.MIN_VALUE, it means the value of this field should not be stored
// in the returned row
this.projectedFieldIdsToIdxInReturnedRow = new int[readers.size()];
Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, Integer.MIN_VALUE);
this.numOfFieldsInReturnedRow = 0;
this.isTagFieldProjected = false;

for (Types.NestedField expectedStructField : expectedIcebergSchema.asStructType().fields()) {
String fieldName = expectedStructField.name();
if (fieldName.equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) {
this.isTagFieldProjected = true;
this.numOfFieldsInReturnedRow++;
continue;
}
int projectedFieldIndex = Integer.valueOf(fieldName
.substring(ORCSchemaUtil.ICEBERG_UNION_TYPE_FIELD_NAME_PREFIX_LENGTH));
this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] = this.numOfFieldsInReturnedRow++;
}
}
}

@Override
Expand All @@ -183,12 +214,17 @@ public Object nonNullRead(ColumnVector vector, int row) {
if (readers.length == 1) {
return value;
} else {
InternalRow struct = new GenericInternalRow(readers.length + 1);
for (int i = 0; i < readers.length; i += 1) {
struct.setNullAt(i + 1);
InternalRow struct = new GenericInternalRow(numOfFieldsInReturnedRow);
for (int i = 0; i < struct.numFields(); i += 1) {
struct.setNullAt(i);
}
if (this.isTagFieldProjected) {
struct.update(0, fieldIndex);
}

if (this.projectedFieldIdsToIdxInReturnedRow[fieldIndex] != Integer.MIN_VALUE) {
struct.update(this.projectedFieldIdsToIdxInReturnedRow[fieldIndex], value);
}
struct.update(0, fieldIndex);
struct.update(fieldIndex + 1, value);

return struct;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.IntStream;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.orc.OrcBatchReader;
import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
import org.apache.iceberg.orc.OrcValueReader;
Expand Down Expand Up @@ -434,10 +435,19 @@ public ColumnVector getChild(int ordinal) {
private static class UnionConverter implements Converter {
private final Type type;
private final List<Converter> optionConverters;
private boolean isTagFieldProjected;

private UnionConverter(Type type, List<Converter> optionConverters) {
this.type = type;
this.optionConverters = optionConverters;
if (optionConverters.size() > 1) {
for (Types.NestedField field : type.asStructType().fields()) {
if (field.name().equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) {
this.isTagFieldProjected = true;
break;
}
}
}
}

@Override
Expand All @@ -449,13 +459,23 @@ public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector v
List<Types.NestedField> fields = type.asStructType().fields();
List<ColumnVector> fieldVectors = Lists.newArrayListWithExpectedSize(fields.size());

LongColumnVector longColumnVector = new LongColumnVector();
longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray();
// Adding ColumnVector for tag field into fieldVectors when the tag field is projected in Iceberg schema
if (isTagFieldProjected) {
LongColumnVector longColumnVector = new LongColumnVector();
longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray();

fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector,
OrcValueReaders.ints(), batchOffsetInFile));
for (int i = 0; i < fields.size() - 1; i += 1) {
fieldVectors.add(optionConverters.get(i).convert(unionColumnVector.fields[i], batchSize, batchOffsetInFile));
fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector,
OrcValueReaders.ints(), batchOffsetInFile));
}

// Adding ColumnVector for each field projected in Iceberg schema into fieldVectors
for (int i = 0; i < fields.size(); ++i) {
Types.NestedField field = fields.get(i);
if (!field.name().equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) {
int typeIdx = Integer.parseInt(field.name().substring(5));
fieldVectors.add(optionConverters.get(typeIdx)
.convert(unionColumnVector.fields[typeIdx], batchSize, batchOffsetInFile));
}
}

return new BaseOrcColumnVector(type.asStructType(), batchSize, vector) {
Expand Down
Loading

0 comments on commit 563e7d9

Please sign in to comment.