[KYUUBI #3515] [Authz] support checking rewritten Iceberg commands and skip apply Row-filter to output tables
### _Why are the changes needed?_ to close #3515. By replacing mapChildren in `RuleApplyRowFilterAndDataMasking`to skip head of children query as insterted by iceberg in `IcebergSparkSessionExtensions` . ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3520 from bowenliang123/3515-authz-iceberg. Closes #3515 a10fe43a [Bowen Liang] improve mapPlanChildren 5eb7f845 [Bowen Liang] fix problem after merging from master bb1eefbd [Bowen Liang] merge from master 6c6b071c [Bowen Liang] Merge commit 'ae2df990a32c695de6f0345ea7e73e51103e87e5' into 3515-authz-iceberg dbcfe6d8 [Bowen Liang] restrict skipMapchiled to Iceberg command privilege builders and add skipMappedChildren method to IcebergCommands to handling them ae2df990 [Bowen Liang] nit 0c691798 [liangbowen] update mapPlanChildren and passSparkVersionCheck 161215d0 [liangbowen] nit 0006dee3 [liangbowen] generalize passSparkVersionCheck method to AuthZUtils 44163638 [liangbowen] refactor getFieldValOpt to getFieldValOption 5b8aa40b [Bowen Liang] improvements on skippedMapChildren of mapPlanChildren 11c2e637 [liangbowen] nit 5a971945 [liangbowen] unifiying general way for ensure skipped table is in plan's children b161d70f [Bowen Liang] nit 952c1e1c [Bowen Liang] reuse MergeIntoTable of v2Commands to MergeIntoIcebergTable of IcebergCommands 0b25bd1b [Bowen Liang] generalize mapPlanChildren for iceberg commands 04fb651f [Bowen Liang] nit a1f33bc3 [Bowen Liang] update DELETE FROM TABLE ut 34d65e5a [Bowen Liang] introduce IcebergCommands for access checking iceberg table. skip head child for iceberg commands in RuleApplyRowFilterAndDataMasking to prevent marking output tables. 085bfa31 [liangbowen] repalce mapChildren in RuleApplyRowFilterAndDataMasking with mapPlanChildren method, to skip head child for iceberg UpdateIcebergTable/MergeIntoIcebergTable/DeleteFromIcebergTable 6a245019 [liangbowen] init iceberg ut Lead-authored-by: Bowen Liang <liangbowen@gf.com.cn> Co-authored-by: liangbowen <liangbowen@gf.com.cn> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
e80a7db146
commit
0c2091cd03
@ -283,6 +283,13 @@
|
||||
<artifactId>guava</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.iceberg</groupId>
|
||||
<artifactId>iceberg-spark-runtime-${spark.binary.version}_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@ -0,0 +1,120 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.plugin.spark.authz
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.OperationType._
|
||||
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType.PrivilegeObjectActionType
|
||||
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
|
||||
import org.apache.kyuubi.plugin.spark.authz.v2Commands.CommandType.CommandType
|
||||
|
||||
/**
|
||||
* Building privilege objects
|
||||
* for Iceberg commands rewritten by extension
|
||||
*/
|
||||
object IcebergCommands extends Enumeration {
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
implicit def valueToCmdPrivilegeBuilder(x: Value): CmdPrivilegeBuilder =
|
||||
x.asInstanceOf[CmdPrivilegeBuilder]
|
||||
|
||||
/**
|
||||
* check whether commandName is implemented with supported privilege builders
|
||||
* and pass the requirement checks (e.g. Spark version)
|
||||
*
|
||||
* @param commandName name of command
|
||||
* @return true if so, false else
|
||||
*/
|
||||
def accept(commandName: String): Boolean = {
|
||||
try {
|
||||
val command = IcebergCommands.withName(commandName)
|
||||
|
||||
// check spark version requirements
|
||||
passSparkVersionCheck(command.mostVer, command.leastVer)
|
||||
} catch {
|
||||
case _: NoSuchElementException => false
|
||||
}
|
||||
}
|
||||
|
||||
def skipMappedChildren(plan: LogicalPlan): Seq[LogicalPlan] = {
|
||||
Seq(
|
||||
getFieldValOpt[LogicalPlan](plan, "table"),
|
||||
getFieldValOpt[LogicalPlan](plan, "targetTable"),
|
||||
getFieldValOpt[LogicalPlan](plan, "sourceTable"))
|
||||
.flatten intersect plan.children
|
||||
}
|
||||
|
||||
/**
|
||||
* Command privilege builder
|
||||
*
|
||||
* @param operationType OperationType for converting accessType
|
||||
* @param leastVer minimum Spark version required
|
||||
* @param mostVer maximum Spark version supported
|
||||
* @param commandTypes Seq of [[CommandType]] hinting privilege building
|
||||
* @param buildInput input [[PrivilegeObject]] for privilege check
|
||||
* @param buildOutput output [[PrivilegeObject]] for privilege check
|
||||
* @param outputActionType [[PrivilegeObjectActionType]] for output [[PrivilegeObject]]
|
||||
*/
|
||||
case class CmdPrivilegeBuilder(
|
||||
operationType: OperationType = QUERY,
|
||||
leastVer: Option[String] = None,
|
||||
mostVer: Option[String] = None,
|
||||
commandTypes: Seq[CommandType] = Seq.empty,
|
||||
buildInput: (LogicalPlan, ArrayBuffer[PrivilegeObject], Seq[CommandType]) => Unit =
|
||||
v2Commands.defaultBuildInput,
|
||||
buildOutput: (
|
||||
LogicalPlan,
|
||||
ArrayBuffer[PrivilegeObject],
|
||||
Seq[CommandType],
|
||||
PrivilegeObjectActionType) => Unit = v2Commands.defaultBuildOutput,
|
||||
outputActionType: PrivilegeObjectActionType = PrivilegeObjectActionType.OTHER)
|
||||
extends super.Val {
|
||||
|
||||
def buildPrivileges(
|
||||
plan: LogicalPlan,
|
||||
inputObjs: ArrayBuffer[PrivilegeObject],
|
||||
outputObjs: ArrayBuffer[PrivilegeObject]): Unit = {
|
||||
this.buildInput(plan, inputObjs, commandTypes)
|
||||
this.buildOutput(plan, outputObjs, commandTypes, outputActionType)
|
||||
}
|
||||
}
|
||||
|
||||
// dml commands
|
||||
|
||||
val DeleteFromIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
|
||||
operationType = ALTERTABLE_DROPPARTS,
|
||||
leastVer = Some("3.2"),
|
||||
commandTypes = Seq(v2Commands.CommandType.HasTableAsIdentifierOption),
|
||||
outputActionType = PrivilegeObjectActionType.UPDATE)
|
||||
|
||||
val UpdateIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
|
||||
operationType = ALTERTABLE_ADDPARTS,
|
||||
leastVer = Some("3.2"),
|
||||
commandTypes = Seq(v2Commands.CommandType.HasTableAsIdentifierOption),
|
||||
outputActionType = PrivilegeObjectActionType.UPDATE)
|
||||
|
||||
val UnresolvedMergeIntoIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder()
|
||||
|
||||
val MergeIntoIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
|
||||
buildInput = v2Commands.MergeIntoTable.buildInput,
|
||||
buildOutput = v2Commands.MergeIntoTable.buildOutput)
|
||||
}
|
||||
@ -212,6 +212,9 @@ object PrivilegesBuilder {
|
||||
case v2Cmd if v2Commands.accept(v2Cmd) =>
|
||||
v2Commands.withName(v2Cmd).buildPrivileges(plan, inputObjs, outputObjs)
|
||||
|
||||
case icebergCmd if IcebergCommands.accept(icebergCmd) =>
|
||||
IcebergCommands.withName(icebergCmd).buildPrivileges(plan, inputObjs, outputObjs)
|
||||
|
||||
case "AlterDatabasePropertiesCommand" |
|
||||
"AlterDatabaseSetLocationCommand" |
|
||||
"CreateDatabaseCommand" |
|
||||
|
||||
@ -24,16 +24,27 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project
|
||||
import org.apache.spark.sql.catalyst.rules.Rule
|
||||
import org.apache.spark.sql.connector.catalog.Identifier
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
|
||||
import org.apache.kyuubi.plugin.spark.authz.{IcebergCommands, ObjectType, OperationType}
|
||||
import org.apache.kyuubi.plugin.spark.authz.util.{PermanentViewMarker, RowFilterAndDataMaskingMarker}
|
||||
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
|
||||
|
||||
class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] {
|
||||
|
||||
private def mapPlanChildren(plan: LogicalPlan)(f: LogicalPlan => LogicalPlan): LogicalPlan = {
|
||||
val newChildren = plan match {
|
||||
case _ if IcebergCommands.accept(plan.nodeName) =>
|
||||
val skipped = IcebergCommands.skipMappedChildren(plan)
|
||||
skipped ++ (plan.children diff skipped).map(f)
|
||||
case _ =>
|
||||
plan.children.map(f)
|
||||
}
|
||||
plan.withNewChildren(newChildren)
|
||||
}
|
||||
|
||||
override def apply(plan: LogicalPlan): LogicalPlan = {
|
||||
// Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation/DataSourceV2Relation with
|
||||
// RowFilterAndDataMaskingMarker if it is not wrapped yet.
|
||||
plan mapChildren {
|
||||
mapPlanChildren(plan) {
|
||||
case p: RowFilterAndDataMaskingMarker => p
|
||||
case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
|
||||
val table = getHiveTable(hiveTableRelation)
|
||||
|
||||
@ -44,6 +44,8 @@ private[authz] object AuthZUtils {
|
||||
}
|
||||
}
|
||||
|
||||
def getFieldValOpt[T](o: Any, name: String): Option[T] = Try(getFieldVal[T](o, name)).toOption
|
||||
|
||||
def invoke(
|
||||
obj: AnyRef,
|
||||
methodName: String,
|
||||
@ -128,6 +130,19 @@ private[authz] object AuthZUtils {
|
||||
SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString)
|
||||
}
|
||||
|
||||
/**
|
||||
* check if spark version satisfied
|
||||
* first param is option of supported most spark version,
|
||||
* and secont param is option of supported least spark version
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
def passSparkVersionCheck: (Option[String], Option[String]) => Boolean =
|
||||
(mostSparkVersion, leastSparkVersion) => {
|
||||
mostSparkVersion.forall(isSparkVersionAtMost) &&
|
||||
leastSparkVersion.forall(isSparkVersionAtLeast)
|
||||
}
|
||||
|
||||
def quoteIfNeeded(part: String): String = {
|
||||
if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
|
||||
part
|
||||
|
||||
@ -64,11 +64,8 @@ object v2Commands extends Enumeration {
|
||||
val command = v2Commands.withName(commandName)
|
||||
|
||||
// check spark version requirements
|
||||
def passSparkVersionCheck: Boolean =
|
||||
(command.mostVer.isEmpty || isSparkVersionAtMost(command.mostVer.get)) &&
|
||||
(command.leastVer.isEmpty || isSparkVersionAtLeast(command.leastVer.get))
|
||||
passSparkVersionCheck(command.mostVer, command.leastVer)
|
||||
|
||||
passSparkVersionCheck
|
||||
} catch {
|
||||
case _: NoSuchElementException => false
|
||||
}
|
||||
|
||||
@ -32,6 +32,8 @@ trait SparkSessionProvider {
|
||||
protected val isSparkV33OrGreater: Boolean = isSparkVersionAtLeast("3.3")
|
||||
|
||||
protected val extension: SparkSessionExtensions => Unit = _ => Unit
|
||||
protected val sqlExtensions: String = ""
|
||||
|
||||
protected lazy val spark: SparkSession = {
|
||||
val metastore = {
|
||||
val path = Files.createTempDirectory("hms")
|
||||
@ -46,6 +48,7 @@ trait SparkSessionProvider {
|
||||
.config(
|
||||
"spark.sql.warehouse.dir",
|
||||
Files.createTempDirectory("spark-warehouse").toString)
|
||||
.config("spark.sql.extensions", sqlExtensions)
|
||||
.withExtensions(extension)
|
||||
.getOrCreate()
|
||||
}
|
||||
|
||||
@ -0,0 +1,145 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kyuubi.plugin.spark.authz.ranger
|
||||
|
||||
// scalastyle:off
|
||||
import java.nio.file.Files
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
|
||||
|
||||
/**
|
||||
* Tests for RangerSparkExtensionSuite
|
||||
* on Iceberg catalog with DataSource V2 API.
|
||||
*/
|
||||
class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
override protected val catalogImpl: String = "hive"
|
||||
override protected val sqlExtensions: String =
|
||||
if (isSparkV32OrGreater)
|
||||
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
|
||||
else ""
|
||||
|
||||
val catalogV2 = "local"
|
||||
val namespace1 = "ns1"
|
||||
val table1 = "table1"
|
||||
val outputTable1 = "outputTable1"
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
if (isSparkV32OrGreater) {
|
||||
spark.conf.set(
|
||||
s"spark.sql.catalog.$catalogV2",
|
||||
"org.apache.iceberg.spark.SparkCatalog")
|
||||
spark.conf.set(s"spark.sql.catalog.$catalogV2.type", "hadoop")
|
||||
spark.conf.set(
|
||||
s"spark.sql.catalog.$catalogV2.warehouse",
|
||||
Files.createTempDirectory("iceberg-hadoop").toString)
|
||||
|
||||
super.beforeAll()
|
||||
|
||||
doAs("admin", sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace1"))
|
||||
doAs(
|
||||
"admin",
|
||||
sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table1" +
|
||||
" (id int, name string, city string) USING iceberg"))
|
||||
|
||||
doAs(
|
||||
"admin",
|
||||
sql(s"INSERT INTO $catalogV2.$namespace1.$table1" +
|
||||
" (id , name , city ) VALUES (1, 'liangbowen','Guangzhou')"))
|
||||
doAs(
|
||||
"admin",
|
||||
sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$outputTable1" +
|
||||
" (id int, name string, city string) USING iceberg"))
|
||||
}
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
super.afterAll()
|
||||
spark.sessionState.catalog.reset()
|
||||
spark.sessionState.conf.clear()
|
||||
}
|
||||
|
||||
test("[KYUUBI #3515] MERGE INTO") {
|
||||
assume(isSparkV32OrGreater)
|
||||
|
||||
val mergeIntoSql =
|
||||
s"""
|
||||
|MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target
|
||||
|USING $catalogV2.$namespace1.$table1 AS source
|
||||
|ON target.id = source.id
|
||||
|WHEN MATCHED AND (target.name='delete') THEN DELETE
|
||||
|WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city
|
||||
""".stripMargin
|
||||
|
||||
// MergeIntoTable: Using a MERGE INTO Statement
|
||||
val e1 = intercept[AccessControlException](
|
||||
doAs(
|
||||
"someone",
|
||||
sql(mergeIntoSql)))
|
||||
assert(e1.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/id]"))
|
||||
|
||||
try {
|
||||
SparkRangerAdminPlugin.getRangerConf.setBoolean(
|
||||
s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call",
|
||||
true)
|
||||
val e2 = intercept[AccessControlException](
|
||||
doAs(
|
||||
"someone",
|
||||
sql(mergeIntoSql)))
|
||||
assert(e2.getMessage.contains(s"does not have" +
|
||||
s" [select] privilege" +
|
||||
s" on [$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," +
|
||||
s" [update] privilege on [$namespace1/$outputTable1]"))
|
||||
} finally {
|
||||
SparkRangerAdminPlugin.getRangerConf.setBoolean(
|
||||
s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call",
|
||||
false)
|
||||
}
|
||||
|
||||
doAs("admin", sql(mergeIntoSql))
|
||||
}
|
||||
|
||||
test("[KYUUBI #3515] UPDATE TABLE") {
|
||||
assume(isSparkV32OrGreater)
|
||||
|
||||
// UpdateTable
|
||||
val e1 = intercept[AccessControlException](
|
||||
doAs(
|
||||
"someone",
|
||||
sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
|
||||
" WHERE id=1")))
|
||||
assert(e1.getMessage.contains(s"does not have [update] privilege" +
|
||||
s" on [$namespace1/$table1]"))
|
||||
|
||||
doAs(
|
||||
"admin",
|
||||
sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
|
||||
" WHERE id=1"))
|
||||
}
|
||||
|
||||
test("[KYUUBI #3515] DELETE FROM TABLE") {
|
||||
assume(isSparkV32OrGreater)
|
||||
|
||||
// DeleteFromTable
|
||||
val e6 = intercept[AccessControlException](
|
||||
doAs("someone", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2")))
|
||||
assert(e6.getMessage.contains(s"does not have [update] privilege" +
|
||||
s" on [$namespace1/$table1]"))
|
||||
|
||||
doAs("admin", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user