Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Oct 9, 2024
1 parent d01295c commit 83c6052
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 83 deletions.
110 changes: 27 additions & 83 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -41,8 +40,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
Expand Down Expand Up @@ -72,7 +69,7 @@ public String partition() {
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private final FilesToDeleteHolder filesToDelete = new FilesToDeleteHolder();
private final Set<F> deleteFiles = newFileSet();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean failAnyDelete = false;
Expand All @@ -84,7 +81,7 @@ public String partition() {
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, FilesToDeleteHolder> filteredManifestToDeletedFiles =
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();

private final Supplier<ExecutorService> workerPoolSupplier;
Expand All @@ -103,6 +100,8 @@ protected ManifestFilterManager(

protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);

protected abstract Set<F> newFileSet();

protected void failAnyDelete() {
this.failAnyDelete = true;
}
Expand Down Expand Up @@ -154,7 +153,7 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
filesToDelete.delete(file);
deleteFiles.add(file);
deleteFilePartitions.add(file.specId(), file.partition());
}

Expand All @@ -167,7 +166,7 @@ void delete(CharSequence path) {

boolean containsDeletes() {
return !deletePaths.isEmpty()
|| filesToDelete.containsDeletes()
|| !deleteFiles.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -212,10 +211,11 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {

for (ManifestFile manifest : manifests) {
PartitionSpec manifestSpec = specsById.get(manifest.partitionSpecId());
FilesToDeleteHolder manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
manifestDeletes.dataFiles.forEach(file -> summaryBuilder.deletedFile(manifestSpec, file));
manifestDeletes.deleteFiles.forEach(file -> summaryBuilder.deletedFile(manifestSpec, file));
for (F file : manifestDeletes) {
summaryBuilder.deletedFile(manifestSpec, file);
}
}
}

Expand All @@ -232,20 +232,23 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
*/
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
FilesToDeleteHolder deletedFiles = deletedFiles(manifests);
deletedFiles.validateRequiredDeletes(filesToDelete);
deletedFiles.validateRequiredDeletes(deletePaths);
CharSequenceSet deletedFiles = deletedFiles(manifests);
ValidationException.check(
deletedFiles.containsAll(deletePaths),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
}
}

private FilesToDeleteHolder deletedFiles(ManifestFile[] manifests) {
FilesToDeleteHolder deletedFiles = new FilesToDeleteHolder();
private CharSequenceSet deletedFiles(ManifestFile[] manifests) {
CharSequenceSet deletedFiles = CharSequenceSet.empty();
if (manifests != null) {
for (ManifestFile manifest : manifests) {
FilesToDeleteHolder manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
deletedFiles.dataFiles.addAll(manifestDeletes.dataFiles);
deletedFiles.deleteFiles.addAll(manifestDeletes.deleteFiles);
for (F file : manifestDeletes) {
deletedFiles.add(file.path());
}
}
}
}
Expand Down Expand Up @@ -342,7 +345,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainDroppedFiles;
if (!deletePaths.isEmpty()) {
canContainDroppedFiles = true;
} else if (filesToDelete.containsDeletes()) {
} else if (!deleteFiles.isEmpty()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
Expand All @@ -369,7 +372,7 @@ private boolean manifestHasDeletedFiles(
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
|| filesToDelete.markedForDelete(file)
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand Down Expand Up @@ -405,7 +408,7 @@ private ManifestFile filterManifestWithDeletedFiles(
boolean isDelete = reader.isDeleteManifestReader();
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
FilesToDeleteHolder deletedFiles = new FilesToDeleteHolder();
Set<F> deletedFiles = newFileSet();

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
Expand All @@ -417,7 +420,7 @@ private ManifestFile filterManifestWithDeletedFiles(
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
|| filesToDelete.markedForDelete(file)
|| deletedFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -437,7 +440,7 @@ private ManifestFile filterManifestWithDeletedFiles(
if (allRowsMatch) {
writer.delete(entry);

if (deletedFiles.markedForDelete(file)) {
if (deletedFiles.contains(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
Expand All @@ -446,7 +449,7 @@ private ManifestFile filterManifestWithDeletedFiles(
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.delete(file.copyWithoutStats());
deletedFiles.add(file.copyWithoutStats());
}
} else {
writer.existing(entry);
Expand Down Expand Up @@ -527,63 +530,4 @@ private Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator> metricsEvaluator
return metricsEvaluators.get(partition);
}
}

private class FilesToDeleteHolder {
private final DataFileSet dataFiles = DataFileSet.create();
private final DeleteFileSet deleteFiles = DeleteFileSet.create();

private FilesToDeleteHolder() {}

private void delete(F file) {
if (file instanceof DataFile) {
dataFiles.add((DataFile) file);
} else {
deleteFiles.add((DeleteFile) file);
}
}

private boolean containsDeletes() {
return !dataFiles.isEmpty() || !deleteFiles.isEmpty();
}

private boolean markedForDelete(F file) {
if (file instanceof DataFile) {
return dataFiles.contains((DataFile) file);
} else {
return deleteFiles.contains((DeleteFile) file);
}
}

private void validateRequiredDeletes(FilesToDeleteHolder filesToBeDeleted) {
ValidationException.check(
dataFiles.containsAll(filesToBeDeleted.dataFiles),
"Missing required files to delete: %s",
COMMA.join(
filesToBeDeleted.dataFiles.stream()
.filter(f -> !dataFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));

ValidationException.check(
deleteFiles.containsAll(filesToBeDeleted.deleteFiles),
"Missing required files to delete: %s",
COMMA.join(
filesToBeDeleted.deleteFiles.stream()
.filter(f -> !deleteFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));
}

@SuppressWarnings("CollectionUndefinedEquality")
private void validateRequiredDeletes(CharSequenceSet filePathsToBeDeleted) {
CharSequenceSet deletedFiles = CharSequenceSet.empty();
dataFiles.stream().map(ContentFile::path).forEach(deletedFiles::add);
deleteFiles.stream().map(ContentFile::path).forEach(deletedFiles::add);

ValidationException.check(
deletedFiles.containsAll(filePathsToBeDeleted),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(filePathsToBeDeleted, path -> !deletedFiles.contains(path))));
}
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,11 @@ protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec)
protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newManifestReader(manifest);
}

@Override
protected Set<DataFile> newFileSet() {
return DataFileSet.create();
}
}

private class DataFileMergeManager extends ManifestMergeManager<DataFile> {
Expand Down Expand Up @@ -1087,6 +1092,11 @@ protected ManifestWriter<DeleteFile> newManifestWriter(PartitionSpec manifestSpe
protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
}

@Override
protected Set<DeleteFile> newFileSet() {
return DeleteFileSet.create();
}
}

private class DeleteFileMergeManager extends ManifestMergeManager<DeleteFile> {
Expand Down

0 comments on commit 83c6052

Please sign in to comment.