[KYUUBI #5360] Support Hudi InsertIntoHoodieTableCommand

### _Why are the changes needed?_
To close #5360
Support Compaction table/path related command. The SQL grammar is https://github.com/apache/hudi/blob/release-0.14.0/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4

- InsertIntoHoodieTableCommand :https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No

Closes #5456 from AngersZhuuuu/KYUUBI-5360.

Closes #5360

ebeb3f270 [Angerszhuuuu] Update table_command_spec.json
fcdc51880 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5360
d1281ca66 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala
5a6ce81fb [Angerszhuuuu] Merge branch 'master' into KYUUBI-5360
ead082e39 [Angerszhuuuu] [KYUUBI #5360 ]  Support Hudi InsertIntoHoodieTableCommand

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Angerszhuuuu 2023-10-19 10:13:12 +08:00 committed by Kent Yao
parent bdc28acf41
commit 411df60431
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
3 changed files with 78 additions and 0 deletions

View File

@ -1622,6 +1622,27 @@
} ],
"opType" : "DROPTABLE",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "logicalRelation",
"fieldExtractor" : "LogicalRelationTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : "overwrite",
"fieldExtractor" : "OverwriteOrInsertActionTypeExtractor",
"actionType" : null
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.RepairHoodieTableCommand",
"tableDescs" : [ {

View File

@ -144,6 +144,16 @@ object HudiCommands {
TableCommandSpec(cmd, Seq(tableDesc), SHOW_TBLPROPERTIES)
}
val InsertIntoHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand"
val tableDesc = TableDesc(
"logicalRelation",
classOf[LogicalRelationTableExtractor],
actionTypeDesc =
Some(ActionTypeDesc("overwrite", classOf[OverwriteOrInsertActionTypeExtractor])))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
}
val data: Array[TableCommandSpec] = Array(
AlterHoodieTableAddColumnsCommand,
AlterHoodieTableChangeColumnCommand,
@ -156,6 +166,7 @@ object HudiCommands {
CompactionHoodieTableCommand,
CompactionShowHoodieTableCommand,
DropHoodieTableCommand,
InsertIntoHoodieTableCommand,
RepairHoodieTableCommand,
TruncateHoodieTableCommand,
Spark31AlterTableCommand)

View File

@ -324,4 +324,50 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(admin, sql(showCompactionTable))
}
}
test("InsertIntoHoodieTableCommand") {
withSingleCallEnabled {
withCleanTmpResources(Seq(
(s"$namespace1.$table1", "table"),
(s"$namespace1.$table2", "table"),
(namespace1, "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name string, city string)
|USING $format
|""".stripMargin))
val insertIntoHoodieTableSql =
s"""
|INSERT INTO $namespace1.$table1
|PARTITION(city = 'hangzhou')
|SELECT id, name
|FROM $namespace1.$table2
|WHERE city = 'hangzhou'
|""".stripMargin
interceptContains[AccessControlException] {
doAs(someone, sql(insertIntoHoodieTableSql))
}(s"does not have [select] privilege on " +
s"[$namespace1/$table2/id,$namespace1/$table2/name,hudi_ns/$table2/city], " +
s"[update] privilege on [$namespace1/$table1]")
}
}
}
}