From d4dd1cb0e3c5561c1586498596e9c9d16f3c9f50 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Mon, 20 Nov 2023 16:42:06 +0800 Subject: [PATCH] [KYUUBI #5707][AUTHZ] Support merge into path-based table for Delta Lake in Authz MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #5707 ## Describe Your Solution ๐Ÿ”ง `org.apache.spark.sql.delta.commands.MergeIntoCommand` add uriDescs. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite.test("merge into path-based table") --- # Checklists ## ๐Ÿ“ Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## ๐Ÿ“ Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [x] Minimum number of approvals - [x] No changes are requested **Be nice. Be informative.** Closes #5708 from zml1206/KYUUBI-5707. Closes #5707 45ab4d44e [zml1206] fix 679f735e1 [zml1206] Support merge into path-based table for Delta Lake in Authz Authored-by: zml1206 Signed-off-by: Kent Yao --- .../main/resources/table_command_spec.json | 6 ++- .../spark/authz/serde/tableExtractors.scala | 9 ++-- .../spark/authz/serde/uriExtractors.scala | 10 +++-- .../spark/authz/gen/DeltaCommands.scala | 7 +-- ...eltaCatalogRangerSparkExtensionSuite.scala | 45 +++++++++++++++++++ 5 files changed, 64 insertions(+), 13 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json index 4442e6868a6..583ad29b4e6 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json @@ -2198,7 +2198,11 @@ "fieldName" : "source", "fieldExtractor" : "LogicalPlanQueryExtractor" } ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "target", + "fieldExtractor" : "SubqueryAliasURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.delta.commands.OptimizeTableCommand", "tableDescs" : [ { diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala index 890b36959ce..adff934740b 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala @@ -257,9 +257,12 @@ class ResolvedIdentifierTableExtractor extends TableExtractor { class SubqueryAliasTableExtractor extends TableExtractor { override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = { v1.asInstanceOf[SubqueryAlias] match { - case SubqueryAlias(_, SubqueryAlias(identifier, _)) - if !isPathIdentifier(identifier.name, spark) => - lookupExtractor[StringTableExtractor].apply(spark, identifier.toString()) + case SubqueryAlias(_, SubqueryAlias(identifier, _)) => + if (isPathIdentifier(identifier.name, spark)) { + None + } else { + lookupExtractor[StringTableExtractor].apply(spark, identifier.toString()) + } case SubqueryAlias(identifier, _) if !isPathIdentifier(identifier.name, spark) => lookupExtractor[StringTableExtractor].apply(spark, identifier.toString()) case _ => None diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala index 07b2408ba6d..fd0b19420cd 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala @@ -109,11 +109,15 @@ class IdentifierURIExtractor extends URIExtractor { class SubqueryAliasURIExtractor extends URIExtractor { override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = v1 match { - case SubqueryAlias(_, SubqueryAlias(identifier, _)) - if isPathIdentifier(identifier.name, spark) => - Seq(identifier.name).map(Uri) + case SubqueryAlias(_, SubqueryAlias(identifier, _)) => + if (isPathIdentifier(identifier.name, spark)) { + Seq(identifier.name).map(Uri) + } else { + Nil + } case SubqueryAlias(identifier, _) if isPathIdentifier(identifier.name, spark) => Seq(identifier.name).map(Uri) + case _ => Nil } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala index 82627a0bf6d..db72cde83e5 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala @@ -41,13 +41,8 @@ object DeltaCommands extends CommandSpecs[TableCommandSpec] { val MergeIntoCommand = { val cmd = "org.apache.spark.sql.delta.commands.MergeIntoCommand" - val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE)) - val tableDesc = TableDesc( - "target", - classOf[SubqueryAliasTableExtractor], - actionTypeDesc = Some(actionTypeDesc)) val queryDesc = QueryDesc("source") - TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc)) + DeleteCommand.copy(classname = cmd, queryDescs = Seq(queryDesc)) } val OptimizeTableCommand = { diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala index 331bd380df0..801f9f7458c 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala @@ -387,6 +387,51 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } } } + + test("merge into path-based table") { + withSingleCallEnabled { + withCleanTmpResources(Seq( + (s"$namespace1.$table2", "table"), + (s"$namespace1", "database"))) { + doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) + doAs(admin, sql(createTableSql(namespace1, table2))) + withTempDir(path => { + doAs(admin, sql(createPathBasedTableSql(path))) + val mergeIntoSql = + s""" + |MERGE INTO delta.`$path` AS target + |USING $namespace1.$table2 AS source + |ON target.id = source.id + |WHEN MATCHED THEN + | UPDATE SET + | id = source.id, + | name = source.name, + | gender = source.gender, + | birthDate = source.birthDate + |WHEN NOT MATCHED + | THEN INSERT ( + | id, + | name, + | gender, + | birthDate + | ) + | VALUES ( + | source.id, + | source.name, + | source.gender, + | source.birthDate + | ) + |""".stripMargin + interceptContains[AccessControlException]( + doAs(someone, sql(mergeIntoSql)))( + s"does not have [select] privilege on [$namespace1/$table2/id," + + s"$namespace1/$table2/name,$namespace1/$table2/gender," + + s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]") + doAs(admin, sql(mergeIntoSql)) + }) + } + } + } } object DeltaCatalogRangerSparkExtensionSuite {