diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 85d1a99950f8..5e244c2b1212 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -403,9 +403,9 @@ private boolean initializeFromFilesystem(String initializationTime, List> fileGroupCountAndRecordsPair; try { @@ -433,7 +433,7 @@ private boolean initializeFromFilesystem(String initializationTime, List 0, "FileGroup count for MDT partition " + partitionTypeName + " should be > 0"); - initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount, partitionName); + initializeFileGroups(dataMetaClient, partitionType, instantTimeForPartition, fileGroupCount, partitionName); // Perform the commit using bulkCommit HoodieData records = fileGroupCountAndRecordsPair.getValue(); - bulkCommit(commitTimeForPartition, partitionName, records, fileGroupCount); + bulkCommit(instantTimeForPartition, partitionName, records, fileGroupCount); metadataMetaClient.reloadActiveTimeline(); dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionName, true); @@ -495,7 +495,7 @@ private boolean initializeFromFilesystem(String initializationTime, List> initializeBloomFiltersPartition( return Pair.of(fileGroupCount, records); } - protected abstract HoodieData getFunctionalIndexRecords(List> partitionFileSlicePairs, + protected abstract HoodieData getFunctionalIndexRecords(List>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, - StorageConfiguration storageConf); + StorageConfiguration storageConf, + String instantTime); protected abstract EngineType getEngineType(); @@ -549,15 +550,28 @@ public abstract HoodieData getDeletedSecondaryRecordMapping(Hoodie Map recordKeySecondaryKeyMap, HoodieIndexDefinition indexDefinition); - private Pair> initializeFunctionalIndexPartition(String indexName) throws Exception { + private Pair> initializeFunctionalIndexPartition(String indexName, String instantTime) throws Exception { HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexName); ValidationUtils.checkState(indexDefinition != null, "Functional Index definition is not present for index " + indexName); List> partitionFileSlicePairs = getPartitionFileSlicePairs(); + List>> partitionFilePathSizeTriplet = new ArrayList<>(); + partitionFileSlicePairs.forEach(entry -> { + if (entry.getValue().getBaseFile().isPresent()) { + partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(entry.getValue().getBaseFile().get().getPath(), entry.getValue().getBaseFile().get().getFileLen()))); + } + entry.getValue().getLogFiles().forEach(hoodieLogFile -> { + if (entry.getValue().getLogFiles().count() > 0) { + entry.getValue().getLogFiles().forEach(logfile -> { + partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(logfile.getPath().toString(), logfile.getFileSize()))); + }); + } + }); + }); int fileGroupCount = dataWriteConfig.getMetadataConfig().getFunctionalIndexFileGroupCount(); - int parallelism = Math.min(partitionFileSlicePairs.size(), dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism()); + int parallelism = Math.min(partitionFilePathSizeTriplet.size(), dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism()); Schema readerSchema = getProjectedSchemaForFunctionalIndex(indexDefinition, dataMetaClient); - return Pair.of(fileGroupCount, getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf)); + return Pair.of(fileGroupCount, getFunctionalIndexRecords(partitionFilePathSizeTriplet, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime)); } private HoodieIndexDefinition getFunctionalIndexDefinition(String indexName) { @@ -570,6 +584,9 @@ private HoodieIndexDefinition getFunctionalIndexDefinition(String indexName) { } private Set getIndexPartitionsToInit(MetadataPartitionType partitionType) { + if (dataMetaClient.getIndexMetadata().isEmpty()) { + return Collections.emptySet(); + } Set secondaryIndexPartitions = dataMetaClient.getIndexMetadata().get().getIndexDefinitions().values().stream() .map(HoodieIndexDefinition::getIndexName) .filter(indexName -> indexName.startsWith(partitionType.getPartitionPath())) @@ -1106,28 +1123,12 @@ private void updateFunctionalIndexIfPresent(HoodieCommitMetadata commitMetadata, */ private HoodieData getFunctionalIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, String instantTime) throws Exception { HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexPartition); - List> partitionFileSlicePairs = new ArrayList<>(); - HoodieTableFileSystemView fsv = HoodieTableMetadataUtil.getFileSystemView(dataMetaClient); - commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> { - // collect list of FileIDs touched as part of this commit. - Set fileIds = writeStats.stream().map(writeStat -> writeStat.getFileId()).collect(Collectors.toSet()); - List fileSlices = getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.of(fsv), dataPartition) - .stream().filter(fileSlice -> fileIds.contains(fileSlice.getFileId())).collect(Collectors.toList()); - // process only the fileSlices touched in this commit meta - // data. - fileSlices.forEach(fileSlice -> { - // Filter log files for the instant time and add to this partition fileSlice pairs - List logFilesForInstant = fileSlice.getLogFiles() - .filter(logFile -> logFile.getDeltaCommitTime().equals(instantTime)) - .collect(Collectors.toList()); - Option baseFileOpt = fileSlice.getBaseInstantTime().equals(instantTime) ? fileSlice.getBaseFile() : Option.empty(); - partitionFileSlicePairs.add(Pair.of(dataPartition, new FileSlice( - fileSlice.getFileGroupId(), fileSlice.getBaseInstantTime(), baseFileOpt.orElse(null), logFilesForInstant))); - }); - }); - int parallelism = Math.min(partitionFileSlicePairs.size(), dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism()); + List>> partitionFilePathPairs = new ArrayList<>(); + commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> writeStats.forEach(writeStat -> partitionFilePathPairs.add( + Pair.of(writeStat.getPartitionPath(), Pair.of(new StoragePath(dataMetaClient.getBasePath(), writeStat.getPath()).toString(), writeStat.getFileSizeInBytes()))))); + int parallelism = Math.min(partitionFilePathPairs.size(), dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism()); Schema readerSchema = getProjectedSchemaForFunctionalIndex(indexDefinition, dataMetaClient); - return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf); + return getFunctionalIndexRecords(partitionFilePathPairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime); } private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map> partitionToRecordMap, HoodieData writeStatus) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 6a79b1f1bb1d..36d19b98de53 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -24,7 +24,6 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieRecord; @@ -189,8 +188,9 @@ protected void preWrite(String instantTime) { } @Override - protected HoodieData getFunctionalIndexRecords(List> partitionFileSlicePairs, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient, - int parallelism, Schema readerSchema, StorageConfiguration storageConf) { + protected HoodieData getFunctionalIndexRecords(List>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, + HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, StorageConfiguration storageConf, + String instantTime) { throw new HoodieNotSupportedException("Flink metadata table does not support functional index yet."); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java index 64c8ace922a0..93c8ee2b2bba 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieRecord; @@ -124,8 +123,9 @@ protected void preWrite(String instantTime) { } @Override - protected HoodieData getFunctionalIndexRecords(List> partitionFileSlicePairs, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient, - int parallelism, Schema readerSchema, StorageConfiguration storageConf) { + protected HoodieData getFunctionalIndexRecords(List>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, + HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema, StorageConfiguration storageConf, + String instantTime) { throw new HoodieNotSupportedException("Functional index not supported for Java metadata table writer yet."); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index 48bd0f9a64c3..e1de397ff5d7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -21,8 +21,9 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; -import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -31,36 +32,42 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; import org.apache.hudi.common.util.HoodieRecordUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.functional.HoodieFunctionalIndex; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.io.storage.HoodieIOFactory; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.util.JavaScalaConverters; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapGroupsFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.functions; - -import javax.annotation.Nullable; +import org.apache.spark.sql.types.StructType; +import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; +import scala.Function1; + import static org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE; -import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS; import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; import static org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord; @@ -72,97 +79,103 @@ */ public class SparkMetadataWriterUtils { - public static List getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient, - Schema readerSchema, - List fileSlices, - String partition, - HoodieFunctionalIndex functionalIndex, - String columnToIndex, - SQLContext sqlContext) { - List> columnRangeMetadataList = new ArrayList<>(); - for (FileSlice fileSlice : fileSlices) { - if (fileSlice.getBaseFile().isPresent()) { - HoodieBaseFile baseFile = fileSlice.getBaseFile().get(); - long fileSize = baseFile.getFileSize(); - buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, columnToIndex, sqlContext, columnRangeMetadataList, fileSize, baseFile.getStoragePath(), true); - } - // Handle log files - fileSlice.getLogFiles().forEach(logFile -> { - long fileSize = logFile.getFileSize(); - buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, columnToIndex, sqlContext, columnRangeMetadataList, fileSize, logFile.getPath(), false); - }); - } - return createColumnStatsRecords(partition, columnRangeMetadataList, false, functionalIndex.getIndexName(), COLUMN_STATS.getRecordType()).collect(Collectors.toList()); + public static Column[] getFunctionalIndexColumns() { + return new Column[] { + functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION), + functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH), + functions.col(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE) + }; } - public static List getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient, - Schema readerSchema, - List fileSlices, - String partition, - HoodieFunctionalIndex functionalIndex, - String columnToIndex, - SQLContext sqlContext, - HoodieWriteConfig metadataWriteConfig) { - List bloomFilterMetadataList = new ArrayList<>(); - for (FileSlice fileSlice : fileSlices) { - if (fileSlice.getBaseFile().isPresent()) { - HoodieBaseFile baseFile = fileSlice.getBaseFile().get(); - buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList, baseFile.getStoragePath(), metadataWriteConfig, partition, - baseFile.getCommitTime(), true); - } - // Handle log files - fileSlice.getLogFiles().forEach( - logFile -> buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList, logFile.getPath(), metadataWriteConfig, partition, - logFile.getDeltaCommitTime(), false)); - } - return bloomFilterMetadataList; + public static String[] getFunctionalIndexColumnNames() { + return new String[] { + HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION, + HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH, + HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE + }; } - private static void buildColumnRangeMetadata(HoodieTableMetaClient metaClient, - Schema readerSchema, - HoodieFunctionalIndex functionalIndex, - String columnToIndex, - SQLContext sqlContext, - List> columnRangeMetadataList, - long fileSize, - StoragePath filePath, - boolean isBaseFile) { - Dataset fileDf = readRecordsAsRow(new StoragePath[] {filePath}, sqlContext, metaClient, readerSchema, isBaseFile); - Column indexedColumn = functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex))); - fileDf = fileDf.withColumn(columnToIndex, indexedColumn); - HoodieColumnRangeMetadata columnRangeMetadata = computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(), fileSize); - columnRangeMetadataList.add(columnRangeMetadata); + @NotNull + public static List getRowsWithFunctionalIndexMetadata(List rowsForFilePath, String partition, String filePath, long fileSize) { + return rowsForFilePath.stream().map(row -> { + scala.collection.immutable.Seq indexMetadata = JavaScalaConverters.convertJavaListToScalaList(Arrays.asList(partition, filePath, fileSize)); + Row functionalIndexRow = Row.fromSeq(indexMetadata); + List rows = new ArrayList<>(2); + rows.add(row); + rows.add(functionalIndexRow); + scala.collection.immutable.Seq rowSeq = JavaScalaConverters.convertJavaListToScalaList(rows); + return Row.merge(rowSeq); + }).collect(Collectors.toList()); } - private static void buildBloomFilterMetadata(HoodieTableMetaClient metaClient, - Schema readerSchema, - HoodieFunctionalIndex functionalIndex, - String columnToIndex, - SQLContext sqlContext, - List bloomFilterMetadataList, - StoragePath filePath, - HoodieWriteConfig writeConfig, - String partitionName, - String instantTime, - boolean isBaseFile) { - Dataset fileDf = readRecordsAsRow(new StoragePath[] {filePath}, sqlContext, metaClient, readerSchema, isBaseFile); - Column indexedColumn = functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex))); - fileDf = fileDf.withColumn(columnToIndex, indexedColumn); - BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter(writeConfig); - fileDf.foreach(row -> { - byte[] key = row.getAs(columnToIndex).toString().getBytes(); - bloomFilter.add(key); - }); - ByteBuffer bloomByteBuffer = ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString())); - bloomFilterMetadataList.add(createBloomFilterMetadataRecord(partitionName, filePath.toString(), instantTime, writeConfig.getBloomFilterType(), bloomByteBuffer, false)); + public static HoodieData getFunctionalIndexRecordsUsingColumnStats(Dataset dataset, + HoodieFunctionalIndex functionalIndex, + String columnToIndex) { + // Aggregate col stats related data for the column to index + Dataset columnRangeMetadataDataset = dataset + .select(columnToIndex, SparkMetadataWriterUtils.getFunctionalIndexColumnNames()) + .groupBy(SparkMetadataWriterUtils.getFunctionalIndexColumns()) + .agg(functions.count(functions.when(functions.col(columnToIndex).isNull(), 1)).alias("nullCount"), + functions.min(columnToIndex).alias("minValue"), + functions.max(columnToIndex).alias("maxValue"), + functions.count(columnToIndex).alias("valueCount")); + // Generate column stat records using the aggregated data + return HoodieJavaRDD.of(columnRangeMetadataDataset.javaRDD()).flatMap((SerializableFunction>) + row -> { + int baseAggregatePosition = SparkMetadataWriterUtils.getFunctionalIndexColumnNames().length; + long nullCount = row.getLong(baseAggregatePosition); + Comparable minValue = (Comparable) row.get(baseAggregatePosition + 1); + Comparable maxValue = (Comparable) row.get(baseAggregatePosition + 2); + long valueCount = row.getLong(baseAggregatePosition + 3); + + String partitionName = row.getString(0); + String filePath = row.getString(1); + long totalFileSize = row.getLong(2); + // Total uncompressed size is harder to get directly. This is just an approximation to maintain the order. + long totalUncompressedSize = totalFileSize * 2; + + HoodieColumnRangeMetadata rangeMetadata = HoodieColumnRangeMetadata.create( + filePath, + columnToIndex, + minValue, + maxValue, + nullCount, + valueCount, + totalFileSize, + totalUncompressedSize + ); + return createColumnStatsRecords(partitionName, Collections.singletonList(rangeMetadata), false, functionalIndex.getIndexName(), + COLUMN_STATS.getRecordType()).collect(Collectors.toList()).iterator(); + }); } - private static Dataset readRecordsAsRow(StoragePath[] paths, SQLContext sqlContext, - HoodieTableMetaClient metaClient, Schema schema, - boolean isBaseFile) { + public static HoodieData getFunctionalIndexRecordsUsingBloomFilter(Dataset dataset, String columnToIndex, + HoodieWriteConfig metadataWriteConfig, String instantTime) { + // Group data using functional index metadata and then create bloom filter on the group + Dataset bloomFilterRecords = dataset.select(columnToIndex, SparkMetadataWriterUtils.getFunctionalIndexColumnNames()) + // row.get(0) refers to partition path value and row.get(1) refers to file name. + .groupByKey((MapFunction) row -> Pair.of(row.getString(0), row.getString(1)), Encoders.kryo(Pair.class)) + .flatMapGroups((FlatMapGroupsFunction) ((pair, iterator) -> { + String partition = pair.getLeft().toString(); + String fileName = pair.getRight().toString(); + BloomFilter bloomFilter = HoodieFileWriterFactory.createBloomFilter(metadataWriteConfig); + iterator.forEachRemaining(row -> { + byte[] key = row.getAs(columnToIndex).toString().getBytes(); + bloomFilter.add(key); + }); + ByteBuffer bloomByteBuffer = ByteBuffer.wrap(getUTF8Bytes(bloomFilter.serializeToString())); + HoodieRecord bloomFilterRecord = createBloomFilterMetadataRecord(partition, fileName, instantTime, metadataWriteConfig.getBloomFilterType(), bloomByteBuffer, false); + return Collections.singletonList(bloomFilterRecord).iterator(); + }), Encoders.kryo(HoodieRecord.class)); + return HoodieJavaRDD.of(bloomFilterRecords.javaRDD()); + } + + public static List readRecordsAsRows(StoragePath[] paths, SQLContext sqlContext, + HoodieTableMetaClient metaClient, Schema schema, + HoodieWriteConfig dataWriteConfig, boolean isBaseFile) { List records = isBaseFile ? getBaseFileRecords(new HoodieBaseFile(paths[0].toString()), metaClient, schema) : getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()), metaClient, schema); - return dropMetaFields(toDataset(records, schema, sqlContext, isBaseFile)); + return toRows(records, schema, dataWriteConfig, sqlContext, paths[0].toString()); } private static List getUnmergedLogFileRecords(List logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) { @@ -194,66 +207,20 @@ private static List getBaseFileRecords(HoodieBaseFile baseFile, Ho } } - private static Dataset toDataset(List records, Schema schema, SQLContext sqlContext, boolean isBaseFile) { - List avroRecords = records.stream() + private static List toRows(List records, Schema schema, HoodieWriteConfig dataWriteConfig, SQLContext sqlContext, String path) { + StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema); + Function1 converterToRow = AvroConversionUtils.createConverterToRow(schema, structType); + List avroRecords = records.stream() .map(r -> { - if (isBaseFile) { - return (GenericRecord) r.getData(); - } - HoodieRecordPayload payload = (HoodieRecordPayload) r.getData(); try { - return (GenericRecord) payload.getInsertValue(schema).get(); + return (GenericRecord) (r.getData() instanceof GenericRecord ? r.getData() + : ((HoodieRecordPayload) r.getData()).getInsertValue(schema, dataWriteConfig.getProps()).get()); } catch (IOException e) { - throw new HoodieIOException("Failed to extract Avro payload", e); + throw new HoodieIOException("Could not fetch record payload"); } }) + .map(converterToRow::apply) .collect(Collectors.toList()); - if (avroRecords.isEmpty()) { - return sqlContext.emptyDataFrame().toDF(); - } - JavaSparkContext jsc = new JavaSparkContext(sqlContext.sparkContext()); - JavaRDD javaRDD = jsc.parallelize(avroRecords); - return AvroConversionUtils.createDataFrame(javaRDD.rdd(), schema.toString(), sqlContext.sparkSession()); - } - - private static > HoodieColumnRangeMetadata computeColumnRangeMetadata(Dataset rowDataset, - String columnName, - String filePath, - long fileSize) { - long totalSize = fileSize; - // Get nullCount, minValue, and maxValue - Dataset aggregated = rowDataset.agg( - functions.count(functions.when(functions.col(columnName).isNull(), 1)).alias("nullCount"), - functions.min(columnName).alias("minValue"), - functions.max(columnName).alias("maxValue"), - functions.count(columnName).alias("valueCount") - ); - - Row result = aggregated.collectAsList().get(0); - long nullCount = result.getLong(0); - @Nullable T minValue = (T) result.get(1); - @Nullable T maxValue = (T) result.get(2); - long valueCount = result.getLong(3); - - // Total uncompressed size is harder to get directly. This is just an approximation to maintain the order. - long totalUncompressedSize = totalSize * 2; - - return HoodieColumnRangeMetadata.create( - filePath, - columnName, - minValue, - maxValue, - nullCount, - valueCount, - totalSize, - totalUncompressedSize - ); - } - - private static Dataset dropMetaFields(Dataset df) { - return df.select( - Arrays.stream(df.columns()) - .filter(c -> !HOODIE_META_COLUMNS.contains(c)) - .map(df::col).toArray(Column[]::new)); + return avroRecords; } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 08638526de4d..f5078fd3f8cd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -18,15 +18,18 @@ package org.apache.hudi.metadata; +import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.HoodieSparkFunctionalIndex; import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.utils.SparkMetadataWriterUtils; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.metrics.Registry; -import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieRecord; @@ -42,24 +45,31 @@ import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.hudi.client.utils.SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter; -import static org.apache.hudi.client.utils.SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats; +import static org.apache.hudi.client.utils.SparkMetadataWriterUtils.readRecordsAsRows; import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; @@ -159,10 +169,11 @@ public void deletePartitions(String instantTime, List par } @Override - protected HoodieData getFunctionalIndexRecords(List> partitionFileSlicePairs, + protected HoodieData getFunctionalIndexRecords(List>> partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition, HoodieTableMetaClient metaClient, int parallelism, - Schema readerSchema, StorageConfiguration storageConf) { + Schema readerSchema, StorageConfiguration storageConf, + String instantTime) { HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; if (indexDefinition.getSourceFields().isEmpty()) { // In case there are no columns to index, bail @@ -174,24 +185,40 @@ protected HoodieData getFunctionalIndexRecords(List> partitionToFileSlicesMap = partitionFileSlicePairs.stream() - .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList()))); + // Read records and append functional index metadata to every row + HoodieData rowData = sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism) + .flatMap((SerializableFunction>, Iterator>) entry -> { + String partition = entry.getKey(); + Pair filePathSizePair = entry.getValue(); + String filePath = filePathSizePair.getKey(); + long fileSize = filePathSizePair.getValue(); + List rowsForFilePath = readRecordsAsRows(new StoragePath[] {new StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig, + FSUtils.isBaseFile(new StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1)))); + List rowsWithIndexMetadata = SparkMetadataWriterUtils.getRowsWithFunctionalIndexMetadata(rowsForFilePath, partition, filePath, fileSize); + return rowsWithIndexMetadata.iterator(); + }); + + // Generate dataset with functional index metadata + StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(readerSchema) + .add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_PARTITION, DataTypes.StringType, false, Metadata.empty())) + .add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_PATH, DataTypes.StringType, false, Metadata.empty())) + .add(StructField.apply(HoodieFunctionalIndex.HOODIE_FUNCTIONAL_INDEX_FILE_SIZE, DataTypes.LongType, false, Metadata.empty())); + Dataset rowDataset = sparkEngineContext.getSqlContext().createDataFrame(HoodieJavaRDD.getJavaRDD(rowData).rdd(), structType); + + // Apply functional index and generate the column to index HoodieFunctionalIndex functionalIndex = new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(), indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), indexDefinition.getIndexOptions()); - List allRecords = new ArrayList<>(); - for (Map.Entry> entry : partitionToFileSlicesMap.entrySet()) { - String partition = entry.getKey(); - List fileSlices = entry.getValue(); - List recordsForPartition = Collections.emptyList(); - if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) { - recordsForPartition = getFunctionalIndexRecordsUsingColumnStats(metaClient, readerSchema, fileSlices, partition, functionalIndex, columnToIndex, sqlContext); - } else if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) { - recordsForPartition = getFunctionalIndexRecordsUsingBloomFilter(metaClient, readerSchema, fileSlices, partition, functionalIndex, columnToIndex, sqlContext, metadataWriteConfig); - } - allRecords.addAll(recordsForPartition); + Column indexedColumn = functionalIndex.apply(Collections.singletonList(rowDataset.col(columnToIndex))); + rowDataset = rowDataset.withColumn(columnToIndex, indexedColumn); + + // Generate functional index records + if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) { + return SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingColumnStats(rowDataset, functionalIndex, columnToIndex); + } else if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) { + return SparkMetadataWriterUtils.getFunctionalIndexRecordsUsingBloomFilter(rowDataset, columnToIndex, metadataWriteConfig, instantTime); + } else { + throw new UnsupportedOperationException(indexDefinition.getIndexType() + " is not yet supported"); } - return HoodieJavaRDD.of(allRecords, sparkEngineContext, parallelism); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java index 0ca71cf3aac8..4edb1e4c090a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/index/functional/HoodieFunctionalIndex.java @@ -31,29 +31,32 @@ */ public interface HoodieFunctionalIndex extends Serializable { - public static final String SPARK_DATE_FORMAT = "date_format"; - public static final String SPARK_DAY = "day"; - public static final String SPARK_MONTH = "month"; - public static final String SPARK_YEAR = "year"; - public static final String SPARK_HOUR = "hour"; - public static final String SPARK_FROM_UNIXTIME = "from_unixtime"; - public static final String SPARK_UNIX_TIMESTAMP = "unix_timestamp"; - public static final String SPARK_TO_DATE = "to_date"; - public static final String SPARK_TO_TIMESTAMP = "to_timestamp"; - public static final String SPARK_DATE_ADD = "date_add"; - public static final String SPARK_DATE_SUB = "date_sub"; - public static final String SPARK_CONCAT = "concat"; - public static final String SPARK_SUBSTRING = "substring"; - public static final String SPARK_UPPER = "upper"; - public static final String SPARK_LOWER = "lower"; - public static final String SPARK_TRIM = "trim"; - public static final String SPARK_LTRIM = "ltrim"; - public static final String SPARK_RTRIM = "rtrim"; - public static final String SPARK_LENGTH = "length"; - public static final String SPARK_REGEXP_REPLACE = "regexp_replace"; - public static final String SPARK_REGEXP_EXTRACT = "regexp_extract"; - public static final String SPARK_SPLIT = "split"; - public static final String SPARK_IDENTITY = "identity"; + String HOODIE_FUNCTIONAL_INDEX_FILE_PATH = "_hoodie_functional_index_file_path"; + String HOODIE_FUNCTIONAL_INDEX_PARTITION = "_hoodie_functional_index_partition"; + String HOODIE_FUNCTIONAL_INDEX_FILE_SIZE = "_hoodie_functional_index_file_size"; + String SPARK_DATE_FORMAT = "date_format"; + String SPARK_DAY = "day"; + String SPARK_MONTH = "month"; + String SPARK_YEAR = "year"; + String SPARK_HOUR = "hour"; + String SPARK_FROM_UNIXTIME = "from_unixtime"; + String SPARK_UNIX_TIMESTAMP = "unix_timestamp"; + String SPARK_TO_DATE = "to_date"; + String SPARK_TO_TIMESTAMP = "to_timestamp"; + String SPARK_DATE_ADD = "date_add"; + String SPARK_DATE_SUB = "date_sub"; + String SPARK_CONCAT = "concat"; + String SPARK_SUBSTRING = "substring"; + String SPARK_UPPER = "upper"; + String SPARK_LOWER = "lower"; + String SPARK_TRIM = "trim"; + String SPARK_LTRIM = "ltrim"; + String SPARK_RTRIM = "rtrim"; + String SPARK_LENGTH = "length"; + String SPARK_REGEXP_REPLACE = "regexp_replace"; + String SPARK_REGEXP_EXTRACT = "regexp_extract"; + String SPARK_SPLIT = "split"; + String SPARK_IDENTITY = "identity"; /** * Get the name of the index. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala index de20cfb6fa21..5a302304277b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala @@ -31,7 +31,6 @@ import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient} import org.apache.hudi.metadata.MetadataPartitionType import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH, META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME} import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient - import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.parser.ParserInterface