diff --git a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java index f58670365..883d9a0cb 100644 --- a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java +++ b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java @@ -54,11 +54,11 @@ public Type struct(Types.StructType struct, List fieldResults) { } else if (projectedType != null) { sameTypes = false; // signal that some types were altered if (field.isOptional()) { - selectedFields.add( - Types.NestedField.optional(field.fieldId(), field.name(), projectedType, field.doc())); + selectedFields.add(Types.NestedField.optional( + field.fieldId(), field.name(), projectedType, field.getDefaultValue(), field.doc())); } else { - selectedFields.add( - Types.NestedField.required(field.fieldId(), field.name(), projectedType, field.doc())); + selectedFields.add(Types.NestedField.required( + field.fieldId(), field.name(), projectedType, field.getDefaultValue(), field.doc())); } } } diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 1b050725a..88d1244fe 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -20,6 +20,7 @@ package org.apache.iceberg.types; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; @@ -415,42 +416,112 @@ public int hashCode() { public static class NestedField implements Serializable { public static NestedField optional(int id, String name, Type type) { - return new NestedField(true, id, name, type, null); + return new NestedField(true, id, name, type, null, null); } public static NestedField optional(int id, String name, Type type, String doc) { - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, null, doc); + } + + public static NestedField optional(int id, String name, Type type, Object defaultValue, String doc) { + return new NestedField(true, id, name, type, defaultValue, doc); } public static NestedField required(int id, String name, Type type) { - return new NestedField(false, id, name, type, null); + return new NestedField(false, id, name, type, null, null); } public static NestedField required(int id, String name, Type type, String doc) { - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, null, doc); + } + + public static NestedField required(int id, String name, Type type, Object defaultValue, String doc) { + return new NestedField(false, id, name, type, defaultValue, doc); } public static NestedField of(int id, boolean isOptional, String name, Type type) { - return new NestedField(isOptional, id, name, type, null); + return new NestedField(isOptional, id, name, type, null, null); } public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) { - return new NestedField(isOptional, id, name, type, doc); + return new NestedField(isOptional, id, name, type, null, doc); + } + + public static NestedField of(int id, boolean isOptional, String name, Type type, Object defaultValue, String doc) { + return new NestedField(isOptional, id, name, type, defaultValue, doc); + } + + private static void validateDefaultValue(Object defaultValue, Type type) { + if (defaultValue == null) { + return; + } + switch (type.typeId()) { + case STRUCT: + Preconditions.checkArgument(Map.class.isInstance(defaultValue), + "defaultValue should be a Map from fields names to values, for StructType"); + Map defaultStruct = (Map) defaultValue; + if (defaultStruct.isEmpty()) { + return; + } + Preconditions.checkArgument(defaultStruct.size() == type.asStructType().fields().size()); + for (String fieldName : defaultStruct.keySet()) { + NestedField.validateDefaultValue(defaultStruct.get(fieldName), type.asStructType().field(fieldName).type); + } + break; + + case LIST: + Preconditions.checkArgument(defaultValue instanceof ArrayList, + "defaultValue should be an ArrayList of Objects, for ListType"); + List defaultArrayList = (ArrayList) defaultValue; + if (defaultArrayList.size() == 0) { + return; + } + defaultArrayList.forEach(dv -> NestedField.validateDefaultValue(dv, type.asListType().elementField.type)); + break; + + case MAP: + Preconditions.checkArgument(Map.class.isInstance(defaultValue), + "defaultValue should be an instance of Map for MapType"); + Map defaultMap = (Map) defaultValue; + if (defaultMap.isEmpty()) { + return; + } + for (Map.Entry e : defaultMap.entrySet()) { + NestedField.validateDefaultValue(e.getKey(), type.asMapType().keyField.type); + NestedField.validateDefaultValue(e.getValue(), type.asMapType().valueField.type); + } + break; + + case FIXED: + case BINARY: + Preconditions.checkArgument(byte[].class.isInstance(defaultValue), + "defaultValue should be an instance of byte[] for TypeId.%s, but defaultValue.class = %s", + type.typeId().name(), defaultValue.getClass().getCanonicalName()); + break; + + default: + Preconditions.checkArgument(type.typeId().javaClass().isInstance(defaultValue), + "defaultValue should be and instance of %s for TypeId.%s, but defaultValue.class = %s", + type.typeId().javaClass(), type.typeId().name(), defaultValue.getClass().getCanonicalName()); + } } private final boolean isOptional; private final int id; private final String name; private final Type type; + private final Object defaultValue; private final String doc; - private NestedField(boolean isOptional, int id, String name, Type type, String doc) { + private NestedField(boolean isOptional, int id, String name, Type type, Object defaultValue, String doc) { Preconditions.checkNotNull(name, "Name cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null"); + validateDefaultValue(defaultValue, type); this.isOptional = isOptional; this.id = id; this.name = name; this.type = type; + this.defaultValue = defaultValue; this.doc = doc; } @@ -462,7 +533,7 @@ public NestedField asOptional() { if (isOptional) { return this; } - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, defaultValue, doc); } public boolean isRequired() { @@ -473,7 +544,15 @@ public NestedField asRequired() { if (!isOptional) { return this; } - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, defaultValue, doc); + } + + public boolean hasDefaultValue() { + return defaultValue != null; + } + + public Object getDefaultValue() { + return defaultValue; } public int fieldId() { @@ -496,6 +575,7 @@ public String doc() { public String toString() { return String.format("%d: %s: %s %s", id, name, isOptional ? "optional" : "required", type) + + (hasDefaultValue() ? ", default value: " + defaultValue + ", " : "") + (doc != null ? " (" + doc + ")" : ""); } @@ -514,6 +594,8 @@ public boolean equals(Object o) { return false; } else if (!name.equals(that.name)) { return false; + } else if (!Objects.equals(defaultValue, that.defaultValue)) { + return false; } else if (!Objects.equals(doc, that.doc)) { return false; } @@ -522,7 +604,8 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(NestedField.class, id, isOptional, name, type); + return hasDefaultValue() ? Objects.hash(NestedField.class, id, isOptional, name, type, defaultValue) : + Objects.hash(NestedField.class, id, isOptional, name, type); } } @@ -740,7 +823,6 @@ public boolean equals(Object o) { } else if (!(o instanceof ListType)) { return false; } - ListType listType = (ListType) o; return elementField.equals(listType.elementField); } diff --git a/api/src/test/java/org/apache/iceberg/types/TestDefaultValuesForContainerTypes.java b/api/src/test/java/org/apache/iceberg/types/TestDefaultValuesForContainerTypes.java new file mode 100644 index 000000000..bc68fe1da --- /dev/null +++ b/api/src/test/java/org/apache/iceberg/types/TestDefaultValuesForContainerTypes.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.types; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField; +import static org.apache.iceberg.types.Types.StructType; + +public class TestDefaultValuesForContainerTypes { + + static NestedField intFieldType; + static NestedField stringFieldType; + static StructType structType; + + @BeforeClass + public static void beforeClass() { + intFieldType = NestedField.optional(0, "optionalIntField", Types.IntegerType.get()); + stringFieldType = NestedField.required(1, "requiredStringField", Types.StringType.get()); + structType = StructType.of(Arrays.asList(intFieldType, stringFieldType)); + } + + @Test + public void testStructTypeDefault() { + Map structDefaultvalue = new HashMap<>(); + structDefaultvalue.put(intFieldType.name(), Integer.valueOf(1)); + structDefaultvalue.put(stringFieldType.name(), "two"); + NestedField structField = NestedField.optional(2, "optionalStructField", structType, structDefaultvalue, "doc"); + Assert.assertTrue(structField.hasDefaultValue()); + Assert.assertEquals(structDefaultvalue, structField.getDefaultValue()); + } + + @Test (expected = IllegalArgumentException.class) + public void testStructTypeDefaultInvalidFieldsTypes() { + List structDefaultvalue = new ArrayList<>(); + structDefaultvalue.add("one"); + structDefaultvalue.add("two"); + NestedField.optional(2, "optionalStructField", structType, structDefaultvalue, "doc"); + } + + @Test (expected = IllegalArgumentException.class) + public void testStructTypeDefaultInvalidNumberFields() { + List structDefaultvalue = new ArrayList<>(); + structDefaultvalue.add(Integer.valueOf(1)); + structDefaultvalue.add("two"); + structDefaultvalue.add("three"); + NestedField.optional(2, "optionalStructField", structType, structDefaultvalue, "doc"); + } +} diff --git a/api/src/test/java/org/apache/iceberg/types/TestNestedFieldDefaultValues.java b/api/src/test/java/org/apache/iceberg/types/TestNestedFieldDefaultValues.java new file mode 100644 index 000000000..abe53f930 --- /dev/null +++ b/api/src/test/java/org/apache/iceberg/types/TestNestedFieldDefaultValues.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.types; + +import org.apache.iceberg.types.Types.NestedField; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + + +public class TestNestedFieldDefaultValues { + + private final int id = 1; + private final String fieldName = "fieldName"; + private final Type fieldType = Types.IntegerType.get(); + private final String doc = "field doc"; + private final Integer defaultValue = 100; + + @Test + public void testConstructorsValidCases() { + // optional constructors + Assert.assertFalse(optional(id, fieldName, fieldType).hasDefaultValue()); + Assert.assertFalse(optional(id, fieldName, fieldType, doc).hasDefaultValue()); + NestedField nestedFieldWithDefault = optional(id, fieldName, fieldType, defaultValue, doc); + Assert.assertTrue(nestedFieldWithDefault.hasDefaultValue()); + Assert.assertEquals(defaultValue, nestedFieldWithDefault.getDefaultValue()); + nestedFieldWithDefault = optional(id, fieldName, fieldType, defaultValue, null); + Assert.assertTrue(nestedFieldWithDefault.hasDefaultValue()); + Assert.assertEquals(defaultValue, nestedFieldWithDefault.getDefaultValue()); + + // required constructors + Assert.assertFalse(required(id, fieldName, fieldType).hasDefaultValue()); + Assert.assertFalse(required(id, fieldName, fieldType, doc).hasDefaultValue()); + Assert.assertFalse(required(id, fieldName, fieldType, null, doc).hasDefaultValue()); + nestedFieldWithDefault = required(id, fieldName, fieldType, defaultValue, doc); + Assert.assertTrue(nestedFieldWithDefault.hasDefaultValue()); + Assert.assertEquals(defaultValue, nestedFieldWithDefault.getDefaultValue()); + nestedFieldWithDefault = required(id, fieldName, fieldType, defaultValue, null); + Assert.assertTrue(nestedFieldWithDefault.hasDefaultValue()); + Assert.assertEquals(defaultValue, nestedFieldWithDefault.getDefaultValue()); + + // of constructors + Assert.assertFalse(NestedField.of(id, true, fieldName, fieldType).hasDefaultValue()); + Assert.assertFalse(NestedField.of(id, true, fieldName, fieldType, doc).hasDefaultValue()); + nestedFieldWithDefault = NestedField.of(id, true, fieldName, fieldType, defaultValue, doc); + Assert.assertTrue(nestedFieldWithDefault.hasDefaultValue()); + Assert.assertEquals(defaultValue, nestedFieldWithDefault.getDefaultValue()); + } + + @Test (expected = IllegalArgumentException.class) + public void testOptionalWithInvalidDefaultValueClass() { + // class of default value does not match class of type + Long wrongClassDefaultValue = 100L; + optional(id, fieldName, fieldType, wrongClassDefaultValue, doc); + } + + @Test (expected = IllegalArgumentException.class) + public void testReqiredWithInvalidDefaultValueClass() { + // class of default value does not match class of type + Long wrongClassDefaultValue = 100L; + required(id, fieldName, fieldType, wrongClassDefaultValue, doc); + } +} diff --git a/core/src/main/java/org/apache/iceberg/SchemaParser.java b/core/src/main/java/org/apache/iceberg/SchemaParser.java index 65fe5a93d..6e0d2a7e5 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaParser.java +++ b/core/src/main/java/org/apache/iceberg/SchemaParser.java @@ -19,14 +19,24 @@ package org.apache.iceberg; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import java.io.IOException; import java.io.StringWriter; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -34,6 +44,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.JsonUtil; + public class SchemaParser { private SchemaParser() { @@ -56,6 +67,26 @@ private SchemaParser() { private static final String REQUIRED = "required"; private static final String ELEMENT_REQUIRED = "element-required"; private static final String VALUE_REQUIRED = "value-required"; + private static final String DEFAULT = "default"; + + private static final List primitiveClasses = Arrays.asList(Boolean.class, Integer.class, Long.class, + Float.class, Double.class, CharSequence.class, String.class, java.util.UUID.class, BigDecimal.class); + + private static void writeDefaultValue(Object defaultValue, Type type, JsonGenerator generator) throws IOException { + if (defaultValue == null) { + return; + } + generator.writeFieldName(DEFAULT); + if (type.isListType()) { + generator.writeString(defaultValueToJsonString((List) defaultValue)); + } else if (type.isStructType() || type.isMapType()) { + generator.writeString(defaultValueToJsonString((Map) defaultValue)); + } else if (isFixedOrBinary(type)) { + generator.writeString(defaultValueToJsonString((byte[]) defaultValue)); + } else { + generator.writeString(defaultValueToJsonString(defaultValue)); + } + } static void toJson(Types.StructType struct, JsonGenerator generator) throws IOException { generator.writeStartObject(); @@ -69,13 +100,14 @@ static void toJson(Types.StructType struct, JsonGenerator generator) throws IOEx generator.writeBooleanField(REQUIRED, field.isRequired()); generator.writeFieldName(TYPE); toJson(field.type(), generator); + writeDefaultValue(field.getDefaultValue(), field.type(), generator); if (field.doc() != null) { generator.writeStringField(DOC, field.doc()); } + generator.writeEndObject(); } generator.writeEndArray(); - generator.writeEndObject(); } @@ -88,7 +120,6 @@ static void toJson(Types.ListType list, JsonGenerator generator) throws IOExcept generator.writeFieldName(ELEMENT); toJson(list.elementType(), generator); generator.writeBooleanField(ELEMENT_REQUIRED, !list.isElementOptional()); - generator.writeEndObject(); } @@ -176,6 +207,32 @@ private static Type typeFromJson(JsonNode json) { throw new IllegalArgumentException("Cannot parse type from json: " + json); } + private static boolean isFixedOrBinary(Type type) { + return type.typeId() == Type.TypeID.FIXED || type.typeId() == Type.TypeID.BINARY; + } + + private static Object defaultValueFromJson(JsonNode field, Type type) { + if (!field.has(DEFAULT)) { + return null; + } + + String defaultValueString = field.get(DEFAULT).asText(); + + if (isFixedOrBinary(type)) { + return defaultValueFromJsonBytesField(defaultValueString); + } + + if (type.isPrimitiveType()) { + return primitiveDefaultValueFromJsonString(defaultValueString, type); + } + + try { + return defaultValueFromJsonString(defaultValueString, type); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static Types.StructType structFromJson(JsonNode json) { JsonNode fieldArray = json.get(FIELDS); Preconditions.checkArgument(fieldArray.isArray(), @@ -191,13 +248,13 @@ private static Types.StructType structFromJson(JsonNode json) { int id = JsonUtil.getInt(ID, field); String name = JsonUtil.getString(NAME, field); Type type = typeFromJson(field.get(TYPE)); - + Object defaultValue = defaultValueFromJson(field, type); String doc = JsonUtil.getStringOrNull(DOC, field); boolean isRequired = JsonUtil.getBool(REQUIRED, field); if (isRequired) { - fields.add(Types.NestedField.required(id, name, type, doc)); + fields.add(Types.NestedField.required(id, name, type, defaultValue, doc)); } else { - fields.add(Types.NestedField.optional(id, name, type, doc)); + fields.add(Types.NestedField.optional(id, name, type, defaultValue, doc)); } } @@ -253,4 +310,134 @@ public static Schema fromJson(String json) { } }); } + + private static String defaultValueToJsonString(Map map) { + Map jsonStringElementsMap = new LinkedHashMap<>(); + map.entrySet().forEach( + entry -> jsonStringElementsMap.put(entry.getKey(), defaultValueToJsonString(entry.getValue()))); + return defaultValueToJsonString(jsonStringElementsMap); + } + + private static String defaultValueToJsonString(List list) { + List jsonStringItemsList = new ArrayList<>(); + list.forEach(item -> jsonStringItemsList.add(defaultValueToJsonString(item))); + return defaultValueToJsonString(jsonStringItemsList); + } + + private static String defaultValueToJsonString(byte[] bytes) { + try { + return JsonUtil.mapper().writeValueAsString(ByteBuffer.wrap(bytes)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static String defaultValueToJsonString(Object value) { + if (isPrimitiveClass(value)) { + return value.toString(); + } + + try { + return JsonUtil.mapper().writeValueAsString(new SerDeValue(value)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static boolean isPrimitiveClass(Object value) { + return primitiveClasses.contains(value.getClass()); + } + + private static Object defaultValueFromJsonBytesField(String value) { + try { + return JsonUtil.mapper().readValue(value, ByteBuffer.class).array(); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private static Object defaultValueFromJsonString(String jsonString, Type type) throws IOException { + Preconditions.checkArgument(!type.isPrimitiveType(), "jsonString %s is for primitive type %s", jsonString, type); + Object jsonStringCollection = JsonUtil.mapper().readValue(jsonString, SerDeValue.class).getValue(); + + if (type.isListType()) { + Preconditions.checkArgument(jsonStringCollection instanceof List, + "deserialized Json object: (%s) is not List for List type", jsonStringCollection); + List list = new ArrayList<>(); + Type elementType = type.asListType().elementType(); + for (String item : (List) jsonStringCollection) { + list.add(elementType.isPrimitiveType() ? primitiveDefaultValueFromJsonString(item, elementType) : + JsonUtil.mapper().readValue(item, SerDeValue.class).getValue()); + } + return list; + } + + Preconditions.checkArgument((type.isMapType() || type.isStructType()) && jsonStringCollection instanceof Map, + "deserialized Json object: (%s) is not Map for type: %s", jsonStringCollection, type); + + // map (MapType or StructType) case + Map map = new HashMap<>(); + Map jsonStringMap = (HashMap) jsonStringCollection; + for (Map.Entry entry : jsonStringMap.entrySet()) { + String key = entry.getKey().toString(); + String valueString = entry.getValue().toString(); + Type elementType = type.isMapType() ? type.asMapType().valueType() : type.asStructType().field(key).type(); + Object value = elementType.isPrimitiveType() ? primitiveDefaultValueFromJsonString(valueString, elementType) + : JsonUtil.mapper().readValue(valueString, SerDeValue.class).getValue(); + map.put(key, value); + } + return map; + } + + private static Object primitiveDefaultValueFromJsonString(String jsonString, Type type) { + switch (type.typeId()) { + case BOOLEAN: + return Boolean.valueOf(jsonString); + case INTEGER: + case DATE: + return Integer.valueOf(jsonString); + case DECIMAL: + return BigDecimal.valueOf(Long.valueOf(jsonString)); + case LONG: + case TIME: + case TIMESTAMP: + return Long.valueOf(jsonString); + case FLOAT: + return Float.valueOf(jsonString); + case DOUBLE: + return Double.valueOf(jsonString); + case STRING: + return jsonString; + case UUID: + return java.util.UUID.fromString(jsonString); + case FIXED: + case BINARY: + return defaultValueFromJsonBytesField(jsonString); + default: + throw new RuntimeException("non-primitive type: " + type); + } + } + + /** + * SerDeValue class: + * This is used so that the value to serialize is specified + * as a property, so that the type information gets included in + * the serialized String. + */ + private static class SerDeValue { + // Name of the field used in the intermediate JSON representation + private static final String VALUE_FIELD = "__value__"; + + @JsonProperty(VALUE_FIELD) + private final Object value; + + @JsonCreator + private SerDeValue(@JsonProperty(VALUE_FIELD) Object value) { + this.value = value; + } + + private Object getValue() { + return value; + } + } } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index b3ef5eb15..7cf6d15a9 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -157,10 +157,15 @@ public static boolean isOptionalComplexUnion(Schema schema) { } public static Schema toOption(Schema schema) { + return toOption(schema, false); + } + + public static Schema toOption(Schema schema, boolean nullIsSecondElement) { if (schema.getType() == UNION) { - Preconditions.checkArgument(isOptionSchema(schema), - "Union schemas are not supported: %s", schema); + Preconditions.checkArgument(isOptionSchema(schema), "Union schemas are not supported: %s", schema); return schema; + } else if (nullIsSecondElement) { + return Schema.createUnion(schema, NULL); } else { return Schema.createUnion(NULL, schema); } @@ -429,4 +434,12 @@ private static String sanitize(char character) { } return "_x" + Integer.toHexString(character).toUpperCase(); } + + static boolean hasNonNullDefaultValue(Schema.Field field) { + // the schema should use JsonProperties.NULL_VALUE (i.e., null) as the null default + // value, but a user might also use "null" to indicate null while it is actually a String, so + // need to account for it. + return field.hasDefaultValue() && field.defaultVal() != JsonProperties.NULL_VALUE && + !(field.defaultVal() instanceof String && ((String) field.defaultVal()).equalsIgnoreCase("null")); + } } diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index ecdfc34c9..4ad709d9c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -82,28 +82,55 @@ public Schema record(Schema record, List names, Iterable s List updatedFields = Lists.newArrayListWithExpectedSize(struct.fields().size()); List expectedFields = struct.fields(); for (int i = 0; i < expectedFields.size(); i += 1) { - Types.NestedField field = expectedFields.get(i); - + Types.NestedField expectedField = expectedFields.get(i); // detect reordering - if (i < fields.size() && !field.name().equals(fields.get(i).name())) { + if (i < fields.size() && !expectedField.name().equals(fields.get(i).name())) { hasChange = true; } - Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(field.name())); + Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(expectedField.name())); if (avroField != null) { - updatedFields.add(avroField); - + // if the expectedField has a defaultValue, but the avroField does not, we need to + // create a newField to copy over the non-null default value + if (expectedField.hasDefaultValue() && !AvroSchemaUtil.hasNonNullDefaultValue(avroField)) { + Schema newFiledSchema = (expectedField.isOptional()) ? + AvroSchemaUtil.toOption(AvroSchemaUtil.convert(expectedField.type()), true) : + AvroSchemaUtil.convert(expectedField.type()); + Schema.Field newField = + new Schema.Field(avroField.name(), newFiledSchema, avroField.doc(), expectedField.getDefaultValue()); + newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, expectedField.fieldId()); + updatedFields.add(newField); + hasChange = true; + } else { + // otherwise (i.e., expectedFiled has no default value, or it is null) we can use avroField as is + updatedFields.add(avroField); + } } else { - Preconditions.checkArgument( - field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()), - "Missing required field: %s", field.name()); - // Create a field that will be defaulted to null. We assign a unique suffix to the field - // to make sure that even if records in the file have the field it is not projected. - Schema.Field newField = new Schema.Field( - field.name() + "_r" + field.fieldId(), - AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())), null, JsonProperties.NULL_VALUE); - newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId()); + // here the expectedField is missing from the file schema, so we verify it is either + // an optional field, a metadata column or one that has default value + Preconditions.checkArgument(expectedField.isOptional() || + MetadataColumns.metadataFieldIds().contains(expectedField.fieldId()) || + expectedField.hasDefaultValue(), + "Missing required field that has no default value: expectedField: %s, avroField: null, record: %s", + expectedField, record); + + // Create a field that will be defaulted to the expectedField's default value. If no default value, + // then default to null and assign a unique suffix to the field to make sure that even if records in the + // file have the field it is not projected. + String newFieldName = expectedField.name(); + Schema newFiledSchema; + Object defaultValue; + if (expectedField.hasDefaultValue()) { + newFiledSchema = AvroSchemaUtil.convert(expectedField.type()); + defaultValue = expectedField.getDefaultValue(); + } else { + newFieldName = newFieldName + "_r" + expectedField.fieldId(); + newFiledSchema = AvroSchemaUtil.toOption(AvroSchemaUtil.convert(expectedField.type())); + defaultValue = JsonProperties.NULL_VALUE; + } + Schema.Field newField = new Schema.Field(newFieldName, newFiledSchema, null, defaultValue); + newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, expectedField.fieldId()); updatedFields.add(newField); hasChange = true; } @@ -153,10 +180,10 @@ public Schema union(Schema union, Iterable options) { Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options)); if (nonNullOriginal != nonNullResult) { - return AvroSchemaUtil.toOption(nonNullResult); + boolean nullIsSecondOption = union.getTypes().get(1).getType() == Schema.Type.NULL; + return AvroSchemaUtil.toOption(nonNullResult, nullIsSecondOption); } } - return union; } diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index 828883ef3..097160eba 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -261,16 +261,22 @@ private static Schema copyRecord(Schema record, List newFields) { private static Schema.Field copyField(Schema.Field field, Schema newSchema, Integer fieldId) { Schema newSchemaReordered; - // if the newSchema is an optional schema, make sure the NULL option is always the first - if (isOptionSchemaWithNonNullFirstOption(newSchema)) { + // if the newSchema is an optional schema with no, or null, default value, then make sure the + // NULL option is the first + boolean hasNonNullDefaultValue = AvroSchemaUtil.hasNonNullDefaultValue(field); + if (isOptionSchemaWithNonNullFirstOption(newSchema) && !hasNonNullDefaultValue) { newSchemaReordered = AvroSchemaUtil.toOption(AvroSchemaUtil.fromOption(newSchema)); + } else if (AvroSchemaUtil.isOptionSchema(newSchema) && hasNonNullDefaultValue) { + // o.w. if the newSchema is an optional that has a non-null default value, then make sure the + // NULL option is the second + newSchemaReordered = AvroSchemaUtil.toOption(AvroSchemaUtil.fromOption(newSchema), true); } else { newSchemaReordered = newSchema; } - // do not copy over default values as the file is expected to have values for fields already in the file schema - Schema.Field copy = new Schema.Field(field.name(), - newSchemaReordered, field.doc(), - AvroSchemaUtil.isOptionSchema(newSchemaReordered) ? JsonProperties.NULL_VALUE : null, field.order()); + // copy over non-null default values + Object defaultValue = hasNonNullDefaultValue ? field.defaultVal() : + (AvroSchemaUtil.isOptionSchema(newSchemaReordered) ? JsonProperties.NULL_VALUE : null); + Schema.Field copy = new Schema.Field(field.name(), newSchemaReordered, field.doc(), defaultValue, field.order()); for (Map.Entry prop : field.getObjectProps().entrySet()) { copy.addProp(prop.getKey(), prop.getValue()); diff --git a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java index 933042c44..cf8cd4ecd 100644 --- a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java +++ b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java @@ -92,10 +92,12 @@ public Type record(Schema record, List names, List fieldTypes) { Type fieldType = fieldTypes.get(i); int fieldId = getId(field); + Object defaultValue = AvroSchemaUtil.hasNonNullDefaultValue(field) ? field.defaultVal() : null; + if (AvroSchemaUtil.isOptionSchema(field.schema()) || AvroSchemaUtil.isOptionalComplexUnion(field.schema())) { - newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, field.doc())); + newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, defaultValue, field.doc())); } else { - newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, field.doc())); + newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, defaultValue, field.doc())); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java index 3f1885b37..a3cc60eba 100644 --- a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java +++ b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java @@ -101,9 +101,9 @@ public Schema struct(Types.StructType struct, List fieldSchemas) { String origFieldName = structField.name(); boolean isValidFieldName = AvroSchemaUtil.validAvroName(origFieldName); String fieldName = isValidFieldName ? origFieldName : AvroSchemaUtil.sanitize(origFieldName); - Schema.Field field = new Schema.Field( - fieldName, fieldSchemas.get(i), structField.doc(), - structField.isOptional() ? JsonProperties.NULL_VALUE : null); + Object defaultValue = structField.hasDefaultValue() ? structField.getDefaultValue() : + (structField.isOptional() ? JsonProperties.NULL_VALUE : null); + Schema.Field field = new Schema.Field(fieldName, fieldSchemas.get(i), structField.doc(), defaultValue); if (!isValidFieldName) { field.addProp(AvroSchemaUtil.ICEBERG_FIELD_NAME_PROP, origFieldName); } @@ -121,7 +121,7 @@ public Schema struct(Types.StructType struct, List fieldSchemas) { @Override public Schema field(Types.NestedField field, Schema fieldSchema) { if (field.isOptional()) { - return AvroSchemaUtil.toOption(fieldSchema); + return AvroSchemaUtil.toOption(fieldSchema, field.hasDefaultValue()); } else { return fieldSchema; } diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaParserForDefaultValues.java b/core/src/test/java/org/apache/iceberg/TestSchemaParserForDefaultValues.java new file mode 100644 index 000000000..853b9ad2b --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSchemaParserForDefaultValues.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types.NestedField; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.avro.Schema.Type.BOOLEAN; +import static org.apache.avro.Schema.Type.BYTES; +import static org.apache.avro.Schema.Type.DOUBLE; +import static org.apache.avro.Schema.Type.FLOAT; +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.LONG; +import static org.apache.avro.Schema.Type.NULL; +import static org.apache.avro.Schema.Type.STRING; + + +public class TestSchemaParserForDefaultValues { + + private void assertEqualStructs(org.apache.iceberg.Schema expected, org.apache.iceberg.Schema actual) { + if (expected == null) { + Assert.assertNull(actual); + return; + } + Assert.assertNotNull(actual); + List expectedFields = expected.asStruct().fields(); + List actualFields = actual.asStruct().fields(); + + Assert.assertEquals(expectedFields.size(), actualFields.size()); + + for (int i = 0; i < expectedFields.size(); i++) { + NestedField expectedField = expectedFields.get(i); + NestedField actualField = actualFields.get(i); + Assert.assertEquals(expectedField.fieldId(), actualField.fieldId()); + Assert.assertEquals(expectedField.name(), actualField.name()); + Assert.assertEquals(expectedField.type(), actualField.type()); + Assert.assertEquals(expectedField.doc(), actualField.doc()); + if (expectedField.hasDefaultValue()) { + Assert.assertTrue(actualField.hasDefaultValue()); + switch (expectedField.type().typeId()) { + case BINARY: + case FIXED: + Assert.assertTrue( + Arrays.equals((byte[]) expectedField.getDefaultValue(), (byte[]) actualField.getDefaultValue())); + break; + default: + Assert.assertEquals(expectedField.getDefaultValue(), actualField.getDefaultValue()); + } + } else { + Assert.assertFalse(actualField.hasDefaultValue()); + } + } + } + + private void testToFromJsonPreservingDefaultValues(String[] fieldNames, Schema[] fieldsSchemas, Object[] defaults) { + List fields = new ArrayList<>(); + IntStream.range(0, defaults.length).forEach( + i -> fields.add(new Schema.Field(fieldNames[i], fieldsSchemas[i], null, defaults[i]))); + + Schema schema = Schema.createRecord("root", null, null, false, fields); + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema); + String jsonString = SchemaParser.toJson(icebergSchema); + + Assert.assertTrue(jsonString.contains("default")); + + org.apache.iceberg.Schema icebergSchemaFromJson = SchemaParser.fromJson(jsonString); + + assertEqualStructs(icebergSchema, icebergSchemaFromJson); + } + + @Test + public void testPrimitiveTypes() { + Boolean defaultBoolean = true; + Integer defaultInt = 1; + Long defaultLong = -1L; + Double defaultDouble = 0.1; + Float defaultFloat = 0.1f; + String defaultString = "default string"; + String defaultBytes = "1111"; + int fixedSize = defaultBytes.getBytes().length; + + String[] fieldNames = { + "booleanField", + "intField", + "longField", + "doubleField", + "floatField", + "stringField", + "binaryField", + "fixedField"}; + + Object[] defaults = { + defaultBoolean, + defaultInt, + defaultLong, + defaultDouble, + defaultFloat, + defaultString, + defaultBytes, + defaultBytes}; + + Schema[] primitives = { + Schema.create(BOOLEAN), + Schema.create(INT), + Schema.create(LONG), + Schema.create(DOUBLE), + Schema.create(FLOAT), + Schema.create(STRING), + Schema.create(BYTES), + Schema.createFixed("md5", null, "namespace", fixedSize)}; + + testToFromJsonPreservingDefaultValues(fieldNames, primitives, defaults); + } + + @Test + public void testLogicalTypes() { + Long longDefault = Long.valueOf(1234556789); + String[] fieldNames = { + "dateField", + "timeField", + "timestampField", + "uuidField", + "decimalField"}; + + Object[] defaults = { + Integer.valueOf(123446), + longDefault, + "randomUUID", + longDefault}; + + Schema dateSchema = Schema.create(INT); + dateSchema.addProp("logicaltype", "date"); + Schema timestampSchema = Schema.create(LONG); + timestampSchema.addProp("logicaltype", "timestamp"); + Schema uuidSchema = Schema.create(STRING); + uuidSchema.addProp("logicaltype", "UUID"); + Schema bigDecimalSchema = Schema.create(LONG); + bigDecimalSchema.addProp("logicaltype", "decimal"); + + Schema[] logicals = { + dateSchema, + timestampSchema, + uuidSchema, + bigDecimalSchema}; + + testToFromJsonPreservingDefaultValues(fieldNames, logicals, defaults); + } + + @Test + public void testNestedTypes() { + String structStringFieldName = "stringFieldOfStruct"; + String structBooleanFieldName = "booleanFieldOfStruct"; + Map defaultStruct = ImmutableMap.of(structStringFieldName, "default string", + structBooleanFieldName, Boolean.TRUE); + List defaultList = Arrays.asList(1, 2); + Map defaultMap = ImmutableMap.of("key1", Long.valueOf(1L), "key2", Long.valueOf(2L)); + List structFields = ImmutableList.of( + new Schema.Field(structStringFieldName, Schema.create(STRING), null), + new Schema.Field(structBooleanFieldName, Schema.create(BOOLEAN), null)); + + String[] fieldNames = {"structField", "listField", "mapField"}; + Object[] defaults = {defaultStruct, defaultList, defaultMap}; + Schema[] nested = { + Schema.createRecord("name", null, "namespace", false, structFields), + Schema.createArray(Schema.create(INT)), + Schema.createMap(Schema.create(LONG))}; + + testToFromJsonPreservingDefaultValues(fieldNames, nested, defaults); + } + + @Test + public void testOptionalWithDefault() { + Integer defaultInt = 1; + Map defaultMap = ImmutableMap.of("key1", Long.valueOf(1L), "key2", Long.valueOf(2L)); + + String[] fieldNames = {"optionalPrimitive", "optionalNested"}; + Schema[] optionals = { + Schema.createUnion(Schema.create(INT), Schema.create(NULL)), + Schema.createUnion(Schema.createMap(Schema.create(LONG)), Schema.create(NULL))}; + Object[] defaults = {defaultInt, defaultMap}; + + testToFromJsonPreservingDefaultValues(fieldNames, optionals, defaults); + } + + @Test + public void testNestedOfNestedWithDefault() { + Integer defaultInt = 1; + Map defaultMap = ImmutableMap.of("key1", Long.valueOf(1L), "key2", Long.valueOf(2L)); + + String structIntField = "intFieldOfStruct"; + String structMapFieldName = "mapFieldOfStruct"; + List structFields = ImmutableList.of( + new Schema.Field(structIntField, Schema.create(INT), null, defaultInt), + new Schema.Field(structMapFieldName, Schema.createMap(Schema.create(LONG)), null, defaultMap)); + + String[] fieldNames = {"intFieldNoDefault", "structFieldNoDefault"}; + Schema[] topLevelFields = { + Schema.create(INT), + Schema.createRecord("name", null, "namespace", false, structFields)}; + + List fields = new ArrayList<>(); + IntStream.range(0, fieldNames.length).forEach( + i -> fields.add(new Schema.Field(fieldNames[i], topLevelFields[i], null))); + + Schema schema = org.apache.avro.Schema.createRecord("root", null, null, false, fields); + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema); + String jsonString = SchemaParser.toJson(icebergSchema); + + Assert.assertTrue(jsonString.contains("default")); + + org.apache.iceberg.Schema fromJsonIcebergSchema = SchemaParser.fromJson(jsonString); + Assert.assertEquals(icebergSchema.toString(), fromJsonIcebergSchema.toString()); + } + + @Test + public void testDeepNestedWithDefault() { + Integer defaultInt = 1; + Map defaultMap = ImmutableMap.of("key1", Long.valueOf(1L), "key2", Long.valueOf(2L)); + + String structIntField = "intFieldOfStruct"; + String structMapFieldName = "mapFieldOfStruct"; + List structFields = ImmutableList.of( + new Schema.Field(structIntField, Schema.create(INT), null, defaultInt), + new Schema.Field(structMapFieldName, Schema.createMap(Schema.create(LONG)), null, defaultMap)); + + Schema downLevelStruct = Schema.createRecord("name", null, "namespace0", false, structFields); + + List intermediateStructFields = ImmutableList.of( + new Schema.Field("intermediateIntField", Schema.create(INT), null), + new Schema.Field("intermediateStructField", downLevelStruct, null)); + + Schema intermediateStruct = Schema.createRecord("name", null, "namespace1", false, intermediateStructFields); + String[] fieldNames = {"topLevelLong", "topLevelString", "topLevelStruct"}; + Schema[] topLevelFields = { + Schema.create(LONG), + Schema.create(STRING), + intermediateStruct}; + + List fields = new ArrayList<>(); + IntStream.range(0, fieldNames.length).forEach( + i -> fields.add(new Schema.Field(fieldNames[i], topLevelFields[i], null))); + + Schema schema = org.apache.avro.Schema.createRecord("root", null, null, false, fields); + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema); + String jsonString = SchemaParser.toJson(icebergSchema); + + Assert.assertTrue(jsonString.contains("default")); + + org.apache.iceberg.Schema fromJsonIcebergSchema = SchemaParser.fromJson(jsonString); + Assert.assertEquals(icebergSchema.toString(), fromJsonIcebergSchema.toString()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index f3e75e1f2..8d8fbb97f 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -173,7 +173,7 @@ public void testMissingRequiredFields() { Schema readSchema = writeSchema; AssertHelpers.assertThrows("Missing required field in nameMapping", - IllegalArgumentException.class, "Missing required field: x", + IllegalArgumentException.class, // In this case, pruneColumns result is an empty record () -> writeAndRead(writeSchema, readSchema, record, nameMapping)); } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java index 13efc18a7..575563a08 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroOptionsWithNonNullDefaults.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; @@ -40,6 +41,9 @@ public class TestAvroOptionsWithNonNullDefaults { + private static final String fieldWithDefaultName = "fieldWithDefault"; + private static final String noDefaultFiledName = "noDefaultField"; + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -125,4 +129,116 @@ public void writeAndValidateOptionWithNonNullDefaultsEvolution() throws IOExcept AvroTestHelpers.assertEquals(readIcebergSchema.asStruct(), expected.get(i), rows.get(i)); } } + + @Test + public void testDefaultValueUsedPrimitiveType() throws IOException { + Schema writeSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null))); + // evolved schema + Schema readSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null), + new Schema.Field(fieldWithDefaultName, Schema.create(INT), null, -1))); + + GenericData.Record record1 = new GenericData.Record(writeSchema); + record1.put(noDefaultFiledName, 1); + GenericData.Record record2 = new GenericData.Record(writeSchema); + record2.put(noDefaultFiledName, 2); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(writeSchema, testFile); + writer.append(record1); + writer.append(record2); + } + + + List expected = ImmutableList.of(record1, record2); + org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(readSchema); + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)).project(readIcebergSchema).build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + Assert.assertEquals(expected.get(i).get(noDefaultFiledName), rows.get(i).get(noDefaultFiledName)); + // default should be used for records missing the field + Assert.assertEquals(-1, rows.get(i).get(fieldWithDefaultName)); + } + } + + @Test + public void testDefaultValueNotUsedWhenFiledHasValue() throws IOException { + Schema readSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null), + new Schema.Field(fieldWithDefaultName, Schema.create(INT), null, -1))); + + GenericData.Record record1 = new GenericData.Record(readSchema); + record1.put(noDefaultFiledName, 3); + record1.put(fieldWithDefaultName, 3); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(readSchema, testFile); + writer.append(record1); + } + + List expected = ImmutableList.of(record1); + org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(readSchema); + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)).project(readIcebergSchema).build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + Assert.assertEquals(expected.get(i).get(noDefaultFiledName), rows.get(i).get(noDefaultFiledName)); + // default value should NOT be used if field is populated + Assert.assertEquals(expected.get(i).get(fieldWithDefaultName), rows.get(i).get(fieldWithDefaultName)); + } + } + + @Test + public void testDefaultValueUsedComplexType() throws IOException { + Schema writeSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null))); + // evolved schema + List defaultArray = Arrays.asList(-1, -2); + Schema readSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null), + new Schema.Field(fieldWithDefaultName, Schema.createArray(Schema.create(INT)), null, defaultArray))); + + GenericData.Record record1 = new GenericData.Record(writeSchema); + record1.put(noDefaultFiledName, 1); + GenericData.Record record2 = new GenericData.Record(writeSchema); + record2.put(noDefaultFiledName, 2); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(writeSchema, testFile); + writer.append(record1); + writer.append(record2); + } + + + List expected = ImmutableList.of(record1, record2); + org.apache.iceberg.Schema readIcebergSchema = AvroSchemaUtil.toIceberg(readSchema); + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)).project(readIcebergSchema).build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + Assert.assertEquals(expected.get(i).get(noDefaultFiledName), rows.get(i).get(noDefaultFiledName)); + // default should be used for records missing the field + Assert.assertEquals(defaultArray, rows.get(i).get(fieldWithDefaultName)); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestDefaultValuePreserving.java b/core/src/test/java/org/apache/iceberg/avro/TestDefaultValuePreserving.java new file mode 100644 index 000000000..a4c132c49 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestDefaultValuePreserving.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.avro.Schema.Type.INT; + + +/** + * Testing the preserving of fields; default values in {@link SchemaToType} and {@link TypeToSchema} + */ +public class TestDefaultValuePreserving { + + String noDefaultFiledName = "fieldWithNoDefaultValue"; + String fieldWithDefaultName = "fieldWithDefaultValue"; + Integer defaultValue = -1; + + @Test + public void testSchemaToTypeRecord() { + Schema recordSchema = Schema.createRecord("root", null, null, false, ImmutableList.of( + new Schema.Field(noDefaultFiledName, Schema.create(INT), null, null), + new Schema.Field(fieldWithDefaultName, Schema.create(INT), null, defaultValue))); + SchemaToType schemaToType = new SchemaToType(recordSchema); + List names = recordSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + List types = ImmutableList.of(Types.IntegerType.get(), Types.IntegerType.get()); + + Type record = schemaToType.record(recordSchema, names, types); + + Assert.assertNotNull(record); + Assert.assertTrue(record.isStructType()); + Assert.assertEquals(names.size(), record.asStructType().fields().size()); + Assert.assertFalse(record.asStructType().field(noDefaultFiledName).hasDefaultValue()); + Assert.assertTrue(record.asStructType().field(fieldWithDefaultName).hasDefaultValue()); + Assert.assertEquals(defaultValue, record.asStructType().field(fieldWithDefaultName).getDefaultValue()); + } + + @Test + public void testTypeToSchemaStruct() { + List nestedFields = ImmutableList.of( + Types.NestedField.required(0, noDefaultFiledName, Types.IntegerType.get()), + Types.NestedField.required(1, fieldWithDefaultName, Types.IntegerType.get(), defaultValue, null)); + Types.StructType structType = Types.StructType.of(nestedFields); + Map names = ImmutableMap.of(structType, "tableName"); + TypeToSchema typeToSchema = new TypeToSchema(names); + List fieldSchemas = ImmutableList.of(Schema.create(INT), Schema.create(INT)); + + Schema structSchema = typeToSchema.struct(structType, fieldSchemas); + + Assert.assertNotNull(structSchema); + Assert.assertEquals(nestedFields.size(), structSchema.getFields().size()); + for (int i = 0; i < nestedFields.size(); i++) { + if (nestedFields.get(i).hasDefaultValue()) { + Assert.assertTrue(structSchema.getFields().get(i).hasDefaultValue()); + Assert.assertEquals(nestedFields.get(i).getDefaultValue(), structSchema.getFields().get(i).defaultVal()); + } else { + Assert.assertFalse(structSchema.getFields().get(i).hasDefaultValue()); + } + } + } +} diff --git a/site/docs/spec.md b/site/docs/spec.md index 176094906..0a46d480c 100644 --- a/site/docs/spec.md +++ b/site/docs/spec.md @@ -116,6 +116,8 @@ A table's **schema** is a list of named columns. All data types are either primi For the representations of these types in Avro, ORC, and Parquet file formats, see Appendix A. +Default values for fields are supported, see Neted Types below. + #### Nested Types A **`struct`** is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type. Fields may have an optional comment or doc string. @@ -124,6 +126,13 @@ A **`list`** is a collection of values with some element type. The element field A **`map`** is a collection of key-value pairs with a key type and a value type. Both the key field and value field each have an integer id that is unique in the table schema. Map keys are required and map values can be either optional or required. Both map keys and map values may be any type, including nested types. +Iceberg supports default-value semantics for fields of nested types (i.e., struct, list and map). Specifically, a field +of a nested type field can have a default value that will be returned upon reading this field, if it is not manifested. +The default value can be defined with both required and optional fields. Null default values are allowed with optional +fields only, and it's behavior is identical to optional fields with no default value, that is a Null is returned upon +reading this field when it is not manifested. + + #### Primitive Types | Primitive type | Description | Requirements | @@ -925,7 +934,6 @@ This serialization scheme is for storing single values as individual binary valu | **`list`** | Not supported | | **`map`** | Not supported | - ## Format version changes ### Version 2 diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java new file mode 100644 index 000000000..388397f0e --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.NULL; +import static org.apache.iceberg.spark.SparkSchemaUtil.convert; + +public class TestSparkAvroReaderForFieldsWithDefaultValue { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testAvroDefaultValues() throws IOException { + String indexFiledName = "index"; + String nullableFiledName = "optionalFieldWithDefault"; + String requiredFiledName = "requiredFieldWithDefault"; + int defaultValue = -1; + + // write records with initial writeSchema + org.apache.avro.Schema writeSchema = org.apache.avro.Schema.createRecord("root", null, null, false, + ImmutableList.of(new org.apache.avro.Schema.Field(indexFiledName, org.apache.avro.Schema.create(INT), + null, null), new org.apache.avro.Schema.Field(nullableFiledName, + org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(INT), + org.apache.avro.Schema.create(NULL)), null, defaultValue))); + + Schema icebergWriteSchema = AvroSchemaUtil.toIceberg(writeSchema); + List expected = RandomData.generateList(icebergWriteSchema, 2, 0L); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = Avro.write(Files.localOutput(testFile)) + .schema(icebergWriteSchema) + .named("test") + .build()) { + for (GenericData.Record rec : expected) { + writer.add(rec); + } + } + + // evolve schema by adding a required field with default value + org.apache.avro.Schema evolvedSchema = org.apache.avro.Schema.createRecord("root", null, null, false, + ImmutableList.of(new org.apache.avro.Schema.Field(indexFiledName, org.apache.avro.Schema.create(INT), + null, null), + new org.apache.avro.Schema.Field(nullableFiledName, + org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(INT), + org.apache.avro.Schema.create(NULL)), null, defaultValue), + new org.apache.avro.Schema.Field(requiredFiledName, org.apache.avro.Schema.create(INT), null, defaultValue) + )); + + // read written rows with evolved schema + List rows; + Schema icebergReadSchema = AvroSchemaUtil.toIceberg(evolvedSchema); + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .createReaderFunc(SparkAvroReader::new) + .project(icebergReadSchema) + .build()) { + rows = Lists.newArrayList(reader); + } + + // validate all rows, and all fields are read properly + Assert.assertNotNull(rows); + Assert.assertEquals(expected.size(), rows.size()); + for (int row = 0; row < expected.size(); row++) { + GenericData.Record expectedRow = expected.get(row); + InternalRow actualRow = rows.get(row); + List fields = icebergReadSchema.asStruct().fields(); + + for (int i = 0; i < fields.size(); i += 1) { + Object expectedValue = null; + if (i >= writeSchema.getFields().size() && fields.get(i).hasDefaultValue()) { + expectedValue = fields.get(i).getDefaultValue(); + } else if (i < writeSchema.getFields().size()) { + expectedValue = expectedRow.get(i); + } + Type fieldType = fields.get(i).type(); + Object actualValue = actualRow.isNullAt(i) ? null : actualRow.get(i, convert(fieldType)); + Assert.assertEquals(expectedValue, actualValue); + } + } + } +} +