From 6a5ae1ae6a01f1395ee70e046537cd87b990c4ae Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Mon, 14 Oct 2024 18:54:33 +0200 Subject: [PATCH] Core: Switch usage to DataFileSet / DeleteFileSet (#11158) --- .../apache/iceberg/BaseOverwriteFiles.java | 5 +- .../org/apache/iceberg/BaseRewriteFiles.java | 4 +- .../java/org/apache/iceberg/FastAppend.java | 8 +-- .../apache/iceberg/ManifestFilterManager.java | 58 ++++++++++++------- .../iceberg/MergingSnapshotProducer.java | 28 ++++++--- .../org/apache/iceberg/SnapshotProducer.java | 16 ++--- .../RewriteDataFilesCommitManager.java | 7 +-- .../iceberg/actions/RewriteFileGroup.java | 10 ++-- .../actions/RewritePositionDeletesGroup.java | 10 ++-- .../org/apache/iceberg/TestDeleteFiles.java | 15 ++++- .../apache/iceberg/hive/HiveTableTest.java | 2 +- .../iceberg/nessie/TestNessieTable.java | 2 +- .../source/SparkPositionDeletesRewrite.java | 4 +- .../iceberg/spark/source/SparkWrite.java | 4 +- .../spark/TestFileRewriteCoordinator.java | 7 ++- .../iceberg/spark/data/TestHelpers.java | 4 +- .../source/TestPositionDeletesTable.java | 3 +- 17 files changed, 115 insertions(+), 72 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index d929bc068ec2..16fbc0dd1ebc 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg; -import java.util.Set; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; @@ -26,11 +25,11 @@ import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.expressions.StrictMetricsEvaluator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.DataFileSet; public class BaseOverwriteFiles extends MergingSnapshotProducer implements OverwriteFiles { - private final Set deletedDataFiles = Sets.newHashSet(); + private final DataFileSet deletedDataFiles = DataFileSet.create(); private boolean validateAddedFilesMatchOverwriteFilter = false; private Long startingSnapshotId = null; private Expression conflictDetectionFilter = null; diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index d231536d0642..b25681de4238 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -21,10 +21,10 @@ import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.DataFileSet; class BaseRewriteFiles extends MergingSnapshotProducer implements RewriteFiles { - private final Set replacedDataFiles = Sets.newHashSet(); + private final DataFileSet replacedDataFiles = DataFileSet.create(); private Long startingSnapshotId = null; BaseRewriteFiles(String tableName, TableOperations ops) { diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 1bae2e2fc5a0..1b6e1b3b52bc 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -30,7 +30,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.DataFileSet; /** * {@link AppendFiles Append} implementation that adds a new manifest file for the write. @@ -43,8 +43,7 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final TableOperations ops; private final PartitionSpec spec; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); - private final List newFiles = Lists.newArrayList(); - private final CharSequenceSet newFilePaths = CharSequenceSet.empty(); + private final DataFileSet newFiles = DataFileSet.create(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private List newManifests = null; @@ -86,9 +85,8 @@ protected Map summary() { @Override public FastAppend appendFile(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newFilePaths.add(file.path())) { + if (newFiles.add(file)) { this.hasNewFiles = true; - newFiles.add(file); summaryBuilder.addedFile(spec, file); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 106be74fa3ad..fddb1a161637 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; @@ -39,9 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.CharSequenceSet; -import org.apache.iceberg.util.CharSequenceWrapper; import org.apache.iceberg.util.ManifestFileUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PartitionSet; @@ -71,9 +70,9 @@ public String partition() { private final PartitionSet deleteFilePartitions; private final PartitionSet dropPartitions; private final CharSequenceSet deletePaths = CharSequenceSet.empty(); + private final Set deleteFiles = newFileSet(); private Expression deleteExpression = Expressions.alwaysFalse(); private long minSequenceNumber = 0; - private boolean hasPathOnlyDeletes = false; private boolean failAnyDelete = false; private boolean failMissingDeletePaths = false; private int duplicateDeleteCount = 0; @@ -102,6 +101,8 @@ protected ManifestFilterManager( protected abstract ManifestReader newManifestReader(ManifestFile manifest); + protected abstract Set newFileSet(); + protected void failAnyDelete() { this.failAnyDelete = true; } @@ -153,7 +154,7 @@ void caseSensitive(boolean newCaseSensitive) { void delete(F file) { Preconditions.checkNotNull(file, "Cannot delete file: null"); invalidateFilteredCache(); - deletePaths.add(file.path()); + deleteFiles.add(file); deleteFilePartitions.add(file.specId(), file.partition()); } @@ -161,12 +162,12 @@ void delete(F file) { void delete(CharSequence path) { Preconditions.checkNotNull(path, "Cannot delete file path: null"); invalidateFilteredCache(); - this.hasPathOnlyDeletes = true; deletePaths.add(path); } boolean containsDeletes() { return !deletePaths.isEmpty() + || !deleteFiles.isEmpty() || deleteExpression != Expressions.alwaysFalse() || !dropPartitions.isEmpty(); } @@ -233,23 +234,37 @@ SnapshotSummary.Builder buildSummary(Iterable manifests) { @SuppressWarnings("CollectionUndefinedEquality") private void validateRequiredDeletes(ManifestFile... manifests) { if (failMissingDeletePaths) { - CharSequenceSet deletedFiles = deletedFiles(manifests); + Set deletedFiles = deletedFiles(manifests); + ValidationException.check( + deletedFiles.containsAll(deleteFiles), + "Missing required files to delete: %s", + COMMA.join( + deleteFiles.stream() + .filter(f -> !deletedFiles.contains(f)) + .map(ContentFile::location) + .collect(Collectors.toList()))); + + CharSequenceSet deletedFilePaths = + deletedFiles.stream() + .map(ContentFile::path) + .collect(Collectors.toCollection(CharSequenceSet::empty)); + ValidationException.check( - deletedFiles.containsAll(deletePaths), + deletedFilePaths.containsAll(deletePaths), "Missing required files to delete: %s", - COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path)))); + COMMA.join(Iterables.filter(deletePaths, path -> !deletedFilePaths.contains(path)))); } } - private CharSequenceSet deletedFiles(ManifestFile[] manifests) { - CharSequenceSet deletedFiles = CharSequenceSet.empty(); + private Set deletedFiles(ManifestFile[] manifests) { + Set deletedFiles = newFileSet(); if (manifests != null) { for (ManifestFile manifest : manifests) { Iterable manifestDeletes = filteredManifestToDeletedFiles.get(manifest); if (manifestDeletes != null) { for (F file : manifestDeletes) { - deletedFiles.add(file.path()); + deletedFiles.add(file); } } } @@ -345,9 +360,9 @@ private boolean canContainDeletedFiles(ManifestFile manifest) { } boolean canContainDroppedFiles; - if (hasPathOnlyDeletes) { + if (!deletePaths.isEmpty()) { canContainDroppedFiles = true; - } else if (!deletePaths.isEmpty()) { + } else if (!deleteFiles.isEmpty()) { // because there were no path-only deletes, the set of deleted file partitions is valid canContainDroppedFiles = ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById); @@ -374,6 +389,7 @@ private boolean manifestHasDeletedFiles( F file = entry.file(); boolean markedForDelete = deletePaths.contains(file.path()) + || deleteFiles.contains(file) || dropPartitions.contains(file.specId(), file.partition()) || (isDelete && entry.isLive() @@ -387,7 +403,7 @@ private boolean manifestHasDeletedFiles( || isDelete, // ignore delete files where some records may not match the expression "Cannot delete file where some, but not all, rows match filter %s: %s", this.deleteExpression, - file.path()); + file.location()); if (allRowsMatch) { if (failAnyDelete) { @@ -409,8 +425,7 @@ private ManifestFile filterManifestWithDeletedFiles( boolean isDelete = reader.isDeleteManifestReader(); // when this point is reached, there is at least one file that will be deleted in the // manifest. produce a copy of the manifest with all deleted files removed. - List deletedFiles = Lists.newArrayList(); - Set deletedPaths = Sets.newHashSet(); + Set deletedFiles = newFileSet(); try { ManifestWriter writer = newManifestWriter(reader.spec()); @@ -422,6 +437,7 @@ private ManifestFile filterManifestWithDeletedFiles( F file = entry.file(); boolean markedForDelete = deletePaths.contains(file.path()) + || deleteFiles.contains(file) || dropPartitions.contains(file.specId(), file.partition()) || (isDelete && entry.isLive() @@ -436,23 +452,21 @@ private ManifestFile filterManifestWithDeletedFiles( // the expression "Cannot delete file where some, but not all, rows match filter %s: %s", this.deleteExpression, - file.path()); + file.location()); if (allRowsMatch) { writer.delete(entry); - CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path()); - if (deletedPaths.contains(wrapper)) { + if (deletedFiles.contains(file)) { LOG.warn( "Deleting a duplicate path from manifest {}: {}", manifest.path(), - wrapper.get()); + file.location()); duplicateDeleteCount += 1; } else { // only add the file to deletes if it is a new delete // this keeps the snapshot summary accurate for non-duplicate data - deletedFiles.add(entry.file().copyWithoutStats()); - deletedPaths.add(wrapper); + deletedFiles.add(file.copyWithoutStats()); } } else { writer.existing(entry); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 6a4da2abc9b6..2209b348227d 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -50,6 +50,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.DataFileSet; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PartitionSet; import org.apache.iceberg.util.SnapshotUtil; @@ -81,8 +83,8 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final Map> newDataFilesBySpec = Maps.newHashMap(); - private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty(); - private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty(); + private final DataFileSet newDataFiles = DataFileSet.create(); + private final DeleteFileSet newDeleteFiles = DeleteFileSet.create(); private Long newDataFilesDataSequenceNumber; private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); @@ -234,7 +236,7 @@ protected boolean addsDeleteFiles() { /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newDataFilePaths.add(file.path())) { + if (newDataFiles.add(file)) { PartitionSpec fileSpec = ops.current().spec(file.specId()); Preconditions.checkArgument( fileSpec != null, @@ -244,9 +246,9 @@ protected void add(DataFile file) { addedFilesSummary.addedFile(fileSpec, file); hasNewDataFiles = true; - List newDataFiles = + List dataFiles = newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList()); - newDataFiles.add(file); + dataFiles.add(file); } } @@ -268,7 +270,7 @@ private void add(DeleteFileHolder fileHolder) { List deleteFiles = newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList()); - if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) { + if (newDeleteFiles.add(fileHolder.deleteFile())) { deleteFiles.add(fileHolder); addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile()); hasNewDeleteFiles = true; @@ -970,9 +972,9 @@ private List newDataFilesAsManifests() { if (cachedNewDataManifests.isEmpty()) { newDataFilesBySpec.forEach( - (dataSpec, newDataFiles) -> { + (dataSpec, dataFiles) -> { List newDataManifests = - writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec); + writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec); cachedNewDataManifests.addAll(newDataManifests); }); this.hasNewDataFiles = false; @@ -1032,6 +1034,11 @@ protected ManifestWriter newManifestWriter(PartitionSpec manifestSpec) protected ManifestReader newManifestReader(ManifestFile manifest) { return MergingSnapshotProducer.this.newManifestReader(manifest); } + + @Override + protected Set newFileSet() { + return DataFileSet.create(); + } } private class DataFileMergeManager extends ManifestMergeManager { @@ -1085,6 +1092,11 @@ protected ManifestWriter newManifestWriter(PartitionSpec manifestSpe protected ManifestReader newManifestReader(ManifestFile manifest) { return MergingSnapshotProducer.this.newDeleteManifestReader(manifest); } + + @Override + protected Set newFileSet() { + return DeleteFileSet.create(); + } } private class DeleteFileMergeManager extends ManifestMergeManager { diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index f750e88e86d9..27724f787dd2 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.math.RoundingMode; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; @@ -567,17 +568,17 @@ protected boolean cleanupAfterCommit() { return true; } - protected List writeDataManifests(List files, PartitionSpec spec) { + protected List writeDataManifests(Collection files, PartitionSpec spec) { return writeDataManifests(files, null /* inherit data seq */, spec); } protected List writeDataManifests( - List files, Long dataSeq, PartitionSpec spec) { + Collection files, Long dataSeq, PartitionSpec spec) { return writeManifests(files, group -> writeDataFileGroup(group, dataSeq, spec)); } private List writeDataFileGroup( - List files, Long dataSeq, PartitionSpec spec) { + Collection files, Long dataSeq, PartitionSpec spec) { RollingManifestWriter writer = newRollingManifestWriter(spec); try (RollingManifestWriter closableWriter = writer) { @@ -594,12 +595,12 @@ private List writeDataFileGroup( } protected List writeDeleteManifests( - List files, PartitionSpec spec) { + Collection files, PartitionSpec spec) { return writeManifests(files, group -> writeDeleteFileGroup(group, spec)); } private List writeDeleteFileGroup( - List files, PartitionSpec spec) { + Collection files, PartitionSpec spec) { RollingManifestWriter writer = newRollingDeleteManifestWriter(spec); try (RollingManifestWriter closableWriter = writer) { @@ -618,7 +619,7 @@ private List writeDeleteFileGroup( } private static List writeManifests( - List files, Function, List> writeFunc) { + Collection files, Function, List> writeFunc) { int parallelism = manifestWriterCount(ThreadPools.WORKER_THREAD_POOL_SIZE, files.size()); List> groups = divide(files, parallelism); Queue manifests = Queues.newConcurrentLinkedQueue(); @@ -630,7 +631,8 @@ private static List writeManifests( return ImmutableList.copyOf(manifests); } - private static List> divide(List list, int groupCount) { + private static List> divide(Collection collection, int groupCount) { + List list = Lists.newArrayList(collection); int groupSize = IntMath.divide(list.size(), groupCount, RoundingMode.CEILING); return Lists.partition(list, groupSize); } diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java index 45b4bcf0a4d9..03d23231c0f1 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java @@ -20,14 +20,13 @@ import java.util.Map; import java.util.Set; -import org.apache.iceberg.DataFile; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.DataFileSet; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,8 +72,8 @@ public RewriteDataFilesCommitManager( * @param fileGroups fileSets to commit */ public void commitFileGroups(Set fileGroups) { - Set rewrittenDataFiles = Sets.newHashSet(); - Set addedDataFiles = Sets.newHashSet(); + DataFileSet rewrittenDataFiles = DataFileSet.create(); + DataFileSet addedDataFiles = DataFileSet.create(); for (RewriteFileGroup group : fileGroups) { rewrittenDataFiles.addAll(group.rewrittenFiles()); addedDataFiles.addAll(group.addedFiles()); diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java index dd1358f2ed40..dfc9842780f5 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.actions; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -29,6 +28,7 @@ import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.DataFileSet; /** * Container class representing a set of files to be rewritten by a RewriteAction and the new files @@ -38,7 +38,7 @@ public class RewriteFileGroup { private final FileGroupInfo info; private final List fileScanTasks; - private Set addedFiles = Collections.emptySet(); + private DataFileSet addedFiles = DataFileSet.create(); public RewriteFileGroup(FileGroupInfo info, List fileScanTasks) { this.info = info; @@ -54,11 +54,13 @@ public List fileScans() { } public void setOutputFiles(Set files) { - addedFiles = files; + addedFiles = DataFileSet.of(files); } public Set rewrittenFiles() { - return fileScans().stream().map(FileScanTask::file).collect(Collectors.toSet()); + return fileScans().stream() + .map(FileScanTask::file) + .collect(Collectors.toCollection(DataFileSet::create)); } public Set addedFiles() { diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java index 2be7145bcd34..d1c688417a64 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.actions; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -30,6 +29,7 @@ import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.DeleteFileSet; /** * Container class representing a set of position delete files to be rewritten by a {@link @@ -40,7 +40,7 @@ public class RewritePositionDeletesGroup { private final List tasks; private final long maxRewrittenDataSequenceNumber; - private Set addedDeleteFiles = Collections.emptySet(); + private DeleteFileSet addedDeleteFiles = DeleteFileSet.create(); public RewritePositionDeletesGroup(FileGroupInfo info, List tasks) { Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty"); @@ -59,7 +59,7 @@ public List tasks() { } public void setOutputFiles(Set files) { - addedDeleteFiles = files; + addedDeleteFiles = DeleteFileSet.of(files); } public long maxRewrittenDataSequenceNumber() { @@ -67,7 +67,9 @@ public long maxRewrittenDataSequenceNumber() { } public Set rewrittenDeleteFiles() { - return tasks().stream().map(PositionDeletesScanTask::file).collect(Collectors.toSet()); + return tasks().stream() + .map(PositionDeletesScanTask::file) + .collect(Collectors.toCollection(DeleteFileSet::create)); } public Set addedDeleteFiles() { diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java index 18e3de240170..4928f998f3b1 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java @@ -412,7 +412,20 @@ public void testDeleteValidateFileExistence() { assertThatThrownBy( () -> commit(table, table.newDelete().deleteFile(FILE_B).validateFilesExist(), branch)) - .isInstanceOf(ValidationException.class); + .isInstanceOf(ValidationException.class) + .hasMessage("Missing required files to delete: /path/to/data-b.parquet"); + + assertThatThrownBy( + () -> + commit( + table, + table + .newDelete() + .deleteFile("/path/to/non-existing.parquet") + .validateFilesExist(), + branch)) + .isInstanceOf(ValidationException.class) + .hasMessage("Missing required files to delete: /path/to/non-existing.parquet"); } @TestTemplate diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java index 9ae3c97db47c..13c459128dec 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java @@ -213,7 +213,7 @@ public void testDropTable() throws IOException { table.newAppend().appendFile(file1).appendFile(file2).commit(); // delete file2 - table.newDelete().deleteFile(file2.path()).commit(); + table.newDelete().deleteFile(file2).commit(); String manifestListLocation = table.currentSnapshot().manifestListLocation().replace("file:", ""); diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java index f0f75c842429..ca507eae575a 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java @@ -407,7 +407,7 @@ public void testDropTable() throws IOException { table.newAppend().appendFile(file1).appendFile(file2).commit(); // delete file2 - table.newDelete().deleteFile(file2.path()).commit(); + table.newDelete().deleteFile(file2).commit(); String manifestListLocation = table.currentSnapshot().manifestListLocation().replace("file:", ""); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index d91779475845..73e6ab01563c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -36,11 +36,11 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; @@ -148,7 +148,7 @@ public boolean useCommitCoordinator() { @Override public void commit(WriterCommitMessage[] messages) { PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); - coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + coordinator.stageRewrite(table, fileSetId, DeleteFileSet.of(files(messages))); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index e4a0eb700be6..cc3dc592ecee 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -52,12 +52,12 @@ import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.RollingDataWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.spark.SparkWriteRequirements; +import org.apache.iceberg.util.DataFileSet; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; import org.apache.spark.api.java.JavaSparkContext; @@ -491,7 +491,7 @@ private RewriteFiles(String fileSetID) { @Override public void commit(WriterCommitMessage[] messages) { FileRewriteCoordinator coordinator = FileRewriteCoordinator.get(); - coordinator.stageRewrite(table, fileSetID, ImmutableSet.copyOf(files(messages))); + coordinator.stageRewrite(table, fileSetID, DataFileSet.of(files(messages))); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java index 3955d0395474..666634a06c02 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java @@ -34,6 +34,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.util.DataFileSet; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -93,7 +94,7 @@ public void testBinPackRewrite() throws NoSuchTableException, IOException { Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream() .map(t -> t.asFileScanTask().file()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(DataFileSet::create)); Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -165,7 +166,7 @@ public void testSortRewrite() throws NoSuchTableException, IOException { Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream() .map(t -> t.asFileScanTask().file()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(DataFileSet::create)); Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -247,7 +248,7 @@ public void testCommitMultipleRewrites() throws NoSuchTableException, IOExceptio Set addedFiles = fileSetIDs.stream() .flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table, fileSetID).stream()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(DataFileSet::create)); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); table.refresh(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index c73ef630ac48..4252838d5f53 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -54,11 +54,11 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.orc.storage.serde2.io.DateWritable; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; @@ -787,7 +787,7 @@ public static List dataFiles(Table table, String branch) { } public static Set deleteFiles(Table table) { - Set deleteFiles = Sets.newHashSet(); + DeleteFileSet deleteFiles = DeleteFileSet.create(); for (FileScanTask task : table.newScan().planFiles()) { deleteFiles.addAll(task.deletes()); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index a6573171aa6a..ca934772f6af 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -72,6 +72,7 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.spark.sql.AnalysisException; @@ -1576,7 +1577,7 @@ private void commit( Set rewrittenFiles = ScanTaskSetManager.get().fetchTasks(posDeletesTable, fileSetID).stream() .map(t -> ((PositionDeletesScanTask) t).file()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(DeleteFileSet::create)); Set addedFiles = rewriteCoordinator.fetchNewFiles(posDeletesTable, fileSetID); // Assert new files and old files are equal in number but different in paths