From fe08b6fecbcc34fcc2a3c6a5cdca6b8ebf527252 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 29 May 2024 09:52:30 -0700 Subject: [PATCH] [HUDI-7807] Fixing spark-sql for pk less tables (#11354) --- .../org/apache/hudi/keygen/KeyGenUtils.java | 4 +- .../HoodieSparkKeyGeneratorFactory.java | 3 + .../apache/hudi/HoodieSparkSqlWriter.scala | 4 +- .../spark/sql/hudi/dml/TestDeleteTable.scala | 16 +++- .../spark/sql/hudi/dml/TestUpdateTable.scala | 91 ++++++++++--------- 5 files changed, 69 insertions(+), 49 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 4d7c83a7794d..34af55fd85a5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -268,6 +268,8 @@ public static List getRecordKeyFields(TypedProperties props) { * @return true if record keys need to be auto generated. false otherwise. */ public static boolean isAutoGeneratedRecordKeysEnabled(TypedProperties props) { - return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()); + return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) + || props.getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).equals(StringUtils.EMPTY_STRING); + // spark-sql sets record key config to empty string for update, and couple of other statements. } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java index c655bf625433..2b3315fefb47 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java @@ -88,6 +88,9 @@ public static KeyGenerator createKeyGenerator(String keyGeneratorClass, TypedPro //Need to prevent overwriting the keygen for spark sql merge into because we need to extract //the recordkey from the meta cols if it exists. Sql keygen will use pkless keygen if needed. && !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false); + if (autoRecordKeyGen) { + props.remove(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()); + } KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); if (autoRecordKeyGen) { return new AutoRecordGenWrapperKeyGenerator(props, (BuiltinKeyGenerator) keyGenerator); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5b9b57cf10c9..1a8031b9fe2b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -228,8 +228,8 @@ class HoodieSparkSqlWriterInternal { originKeyGeneratorClassName, paramsWithoutDefaults) // Validate datasource and tableconfig keygen are the same - validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig); - validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite); + validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig) + validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite) asyncCompactionTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get asyncClusteringTriggerFnDefined = streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala index b9cafb6ec079..c157091d94d1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala @@ -80,28 +80,35 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { test("Test Delete Table Without Primary Key") { withTempDir { tmp => Seq("cow", "mor").foreach { tableType => + Seq (true, false).foreach { isPartitioned => val tableName = generateTableName + val partitionedClause = if (isPartitioned) { + "PARTITIONED BY (name)" + } else { + "" + } // create table spark.sql( s""" |create table $tableName ( | id int, - | name string, | price double, - | ts long + | ts long, + | name string |) using hudi | location '${tmp.getCanonicalPath}/$tableName' | tblproperties ( | type = '$tableType', | preCombineField = 'ts' | ) + | $partitionedClause """.stripMargin) // test with optimized sql writes enabled. spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") // insert data to table - spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 1, 10, 1000, 'a1'") checkAnswer(s"select id, name, price, ts from $tableName")( Seq(1, "a1", 10.0, 1000) ) @@ -112,7 +119,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { Seq(0) ) - spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000") + spark.sql(s"insert into $tableName select 2, 10, 1000, 'a2'") spark.sql(s"delete from $tableName where id = 1") checkAnswer(s"select id, name, price, ts from $tableName")( Seq(2, "a2", 10.0, 1000) @@ -124,6 +131,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { ) } } + } } test("Test Delete Table On Non-PK Condition") { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala index 8bdfe258bb7f..5162b6648804 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala @@ -77,54 +77,61 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { test("Test Update Table Without Primary Key") { withRecordType()(withTempDir { tmp => Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - // create table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | location '${tmp.getCanonicalPath}/$tableName' - | tblproperties ( - | type = '$tableType', - | preCombineField = 'ts' - | ) - """.stripMargin) - - // insert data to table - spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 10.0, 1000) - ) + Seq(true, false).foreach { isPartitioned => + val tableName = generateTableName + val partitionedClause = if (isPartitioned) { + "PARTITIONED BY (name)" + } else { + "" + } + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | price double, + | ts long, + | name string + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = '$tableType', + | preCombineField = 'ts' + | ) + | $partitionedClause + """.stripMargin) - // test with optimized sql writes enabled. - spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") + // insert data to table + spark.sql(s"insert into $tableName select 1,10, 1000, 'a1'") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) - // update data - spark.sql(s"update $tableName set price = 20 where id = 1") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 20.0, 1000) - ) + // test with optimized sql writes enabled. + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") - // update data - spark.sql(s"update $tableName set price = price * 2 where id = 1") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 40.0, 1000) - ) + // update data + spark.sql(s"update $tableName set price = 20 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 20.0, 1000) + ) - // verify default compaction w/ MOR - if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - spark.sql(s"update $tableName set price = price * 2 where id = 1") - spark.sql(s"update $tableName set price = price * 2 where id = 1") + // update data spark.sql(s"update $tableName set price = price * 2 where id = 1") - // verify compaction is complete - val metaClient = createMetaClient(spark, tmp.getCanonicalPath + "/" + tableName) - assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction, "commit") - } + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 40.0, 1000) + ) + // verify default compaction w/ MOR + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + spark.sql(s"update $tableName set price = price * 2 where id = 1") + spark.sql(s"update $tableName set price = price * 2 where id = 1") + spark.sql(s"update $tableName set price = price * 2 where id = 1") + // verify compaction is complete + val metaClient = createMetaClient(spark, tmp.getCanonicalPath + "/" + tableName) + assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction, "commit") + } + } } }) }