Skip to content

Commit

Permalink
[HUDI-8340] Fix functional index record generation using spark distri…
Browse files Browse the repository at this point in the history
…buted computation (#12127)

Co-authored-by: sivabalan <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
  • Loading branch information
3 people authored Oct 21, 2024
1 parent 4b619f0 commit e0b12fb
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat

for (MetadataPartitionType partitionType : partitionsToInit) {
// Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time.
String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime);
String instantTimeForPartition = generateUniqueInstantTime(initializationTime);
String partitionTypeName = partitionType.name();
LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, commitTimeForPartition);
LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, instantTimeForPartition);
String partitionName;
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
try {
Expand Down Expand Up @@ -433,7 +433,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
}
ValidationUtils.checkState(functionalIndexPartitionsToInit.size() == 1, "Only one functional index at a time is supported for now");
partitionName = functionalIndexPartitionsToInit.iterator().next();
fileGroupCountAndRecordsPair = initializeFunctionalIndexPartition(partitionName);
fileGroupCountAndRecordsPair = initializeFunctionalIndexPartition(partitionName, instantTimeForPartition);
break;
case PARTITION_STATS:
fileGroupCountAndRecordsPair = initializePartitionStatsIndex(partitionInfoList);
Expand Down Expand Up @@ -469,11 +469,11 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
// Generate the file groups
final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionTypeName + " should be > 0");
initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount, partitionName);
initializeFileGroups(dataMetaClient, partitionType, instantTimeForPartition, fileGroupCount, partitionName);

// Perform the commit using bulkCommit
HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
bulkCommit(commitTimeForPartition, partitionName, records, fileGroupCount);
bulkCommit(instantTimeForPartition, partitionName, records, fileGroupCount);
metadataMetaClient.reloadActiveTimeline();

dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionName, true);
Expand All @@ -495,7 +495,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
* @param initializationTime Timestamp from dataset to use for initialization
* @return a unique timestamp for MDT
*/
private String generateUniqueCommitInstantTime(String initializationTime) {
private String generateUniqueInstantTime(String initializationTime) {
// If it's initialized via Async indexer, we don't need to alter the init time.
// otherwise yields the timestamp on the fly.
// This function would be called multiple times in a single application if multiple indexes are being
Expand Down Expand Up @@ -537,27 +537,41 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(
return Pair.of(fileGroupCount, records);
}

protected abstract HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, FileSlice>> partitionFileSlicePairs,
protected abstract HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet,
HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient,
int parallelism, Schema readerSchema,
StorageConfiguration<?> storageConf);
StorageConfiguration<?> storageConf,
String instantTime);

protected abstract EngineType getEngineType();

public abstract HoodieData<HoodieRecord> getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext,
Map<String, String> recordKeySecondaryKeyMap,
HoodieIndexDefinition indexDefinition);

private Pair<Integer, HoodieData<HoodieRecord>> initializeFunctionalIndexPartition(String indexName) throws Exception {
private Pair<Integer, HoodieData<HoodieRecord>> initializeFunctionalIndexPartition(String indexName, String instantTime) throws Exception {
HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexName);
ValidationUtils.checkState(indexDefinition != null, "Functional Index definition is not present for index " + indexName);
List<Pair<String, FileSlice>> partitionFileSlicePairs = getPartitionFileSlicePairs();
List<Pair<String, Pair<String, Long>>> partitionFilePathSizeTriplet = new ArrayList<>();
partitionFileSlicePairs.forEach(entry -> {
if (entry.getValue().getBaseFile().isPresent()) {
partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(entry.getValue().getBaseFile().get().getPath(), entry.getValue().getBaseFile().get().getFileLen())));
}
entry.getValue().getLogFiles().forEach(hoodieLogFile -> {
if (entry.getValue().getLogFiles().count() > 0) {
entry.getValue().getLogFiles().forEach(logfile -> {
partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(logfile.getPath().toString(), logfile.getFileSize())));
});
}
});
});

int fileGroupCount = dataWriteConfig.getMetadataConfig().getFunctionalIndexFileGroupCount();
int parallelism = Math.min(partitionFileSlicePairs.size(), dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
int parallelism = Math.min(partitionFilePathSizeTriplet.size(), dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
Schema readerSchema = getProjectedSchemaForFunctionalIndex(indexDefinition, dataMetaClient);
return Pair.of(fileGroupCount, getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf));
return Pair.of(fileGroupCount, getFunctionalIndexRecords(partitionFilePathSizeTriplet, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime));
}

private HoodieIndexDefinition getFunctionalIndexDefinition(String indexName) {
Expand All @@ -570,6 +584,9 @@ private HoodieIndexDefinition getFunctionalIndexDefinition(String indexName) {
}

private Set<String> getIndexPartitionsToInit(MetadataPartitionType partitionType) {
if (dataMetaClient.getIndexMetadata().isEmpty()) {
return Collections.emptySet();
}
Set<String> secondaryIndexPartitions = dataMetaClient.getIndexMetadata().get().getIndexDefinitions().values().stream()
.map(HoodieIndexDefinition::getIndexName)
.filter(indexName -> indexName.startsWith(partitionType.getPartitionPath()))
Expand Down Expand Up @@ -1106,28 +1123,12 @@ private void updateFunctionalIndexIfPresent(HoodieCommitMetadata commitMetadata,
*/
private HoodieData<HoodieRecord> getFunctionalIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception {
HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexPartition);
List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
HoodieTableFileSystemView fsv = HoodieTableMetadataUtil.getFileSystemView(dataMetaClient);
commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> {
// collect list of FileIDs touched as part of this commit.
Set<String> fileIds = writeStats.stream().map(writeStat -> writeStat.getFileId()).collect(Collectors.toSet());
List<FileSlice> fileSlices = getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.of(fsv), dataPartition)
.stream().filter(fileSlice -> fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList());
// process only the fileSlices touched in this commit meta
// data.
fileSlices.forEach(fileSlice -> {
// Filter log files for the instant time and add to this partition fileSlice pairs
List<HoodieLogFile> logFilesForInstant = fileSlice.getLogFiles()
.filter(logFile -> logFile.getDeltaCommitTime().equals(instantTime))
.collect(Collectors.toList());
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseInstantTime().equals(instantTime) ? fileSlice.getBaseFile() : Option.empty();
partitionFileSlicePairs.add(Pair.of(dataPartition, new FileSlice(
fileSlice.getFileGroupId(), fileSlice.getBaseInstantTime(), baseFileOpt.orElse(null), logFilesForInstant)));
});
});
int parallelism = Math.min(partitionFileSlicePairs.size(), dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
List<Pair<String, Pair<String, Long>>> partitionFilePathPairs = new ArrayList<>();
commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> writeStats.forEach(writeStat -> partitionFilePathPairs.add(
Pair.of(writeStat.getPartitionPath(), Pair.of(new StoragePath(dataMetaClient.getBasePath(), writeStat.getPath()).toString(), writeStat.getFileSizeInBytes())))));
int parallelism = Math.min(partitionFilePathPairs.size(), dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
Schema readerSchema = getProjectedSchemaForFunctionalIndex(indexDefinition, dataMetaClient);
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf);
return getFunctionalIndexRecords(partitionFilePathPairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime);
}

private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -189,8 +188,9 @@ protected void preWrite(String instantTime) {
}

@Override
protected HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, FileSlice>> partitionFileSlicePairs, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient,
int parallelism, Schema readerSchema, StorageConfiguration<?> storageConf) {
protected HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, StorageConfiguration<?> storageConf,
String instantTime) {
throw new HoodieNotSupportedException("Flink metadata table does not support functional index yet.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -124,8 +123,9 @@ protected void preWrite(String instantTime) {
}

@Override
protected HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, FileSlice>> partitionFileSlicePairs, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient,
int parallelism, Schema readerSchema, StorageConfiguration<?> storageConf) {
protected HoodieData<HoodieRecord> getFunctionalIndexRecords(List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, StorageConfiguration<?> storageConf,
String instantTime) {
throw new HoodieNotSupportedException("Functional index not supported for Java metadata table writer yet.");
}

Expand Down
Loading

0 comments on commit e0b12fb

Please sign in to comment.