[KYUUBI #7068] Iceberg ranger support check branch and tag ddl

### Why are the changes needed?

Iceberg ranger check support branch and tag ddl

### How was this patch tested?

- [x] create branch
- [x] replace branch
- [x] drop branch
- [x] create tag
- [x] replace tag
- [x] drop tag

issue #7068
### Was this patch authored or co-authored using generative AI tooling?

Closes #7069 from davidyuan1223/iceberg_branch_check.

Closes #7068

d060a24e1 [davidyuan] update
1e05018d1 [davidyuan] Merge branch 'master' into iceberg_branch_check
be2684671 [davidyuan] update
231ed3356 [davidyuan] sort spi file
6d2a5bf20 [davidyuan] sort spi file
bc21310cc [davidyuan] update
52ca367f1 [davidyuan] update

Authored-by: davidyuan <yuanfuyuan@mafengwo.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
davidyuan 2025-05-29 13:04:43 +08:00 committed by Cheng Pan
parent 6d99b20e04
commit 1af6647132
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 190 additions and 0 deletions

View File

@ -1841,6 +1841,38 @@
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch",
"tableDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "ArrayBufferTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false,
"comment" : "Iceberg"
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag",
"tableDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "ArrayBufferTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false,
"comment" : "Iceberg"
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable",
"tableDescs" : [ {
@ -1862,6 +1894,22 @@
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.DropBranch",
"tableDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "ArrayBufferTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false,
"comment" : "Iceberg"
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields",
"tableDescs" : [ {
@ -1894,6 +1942,22 @@
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.DropTag",
"tableDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "ArrayBufferTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : null,
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false,
"comment" : "Iceberg"
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.MergeIntoIcebergTable",
"tableDescs" : [ {

View File

@ -93,6 +93,26 @@ object IcebergCommands extends CommandSpecs[TableCommandSpec] {
AddPartitionFiled.copy(cmd)
}
val CreateOrReplaceBranch = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch"
AddPartitionFiled.copy(cmd)
}
val CreateOrReplaceTag = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag"
AddPartitionFiled.copy(cmd)
}
val DropBranch = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.DropBranch"
AddPartitionFiled.copy(cmd)
}
val DropTag = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.DropTag"
AddPartitionFiled.copy(cmd)
}
override def specs: Seq[TableCommandSpec] = Seq(
CallProcedure,
DeleteFromIcebergTable,
@ -104,6 +124,10 @@ object IcebergCommands extends CommandSpecs[TableCommandSpec] {
WriteDistributionAndOrdering,
SetIdentifierFields,
DropIdentifierFields,
CreateOrReplaceBranch,
CreateOrReplaceTag,
DropBranch,
DropTag,
MergeIntoIcebergTable.copy(classname =
"org.apache.spark.sql.catalyst.plans.logical.UnresolvedMergeIntoIcebergTable"))
}

View File

@ -484,4 +484,106 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
doAs(admin, sql(dropIdentifierSql))
}
}
test("CREATE BRANCH for Iceberg") {
val table = s"$catalogV2.$namespace1.partitioned_table"
withCleanTmpResources(Seq((table, "table"))) {
doAs(
admin,
sql(
s"CREATE TABLE $table (id int NOT NULL, name string, city string) USING iceberg"))
doAs(admin, sql(s"INSERT INTO $table VALUES (1, 'test', 'city')"))
val createBranchSql = s"ALTER TABLE $table CREATE BRANCH test_branch"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(createBranchSql))
}(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
doAs(admin, sql(createBranchSql))
}
}
test("CREATE TAG for Iceberg") {
val table = s"$catalogV2.$namespace1.partitioned_table"
withCleanTmpResources(Seq((table, "table"))) {
doAs(
admin,
sql(
s"CREATE TABLE $table (id int NOT NULL, name string, city string) USING iceberg"))
doAs(admin, sql(s"INSERT INTO $table VALUES (1, 'test', 'city')"))
val createTagSql = s"ALTER TABLE $table CREATE TAG test_tag"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(createTagSql))
}(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
doAs(admin, sql(createTagSql))
}
}
test("REPLACE BRANCH for Iceberg") {
val table = s"$catalogV2.$namespace1.partitioned_table"
withCleanTmpResources(Seq((table, "table"))) {
doAs(
admin,
sql(
s"CREATE TABLE $table (id int NOT NULL, name string, city string) USING iceberg"))
doAs(admin, sql(s"INSERT INTO $table VALUES (1, 'test', 'city')"))
doAs(admin, sql(s"ALTER TABLE $table CREATE BRANCH test_branch"))
doAs(admin, sql(s"INSERT INTO $table VALUES (2, 'test2', 'city2')"))
val replaceBranchSql = s"ALTER TABLE $table REPLACE BRANCH test_branch"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(replaceBranchSql))
}(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
doAs(admin, sql(replaceBranchSql))
}
}
test("REPLACE TAG for Iceberg") {
val table = s"$catalogV2.$namespace1.partitioned_table"
withCleanTmpResources(Seq((table, "table"))) {
doAs(
admin,
sql(
s"CREATE TABLE $table (id int NOT NULL, name string, city string) USING iceberg"))
doAs(admin, sql(s"INSERT INTO $table VALUES (1, 'test', 'city')"))
doAs(admin, sql(s"ALTER TABLE $table CREATE TAG test_tag"))
doAs(admin, sql(s"INSERT INTO $table VALUES (2, 'test2', 'city2')"))
val replaceTagSql = s"ALTER TABLE $table REPLACE TAG test_tag"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(replaceTagSql))
}(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
doAs(admin, sql(replaceTagSql))
}
}
test("DROP BRANCH for Iceberg") {
val table = s"$catalogV2.$namespace1.partitioned_table"
withCleanTmpResources(Seq((table, "table"))) {
doAs(
admin,
sql(
s"CREATE TABLE $table (id int NOT NULL, name string, city string) USING iceberg"))
doAs(admin, sql(s"INSERT INTO $table VALUES (1, 'test', 'city')"))
doAs(admin, sql(s"ALTER TABLE $table CREATE BRANCH test_branch"))
val dropBranchSql = s"ALTER TABLE $table DROP BRANCH test_branch"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(dropBranchSql))
}(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
doAs(admin, sql(dropBranchSql))
}
}
test("DROP TAG for Iceberg") {
val table = s"$catalogV2.$namespace1.partitioned_table"
withCleanTmpResources(Seq((table, "table"))) {
doAs(
admin,
sql(
s"CREATE TABLE $table (id int NOT NULL, name string, city string) USING iceberg"))
doAs(admin, sql(s"INSERT INTO $table VALUES (1, 'test', 'city')"))
doAs(admin, sql(s"ALTER TABLE $table CREATE TAG test_tag"))
val dropTagSql = s"ALTER TABLE $table DROP TAG test_tag"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(dropTagSql))
}(s"does not have [alter] privilege on [$namespace1/partitioned_table]")
doAs(admin, sql(dropTagSql))
}
}
}