[KYUUBI #5447][AUTHZ] Support Hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand
### _Why are the changes needed?_ To close #5447. Kyuubi authz Support hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand - DeleteHoodieTableCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala - UpdateHoodieTableCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala - MergeIntoHoodieTableCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5482 from AngersZhuuuu/KYUUBI-5447. Closes #5447 2598af203 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala 08be589b7 [Angerszhuuuu] Update org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor 19497d12c [Angerszhuuuu] Update tableExtractors.scala df6e244e2 [Angerszhuuuu] update 1a72f1323 [Angerszhuuuu] update f7ca6846c [Angerszhuuuu] Merge branch 'master' into KYUUBI-5447 37006869b [Angerszhuuuu] [KYUUBI #5447][AUTHZ] Support hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: liangbowen <liangbowen@gf.com.cn>
This commit is contained in:
parent
59c25b9851
commit
abaa3698cb
@ -15,5 +15,6 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoSourceTableExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanOptionQueryExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanQueryExtractor
|
||||
|
||||
@ -19,6 +19,8 @@ org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
|
||||
|
||||
@ -1604,6 +1604,27 @@
|
||||
} ],
|
||||
"opType" : "CREATETABLE",
|
||||
"queryDescs" : [ ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand",
|
||||
"tableDescs" : [ {
|
||||
"fieldName" : "dft",
|
||||
"fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
|
||||
"columnDesc" : null,
|
||||
"actionTypeDesc" : {
|
||||
"fieldName" : null,
|
||||
"fieldExtractor" : null,
|
||||
"actionType" : "UPDATE"
|
||||
},
|
||||
"tableTypeDesc" : null,
|
||||
"catalogDesc" : null,
|
||||
"isInput" : false,
|
||||
"setCurrentDatabaseIfMissing" : false
|
||||
} ],
|
||||
"opType" : "QUERY",
|
||||
"queryDescs" : [ {
|
||||
"fieldName" : "query",
|
||||
"fieldExtractor" : "LogicalPlanQueryExtractor"
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hudi.command.DropHoodieTableCommand",
|
||||
"tableDescs" : [ {
|
||||
@ -1643,6 +1664,27 @@
|
||||
"fieldName" : "query",
|
||||
"fieldExtractor" : "LogicalPlanQueryExtractor"
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand",
|
||||
"tableDescs" : [ {
|
||||
"fieldName" : "mergeInto",
|
||||
"fieldExtractor" : "HudiMergeIntoTargetTableExtractor",
|
||||
"columnDesc" : null,
|
||||
"actionTypeDesc" : {
|
||||
"fieldName" : null,
|
||||
"fieldExtractor" : null,
|
||||
"actionType" : "UPDATE"
|
||||
},
|
||||
"tableTypeDesc" : null,
|
||||
"catalogDesc" : null,
|
||||
"isInput" : false,
|
||||
"setCurrentDatabaseIfMissing" : false
|
||||
} ],
|
||||
"opType" : "QUERY",
|
||||
"queryDescs" : [ {
|
||||
"fieldName" : "mergeInto",
|
||||
"fieldExtractor" : "HudiMergeIntoSourceTableExtractor"
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hudi.command.RepairHoodieTableCommand",
|
||||
"tableDescs" : [ {
|
||||
@ -1705,4 +1747,25 @@
|
||||
} ],
|
||||
"opType" : "TRUNCATETABLE",
|
||||
"queryDescs" : [ ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand",
|
||||
"tableDescs" : [ {
|
||||
"fieldName" : "ut",
|
||||
"fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
|
||||
"columnDesc" : null,
|
||||
"actionTypeDesc" : {
|
||||
"fieldName" : null,
|
||||
"fieldExtractor" : null,
|
||||
"actionType" : "UPDATE"
|
||||
},
|
||||
"tableTypeDesc" : null,
|
||||
"catalogDesc" : null,
|
||||
"isInput" : false,
|
||||
"setCurrentDatabaseIfMissing" : false
|
||||
} ],
|
||||
"opType" : "QUERY",
|
||||
"queryDescs" : [ {
|
||||
"fieldName" : "query",
|
||||
"fieldExtractor" : "LogicalPlanQueryExtractor"
|
||||
} ]
|
||||
} ]
|
||||
@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.serde
|
||||
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
|
||||
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
|
||||
|
||||
trait QueryExtractor extends (AnyRef => Option[LogicalPlan]) with Extractor
|
||||
|
||||
object QueryExtractor {
|
||||
@ -44,3 +46,9 @@ class LogicalPlanOptionQueryExtractor extends QueryExtractor {
|
||||
v1.asInstanceOf[Option[LogicalPlan]]
|
||||
}
|
||||
}
|
||||
|
||||
class HudiMergeIntoSourceTableExtractor extends QueryExtractor {
|
||||
override def apply(v1: AnyRef): Option[LogicalPlan] = {
|
||||
new LogicalPlanQueryExtractor().apply(invokeAs[LogicalPlan](v1, "sourceTable"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTable
|
||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
|
||||
import org.apache.kyuubi.util.reflect.ReflectUtils._
|
||||
@ -80,7 +80,9 @@ class TableIdentifierTableExtractor extends TableExtractor {
|
||||
val catalogTable = spark.sessionState.catalog.getTableMetadata(identifier)
|
||||
Option(catalogTable.owner).filter(_.nonEmpty)
|
||||
} catch {
|
||||
case _: Exception => None
|
||||
case e: Exception =>
|
||||
e.printStackTrace()
|
||||
None
|
||||
}
|
||||
Some(Table(None, identifier.database, identifier.table, owner))
|
||||
}
|
||||
@ -240,3 +242,29 @@ class TableTableExtractor extends TableExtractor {
|
||||
lookupExtractor[StringTableExtractor].apply(spark, tableName)
|
||||
}
|
||||
}
|
||||
|
||||
class HudiDataSourceV2RelationTableExtractor extends TableExtractor {
|
||||
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
|
||||
invokeAs[LogicalPlan](v1, "table") match {
|
||||
// Match multipartIdentifier with tableAlias
|
||||
case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
|
||||
new StringTableExtractor().apply(spark, identifier.toString())
|
||||
// Match multipartIdentifier without tableAlias
|
||||
case SubqueryAlias(identifier, _) =>
|
||||
new StringTableExtractor().apply(spark, identifier.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class HudiMergeIntoTargetTableExtractor extends TableExtractor {
|
||||
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
|
||||
invokeAs[LogicalPlan](v1, "targetTable") match {
|
||||
// Match multipartIdentifier with tableAlias
|
||||
case SubqueryAlias(_, SubqueryAlias(identifier, relation)) =>
|
||||
new StringTableExtractor().apply(spark, identifier.toString())
|
||||
// Match multipartIdentifier without tableAlias
|
||||
case SubqueryAlias(identifier, _) =>
|
||||
new StringTableExtractor().apply(spark, identifier.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.kyuubi.plugin.spark.authz.gen
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.OperationType._
|
||||
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
|
||||
import org.apache.kyuubi.plugin.spark.authz.serde._
|
||||
import org.apache.kyuubi.plugin.spark.authz.serde.TableType._
|
||||
|
||||
@ -165,6 +166,40 @@ object HudiCommands {
|
||||
TableCommandSpec(cmd, Seq(tableDesc), SHOWPARTITIONS)
|
||||
}
|
||||
|
||||
val DeleteHoodieTableCommand = {
|
||||
val cmd = "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand"
|
||||
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
|
||||
val tableDesc =
|
||||
TableDesc(
|
||||
"dft",
|
||||
classOf[HudiDataSourceV2RelationTableExtractor],
|
||||
actionTypeDesc = Some(actionTypeDesc))
|
||||
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
|
||||
}
|
||||
|
||||
val UpdateHoodieTableCommand = {
|
||||
val cmd = "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand"
|
||||
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
|
||||
val tableDesc =
|
||||
TableDesc(
|
||||
"ut",
|
||||
classOf[HudiDataSourceV2RelationTableExtractor],
|
||||
actionTypeDesc = Some(actionTypeDesc))
|
||||
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
|
||||
}
|
||||
|
||||
val MergeIntoHoodieTableCommand = {
|
||||
val cmd = "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand"
|
||||
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
|
||||
val tableDesc =
|
||||
TableDesc(
|
||||
"mergeInto",
|
||||
classOf[HudiMergeIntoTargetTableExtractor],
|
||||
actionTypeDesc = Some(actionTypeDesc))
|
||||
val queryDescs = QueryDesc("mergeInto", classOf[HudiMergeIntoSourceTableExtractor])
|
||||
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs))
|
||||
}
|
||||
|
||||
val data: Array[TableCommandSpec] = Array(
|
||||
AlterHoodieTableAddColumnsCommand,
|
||||
AlterHoodieTableChangeColumnCommand,
|
||||
@ -176,10 +211,13 @@ object HudiCommands {
|
||||
CreateHoodieTableLikeCommand,
|
||||
CompactionHoodieTableCommand,
|
||||
CompactionShowHoodieTableCommand,
|
||||
DeleteHoodieTableCommand,
|
||||
DropHoodieTableCommand,
|
||||
InsertIntoHoodieTableCommand,
|
||||
MergeIntoHoodieTableCommand,
|
||||
RepairHoodieTableCommand,
|
||||
TruncateHoodieTableCommand,
|
||||
ShowHoodieTablePartitionsCommand,
|
||||
Spark31AlterTableCommand)
|
||||
Spark31AlterTableCommand,
|
||||
UpdateHoodieTableCommand)
|
||||
}
|
||||
|
||||
@ -33,7 +33,7 @@ import org.apache.kyuubi.util.AssertionUtils.interceptContains
|
||||
*/
|
||||
@HudiTest
|
||||
class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
override protected val catalogImpl: String = "hive"
|
||||
override protected val catalogImpl: String = "in-memory"
|
||||
// TODO: Apache Hudi not support Spark 3.5 and Scala 2.13 yet,
|
||||
// should change after Apache Hudi support Spark 3.5 and Scala 2.13.
|
||||
private def isSupportedVersion = !isSparkV35OrGreater && !isScalaV213
|
||||
@ -407,4 +407,69 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand") {
|
||||
withSingleCallEnabled {
|
||||
withCleanTmpResources(Seq(
|
||||
(s"$namespace1.$table1", "table"),
|
||||
(s"$namespace1.$table2", "table"),
|
||||
(namespace1, "database"))) {
|
||||
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
|
||||
doAs(
|
||||
admin,
|
||||
sql(
|
||||
s"""
|
||||
|CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string)
|
||||
|USING HUDI
|
||||
|OPTIONS (
|
||||
| type = 'cow',
|
||||
| primaryKey = 'id',
|
||||
| 'hoodie.datasource.hive_sync.enable' = 'false'
|
||||
|)
|
||||
|PARTITIONED BY(city)
|
||||
|""".stripMargin))
|
||||
|
||||
doAs(
|
||||
admin,
|
||||
sql(
|
||||
s"""
|
||||
|CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name string, city string)
|
||||
|USING HUDI
|
||||
|OPTIONS (
|
||||
| type = 'cow',
|
||||
| primaryKey = 'id',
|
||||
| 'hoodie.datasource.hive_sync.enable' = 'false'
|
||||
|)
|
||||
|PARTITIONED BY(city)
|
||||
|""".stripMargin))
|
||||
|
||||
val deleteFrom = s"DELETE FROM $namespace1.$table1 WHERE id = 10"
|
||||
interceptContains[AccessControlException] {
|
||||
doAs(someone, sql(deleteFrom))
|
||||
}(s"does not have [update] privilege on [$namespace1/$table1]")
|
||||
doAs(admin, sql(deleteFrom))
|
||||
|
||||
val updateSql = s"UPDATE $namespace1.$table1 SET name = 'test' WHERE id > 10"
|
||||
interceptContains[AccessControlException] {
|
||||
doAs(someone, sql(updateSql))
|
||||
}(s"does not have [update] privilege on [$namespace1/$table1]")
|
||||
doAs(admin, sql(updateSql))
|
||||
|
||||
val mergeIntoSQL =
|
||||
s"""
|
||||
|MERGE INTO $namespace1.$table1 target
|
||||
|USING $namespace1.$table2 source
|
||||
|ON target.id = source.id
|
||||
|WHEN MATCHED
|
||||
|AND target.name == 'test'
|
||||
| THEN UPDATE SET id = source.id, name = source.name, city = source.city
|
||||
|""".stripMargin
|
||||
interceptContains[AccessControlException] {
|
||||
doAs(someone, sql(mergeIntoSQL))
|
||||
}(s"does not have [select] privilege on " +
|
||||
s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city]")
|
||||
doAs(admin, sql(mergeIntoSQL))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user