From 1fcd6a98947ec1dfaef799c6bc5660adf066e322 Mon Sep 17 00:00:00 2001 From: Russell Spitzer Date: Sat, 1 Feb 2025 12:00:24 +0100 Subject: [PATCH] API, Core: Metadata Row Lineage (#11948) --- .../java/org/apache/iceberg/Snapshot.java | 25 ++ .../java/org/apache/iceberg/BaseSnapshot.java | 20 +- .../org/apache/iceberg/MetadataUpdate.java | 7 + .../apache/iceberg/MetadataUpdateParser.java | 6 + .../apache/iceberg/RewriteTablePathUtil.java | 8 +- .../org/apache/iceberg/SnapshotParser.java | 17 +- .../org/apache/iceberg/SnapshotProducer.java | 30 +- .../org/apache/iceberg/TableMetadata.java | 85 ++++- .../apache/iceberg/TableMetadataParser.java | 20 +- .../org/apache/iceberg/TableProperties.java | 2 + .../org/apache/iceberg/util/JsonUtil.java | 7 + .../apache/iceberg/TestDataTaskParser.java | 22 +- .../iceberg/TestMetadataUpdateParser.java | 26 +- .../iceberg/TestRowLineageMetadata.java | 318 ++++++++++++++++++ .../org/apache/iceberg/TestSnapshotJson.java | 65 +++- .../org/apache/iceberg/TestTableMetadata.java | 169 ++++++++-- .../org/apache/iceberg/util/TestJsonUtil.java | 20 ++ 17 files changed, 801 insertions(+), 46 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index c035259e0e2c..52280a41620f 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -171,4 +171,29 @@ default Iterable removedDeleteFiles(FileIO io) { default Integer schemaId() { return null; } + + /** + * The row-id of the first newly added row in this snapshot. All rows added in this snapshot will + * have a row-id assigned to them greater than this value. All rows with a row-id less than this + * value were created in a snapshot that was added to the table (but not necessarily commited to + * this branch) in the past. + * + * @return the first row-id to be used in this snapshot or null if row lineage was not enabled + * when the table was created. + */ + default Long firstRowId() { + return null; + } + + /** + * The total number of newly added rows in this snapshot. It should be the summation of {@link + * ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot. + * + *

This field is optional but is required when row lineage is enabled. + * + * @return the total number of new rows in this snapshot or null if the value was not stored. + */ + default Long addedRows() { + return null; + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index 58dec570d1fb..c3c1159ef8df 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -43,6 +43,8 @@ class BaseSnapshot implements Snapshot { private final Map summary; private final Integer schemaId; private final String[] v1ManifestLocations; + private final Long firstRowId; + private final Long addedRows; // lazily initialized private transient List allManifests = null; @@ -61,7 +63,9 @@ class BaseSnapshot implements Snapshot { String operation, Map summary, Integer schemaId, - String manifestList) { + String manifestList, + Long firstRowId, + Long addedRows) { this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; this.parentId = parentId; @@ -71,6 +75,8 @@ class BaseSnapshot implements Snapshot { this.schemaId = schemaId; this.manifestListLocation = manifestList; this.v1ManifestLocations = null; + this.firstRowId = firstRowId; + this.addedRows = addedRows; } BaseSnapshot( @@ -91,6 +97,8 @@ class BaseSnapshot implements Snapshot { this.schemaId = schemaId; this.manifestListLocation = null; this.v1ManifestLocations = v1ManifestLocations; + this.firstRowId = null; + this.addedRows = null; } @Override @@ -128,6 +136,16 @@ public Integer schemaId() { return schemaId; } + @Override + public Long firstRowId() { + return firstRowId; + } + + @Override + public Long addedRows() { + return addedRows; + } + private void cacheManifests(FileIO fileIO) { if (fileIO == null) { throw new IllegalArgumentException("Cannot cache changes: FileIO is null"); diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index b76fcf9f017d..2f6c0c326b1f 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -526,4 +526,11 @@ public void applyTo(ViewMetadata.Builder viewMetadataBuilder) { viewMetadataBuilder.setCurrentVersionId(versionId); } } + + class EnableRowLineage implements MetadataUpdate { + @Override + public void applyTo(TableMetadata.Builder metadataBuilder) { + metadataBuilder.enableRowLineage(); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index 6a6a34c92d18..b985dbc0897e 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -60,6 +60,7 @@ private MetadataUpdateParser() {} static final String SET_PARTITION_STATISTICS = "set-partition-statistics"; static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics"; static final String REMOVE_PARTITION_SPECS = "remove-partition-specs"; + static final String ENABLE_ROW_LINEAGE = "enable-row-lineage"; // AssignUUID private static final String UUID = "uuid"; @@ -154,6 +155,7 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION) .put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION) .put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS) + .put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE) .buildOrThrow(); public static String toJson(MetadataUpdate metadataUpdate) { @@ -249,6 +251,8 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator case REMOVE_PARTITION_SPECS: writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator); break; + case ENABLE_ROW_LINEAGE: + break; default: throw new IllegalArgumentException( String.format( @@ -322,6 +326,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) { return readCurrentViewVersionId(jsonNode); case REMOVE_PARTITION_SPECS: return readRemovePartitionSpecs(jsonNode); + case ENABLE_ROW_LINEAGE: + return new MetadataUpdate.EnableRowLineage(); default: throw new UnsupportedOperationException( String.format("Cannot convert metadata update action to json: %s", action)); diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index 44ccf2fe4098..72b084b5ef2a 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -131,7 +131,9 @@ public static TableMetadata replacePaths( // TODO: update statistic file paths metadata.statisticsFiles(), metadata.partitionStatisticsFiles(), - metadata.changes()); + metadata.changes(), + metadata.rowLineageEnabled(), + metadata.nextRowId()); } private static Map updateProperties( @@ -188,7 +190,9 @@ private static List updatePathInSnapshots( snapshot.operation(), snapshot.summary(), snapshot.schemaId(), - newManifestListLocation); + newManifestListLocation, + snapshot.firstRowId(), + snapshot.addedRows()); newSnapshots.add(newSnapshot); } return newSnapshots; diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index b5ac3ec718ac..85fa71756e71 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -51,6 +51,8 @@ private SnapshotParser() {} private static final String MANIFESTS = "manifests"; private static final String MANIFEST_LIST = "manifest-list"; private static final String SCHEMA_ID = "schema-id"; + private static final String FIRST_ROW_ID = "first-row-id"; + private static final String ADDED_ROWS = "added-rows"; static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException { generator.writeStartObject(); @@ -96,6 +98,14 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio generator.writeNumberField(SCHEMA_ID, snapshot.schemaId()); } + if (snapshot.firstRowId() != null) { + generator.writeNumberField(FIRST_ROW_ID, snapshot.firstRowId()); + } + + if (snapshot.addedRows() != null) { + generator.writeNumberField(ADDED_ROWS, snapshot.addedRows()); + } + generator.writeEndObject(); } @@ -158,6 +168,9 @@ static Snapshot fromJson(JsonNode node) { Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, node); + Long firstRowId = JsonUtil.getLongOrNull(FIRST_ROW_ID, node); + Long addedRows = JsonUtil.getLongOrNull(ADDED_ROWS, node); + if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); @@ -169,7 +182,9 @@ static Snapshot fromJson(JsonNode node) { operation, summary, schemaId, - manifestList); + manifestList, + firstRowId, + addedRows); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index bc65e90eaeae..6703a04dc69f 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.UUID; @@ -282,6 +283,13 @@ public Snapshot apply() { throw new RuntimeIOException(e, "Failed to write manifest list file"); } + Long addedRows = null; + Long lastRowId = null; + if (base.rowLineageEnabled()) { + addedRows = calculateAddedRows(manifests); + lastRowId = base.nextRowId(); + } + return new BaseSnapshot( sequenceNumber, snapshotId(), @@ -290,7 +298,27 @@ public Snapshot apply() { operation(), summary(base), base.currentSchemaId(), - manifestList.location()); + manifestList.location(), + lastRowId, + addedRows); + } + + private Long calculateAddedRows(List manifests) { + return manifests.stream() + .filter( + manifest -> + manifest.snapshotId() == null + || Objects.equals(manifest.snapshotId(), this.snapshotId)) + .mapToLong( + manifest -> { + Preconditions.checkArgument( + manifest.addedRowsCount() != null, + "Cannot determine number of added rows in snapshot because" + + " the entry for manifest %s is missing the field `added-rows-count`", + manifest.path()); + return manifest.addedRowsCount(); + }) + .sum(); } protected abstract Map summary(); diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index a532ff00ee4e..86d055c642bc 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -56,6 +56,9 @@ public class TableMetadata implements Serializable { static final int INITIAL_SPEC_ID = 0; static final int INITIAL_SORT_ORDER_ID = 1; static final int INITIAL_SCHEMA_ID = 0; + static final int INITIAL_ROW_ID = 0; + static final boolean DEFAULT_ROW_LINEAGE = false; + static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3; private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1); @@ -262,6 +265,8 @@ public String toString() { private volatile Map snapshotsById; private volatile Map refs; private volatile boolean snapshotsLoaded; + private final Boolean rowLineageEnabled; + private final long nextRowId; @SuppressWarnings("checkstyle:CyclomaticComplexity") TableMetadata( @@ -288,7 +293,9 @@ public String toString() { Map refs, List statisticsFiles, List partitionStatisticsFiles, - List changes) { + List changes, + boolean rowLineageEnabled, + long nextRowId) { Preconditions.checkArgument( specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty"); Preconditions.checkArgument( @@ -307,6 +314,10 @@ public String toString() { Preconditions.checkArgument( metadataFileLocation == null || changes.isEmpty(), "Cannot create TableMetadata with a metadata location and changes"); + Preconditions.checkArgument( + formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE || !rowLineageEnabled, + "Cannot enable row lineage when Table Version is less than V3. Table Version is %s", + formatVersion); this.metadataFileLocation = metadataFileLocation; this.formatVersion = formatVersion; @@ -341,6 +352,10 @@ public String toString() { this.statisticsFiles = ImmutableList.copyOf(statisticsFiles); this.partitionStatisticsFiles = ImmutableList.copyOf(partitionStatisticsFiles); + // row lineage + this.rowLineageEnabled = rowLineageEnabled; + this.nextRowId = nextRowId; + HistoryEntry last = null; for (HistoryEntry logEntry : snapshotLog) { if (last != null) { @@ -563,6 +578,14 @@ public TableMetadata withUUID() { return new Builder(this).assignUUID().build(); } + public boolean rowLineageEnabled() { + return rowLineageEnabled; + } + + public long nextRowId() { + return nextRowId; + } + /** * Updates the schema * @@ -615,10 +638,15 @@ public TableMetadata replaceProperties(Map rawProperties) { int newFormatVersion = PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion); + Boolean newRowLineage = + PropertyUtil.propertyAsBoolean( + rawProperties, TableProperties.ROW_LINEAGE, rowLineageEnabled); + return new Builder(this) .setProperties(updated) .removeProperties(removed) .upgradeFormatVersion(newFormatVersion) + .setRowLineage(newRowLineage) .build(); } @@ -903,6 +931,8 @@ public static class Builder { private final Map> statisticsFiles; private final Map> partitionStatisticsFiles; private boolean suppressHistoricalSnapshots = false; + private boolean rowLineage; + private long nextRowId; // change tracking private final List changes; @@ -949,6 +979,8 @@ private Builder(int formatVersion) { this.schemasById = Maps.newHashMap(); this.specsById = Maps.newHashMap(); this.sortOrdersById = Maps.newHashMap(); + this.rowLineage = DEFAULT_ROW_LINEAGE; + this.nextRowId = INITIAL_ROW_ID; } private Builder(TableMetadata base) { @@ -982,6 +1014,9 @@ private Builder(TableMetadata base) { this.schemasById = Maps.newHashMap(base.schemasById); this.specsById = Maps.newHashMap(base.specsById); this.sortOrdersById = Maps.newHashMap(base.sortOrdersById); + + this.rowLineage = base.rowLineageEnabled; + this.nextRowId = base.nextRowId; } public Builder withMetadataLocation(String newMetadataLocation) { @@ -1230,6 +1265,22 @@ public Builder addSnapshot(Snapshot snapshot) { snapshotsById.put(snapshot.snapshotId(), snapshot); changes.add(new MetadataUpdate.AddSnapshot(snapshot)); + if (rowLineage) { + ValidationException.check( + snapshot.firstRowId() >= nextRowId, + "Cannot add a snapshot whose 'first-row-id' (%s) is less than the metadata 'next-row-id' (%s) because this will end up generating duplicate row_ids.", + snapshot.firstRowId(), + nextRowId); + ValidationException.check( + snapshot.addedRows() != null, + "Cannot add a snapshot with a null 'added-rows' field when row lineage is enabled"); + Preconditions.checkArgument( + snapshot.addedRows() >= 0, + "Cannot decrease 'last-row-id'. 'last-row-id' must increase monotonically. Snapshot reports %s added rows"); + + this.nextRowId += snapshot.addedRows(); + } + return this; } @@ -1481,6 +1532,34 @@ public Builder setPreviousFileLocation(String previousFileLocation) { return this; } + private Builder setRowLineage(Boolean newRowLineage) { + if (newRowLineage == null) { + return this; + } + + boolean disablingRowLineage = rowLineage && !newRowLineage; + + Preconditions.checkArgument( + !disablingRowLineage, "Cannot disable row lineage once it has been enabled"); + + if (!rowLineage && newRowLineage) { + return enableRowLineage(); + } else { + return this; + } + } + + public Builder enableRowLineage() { + Preconditions.checkArgument( + formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE, + "Cannot use row lineage with format version %s. Only format version %s or higher support row lineage", + formatVersion, + MIN_FORMAT_VERSION_ROW_LINEAGE); + this.rowLineage = true; + changes.add(new MetadataUpdate.EnableRowLineage()); + return this; + } + private boolean hasChanges() { return changes.size() != startingChangeCount || (discardChanges && !changes.isEmpty()) @@ -1548,7 +1627,9 @@ public TableMetadata build() { partitionStatisticsFiles.values().stream() .flatMap(List::stream) .collect(Collectors.toList()), - discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes)); + discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes), + rowLineage, + nextRowId); } private int addSchemaInternal(Schema schema, int newLastColumnId) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index d7f2b29be75a..ba8f2f78c5e9 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -110,6 +110,8 @@ private TableMetadataParser() {} static final String METADATA_LOG = "metadata-log"; static final String STATISTICS = "statistics"; static final String PARTITION_STATISTICS = "partition-statistics"; + static final String ROW_LINEAGE = "row-lineage"; + static final String NEXT_ROW_ID = "next-row-id"; public static void overwrite(TableMetadata metadata, OutputFile outputFile) { internalWrite(metadata, outputFile, true); @@ -220,6 +222,11 @@ public static void toJson(TableMetadata metadata, JsonGenerator generator) throw generator.writeNullField(CURRENT_SNAPSHOT_ID); } + if (metadata.rowLineageEnabled()) { + generator.writeBooleanField(ROW_LINEAGE, metadata.rowLineageEnabled()); + generator.writeNumberField(NEXT_ROW_ID, metadata.nextRowId()); + } + toJson(metadata.refs(), generator); generator.writeArrayFieldStart(SNAPSHOTS); @@ -454,6 +461,15 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { currentSnapshotId = -1L; } + Boolean rowLineage = JsonUtil.getBoolOrNull(ROW_LINEAGE, node); + long lastRowId; + if (rowLineage != null && rowLineage) { + lastRowId = JsonUtil.getLong(NEXT_ROW_ID, node); + } else { + rowLineage = TableMetadata.DEFAULT_ROW_LINEAGE; + lastRowId = TableMetadata.INITIAL_ROW_ID; + } + long lastUpdatedMillis = JsonUtil.getLong(LAST_UPDATED_MILLIS, node); Map refs; @@ -545,7 +561,9 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) { refs, statisticsFiles, partitionStatisticsFiles, - ImmutableList.of() /* no changes from the file */); + ImmutableList.of() /* no changes from the file */, + rowLineage, + lastRowId); } private static Map refsFromJson(JsonNode refMap) { diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index c137bcd3a2c3..cd7cda23c2d3 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -388,4 +388,6 @@ private TableProperties() {} public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; + + public static final String ROW_LINEAGE = "row-lineage"; } diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index 2810ff5f23c0..972b89c56717 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -144,6 +144,13 @@ public static long getLong(String property, JsonNode node) { return pNode.asLong(); } + public static Boolean getBoolOrNull(String property, JsonNode node) { + if (!node.hasNonNull(property)) { + return null; + } + return getBool(property, node); + } + public static boolean getBool(String property, JsonNode node) { Preconditions.checkArgument(node.has(property), "Cannot parse missing boolean: %s", property); JsonNode pNode = node.get(property); diff --git a/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java b/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java index 5a3d119046f5..6e0b4325e4b5 100644 --- a/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java +++ b/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java @@ -194,9 +194,27 @@ private DataTask createDataTask() { List snapshots = Arrays.asList( new BaseSnapshot( - 1L, 1L, null, 1234567890000L, "append", summary1, 1, "file:/tmp/manifest1.avro"), + 1L, + 1L, + null, + 1234567890000L, + "append", + summary1, + 1, + "file:/tmp/manifest1.avro", + null, + null), new BaseSnapshot( - 2L, 2L, 1L, 9876543210000L, "append", summary2, 1, "file:/tmp/manifest2.avro")); + 2L, + 2L, + 1L, + 9876543210000L, + "append", + summary2, + 1, + "file:/tmp/manifest2.avro", + null, + null)); return StaticDataTask.of( Files.localInput("file:/tmp/metadata2.json"), diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index 1ce319a34eba..741184d612f1 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -360,6 +360,8 @@ public void testAddSnapshotToJson() throws IOException { long parentId = 1; long snapshotId = 2; int schemaId = 3; + Long firstRowId = 4L; + Long addedRows = 10L; String manifestList = createManifestListWithManifestFiles(snapshotId, parentId); @@ -372,7 +374,9 @@ public void testAddSnapshotToJson() throws IOException { DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"), schemaId, - manifestList); + manifestList, + firstRowId, + addedRows); String snapshotJson = SnapshotParser.toJson(snapshot, /* pretty */ false); String expected = String.format("{\"action\":\"%s\",\"snapshot\":%s}", action, snapshotJson); MetadataUpdate update = new MetadataUpdate.AddSnapshot(snapshot); @@ -388,6 +392,8 @@ public void testAddSnapshotFromJson() throws IOException { long parentId = 1; long snapshotId = 2; int schemaId = 3; + Long lastRowId = 4L; + Long addedRows = 5L; Map summary = ImmutableMap.of("files-added", "4", "files-deleted", "100"); String manifestList = createManifestListWithManifestFiles(snapshotId, parentId); @@ -400,7 +406,9 @@ public void testAddSnapshotFromJson() throws IOException { DataOperations.REPLACE, summary, schemaId, - manifestList); + manifestList, + lastRowId, + addedRows); String snapshotJson = SnapshotParser.toJson(snapshot, /* pretty */ false); String json = String.format("{\"action\":\"%s\",\"snapshot\":%s}", action, snapshotJson); MetadataUpdate expected = new MetadataUpdate.AddSnapshot(snapshot); @@ -922,6 +930,17 @@ public void testRemovePartitionSpec() { .isEqualTo(json); } + @Test + public void testEnableRowLineage() { + String action = MetadataUpdateParser.ENABLE_ROW_LINEAGE; + String json = "{\"action\":\"enable-row-lineage\"}"; + MetadataUpdate expected = new MetadataUpdate.EnableRowLineage(); + assertEquals(action, expected, MetadataUpdateParser.fromJson(json)); + assertThat(MetadataUpdateParser.toJson(expected)) + .as("Enable row lineage should convert to the correct JSON value") + .isEqualTo(json); + } + public void assertEquals( String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) { switch (action) { @@ -1031,6 +1050,9 @@ public void assertEquals( (MetadataUpdate.RemovePartitionSpecs) expectedUpdate, (MetadataUpdate.RemovePartitionSpecs) actualUpdate); break; + case MetadataUpdateParser.ENABLE_ROW_LINEAGE: + assertThat(actualUpdate).isInstanceOf(MetadataUpdate.EnableRowLineage.class); + break; default: fail("Unrecognized metadata update action: " + action); } diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java new file mode 100644 index 000000000000..0fb9ee880f6b --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.File; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestRowLineageMetadata { + + @Parameters(name = "formatVersion = {0}") + private static List formatVersion() { + return Ints.asList(TestHelpers.ALL_VERSIONS); + } + + @Parameter private int formatVersion; + + private static final String TEST_LOCATION = "s3://bucket/test/location"; + + private static final Schema TEST_SCHEMA = + new Schema( + 7, + Types.NestedField.required(1, "x", Types.LongType.get()), + Types.NestedField.required(2, "y", Types.LongType.get(), "comment"), + Types.NestedField.required(3, "z", Types.LongType.get())); + + private TableMetadata baseMetadata() { + return TableMetadata.buildFromEmpty(formatVersion) + .enableRowLineage() + .addSchema(TEST_SCHEMA) + .setLocation(TEST_LOCATION) + .addPartitionSpec(PartitionSpec.unpartitioned()) + .addSortOrder(SortOrder.unsorted()) + .build(); + } + + @TempDir private File tableDir = null; + + @AfterEach + public void cleanup() { + TestTables.clearTables(); + } + + @TestTemplate + public void testRowLineageSupported() { + if (formatVersion >= TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE) { + assertThat(TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()).isNotNull(); + } else { + assertThatThrownBy(() -> TableMetadata.buildFromEmpty(formatVersion).enableRowLineage()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot use row lineage"); + } + } + + @TestTemplate + public void testSnapshotAddition() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); + + long newRows = 30L; + + TableMetadata base = baseMetadata(); + + Snapshot addRows = + new BaseSnapshot( + 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId(), newRows); + + TableMetadata firstAddition = TableMetadata.buildFrom(base).addSnapshot(addRows).build(); + + assertThat(firstAddition.nextRowId()).isEqualTo(newRows); + + Snapshot addMoreRows = + new BaseSnapshot( + 1, 2, 1L, 0, DataOperations.APPEND, null, 1, "foo", firstAddition.nextRowId(), newRows); + + TableMetadata secondAddition = + TableMetadata.buildFrom(firstAddition).addSnapshot(addMoreRows).build(); + + assertThat(secondAddition.nextRowId()).isEqualTo(newRows * 2); + } + + @TestTemplate + public void testInvalidSnapshotAddition() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); + + Long newRows = 30L; + + TableMetadata base = baseMetadata(); + + Snapshot invalidLastRow = + new BaseSnapshot( + 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId() - 3, newRows); + + assertThatThrownBy(() -> TableMetadata.buildFrom(base).addSnapshot(invalidLastRow)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Cannot add a snapshot whose 'first-row-id'"); + + Snapshot invalidNewRows = + new BaseSnapshot( + 0, 1, null, 0, DataOperations.APPEND, null, 1, "foo", base.nextRowId(), null); + + assertThatThrownBy(() -> TableMetadata.buildFrom(base).addSnapshot(invalidNewRows)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Cannot add a snapshot with a null 'added-rows' field when row lineage is enabled"); + } + + @TestTemplate + public void testFastAppend() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + TableMetadata base = table.ops().current(); + table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); + + assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + table.newFastAppend().appendFile(fileWithRows(30)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + table.newFastAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11); + } + + @TestTemplate + public void testAppend() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + TableMetadata base = table.ops().current(); + table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); + + assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + table.newAppend().appendFile(fileWithRows(30)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11); + } + + @TestTemplate + public void testAppendBranch() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); + // Appends to a branch should still change last-row-id even if not on main, these changes + // should also affect commits to main + + String branch = "some_branch"; + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + TableMetadata base = table.ops().current(); + table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); + + assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + // Write to Branch + table.newAppend().appendFile(fileWithRows(30)).toBranch(branch).commit(); + + assertThat(table.currentSnapshot()).isNull(); + assertThat(table.snapshot(branch).firstRowId()).isEqualTo(0L); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + // Write to Main + table.newAppend().appendFile(fileWithRows(17)).appendFile(fileWithRows(11)).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11); + + // Write again to branch + table.newAppend().appendFile(fileWithRows(21)).toBranch(branch).commit(); + assertThat(table.snapshot(branch).firstRowId()).isEqualTo(30 + 17 + 11); + assertThat(table.ops().current().nextRowId()).isEqualTo(30 + 17 + 11 + 21); + } + + @TestTemplate + public void testDeletes() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + TableMetadata base = table.ops().current(); + table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); + + assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + DataFile file = fileWithRows(30); + + table.newAppend().appendFile(file).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + + table.newDelete().deleteFile(file).commit(); + + // Deleting a file should create a new snapshot which should inherit last-row-id from the + // previous metadata and not + // change last-row-id for this metadata. + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(30); + assertThat(table.currentSnapshot().addedRows()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(30); + } + + @TestTemplate + public void testReplace() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + TableMetadata base = table.ops().current(); + + table.ops().commit(base, TableMetadata.buildFrom(base).enableRowLineage().build()); + + assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + assertThat(table.ops().current().nextRowId()).isEqualTo(0L); + + DataFile filePart1 = fileWithRows(30); + DataFile filePart2 = fileWithRows(30); + DataFile fileCompacted = fileWithRows(60); + + table.newAppend().appendFile(filePart1).appendFile(filePart2).commit(); + + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(0); + assertThat(table.ops().current().nextRowId()).isEqualTo(60); + + table.newRewrite().deleteFile(filePart1).deleteFile(filePart2).addFile(fileCompacted).commit(); + + // Rewrites are currently just treated as appends. In the future we could treat these as no-ops + assertThat(table.currentSnapshot().firstRowId()).isEqualTo(60); + assertThat(table.ops().current().nextRowId()).isEqualTo(120); + } + + @TestTemplate + public void testEnableRowLineageViaProperty() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE); + + TestTables.TestTable table = + TestTables.create( + tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion); + + assertThat(table.ops().current().rowLineageEnabled()).isFalse(); + + // No-op + table.updateProperties().set(TableProperties.ROW_LINEAGE, "false").commit(); + assertThat(table.ops().current().rowLineageEnabled()).isFalse(); + + // Enable row lineage + table.updateProperties().set(TableProperties.ROW_LINEAGE, "true").commit(); + assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + + // Disabling row lineage is not allowed + assertThatThrownBy( + () -> table.updateProperties().set(TableProperties.ROW_LINEAGE, "false").commit()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot disable row lineage once it has been enabled"); + + // No-op + table.updateProperties().set(TableProperties.ROW_LINEAGE, "true").commit(); + assertThat(table.ops().current().rowLineageEnabled()).isTrue(); + } + + private final AtomicInteger fileNum = new AtomicInteger(0); + + private DataFile fileWithRows(long numRows) { + return DataFiles.builder(PartitionSpec.unpartitioned()) + .withRecordCount(numRows) + .withFileSizeInBytes(numRows * 100) + .withPath("file://file_" + fileNum.incrementAndGet() + ".parquet") + .build(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index e4c2ba5ec2df..3fda738e4435 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -43,7 +43,16 @@ public void testJsonConversion() throws IOException { Snapshot expected = new BaseSnapshot( - 0, snapshotId, parentId, System.currentTimeMillis(), null, null, 1, manifestList); + 0, + snapshotId, + parentId, + System.currentTimeMillis(), + null, + null, + 1, + manifestList, + null, + null); String json = SnapshotParser.toJson(expected); Snapshot snapshot = SnapshotParser.fromJson(json); @@ -52,6 +61,8 @@ public void testJsonConversion() throws IOException { assertThat(snapshot.operation()).isNull(); assertThat(snapshot.summary()).isNull(); assertThat(snapshot.schemaId()).isEqualTo(1); + assertThat(snapshot.firstRowId()).isNull(); + assertThat(snapshot.addedRows()).isNull(); } @Test @@ -62,7 +73,16 @@ public void testJsonConversionWithoutSchemaId() throws IOException { Snapshot expected = new BaseSnapshot( - 0, snapshotId, parentId, System.currentTimeMillis(), null, null, null, manifestList); + 0, + snapshotId, + parentId, + System.currentTimeMillis(), + null, + null, + null, + manifestList, + null, + null); String json = SnapshotParser.toJson(expected); Snapshot snapshot = SnapshotParser.fromJson(json); @@ -71,6 +91,8 @@ public void testJsonConversionWithoutSchemaId() throws IOException { assertThat(snapshot.operation()).isNull(); assertThat(snapshot.summary()).isNull(); assertThat(snapshot.schemaId()).isNull(); + assertThat(snapshot.firstRowId()).isNull(); + assertThat(snapshot.addedRows()).isNull(); } @Test @@ -89,7 +111,9 @@ public void testJsonConversionWithOperation() throws IOException { DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"), 3, - manifestList); + manifestList, + null, + null); String json = SnapshotParser.toJson(expected); Snapshot snapshot = SnapshotParser.fromJson(json); @@ -105,6 +129,40 @@ public void testJsonConversionWithOperation() throws IOException { assertThat(snapshot.operation()).isEqualTo(expected.operation()); assertThat(snapshot.summary()).isEqualTo(expected.summary()); assertThat(snapshot.schemaId()).isEqualTo(expected.schemaId()); + assertThat(snapshot.firstRowId()).isNull(); + assertThat(snapshot.addedRows()).isNull(); + } + + @Test + public void testJsonConversionWithRowLineage() throws IOException { + int snapshotId = 23; + Long parentId = null; + Long firstRowId = 20L; + Long addedRows = 30L; + String manifestList = createManifestListWithManifestFiles(snapshotId, parentId); + + Snapshot expected = + new BaseSnapshot( + 0, + snapshotId, + parentId, + System.currentTimeMillis(), + null, + null, + null, + manifestList, + firstRowId, + addedRows); + String json = SnapshotParser.toJson(expected); + Snapshot snapshot = SnapshotParser.fromJson(json); + + assertThat(snapshot.snapshotId()).isEqualTo(expected.snapshotId()); + assertThat(snapshot.allManifests(ops.io())).isEqualTo(expected.allManifests(ops.io())); + assertThat(snapshot.operation()).isNull(); + assertThat(snapshot.summary()).isNull(); + assertThat(snapshot.schemaId()).isNull(); + assertThat(snapshot.firstRowId()).isEqualTo(firstRowId); + assertThat(snapshot.addedRows()).isEqualTo(addedRows); } @Test @@ -157,6 +215,7 @@ public void testJsonConversionWithV1Manifests() { assertThat(snapshot.operation()).isEqualTo(expected.operation()); assertThat(snapshot.summary()).isEqualTo(expected.summary()); assertThat(snapshot.schemaId()).isEqualTo(expected.schemaId()); + assertThat(snapshot.firstRowId()).isNull(); } private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId) diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index 45aa211e5187..145ce6c83556 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -28,6 +28,7 @@ import static org.apache.iceberg.TableMetadataParser.PROPERTIES; import static org.apache.iceberg.TableMetadataParser.SCHEMA; import static org.apache.iceberg.TableMetadataParser.SNAPSHOTS; +import static org.apache.iceberg.TestHelpers.MAX_FORMAT_VERSION; import static org.apache.iceberg.TestHelpers.assertSameSchemaList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -105,7 +106,16 @@ public void testJsonConversion() throws Exception { createManifestListWithManifestFile(previousSnapshotId, null, "file:/tmp/manifest1.avro"); Snapshot previousSnapshot = new BaseSnapshot( - 0, previousSnapshotId, null, previousSnapshotId, null, null, null, manifestList); + 0, + previousSnapshotId, + null, + previousSnapshotId, + null, + null, + null, + manifestList, + null, + null); long currentSnapshotId = System.currentTimeMillis(); manifestList = @@ -120,7 +130,9 @@ public void testJsonConversion() throws Exception { null, null, 7, - manifestList); + manifestList, + null, + null); List snapshotLog = ImmutableList.builder() @@ -162,7 +174,7 @@ public void testJsonConversion() throws Exception { TableMetadata expected = new TableMetadata( null, - 2, + MAX_FORMAT_VERSION, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, @@ -184,7 +196,9 @@ public void testJsonConversion() throws Exception { refs, statisticsFiles, partitionStatisticsFiles, - ImmutableList.of()); + ImmutableList.of(), + true, + 40); String asJson = TableMetadataParser.toJson(expected); TableMetadata metadata = TableMetadataParser.fromJson(asJson); @@ -217,6 +231,8 @@ public void testJsonConversion() throws Exception { assertThat(metadata.statisticsFiles()).isEqualTo(statisticsFiles); assertThat(metadata.partitionStatisticsFiles()).isEqualTo(partitionStatisticsFiles); assertThat(metadata.refs()).isEqualTo(refs); + assertThat(metadata.rowLineageEnabled()).isEqualTo(expected.rowLineageEnabled()); + assertThat(metadata.nextRowId()).isEqualTo(expected.nextRowId()); } @Test @@ -231,7 +247,16 @@ public void testBackwardCompat() throws Exception { createManifestListWithManifestFile(previousSnapshotId, null, "file:/tmp/manifest1.avro"); Snapshot previousSnapshot = new BaseSnapshot( - 0, previousSnapshotId, null, previousSnapshotId, null, null, null, manifestList); + 0, + previousSnapshotId, + null, + previousSnapshotId, + null, + null, + null, + manifestList, + null, + null); long currentSnapshotId = System.currentTimeMillis(); manifestList = @@ -246,7 +271,9 @@ public void testBackwardCompat() throws Exception { null, null, null, - manifestList); + manifestList, + null, + null); TableMetadata expected = new TableMetadata( @@ -273,7 +300,9 @@ public void testBackwardCompat() throws Exception { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of()); + ImmutableList.of(), + false, + 0); String asJson = toJsonWithoutSpecAndSchemaList(expected); TableMetadata metadata = TableMetadataParser.fromJson(asJson); @@ -312,6 +341,8 @@ public void testBackwardCompat() throws Exception { .isEqualTo(previousSnapshot.allManifests(ops.io())); assertThat(metadata.previousFiles()).isEqualTo(expected.previousFiles()); assertThat(metadata.snapshot(previousSnapshotId).schemaId()).isNull(); + assertThat(metadata.rowLineageEnabled()).isEqualTo(expected.rowLineageEnabled()); + assertThat(metadata.nextRowId()).isEqualTo(expected.nextRowId()); } @Test @@ -322,7 +353,16 @@ public void testInvalidMainBranch() throws IOException { createManifestListWithManifestFile(previousSnapshotId, null, "file:/tmp/manifest1.avro"); Snapshot previousSnapshot = new BaseSnapshot( - 0, previousSnapshotId, null, previousSnapshotId, null, null, null, manifestList); + 0, + previousSnapshotId, + null, + previousSnapshotId, + null, + null, + null, + manifestList, + null, + null); long currentSnapshotId = System.currentTimeMillis(); manifestList = @@ -338,7 +378,9 @@ public void testInvalidMainBranch() throws IOException { null, null, 7, - manifestList); + manifestList, + null, + null); List snapshotLog = ImmutableList.builder() @@ -359,7 +401,7 @@ public void testInvalidMainBranch() throws IOException { () -> new TableMetadata( null, - 2, + MAX_FORMAT_VERSION, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, @@ -381,7 +423,9 @@ public void testInvalidMainBranch() throws IOException { refs, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of())) + ImmutableList.of(), + false, + 0L)) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Current snapshot ID does not match main branch"); } @@ -393,7 +437,8 @@ public void testMainWithoutCurrent() throws IOException { String manifestList = createManifestListWithManifestFile(snapshotId, null, "file:/tmp/manifest1.avro"); Snapshot snapshot = - new BaseSnapshot(0, snapshotId, null, snapshotId, null, null, null, manifestList); + new BaseSnapshot( + 0, snapshotId, null, snapshotId, null, null, null, manifestList, null, null); Schema schema = new Schema(6, Types.NestedField.required(10, "x", Types.StringType.get())); @@ -404,7 +449,7 @@ public void testMainWithoutCurrent() throws IOException { () -> new TableMetadata( null, - 2, + MAX_FORMAT_VERSION, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, @@ -426,7 +471,9 @@ public void testMainWithoutCurrent() throws IOException { refs, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of())) + ImmutableList.of(), + false, + 0L)) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Current snapshot is not set, but main branch exists"); } @@ -444,7 +491,7 @@ public void testBranchSnapshotMissing() { () -> new TableMetadata( null, - 2, + MAX_FORMAT_VERSION, UUID.randomUUID().toString(), TEST_LOCATION, SEQ_NO, @@ -466,7 +513,9 @@ public void testBranchSnapshotMissing() { refs, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of())) + ImmutableList.of(), + false, + 0L)) .isInstanceOf(IllegalArgumentException.class) .hasMessageEndingWith("does not exist in the existing snapshots list"); } @@ -521,7 +570,16 @@ public void testJsonWithPreviousMetadataLog() throws Exception { createManifestListWithManifestFile(previousSnapshotId, null, "file:/tmp/manifest1.avro"); Snapshot previousSnapshot = new BaseSnapshot( - 0, previousSnapshotId, null, previousSnapshotId, null, null, null, manifestList); + 0, + previousSnapshotId, + null, + previousSnapshotId, + null, + null, + null, + manifestList, + null, + null); long currentSnapshotId = System.currentTimeMillis(); manifestList = @@ -536,7 +594,9 @@ public void testJsonWithPreviousMetadataLog() throws Exception { null, null, null, - manifestList); + manifestList, + null, + null); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -570,7 +630,9 @@ public void testJsonWithPreviousMetadataLog() throws Exception { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of()); + ImmutableList.of(), + false, + 0L); String asJson = TableMetadataParser.toJson(base); TableMetadata metadataFromJson = TableMetadataParser.fromJson(asJson); @@ -586,7 +648,16 @@ public void testAddPreviousMetadataRemoveNone() throws IOException { createManifestListWithManifestFile(previousSnapshotId, null, "file:/tmp/manifest1.avro"); Snapshot previousSnapshot = new BaseSnapshot( - 0, previousSnapshotId, null, previousSnapshotId, null, null, null, manifestList); + 0, + previousSnapshotId, + null, + previousSnapshotId, + null, + null, + null, + manifestList, + null, + null); long currentSnapshotId = System.currentTimeMillis(); manifestList = @@ -601,7 +672,9 @@ public void testAddPreviousMetadataRemoveNone() throws IOException { null, null, null, - manifestList); + manifestList, + null, + null); List reversedSnapshotLog = Lists.newArrayList(); reversedSnapshotLog.add( @@ -646,7 +719,9 @@ public void testAddPreviousMetadataRemoveNone() throws IOException { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of()); + ImmutableList.of(), + false, + 0L); previousMetadataLog.add(latestPreviousMetadata); @@ -668,7 +743,16 @@ public void testAddPreviousMetadataRemoveOne() throws IOException { createManifestListWithManifestFile(previousSnapshotId, null, "file:/tmp/manifest1.avro"); Snapshot previousSnapshot = new BaseSnapshot( - 0, previousSnapshotId, null, previousSnapshotId, null, null, null, manifestList); + 0, + previousSnapshotId, + null, + previousSnapshotId, + null, + null, + null, + manifestList, + null, + null); long currentSnapshotId = System.currentTimeMillis(); manifestList = @@ -683,7 +767,9 @@ public void testAddPreviousMetadataRemoveOne() throws IOException { null, null, null, - manifestList); + manifestList, + null, + null); List reversedSnapshotLog = Lists.newArrayList(); reversedSnapshotLog.add( @@ -737,7 +823,9 @@ public void testAddPreviousMetadataRemoveOne() throws IOException { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of()); + ImmutableList.of(), + false, + 0L); previousMetadataLog.add(latestPreviousMetadata); @@ -763,7 +851,16 @@ public void testAddPreviousMetadataRemoveMultiple() throws IOException { createManifestListWithManifestFile(previousSnapshotId, null, "file:/tmp/manifest1.avro"); Snapshot previousSnapshot = new BaseSnapshot( - 0, previousSnapshotId, null, previousSnapshotId, null, null, null, manifestList); + 0, + previousSnapshotId, + null, + previousSnapshotId, + null, + null, + null, + manifestList, + null, + null); long currentSnapshotId = System.currentTimeMillis(); manifestList = @@ -778,7 +875,9 @@ public void testAddPreviousMetadataRemoveMultiple() throws IOException { null, null, null, - manifestList); + manifestList, + null, + null); List reversedSnapshotLog = Lists.newArrayList(); reversedSnapshotLog.add( @@ -832,7 +931,9 @@ public void testAddPreviousMetadataRemoveMultiple() throws IOException { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of()); + ImmutableList.of(), + false, + 0L); previousMetadataLog.add(latestPreviousMetadata); @@ -878,7 +979,9 @@ public void testV2UUIDValidation() { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of())) + ImmutableList.of(), + false, + 0L)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("UUID is required in format v2"); } @@ -913,7 +1016,9 @@ public void testVersionValidation() { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of())) + ImmutableList.of(), + false, + 0L)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Unsupported format version: v%s (supported: v%s)", @@ -959,7 +1064,9 @@ public void testVersionValidation() { ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of())) + ImmutableList.of(), + false, + 0L)) .isNotNull(); assertThat( diff --git a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java index 7702d691afd0..1c76865e5d4a 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java @@ -195,6 +195,26 @@ public void getBool() throws JsonProcessingException { assertThat(JsonUtil.getBool("x", JsonUtil.mapper().readTree("{\"x\": false}"))).isFalse(); } + @Test + public void getBoolOrNull() throws JsonProcessingException { + assertThatThrownBy( + () -> JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse to a boolean value: x: \"23\""); + + assertThatThrownBy( + () -> JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": \"true\"}"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot parse to a boolean value: x: \"true\""); + + assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{}"))).isNull(); + + assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": null}"))).isNull(); + + assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": true}"))).isTrue(); + assertThat(JsonUtil.getBoolOrNull("x", JsonUtil.mapper().readTree("{\"x\": false}"))).isFalse(); + } + @Test public void getIntArrayOrNull() throws JsonProcessingException { assertThat(JsonUtil.getIntArrayOrNull("items", JsonUtil.mapper().readTree("{}"))).isNull();