Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5690][AUTHZ] Support insert into/overwrite path-based table for Delta Lake in Authz #5691

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
org.apache.kyuubi.plugin.spark.authz.serde.BaseRelationFileIndexURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PartitionLocsSeqURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PropertiesLocationUriExtractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "DataSourceV2RelationURIExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.CacheTable",
"tableDescs" : [ ],
Expand Down Expand Up @@ -365,7 +369,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "DataSourceV2RelationURIExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic",
"tableDescs" : [ {
Expand All @@ -387,7 +395,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "DataSourceV2RelationURIExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.RefreshTable",
"tableDescs" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -186,18 +187,18 @@ class ExpressionSeqTableExtractor extends TableExtractor {
class DataSourceV2RelationTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val plan = v1.asInstanceOf[LogicalPlan]
val maybeV2Relation = plan.find(_.getClass.getSimpleName == "DataSourceV2Relation")
maybeV2Relation match {
case None => None
case Some(v2Relation) =>
val maybeCatalogPlugin = invokeAs[Option[AnyRef]](v2Relation, "catalog")
val maybeCatalog = maybeCatalogPlugin.flatMap(catalogPlugin =>
plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match {
case v2Relation: DataSourceV2Relation
if v2Relation.identifier == None ||
!isPathIdentifier(v2Relation.identifier.get.name(), spark) =>
val maybeCatalog = v2Relation.catalog.flatMap(catalogPlugin =>
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogPlugin))
lookupExtractor[TableTableExtractor].apply(spark, invokeAs[AnyRef](v2Relation, "table"))
lookupExtractor[TableTableExtractor].apply(spark, v2Relation.table)
.map { table =>
val maybeOwner = TableExtractor.getOwner(v2Relation)
table.copy(catalog = maybeCatalog, owner = maybeOwner)
}
case _ => None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.kyuubi.plugin.spark.authz.serde

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

import org.apache.kyuubi.plugin.spark.authz.util.PathIdentifier._
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
Expand Down Expand Up @@ -115,3 +116,16 @@ class SubqueryAliasURIExtractor extends URIExtractor {
Seq(identifier.name).map(Uri)
}
}

class DataSourceV2RelationURIExtractor extends URIExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
val plan = v1.asInstanceOf[LogicalPlan]
plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match {
case v2Relation: DataSourceV2Relation
if v2Relation.identifier != None &&
isPathIdentifier(v2Relation.identifier.get.name, spark) =>
Seq(v2Relation.identifier.get.name).map(Uri)
case _ => Nil
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
"table",
classOf[DataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
val uriDescs = Seq(UriDesc("table", classOf[DataSourceV2RelationURIExtractor]))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc), uriDescs = uriDescs)
}

val ReplaceData = {
Expand Down Expand Up @@ -308,7 +309,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
"table",
classOf[DataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
val uriDescs = Seq(UriDesc("table", classOf[DataSourceV2RelationURIExtractor]))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc), uriDescs = uriDescs)
}

val OverwritePartitionsDynamic = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,36 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(admin, sql(updateTableSql))
})
}

test("insert path-based table") {
withSingleCallEnabled {
withCleanTmpResources(Seq((s"$namespace1.$table2", "table"), (s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table2)))
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path)))
// insert into
val insertIntoSql = s"INSERT INTO delta.`$path` SELECT * FROM $namespace1.$table2"
interceptContains[AccessControlException](
doAs(someone, sql(insertIntoSql)))(
s"does not have [select] privilege on [$namespace1/$table2/id," +
s"$namespace1/$table2/name,$namespace1/$table2/gender," +
s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]")
doAs(admin, sql(insertIntoSql))

// insert overwrite
val insertOverwriteSql =
s"INSERT OVERWRITE delta.`$path` SELECT * FROM $namespace1.$table2"
interceptContains[AccessControlException](
doAs(someone, sql(insertOverwriteSql)))(
s"does not have [select] privilege on [$namespace1/$table2/id," +
s"$namespace1/$table2/name,$namespace1/$table2/gender," +
s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]")
doAs(admin, sql(insertOverwriteSql))
})
}
}
}
}

object DeltaCatalogRangerSparkExtensionSuite {
Expand Down