Skip to content

Commit

Permalink
[hive] Fix no longer possible to drop the hms table issue when table …
Browse files Browse the repository at this point in the history
…not in fs in hive catalog (apache#4853)
  • Loading branch information
xiaohongbo committed Jan 9, 2025
1 parent c8b5e1f commit 4975635
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,10 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier)
identifier),
metastoreClientFactory(identifier).orElse(null)));
} catch (TableNotExistException ignore) {
// do not drop format table, since non paimon table throws TableNotExistException too
if (isPaimonTable(table)) {
dropHmsTableWhenPaimonTableNotExist(identifier);
}
}

if (formatTableDisabled()) {
Expand Down Expand Up @@ -1464,4 +1468,20 @@ public int getBatchGetTableSize() {
return DEFAULT_TABLE_BATCH_SIZE;
}
}

private void dropHmsTableWhenPaimonTableNotExist(Identifier identifier) {
LOG.warn(
"Paimon Table {}.{} exists in HMS but not in the file system during get table; dropping it from HMS.",
identifier.getDatabaseName(),
identifier.getTableName());
try {
dropTableImpl(identifier);
} catch (Exception dropTableException) {
LOG.warn(
"exception happen when trying to drop paimon table {}.{} in HMS during get table which not exists in file system",
identifier.getDatabaseName(),
identifier.getTableName(),
dropTableException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.junit.jupiter.api.io.TempDir;

import java.lang.reflect.Field;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -59,6 +61,7 @@
import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -407,6 +410,37 @@ public void testListTables() throws Exception {
catalog.dropDatabase(databaseName, true, true);
}

@Test
public void testDropTable() throws Exception {
String databaseName = "drop_table_test_db";
String tableName = "drop_table_test_table";
catalog.dropDatabase(databaseName, true, true);
catalog.createDatabase(databaseName, true);
Identifier identifier = Identifier.create(databaseName, tableName);

// test ignore if exists
catalog.createTable(
identifier, Schema.newBuilder().column("col", DataTypes.INT()).build(), true);
Path path = Paths.get(catalog.warehouse(), databaseName.concat(".db"), tableName);
catalog.fileIO().delete(new org.apache.paimon.fs.Path(path.toString()), true);
List<String> tables = catalog.listTables(databaseName);
assertEquals(1, tables.size());
catalog.dropTable(identifier, true);
List<String> newTables = catalog.listTables(databaseName);
assertEquals(0, newTables.size());

// test not ignore if exists
catalog.createTable(
identifier, Schema.newBuilder().column("col", DataTypes.INT()).build(), true);
catalog.fileIO().delete(new org.apache.paimon.fs.Path(path.toString()), true);
tables = catalog.listTables(databaseName);
assertEquals(1, tables.size());
assertThrows(
Catalog.TableNotExistException.class, () -> catalog.dropTable(identifier, false));

catalog.dropDatabase(databaseName, true, true);
}

@Override
protected boolean supportsView() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.catalog.Identifier
import org.apache.paimon.fs.Path
import org.apache.paimon.hive.HiveMetastoreClient
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.spark.catalog.WithPaimonCatalog
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
Expand Down Expand Up @@ -621,6 +623,50 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
}
}

test("Paimon DDL with hive catalog: drop table which location has been deleted") {
spark.close()

Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
Seq(false, true).foreach {
cacheEnabled =>
val customSpark = SparkSession
.builder()
.master("local[2]")
.config(sparkConf)
.config(s"spark.sql.catalog.$catalogName.cache-enabled", cacheEnabled)
.getOrCreate()

customSpark.sql(s"USE $catalogName")
customSpark.sql("CREATE DATABASE IF NOT EXISTS paimon_db")

withDatabase("paimon_db") {
customSpark.sql("USE paimon_db")
customSpark.sql("CREATE TABLE t USING paimon")
val currentCatalog = customSpark.sessionState.catalogManager.currentCatalog
.asInstanceOf[WithPaimonCatalog]
.paimonCatalog()
val table = currentCatalog
.getTable(Identifier.create("paimon_db", "t"))
.asInstanceOf[FileStoreTable]
table.fileIO().delete(table.location(), true)
if (catalogName.equals("paimon")) {
// Filesystem catalog determines whether a table exists based on table location
assert(customSpark.sql("SHOW TABLES").count() == 0)
} else {
// Hive catalog determines whether a table exists based on metadata in hms
assert(customSpark.sql("SHOW TABLES").count() == 1)
}
customSpark.sql("DROP TABLE IF EXISTS t")
assert(customSpark.sql("SHOW TABLES").count() == 0)
}

customSpark.close()
}
}
reset()
}

def getDatabaseProp(dbName: String, propertyName: String): String = {
spark
.sql(s"DESC DATABASE EXTENDED $dbName")
Expand Down

0 comments on commit 4975635

Please sign in to comment.