[KYUUBI #5446][AUTHZ] Support Create/Drop/Show/Reresh index command for Hudi

### _Why are the changes needed?_
To close #5446. Kyuubi authz support Create/Drop/Show/Reresh index based command

IndexBasedCommand SQL grammer is in https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4

Below command under https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala

- CreateIndexCommand
- DropIndexCommand
- RefreshIndexCommand
- ShowIndexesCommand

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No

Closes #5448 from AngersZhuuuu/KYUUBI-5446.

Closes #5446

1be056127 [Angerszhuuuu] Update ObjectType.scala
c4ae073f0 [Angerszhuuuu] follow comment
ed1544e62 [Angerszhuuuu] Update
214bb86b2 [Angerszhuuuu] Update OperationType.scala
97cb0f860 [Angerszhuuuu] Update PrivilegeObject.scala
bc65abdbe [Angerszhuuuu] update
0e6f6dffe [Angerszhuuuu] Update
4019f45e4 [Angerszhuuuu] update
dc9188ded [Angerszhuuuu] update
f9a398c14 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5446
52f67e942 [Angerszhuuuu] update
bed945e91 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5446
f174ef269 [Angerszhuuuu] Update table_command_spec.json
476453168 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5446
a78425b46 [Angerszhuuuu] Update HudiCommands.scala
81881db6f [Angerszhuuuu] Update HudiCommands.scala
544a4433c [Angerszhuuuu] Update HudiCommands.scala
aed91cd78 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5446
9632886d1 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala
c404fd780 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala
e70c8f1e3 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala
fefb021a1 [Angerszhuuuu] Update table_command_spec.json
ab8b96dfd [Angerszhuuuu] Merge branch 'master' into KYUUBI-5446
fcb3d45a0 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala
b1657d931 [Angerszhuuuu] follow comment
0222e0238 [Angerszhuuuu] [KYUUBI-5446][AUTHZ] Support Create/Drop/Show/Reresh index command

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Angerszhuuuu 2023-10-25 13:33:53 +08:00 committed by Kent Yao
parent 5cff4fb98c
commit f4f54666de
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
7 changed files with 153 additions and 7 deletions

View File

@ -1623,6 +1623,20 @@
} ],
"opType" : "CREATETABLE",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.CreateIndexCommand",
"tableDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "CatalogTableTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "CREATEINDEX",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand",
"tableDescs" : [ {
@ -1659,6 +1673,20 @@
} ],
"opType" : "DROPTABLE",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.DropIndexCommand",
"tableDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "CatalogTableTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "DROPINDEX",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand",
"tableDescs" : [ {
@ -1701,6 +1729,20 @@
"fieldName" : "mergeInto",
"fieldExtractor" : "HudiMergeIntoSourceTableExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.RefreshIndexCommand",
"tableDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "CatalogTableTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "ALTERINDEX_REBUILD",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.RepairHoodieTableCommand",
"tableDescs" : [ {
@ -1732,6 +1774,20 @@
} ],
"opType" : "SHOWPARTITIONS",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.ShowIndexesCommand",
"tableDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "CatalogTableTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : true,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "SHOWINDEXES",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.Spark31AlterTableCommand",
"tableDescs" : [ {

View File

@ -23,11 +23,12 @@ object ObjectType extends Enumeration {
type ObjectType = Value
val DATABASE, TABLE, VIEW, COLUMN, FUNCTION = Value
val DATABASE, TABLE, VIEW, COLUMN, FUNCTION, INDEX = Value
def apply(obj: PrivilegeObject, opType: OperationType): ObjectType = {
obj.privilegeObjectType match {
case PrivilegeObjectType.DATABASE => DATABASE
case PrivilegeObjectType.TABLE_OR_VIEW if opType.toString.contains("INDEX") => INDEX
case PrivilegeObjectType.TABLE_OR_VIEW if obj.columns.nonEmpty => COLUMN
case PrivilegeObjectType.TABLE_OR_VIEW if opType.toString.contains("VIEW") => VIEW
case PrivilegeObjectType.TABLE_OR_VIEW => TABLE

View File

@ -20,7 +20,8 @@ package org.apache.kyuubi.plugin.spark.authz
object OperationType extends Enumeration {
type OperationType = Value
// According to https://scalameta.org/scalafmt/docs/known-issues.html
// format: off
val ALTERDATABASE, ALTERDATABASE_LOCATION, ALTERTABLE_ADDCOLS, ALTERTABLE_ADDPARTS,
ALTERTABLE_RENAMECOL, ALTERTABLE_REPLACECOLS, ALTERTABLE_DROPPARTS, ALTERTABLE_RENAMEPART,
ALTERTABLE_RENAME, ALTERTABLE_PROPERTIES, ALTERTABLE_SERDEPROPERTIES, ALTERTABLE_LOCATION,
@ -28,5 +29,7 @@ object OperationType extends Enumeration {
CREATETABLE_AS_SELECT, CREATEFUNCTION, CREATEVIEW, DESCDATABASE, DESCFUNCTION, DESCTABLE,
DROPDATABASE, DROPFUNCTION, DROPTABLE, DROPVIEW, EXPLAIN, LOAD, MSCK, QUERY, RELOADFUNCTION,
SHOWCONF, SHOW_CREATETABLE, SHOWCOLUMNS, SHOWDATABASES, SHOWFUNCTIONS, SHOWPARTITIONS,
SHOWTABLES, SHOW_TBLPROPERTIES, SWITCHDATABASE, TRUNCATETABLE = Value
SHOWTABLES, SHOW_TBLPROPERTIES, SWITCHDATABASE, TRUNCATETABLE,
CREATEINDEX, DROPINDEX, ALTERINDEX_REBUILD, SHOWINDEXES = Value
// format: on
}

View File

@ -57,7 +57,7 @@ object AccessResource {
resource.setValue("database", firstLevelResource)
resource.setValue("table", secondLevelResource)
resource.setValue("column", thirdLevelResource)
case TABLE | VIEW => // fixme spark have added index support
case TABLE | VIEW | INDEX =>
resource.setValue("database", firstLevelResource)
resource.setValue("table", secondLevelResource)
}

View File

@ -25,7 +25,7 @@ object AccessType extends Enumeration {
type AccessType = Value
val NONE, CREATE, ALTER, DROP, SELECT, UPDATE, USE, READ, WRITE, ALL, ADMIN = Value
val NONE, CREATE, ALTER, DROP, SELECT, UPDATE, USE, READ, WRITE, ALL, ADMIN, INDEX = Value
def apply(obj: PrivilegeObject, opType: OperationType, isInput: Boolean): AccessType = {
obj.actionType match {
@ -48,14 +48,16 @@ object AccessType extends Enumeration {
ALTERTABLE_REPLACECOLS |
ALTERTABLE_SERDEPROPERTIES |
ALTERVIEW_RENAME |
MSCK => ALTER
MSCK |
ALTERINDEX_REBUILD => ALTER
case ALTERVIEW_AS => if (isInput) SELECT else ALTER
case DROPDATABASE | DROPTABLE | DROPFUNCTION | DROPVIEW => DROP
case DROPDATABASE | DROPTABLE | DROPFUNCTION | DROPVIEW | DROPINDEX => DROP
case LOAD => if (isInput) SELECT else UPDATE
case QUERY |
SHOW_CREATETABLE |
SHOW_TBLPROPERTIES |
SHOWPARTITIONS |
SHOWINDEXES |
ANALYZE_TABLE => SELECT
case SHOWCOLUMNS | DESCTABLE => SELECT
case SHOWDATABASES |
@ -65,6 +67,7 @@ object AccessType extends Enumeration {
SHOWFUNCTIONS |
DESCFUNCTION => USE
case TRUNCATETABLE => UPDATE
case CREATEINDEX => INDEX
case _ => NONE
}
case PrivilegeObjectActionType.DELETE => DROP

View File

@ -145,6 +145,30 @@ object HudiCommands {
TableCommandSpec(cmd, Seq(tableDesc), SHOW_TBLPROPERTIES)
}
val CreateIndexCommand = {
val cmd = "org.apache.spark.sql.hudi.command.CreateIndexCommand"
val tableDesc = TableDesc("table", classOf[CatalogTableTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), CREATEINDEX)
}
val DropIndexCommand = {
val cmd = "org.apache.spark.sql.hudi.command.DropIndexCommand"
val tableDesc = TableDesc("table", classOf[CatalogTableTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), DROPINDEX)
}
val ShowIndexCommand = {
val cmd = "org.apache.spark.sql.hudi.command.ShowIndexesCommand"
val tableDesc = TableDesc("table", classOf[CatalogTableTableExtractor], isInput = true)
TableCommandSpec(cmd, Seq(tableDesc), SHOWINDEXES)
}
val RefreshIndexCommand = {
val cmd = "org.apache.spark.sql.hudi.command.RefreshIndexCommand"
val tableDesc = TableDesc("table", classOf[CatalogTableTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), ALTERINDEX_REBUILD)
}
val InsertIntoHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand"
val tableDesc = TableDesc(
@ -228,13 +252,17 @@ object HudiCommands {
CreateHoodieTableAsSelectCommand,
CreateHoodieTableCommand,
CreateHoodieTableLikeCommand,
CreateIndexCommand,
CompactionHoodieTableCommand,
CompactionShowHoodieTableCommand,
DeleteHoodieTableCommand,
DropHoodieTableCommand,
DropIndexCommand,
InsertIntoHoodieTableCommand,
MergeIntoHoodieTableCommand,
RefreshIndexCommand,
RepairHoodieTableCommand,
ShowIndexCommand,
TruncateHoodieTableCommand,
ShowHoodieTablePartitionsCommand,
Spark31AlterTableCommand,

View File

@ -53,6 +53,7 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
val table1 = "table1_hoodie"
val table2 = "table2_hoodie"
val outputTable1 = "outputTable_hoodie"
val index1 = "table_hoodie_index1"
override def withFixture(test: NoArgTest): Outcome = {
assume(isSupportedVersion)
@ -522,4 +523,58 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}
test("IndexBasedCommand") {
assume(
!isSparkV33OrGreater,
"Hudi index creation not supported on Spark 3.3 or greater currently")
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (namespace1, "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))
// CreateIndexCommand
val createIndex = s"CREATE INDEX $index1 ON $namespace1.$table1 USING LUCENE (id)"
interceptContains[AccessControlException](
doAs(
someone,
sql(createIndex)))(s"does not have [index] privilege on [$namespace1/$table1]")
doAs(admin, sql(createIndex))
// RefreshIndexCommand
val refreshIndex = s"REFRESH INDEX $index1 ON $namespace1.$table1"
interceptContains[AccessControlException](
doAs(
someone,
sql(refreshIndex)))(s"does not have [alter] privilege on [$namespace1/$table1]")
doAs(admin, sql(refreshIndex))
// ShowIndexesCommand
val showIndex = s"SHOW INDEXES FROM TABLE $namespace1.$table1"
interceptContains[AccessControlException](
doAs(
someone,
sql(showIndex)))(s"does not have [select] privilege on [$namespace1/$table1]")
doAs(admin, sql(showIndex))
// DropIndexCommand
val dropIndex = s"DROP INDEX $index1 ON $namespace1.$table1"
interceptContains[AccessControlException](
doAs(
someone,
sql(dropIndex)))(s"does not have [drop] privilege on [$namespace1/$table1]")
doAs(admin, sql(dropIndex))
}
}
}