From 2e111a179b5e51071d40923a422df54cc82fabf4 Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Thu, 26 Nov 2020 17:13:08 -0800 Subject: [PATCH] Hive Metadata Scan: Do not throw an exception on dangling partitions; log warning message (#50) --- .../iceberg/hive/legacy/FileSystemUtils.java | 10 +++++++++ .../legacy/LegacyHiveTableOperations.java | 16 +++++++++++--- .../hive/legacy/TestLegacyHiveTableScan.java | 22 +++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java index 81cad5f5b..6c223143f 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java @@ -50,6 +50,16 @@ static List listFiles(String directory, Configuration conf) { return Arrays.asList(files); } + static boolean exists(String file, Configuration conf) { + final Path filePath = new Path(file); + try { + FileSystem fs = filePath.getFileSystem(conf); + return fs.exists(filePath); + } catch (IOException e) { + throw new RuntimeIOException(e, "Error determining if file or directory exists: " + file); + } + } + private enum HiddenPathFilter implements PathFilter { INSTANCE; diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java index c7507acf7..c55e486f5 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java @@ -114,9 +114,19 @@ Iterable> getFilesByFilter(Expression expression) { Iterable> filesPerDirectory = Iterables.transform( matchingDirectories, - directory -> Iterables.transform( - FileSystemUtils.listFiles(directory.location(), conf), - file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format()))); + directory -> { + List files; + if (FileSystemUtils.exists(directory.location(), conf)) { + files = FileSystemUtils.listFiles(directory.location(), conf); + } else { + LOG.warn("Cannot find directory: {}. Skipping.", directory.location()); + files = ImmutableList.of(); + } + return Iterables.transform( + files, + file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format()) + ); + }); // Note that we return an Iterable of Iterables here so that the TableScan can process iterables of individual // directories in parallel hence resulting in a parallel file listing diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java index dc060d0ee..55667089a 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java @@ -19,6 +19,7 @@ package org.apache.iceberg.hive.legacy; +import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Files; @@ -29,6 +30,7 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -107,6 +109,17 @@ public void testHiveScanMultiPartition() throws Exception { filesMatch(ImmutableMap.of("pcol=ds/pIntCol=2/B", AVRO, "pcol=ds/pIntCol=1/A", AVRO), hiveScan(table)); } + @Test + public void testHiveScanDanglingPartitions() throws Exception { + String tableName = "dangling_partition"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("ds", 1), AVRO, "A"); + addPartition(table, ImmutableList.of("ds", 2), AVRO, "B"); + addPartition(table, ImmutableList.of("ds", 3), AVRO, "C"); + makePartitionDangling(table, ImmutableList.of("ds", 3)); + filesMatch(ImmutableMap.of("pcol=ds/pIntCol=2/B", AVRO, "pcol=ds/pIntCol=1/A", AVRO), hiveScan(table)); + } + @Test public void testHiveScanNoAvroSchema() throws Exception { String tableName = "hive_scan_no_avro_schema"; @@ -256,6 +269,15 @@ private void addPartition(Table table, List partitionValues, FileFormat } } + private void makePartitionDangling(Table table, List partitionValues) throws Exception { + String partitionLocation = metastoreClient.getPartition( + table.getDbName(), + table.getTableName(), + Lists.transform(partitionValues, Object::toString) + ).getSd().getLocation(); + FileUtils.deleteDirectory(new File(new URI(partitionLocation))); + } + private Map hiveScan(Table table) { return hiveScan(table, Expressions.alwaysTrue()); }