-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Switch usage to DataFileSet / DeleteFileSet #11158
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,8 @@ | |
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
import org.apache.iceberg.util.CharSequenceSet; | ||
import org.apache.iceberg.util.DataFileSet; | ||
import org.apache.iceberg.util.DeleteFileSet; | ||
import org.apache.iceberg.util.Pair; | ||
import org.apache.iceberg.util.PartitionSet; | ||
import org.apache.iceberg.util.SnapshotUtil; | ||
|
@@ -81,8 +83,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> { | |
|
||
// update data | ||
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap(); | ||
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty(); | ||
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty(); | ||
private final DataFileSet newDataFiles = DataFileSet.create(); | ||
private final DeleteFileSet newDeleteFiles = DeleteFileSet.create(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need these extra collections? Can't we use sets in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm handling this already in 4d42f18. I just didn't want to introduce too many changes/refactorings as the PR is already quite large |
||
private Long newDataFilesDataSequenceNumber; | ||
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap(); | ||
private final List<ManifestFile> appendManifests = Lists.newArrayList(); | ||
|
@@ -234,7 +236,7 @@ protected boolean addsDeleteFiles() { | |
/** Add a data file to the new snapshot. */ | ||
protected void add(DataFile file) { | ||
Preconditions.checkNotNull(file, "Invalid data file: null"); | ||
if (newDataFilePaths.add(file.path())) { | ||
if (newDataFiles.add(file)) { | ||
PartitionSpec fileSpec = ops.current().spec(file.specId()); | ||
Preconditions.checkArgument( | ||
fileSpec != null, | ||
|
@@ -244,9 +246,9 @@ protected void add(DataFile file) { | |
|
||
addedFilesSummary.addedFile(fileSpec, file); | ||
hasNewDataFiles = true; | ||
List<DataFile> newDataFiles = | ||
List<DataFile> dataFiles = | ||
newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList()); | ||
newDataFiles.add(file); | ||
dataFiles.add(file); | ||
} | ||
} | ||
|
||
|
@@ -268,7 +270,7 @@ private void add(DeleteFileHolder fileHolder) { | |
List<DeleteFileHolder> deleteFiles = | ||
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList()); | ||
|
||
if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) { | ||
if (newDeleteFiles.add(fileHolder.deleteFile())) { | ||
deleteFiles.add(fileHolder); | ||
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile()); | ||
hasNewDeleteFiles = true; | ||
|
@@ -970,9 +972,9 @@ private List<ManifestFile> newDataFilesAsManifests() { | |
|
||
if (cachedNewDataManifests.isEmpty()) { | ||
newDataFilesBySpec.forEach( | ||
(dataSpec, newDataFiles) -> { | ||
(dataSpec, dataFiles) -> { | ||
List<ManifestFile> newDataManifests = | ||
writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec); | ||
writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec); | ||
cachedNewDataManifests.addAll(newDataManifests); | ||
}); | ||
this.hasNewDataFiles = false; | ||
|
@@ -1032,6 +1034,11 @@ protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) | |
protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) { | ||
return MergingSnapshotProducer.this.newManifestReader(manifest); | ||
} | ||
|
||
@Override | ||
protected Set<DataFile> newFileSet() { | ||
return DataFileSet.create(); | ||
} | ||
} | ||
|
||
private class DataFileMergeManager extends ManifestMergeManager<DataFile> { | ||
|
@@ -1085,6 +1092,11 @@ protected ManifestWriter<DeleteFile> newManifestWriter(PartitionSpec manifestSpe | |
protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) { | ||
return MergingSnapshotProducer.this.newDeleteManifestReader(manifest); | ||
} | ||
|
||
@Override | ||
protected Set<DeleteFile> newFileSet() { | ||
return DeleteFileSet.create(); | ||
} | ||
} | ||
|
||
private class DeleteFileMergeManager extends ManifestMergeManager<DeleteFile> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we co-locate
deleteFiles
anddeleteFilePartitions
asdeleteFilePartitions
essentially contains a set of partitions derived fromdeleteFiles
.