[KYUUBI #5529][AUTHZ] Support create table command for Delta Lake
### _Why are the changes needed?_ To close #5529. Support create table command for Delta Lake in Authz. https://docs.delta.io/latest/delta-batch.html#create-a-table ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No. Closes #5530 from zml1206/KYUUBI-5529. Closes #5529 b8ed2a464 [zml1206] update f12ff8ff5 [zml1206] Merge branch 'master' into KYUUBI-5529 c02523062 [Bowen Liang] resolve conflicts 036154b04 [Bowen Liang] Merge branch 'master' into KYUUBI-5529 fb4223b2c [zml1206] fix check spec json files 10138d83a [zml1206] Merge branch 'master' into KYUUBI-5529 facd8f7c6 [zml1206] fix show databases ut b2a9543ec [zml1206] update ut 23f6b8172 [zml1206] Support create table command for Delta Lake Lead-authored-by: zml1206 <zhuml1206@gmail.com> Co-authored-by: Bowen Liang <bowenliang@apache.org> Co-authored-by: Bowen Liang <liangbowen@gf.com.cn> Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
This commit is contained in:
parent
b99a21658a
commit
cfd90e0e2f
@ -329,6 +329,12 @@
|
||||
<artifactId>paimon-spark-${paimon.spark.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.delta</groupId>
|
||||
<artifactId>${delta.artifact}_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@ -1953,4 +1953,19 @@
|
||||
"opType" : "QUERY",
|
||||
"queryDescs" : [ ],
|
||||
"uriDescs" : [ ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.delta.commands.CreateDeltaTableCommand",
|
||||
"tableDescs" : [ {
|
||||
"fieldName" : "table",
|
||||
"fieldExtractor" : "CatalogTableTableExtractor",
|
||||
"columnDesc" : null,
|
||||
"actionTypeDesc" : null,
|
||||
"tableTypeDesc" : null,
|
||||
"catalogDesc" : null,
|
||||
"isInput" : false,
|
||||
"setCurrentDatabaseIfMissing" : false
|
||||
} ],
|
||||
"opType" : "CREATETABLE",
|
||||
"queryDescs" : [ ],
|
||||
"uriDescs" : [ ]
|
||||
} ]
|
||||
@ -41,6 +41,7 @@ object RangerTestNamespace {
|
||||
val sparkCatalog = "spark_catalog"
|
||||
val icebergNamespace = "iceberg_ns"
|
||||
val hudiNamespace = "hudi_ns"
|
||||
val deltaNamespace = "delta_ns"
|
||||
val namespace1 = "ns1"
|
||||
val namespace2 = "ns2"
|
||||
}
|
||||
|
||||
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.plugin.spark.authz.gen
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.OperationType._
|
||||
import org.apache.kyuubi.plugin.spark.authz.serde._
|
||||
|
||||
object DeltaCommands extends CommandSpecs[TableCommandSpec] {
|
||||
|
||||
val CreateDeltaTableCommand = {
|
||||
val cmd = "org.apache.spark.sql.delta.commands.CreateDeltaTableCommand"
|
||||
val tableDesc = TableDesc("table", classOf[CatalogTableTableExtractor])
|
||||
TableCommandSpec(cmd, Seq(tableDesc), CREATETABLE)
|
||||
}
|
||||
|
||||
override def specs: Seq[TableCommandSpec] = Seq(
|
||||
CreateDeltaTableCommand)
|
||||
}
|
||||
@ -44,7 +44,7 @@ class JsonSpecFileGenerator extends AnyFunSuite {
|
||||
// scalastyle:on
|
||||
test("check spec json files") {
|
||||
writeCommandSpecJson("database", Seq(DatabaseCommands))
|
||||
writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands))
|
||||
writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands, DeltaCommands))
|
||||
writeCommandSpecJson("function", Seq(FunctionCommands))
|
||||
writeCommandSpecJson("scan", Seq(Scans))
|
||||
}
|
||||
|
||||
@ -0,0 +1,127 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kyuubi.plugin.spark.authz.ranger
|
||||
|
||||
import org.scalatest.Outcome
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
|
||||
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
|
||||
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
|
||||
import org.apache.kyuubi.tags.DeltaTest
|
||||
import org.apache.kyuubi.util.AssertionUtils._
|
||||
|
||||
/**
|
||||
* Tests for RangerSparkExtensionSuite on Delta Lake
|
||||
*/
|
||||
@DeltaTest
|
||||
class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
override protected val catalogImpl: String = "hive"
|
||||
|
||||
val namespace1 = deltaNamespace
|
||||
val table1 = "table1_delta"
|
||||
val table2 = "table2_delta"
|
||||
|
||||
override def withFixture(test: NoArgTest): Outcome = {
|
||||
test()
|
||||
}
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
spark.conf.set(
|
||||
s"spark.sql.catalog.$sparkCatalog",
|
||||
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
|
||||
spark.conf.set(
|
||||
s"spark.sql.catalog.$sparkCatalog.warehouse",
|
||||
Utils.createTempDir("delta-hadoop").toString)
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
super.afterAll()
|
||||
spark.sessionState.catalog.reset()
|
||||
spark.sessionState.conf.clear()
|
||||
}
|
||||
|
||||
test("create table") {
|
||||
withCleanTmpResources(Seq(
|
||||
(s"$namespace1.$table1", "table"),
|
||||
(s"$namespace1.$table2", "table"),
|
||||
(s"$namespace1", "database"))) {
|
||||
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
|
||||
val createNonPartitionTableSql =
|
||||
s"""
|
||||
|CREATE TABLE IF NOT EXISTS $namespace1.$table1 (
|
||||
| id INT,
|
||||
| firstName STRING,
|
||||
| middleName STRING,
|
||||
| lastName STRING,
|
||||
| gender STRING,
|
||||
| birthDate TIMESTAMP,
|
||||
| ssn STRING,
|
||||
| salary INT
|
||||
|) USING DELTA
|
||||
|""".stripMargin
|
||||
interceptContains[AccessControlException] {
|
||||
doAs(someone, sql(createNonPartitionTableSql))
|
||||
}(s"does not have [create] privilege on [$namespace1/$table1]")
|
||||
doAs(admin, createNonPartitionTableSql)
|
||||
|
||||
val createPartitionTableSql =
|
||||
s"""
|
||||
|CREATE TABLE IF NOT EXISTS $namespace1.$table2 (
|
||||
| id INT,
|
||||
| firstName STRING,
|
||||
| middleName STRING,
|
||||
| lastName STRING,
|
||||
| gender STRING,
|
||||
| birthDate TIMESTAMP,
|
||||
| ssn STRING,
|
||||
| salary INT
|
||||
|)
|
||||
|USING DELTA
|
||||
|PARTITIONED BY (gender)
|
||||
|""".stripMargin
|
||||
interceptContains[AccessControlException] {
|
||||
doAs(someone, sql(createPartitionTableSql))
|
||||
}(s"does not have [create] privilege on [$namespace1/$table2]")
|
||||
doAs(admin, createPartitionTableSql)
|
||||
}
|
||||
}
|
||||
|
||||
test("create or replace table") {
|
||||
withCleanTmpResources(
|
||||
Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) {
|
||||
val createOrReplaceTableSql =
|
||||
s"""
|
||||
|CREATE OR REPLACE TABLE $namespace1.$table1 (
|
||||
| id INT,
|
||||
| firstName STRING,
|
||||
| middleName STRING,
|
||||
| lastName STRING,
|
||||
| gender STRING,
|
||||
| birthDate TIMESTAMP,
|
||||
| ssn STRING,
|
||||
| salary INT
|
||||
|) USING DELTA
|
||||
|""".stripMargin
|
||||
interceptContains[AccessControlException] {
|
||||
doAs(someone, sql(createOrReplaceTableSql))
|
||||
}(s"does not have [create] privilege on [$namespace1/$table1]")
|
||||
doAs(admin, createOrReplaceTableSql)
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user