Skip to content

Commit

Permalink
Hive Metadata Scan: Do not throw an exception on dangling partitions;…
Browse files Browse the repository at this point in the history
… log warning message (#50)
  • Loading branch information
wmoustafa authored Nov 27, 2020
1 parent b85a34d commit 2e111a1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ static List<FileStatus> listFiles(String directory, Configuration conf) {
return Arrays.asList(files);
}

static boolean exists(String file, Configuration conf) {
final Path filePath = new Path(file);
try {
FileSystem fs = filePath.getFileSystem(conf);
return fs.exists(filePath);
} catch (IOException e) {
throw new RuntimeIOException(e, "Error determining if file or directory exists: " + file);
}
}

private enum HiddenPathFilter implements PathFilter {
INSTANCE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,19 @@ Iterable<Iterable<DataFile>> getFilesByFilter(Expression expression) {

Iterable<Iterable<DataFile>> filesPerDirectory = Iterables.transform(
matchingDirectories,
directory -> Iterables.transform(
FileSystemUtils.listFiles(directory.location(), conf),
file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format())));
directory -> {
List<FileStatus> files;
if (FileSystemUtils.exists(directory.location(), conf)) {
files = FileSystemUtils.listFiles(directory.location(), conf);
} else {
LOG.warn("Cannot find directory: {}. Skipping.", directory.location());
files = ImmutableList.of();
}
return Iterables.transform(
files,
file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format())
);
});

// Note that we return an Iterable of Iterables here so that the TableScan can process iterables of individual
// directories in parallel hence resulting in a parallel file listing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.hive.legacy;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
Expand All @@ -29,6 +30,7 @@
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand Down Expand Up @@ -107,6 +109,17 @@ public void testHiveScanMultiPartition() throws Exception {
filesMatch(ImmutableMap.of("pcol=ds/pIntCol=2/B", AVRO, "pcol=ds/pIntCol=1/A", AVRO), hiveScan(table));
}

@Test
public void testHiveScanDanglingPartitions() throws Exception {
String tableName = "dangling_partition";
Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS);
addPartition(table, ImmutableList.of("ds", 1), AVRO, "A");
addPartition(table, ImmutableList.of("ds", 2), AVRO, "B");
addPartition(table, ImmutableList.of("ds", 3), AVRO, "C");
makePartitionDangling(table, ImmutableList.of("ds", 3));
filesMatch(ImmutableMap.of("pcol=ds/pIntCol=2/B", AVRO, "pcol=ds/pIntCol=1/A", AVRO), hiveScan(table));
}

@Test
public void testHiveScanNoAvroSchema() throws Exception {
String tableName = "hive_scan_no_avro_schema";
Expand Down Expand Up @@ -256,6 +269,15 @@ private void addPartition(Table table, List<Object> partitionValues, FileFormat
}
}

private void makePartitionDangling(Table table, List<Object> partitionValues) throws Exception {
String partitionLocation = metastoreClient.getPartition(
table.getDbName(),
table.getTableName(),
Lists.transform(partitionValues, Object::toString)
).getSd().getLocation();
FileUtils.deleteDirectory(new File(new URI(partitionLocation)));
}

private Map<String, FileFormat> hiveScan(Table table) {
return hiveScan(table, Expressions.alwaysTrue());
}
Expand Down

0 comments on commit 2e111a1

Please sign in to comment.