From 28181005d82f56c6ed4cfbbe4367a77753de9185 Mon Sep 17 00:00:00 2001 From: Ratandeep Ratti Date: Tue, 22 Oct 2019 21:07:01 -0700 Subject: [PATCH] LIHADOOP-49587: [Writer schema evolution] Update ReassignIds to assign ids to an incoming schema as per the table schema (#9) --- .../org/apache/iceberg/types/ReassignIds.java | 41 +++++++--- .../org/apache/iceberg/types/TypeUtil.java | 5 ++ .../apache/iceberg/types/TestReassignIds.java | 81 +++++++++++++++++++ .../apache/iceberg/types/TestTypeUtil.java | 42 ---------- 4 files changed, 118 insertions(+), 51 deletions(-) create mode 100644 api/src/test/java/org/apache/iceberg/types/TestReassignIds.java delete mode 100644 api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java diff --git a/api/src/main/java/org/apache/iceberg/types/ReassignIds.java b/api/src/main/java/org/apache/iceberg/types/ReassignIds.java index 16099493f..822d4ef54 100644 --- a/api/src/main/java/org/apache/iceberg/types/ReassignIds.java +++ b/api/src/main/java/org/apache/iceberg/types/ReassignIds.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import java.util.Collections; import java.util.List; import java.util.function.Supplier; import org.apache.iceberg.Schema; @@ -28,9 +29,14 @@ class ReassignIds extends TypeUtil.CustomOrderSchemaVisitor { private final Schema sourceSchema; private Type sourceType; + private int nextId; + private boolean newField; ReassignIds(Schema sourceSchema) { this.sourceSchema = sourceSchema; + //TODO: this will fail if we allow dropping columns. We need to update this so that new ids > lastColumnId are used + nextId = Collections.max(TypeUtil.indexById(sourceSchema.asStruct()).keySet()) + 1; + newField = false; } @Override @@ -56,32 +62,43 @@ public Type struct(Types.StructType struct, Iterable fieldTypes) { List newFields = Lists.newArrayListWithExpectedSize(length); for (int i = 0; i < length; i += 1) { Types.NestedField field = fields.get(i); - int sourceFieldId = sourceStruct.field(field.name()).fieldId(); + Types.NestedField sourceField = sourceStruct.field(field.name()); + int fieldId = fieldId(sourceField); if (field.isRequired()) { - newFields.add(Types.NestedField.required(sourceFieldId, field.name(), types.get(i), field.doc())); + newFields.add(Types.NestedField.required(fieldId, field.name(), types.get(i), field.doc())); } else { - newFields.add(Types.NestedField.optional(sourceFieldId, field.name(), types.get(i), field.doc())); + newFields.add(Types.NestedField.optional(fieldId, field.name(), types.get(i), field.doc())); } } return Types.StructType.of(newFields); } + private int fieldId(Types.NestedField sourceField) { + if (sourceField == null || newField) { + return allocateId(); + } + return sourceField.fieldId(); + } + @Override public Type field(Types.NestedField field, Supplier future) { Preconditions.checkArgument(sourceType.isStructType(), "Not a struct: %s", sourceType); Types.StructType sourceStruct = sourceType.asStructType(); Types.NestedField sourceField = sourceStruct.field(field.name()); + boolean previousValue = newField; if (sourceField == null) { - throw new IllegalArgumentException("Field " + field.name() + " not found in source schema"); + sourceType = field.type(); + newField = true; + } else { + sourceType = sourceField.type(); } - - this.sourceType = sourceField.type(); try { return future.get(); } finally { sourceType = sourceStruct; + newField = previousValue; } } @@ -90,7 +107,7 @@ public Type list(Types.ListType list, Supplier elementTypeFuture) { Preconditions.checkArgument(sourceType.isListType(), "Not a list: %s", sourceType); Types.ListType sourceList = sourceType.asListType(); - int sourceElementId = sourceList.elementId(); + int sourceElementId = newField ? allocateId() : sourceList.elementId(); this.sourceType = sourceList.elementType(); try { @@ -110,8 +127,8 @@ public Type map(Types.MapType map, Supplier keyTypeFuture, Supplier Preconditions.checkArgument(sourceType.isMapType(), "Not a map: %s", sourceType); Types.MapType sourceMap = sourceType.asMapType(); - int sourceKeyId = sourceMap.keyId(); - int sourceValueId = sourceMap.valueId(); + int sourceKeyId = newField ? allocateId() : sourceMap.keyId(); + int sourceValueId = newField ? allocateId() : sourceMap.valueId(); try { this.sourceType = sourceMap.keyType(); @@ -135,4 +152,10 @@ public Type map(Types.MapType map, Supplier keyTypeFuture, Supplier public Type primitive(Type.PrimitiveType primitive) { return primitive; // nothing to reassign } + + private int allocateId() { + int current = nextId; + nextId += 1; + return current; + } } diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index 95bd07f6d..2533df8b0 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -134,6 +134,11 @@ public static Schema assignIncreasingFreshIds(Schema schema) { /** * Reassigns ids in a schema from another schema. + * For newer fields assigns increasing unique ids + * TODO: Newer fields can be assigned duplicate ids if an existing field was dropped, + * but since we never drop fields, this should not affect us. This does still need + * fixing once we decide to push this upstream + * *

* Ids are determined by field names. If a field in the schema cannot be found in the source * schema, this will throw IllegalArgumentException. diff --git a/api/src/test/java/org/apache/iceberg/types/TestReassignIds.java b/api/src/test/java/org/apache/iceberg/types/TestReassignIds.java new file mode 100644 index 000000000..7ab4dde9f --- /dev/null +++ b/api/src/test/java/org/apache/iceberg/types/TestReassignIds.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.types; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.junit.Assert; +import org.junit.Test; + +public class TestReassignIds { + @Test + public void testReassignIds() { + Schema schema = TypeUtil.reassignIds( + new Schema( + Types.NestedField.required(10, "a", Types.LongType.get()), + Types.NestedField.required(11, "b", Types.StringType.get()) + ), + new Schema( + Types.NestedField.required(0, "a", Types.LongType.get()))); + + check(ImmutableMap.of("b", 1, "a", 0), schema); + + schema = TypeUtil.reassignIds( + new Schema( + Types.NestedField.required(20, "a", Types.MapType.ofRequired(21, 22, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.required(25, "b", Types.StringType.get()), + Types.NestedField.required(27, "c", Types.StructType.of( + Types.NestedField.required(28, "d", Types.FloatType.get()), + Types.NestedField.required(29, "e", Types.FloatType.get()) + )))))), + new Schema( + Types.NestedField.required(1, "a", Types.MapType.ofRequired(2, 3, + Types.StringType.get(), + Types.StructType.of( + Types.NestedField.required(4, "b", Types.StringType.get())))))); + + check(ImmutableMap.of("a.c", 7, "a.c.d", 5, "a.c.e", 6), schema); + + schema = TypeUtil.reassignIds( + new Schema( + Types.NestedField.required(9, "e", Types.IntegerType.get()), + Types.NestedField.required(10, "a", + Types.StructType.of( + Types.NestedField.required(11, "b", + Types.StructType.of( + Types.NestedField.required(12, "c", + Types.StructType.of( + Types.NestedField.required(13, "d", Types.IntegerType.get())))))))), + + new Schema( + Types.NestedField.required(0, "e", Types.IntegerType.get()))); + + check(ImmutableMap.of("a.b.c.d", 1, "a.b.c", 2, "a.b", 3, "a", 4, "e", 0), schema); + } + + private void check(Map expectedNameToId, Schema schema) { + for (Map.Entry e : expectedNameToId.entrySet()) { + Assert.assertEquals(e.getValue().intValue(), schema.findField(e.getKey()).fieldId()); + } + } +} diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java deleted file mode 100644 index 1d22e2f35..000000000 --- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.apache.iceberg.types; - -import org.apache.iceberg.Schema; -import org.junit.Test; - -import static org.apache.iceberg.types.Types.NestedField.required; - - -public class TestTypeUtil { - - @Test(expected = IllegalArgumentException.class) - public void testReassignIdsIllegalArgumentException() { - Schema schema = new Schema( - required(1, "a", Types.IntegerType.get()), - required(2, "b", Types.IntegerType.get()) - ); - Schema sourceSchema = new Schema( - required(1, "a", Types.IntegerType.get()) - ); - TypeUtil.reassignIds(schema, sourceSchema); - } -}