Skip to content

Commit

Permalink
Spark: Follow name mapping when importing ORC tables (#1399)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarRd authored and shardulm94 committed Oct 28, 2020
1 parent 17121a1 commit db051bc
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 247 deletions.
39 changes: 29 additions & 10 deletions orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
Expand Down Expand Up @@ -73,43 +74,60 @@ public static Metrics fromInputFile(InputFile file) {
}

public static Metrics fromInputFile(InputFile file, MetricsConfig metricsConfig) {
return fromInputFile(file, metricsConfig, null);
}

public static Metrics fromInputFile(InputFile file, MetricsConfig metricsConfig, NameMapping mapping) {
final Configuration config = (file instanceof HadoopInputFile) ?
((HadoopInputFile) file).getConf() : new Configuration();
return fromInputFile(file, config, metricsConfig);
return fromInputFile(file, config, metricsConfig, mapping);
}

static Metrics fromInputFile(InputFile file, Configuration config, MetricsConfig metricsConfig) {
static Metrics fromInputFile(InputFile file, Configuration config, MetricsConfig metricsConfig, NameMapping mapping) {
try (Reader orcReader = ORC.newFileReader(file, config)) {
return buildOrcMetrics(orcReader.getNumberOfRows(), orcReader.getSchema(), orcReader.getStatistics(),
metricsConfig);
metricsConfig, mapping);
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Failed to open file: %s", file.location());
}
}

static Metrics fromWriter(Writer writer, MetricsConfig metricsConfig) {
try {
return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics(), metricsConfig);
return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics(), metricsConfig, null);
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Failed to get statistics from writer");
}
}

private static Metrics buildOrcMetrics(long numOfRows, TypeDescription orcSchema,
ColumnStatistics[] colStats, MetricsConfig metricsConfig) {
final Schema schema = ORCSchemaUtil.convert(orcSchema);
final Set<Integer> statsColumns = statsColumns(orcSchema);
private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema,
final ColumnStatistics[] colStats, final MetricsConfig metricsConfig,
final NameMapping mapping) {
final TypeDescription orcSchemaWithIds = (!ORCSchemaUtil.hasIds(orcSchema) && mapping != null) ?
ORCSchemaUtil.applyNameMapping(orcSchema, mapping) : orcSchema;
final Set<Integer> statsColumns = statsColumns(orcSchemaWithIds);
final MetricsConfig effectiveMetricsConfig = Optional.ofNullable(metricsConfig)
.orElseGet(MetricsConfig::getDefault);
Map<Integer, Long> columnSizes = Maps.newHashMapWithExpectedSize(colStats.length);
Map<Integer, Long> valueCounts = Maps.newHashMapWithExpectedSize(colStats.length);
Map<Integer, Long> nullCounts = Maps.newHashMapWithExpectedSize(colStats.length);

if (!ORCSchemaUtil.hasIds(orcSchemaWithIds)) {
return new Metrics(numOfRows,
columnSizes,
valueCounts,
nullCounts,
null,
null);
}

final Schema schema = ORCSchemaUtil.convert(orcSchemaWithIds);
Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();

for (int i = 0; i < colStats.length; i++) {
final ColumnStatistics colStat = colStats[i];
final TypeDescription orcCol = orcSchema.findSubtype(i);
final TypeDescription orcCol = orcSchemaWithIds.findSubtype(i);
final Optional<Types.NestedField> icebergColOpt = ORCSchemaUtil.icebergID(orcCol)
.map(schema::findField);

Expand Down Expand Up @@ -261,7 +279,8 @@ private static class StatsColumnsVisitor extends OrcSchemaVisitor<Set<Integer>>
public Set<Integer> record(TypeDescription record, List<String> names, List<Set<Integer>> fields) {
ImmutableSet.Builder<Integer> result = ImmutableSet.builder();
fields.stream().filter(Objects::nonNull).forEach(result::addAll);
record.getChildren().stream().map(ORCSchemaUtil::fieldId).forEach(result::add);
record.getChildren().stream().map(ORCSchemaUtil::icebergID).filter(Optional::isPresent)
.map(Optional::get).forEach(result::add);
return result.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,7 @@ public static List<DataFile> listPartition(Map<String, String> partition, String
} else if (format.contains("parquet")) {
return listParquetPartition(partition, uri, spec, conf, metricsConfig, mapping);
} else if (format.contains("orc")) {
// TODO: use NameMapping in listOrcPartition
return listOrcPartition(partition, uri, spec, conf, metricsConfig);
return listOrcPartition(partition, uri, spec, conf, metricsConfig, mapping);
} else {
throw new UnsupportedOperationException("Unknown partition format: " + format);
}
Expand Down Expand Up @@ -396,15 +395,16 @@ private static List<DataFile> listParquetPartition(Map<String, String> partition

private static List<DataFile> listOrcPartition(Map<String, String> partitionPath, String partitionUri,
PartitionSpec spec, Configuration conf,
MetricsConfig metricsSpec) {
MetricsConfig metricsSpec, NameMapping mapping) {
try {
Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);

return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.map(stat -> {
Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf), metricsSpec);
Metrics metrics = OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf),
metricsSpec, mapping);
String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name, partitionPath.get(name)))
Expand Down
Loading

0 comments on commit db051bc

Please sign in to comment.