Skip to content

Commit

Permalink
LIHADOOP-49587: [Writer schema evolution] Update ReassignIds to assig…
Browse files Browse the repository at this point in the history
…n ids to an incoming schema as per the table schema (#9)
  • Loading branch information
rdsr authored and shardulm94 committed Oct 23, 2019
1 parent 3bc1946 commit 2818100
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 51 deletions.
41 changes: 32 additions & 9 deletions api/src/main/java/org/apache/iceberg/types/ReassignIds.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,22 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import org.apache.iceberg.Schema;

class ReassignIds extends TypeUtil.CustomOrderSchemaVisitor<Type> {
private final Schema sourceSchema;
private Type sourceType;
private int nextId;
private boolean newField;

ReassignIds(Schema sourceSchema) {
this.sourceSchema = sourceSchema;
//TODO: this will fail if we allow dropping columns. We need to update this so that new ids > lastColumnId are used
nextId = Collections.max(TypeUtil.indexById(sourceSchema.asStruct()).keySet()) + 1;
newField = false;
}

@Override
Expand All @@ -56,32 +62,43 @@ public Type struct(Types.StructType struct, Iterable<Type> fieldTypes) {
List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(length);
for (int i = 0; i < length; i += 1) {
Types.NestedField field = fields.get(i);
int sourceFieldId = sourceStruct.field(field.name()).fieldId();
Types.NestedField sourceField = sourceStruct.field(field.name());
int fieldId = fieldId(sourceField);
if (field.isRequired()) {
newFields.add(Types.NestedField.required(sourceFieldId, field.name(), types.get(i), field.doc()));
newFields.add(Types.NestedField.required(fieldId, field.name(), types.get(i), field.doc()));
} else {
newFields.add(Types.NestedField.optional(sourceFieldId, field.name(), types.get(i), field.doc()));
newFields.add(Types.NestedField.optional(fieldId, field.name(), types.get(i), field.doc()));
}
}

return Types.StructType.of(newFields);
}

private int fieldId(Types.NestedField sourceField) {
if (sourceField == null || newField) {
return allocateId();
}
return sourceField.fieldId();
}

@Override
public Type field(Types.NestedField field, Supplier<Type> future) {
Preconditions.checkArgument(sourceType.isStructType(), "Not a struct: %s", sourceType);

Types.StructType sourceStruct = sourceType.asStructType();
Types.NestedField sourceField = sourceStruct.field(field.name());
boolean previousValue = newField;
if (sourceField == null) {
throw new IllegalArgumentException("Field " + field.name() + " not found in source schema");
sourceType = field.type();
newField = true;
} else {
sourceType = sourceField.type();
}

this.sourceType = sourceField.type();
try {
return future.get();
} finally {
sourceType = sourceStruct;
newField = previousValue;
}
}

Expand All @@ -90,7 +107,7 @@ public Type list(Types.ListType list, Supplier<Type> elementTypeFuture) {
Preconditions.checkArgument(sourceType.isListType(), "Not a list: %s", sourceType);

Types.ListType sourceList = sourceType.asListType();
int sourceElementId = sourceList.elementId();
int sourceElementId = newField ? allocateId() : sourceList.elementId();

this.sourceType = sourceList.elementType();
try {
Expand All @@ -110,8 +127,8 @@ public Type map(Types.MapType map, Supplier<Type> keyTypeFuture, Supplier<Type>
Preconditions.checkArgument(sourceType.isMapType(), "Not a map: %s", sourceType);

Types.MapType sourceMap = sourceType.asMapType();
int sourceKeyId = sourceMap.keyId();
int sourceValueId = sourceMap.valueId();
int sourceKeyId = newField ? allocateId() : sourceMap.keyId();
int sourceValueId = newField ? allocateId() : sourceMap.valueId();

try {
this.sourceType = sourceMap.keyType();
Expand All @@ -135,4 +152,10 @@ public Type map(Types.MapType map, Supplier<Type> keyTypeFuture, Supplier<Type>
public Type primitive(Type.PrimitiveType primitive) {
return primitive; // nothing to reassign
}

private int allocateId() {
int current = nextId;
nextId += 1;
return current;
}
}
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ public static Schema assignIncreasingFreshIds(Schema schema) {

/**
* Reassigns ids in a schema from another schema.
* For newer fields assigns increasing unique ids
* TODO: Newer fields can be assigned duplicate ids if an existing field was dropped,
* but since we never drop fields, this should not affect us. This does still need
* fixing once we decide to push this upstream
*
* <p>
* Ids are determined by field names. If a field in the schema cannot be found in the source
* schema, this will throw IllegalArgumentException.
Expand Down
81 changes: 81 additions & 0 deletions api/src/test/java/org/apache/iceberg/types/TestReassignIds.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.types;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.junit.Assert;
import org.junit.Test;

public class TestReassignIds {
@Test
public void testReassignIds() {
Schema schema = TypeUtil.reassignIds(
new Schema(
Types.NestedField.required(10, "a", Types.LongType.get()),
Types.NestedField.required(11, "b", Types.StringType.get())
),
new Schema(
Types.NestedField.required(0, "a", Types.LongType.get())));

check(ImmutableMap.of("b", 1, "a", 0), schema);

schema = TypeUtil.reassignIds(
new Schema(
Types.NestedField.required(20, "a", Types.MapType.ofRequired(21, 22,
Types.StringType.get(),
Types.StructType.of(
Types.NestedField.required(25, "b", Types.StringType.get()),
Types.NestedField.required(27, "c", Types.StructType.of(
Types.NestedField.required(28, "d", Types.FloatType.get()),
Types.NestedField.required(29, "e", Types.FloatType.get())
)))))),
new Schema(
Types.NestedField.required(1, "a", Types.MapType.ofRequired(2, 3,
Types.StringType.get(),
Types.StructType.of(
Types.NestedField.required(4, "b", Types.StringType.get()))))));

check(ImmutableMap.of("a.c", 7, "a.c.d", 5, "a.c.e", 6), schema);

schema = TypeUtil.reassignIds(
new Schema(
Types.NestedField.required(9, "e", Types.IntegerType.get()),
Types.NestedField.required(10, "a",
Types.StructType.of(
Types.NestedField.required(11, "b",
Types.StructType.of(
Types.NestedField.required(12, "c",
Types.StructType.of(
Types.NestedField.required(13, "d", Types.IntegerType.get())))))))),

new Schema(
Types.NestedField.required(0, "e", Types.IntegerType.get())));

check(ImmutableMap.of("a.b.c.d", 1, "a.b.c", 2, "a.b", 3, "a", 4, "e", 0), schema);
}

private void check(Map<String, Integer> expectedNameToId, Schema schema) {
for (Map.Entry<String, Integer> e : expectedNameToId.entrySet()) {
Assert.assertEquals(e.getValue().intValue(), schema.findField(e.getKey()).fieldId());
}
}
}
42 changes: 0 additions & 42 deletions api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java

This file was deleted.

0 comments on commit 2818100

Please sign in to comment.