From 0c2091cd03c92f4b3e402a2dc84ef35cdc5d5bea Mon Sep 17 00:00:00 2001 From: Bowen Liang Date: Tue, 11 Oct 2022 09:54:50 +0800 Subject: [PATCH] [KYUUBI #3515] [Authz] support checking rewritten Iceberg commands and skip apply Row-filter to output tables ### _Why are the changes needed?_ to close #3515. By replacing mapChildren in `RuleApplyRowFilterAndDataMasking`to skip head of children query as insterted by iceberg in `IcebergSparkSessionExtensions` . ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3520 from bowenliang123/3515-authz-iceberg. Closes #3515 a10fe43a [Bowen Liang] improve mapPlanChildren 5eb7f845 [Bowen Liang] fix problem after merging from master bb1eefbd [Bowen Liang] merge from master 6c6b071c [Bowen Liang] Merge commit 'ae2df990a32c695de6f0345ea7e73e51103e87e5' into 3515-authz-iceberg dbcfe6d8 [Bowen Liang] restrict skipMapchiled to Iceberg command privilege builders and add skipMappedChildren method to IcebergCommands to handling them ae2df990 [Bowen Liang] nit 0c691798 [liangbowen] update mapPlanChildren and passSparkVersionCheck 161215d0 [liangbowen] nit 0006dee3 [liangbowen] generalize passSparkVersionCheck method to AuthZUtils 44163638 [liangbowen] refactor getFieldValOpt to getFieldValOption 5b8aa40b [Bowen Liang] improvements on skippedMapChildren of mapPlanChildren 11c2e637 [liangbowen] nit 5a971945 [liangbowen] unifiying general way for ensure skipped table is in plan's children b161d70f [Bowen Liang] nit 952c1e1c [Bowen Liang] reuse MergeIntoTable of v2Commands to MergeIntoIcebergTable of IcebergCommands 0b25bd1b [Bowen Liang] generalize mapPlanChildren for iceberg commands 04fb651f [Bowen Liang] nit a1f33bc3 [Bowen Liang] update DELETE FROM TABLE ut 34d65e5a [Bowen Liang] introduce IcebergCommands for access checking iceberg table. skip head child for iceberg commands in RuleApplyRowFilterAndDataMasking to prevent marking output tables. 085bfa31 [liangbowen] repalce mapChildren in RuleApplyRowFilterAndDataMasking with mapPlanChildren method, to skip head child for iceberg UpdateIcebergTable/MergeIntoIcebergTable/DeleteFromIcebergTable 6a245019 [liangbowen] init iceberg ut Lead-authored-by: Bowen Liang Co-authored-by: liangbowen Signed-off-by: Kent Yao --- extensions/spark/kyuubi-spark-authz/pom.xml | 7 + .../plugin/spark/authz/IcebergCommands.scala | 120 +++++++++++++++ .../spark/authz/PrivilegesBuilder.scala | 3 + .../RuleApplyRowFilterAndDataMasking.scala | 15 +- .../plugin/spark/authz/util/AuthZUtils.scala | 15 ++ .../plugin/spark/authz/v2Commands.scala | 5 +- .../spark/authz/SparkSessionProvider.scala | 3 + ...bergCatalogRangerSparkExtensionSuite.scala | 145 ++++++++++++++++++ 8 files changed, 307 insertions(+), 6 deletions(-) create mode 100644 extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala create mode 100644 extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml b/extensions/spark/kyuubi-spark-authz/pom.xml index 35fea54c5..cf61281d4 100644 --- a/extensions/spark/kyuubi-spark-authz/pom.xml +++ b/extensions/spark/kyuubi-spark-authz/pom.xml @@ -283,6 +283,13 @@ guava test + + + org.apache.iceberg + iceberg-spark-runtime-${spark.binary.version}_${scala.binary.version} + test + + diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala new file mode 100644 index 000000000..fae9e6a02 --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.spark.authz + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.kyuubi.plugin.spark.authz.OperationType._ +import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType.PrivilegeObjectActionType +import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ +import org.apache.kyuubi.plugin.spark.authz.v2Commands.CommandType.CommandType + +/** + * Building privilege objects + * for Iceberg commands rewritten by extension + */ +object IcebergCommands extends Enumeration { + + import scala.language.implicitConversions + + implicit def valueToCmdPrivilegeBuilder(x: Value): CmdPrivilegeBuilder = + x.asInstanceOf[CmdPrivilegeBuilder] + + /** + * check whether commandName is implemented with supported privilege builders + * and pass the requirement checks (e.g. Spark version) + * + * @param commandName name of command + * @return true if so, false else + */ + def accept(commandName: String): Boolean = { + try { + val command = IcebergCommands.withName(commandName) + + // check spark version requirements + passSparkVersionCheck(command.mostVer, command.leastVer) + } catch { + case _: NoSuchElementException => false + } + } + + def skipMappedChildren(plan: LogicalPlan): Seq[LogicalPlan] = { + Seq( + getFieldValOpt[LogicalPlan](plan, "table"), + getFieldValOpt[LogicalPlan](plan, "targetTable"), + getFieldValOpt[LogicalPlan](plan, "sourceTable")) + .flatten intersect plan.children + } + + /** + * Command privilege builder + * + * @param operationType OperationType for converting accessType + * @param leastVer minimum Spark version required + * @param mostVer maximum Spark version supported + * @param commandTypes Seq of [[CommandType]] hinting privilege building + * @param buildInput input [[PrivilegeObject]] for privilege check + * @param buildOutput output [[PrivilegeObject]] for privilege check + * @param outputActionType [[PrivilegeObjectActionType]] for output [[PrivilegeObject]] + */ + case class CmdPrivilegeBuilder( + operationType: OperationType = QUERY, + leastVer: Option[String] = None, + mostVer: Option[String] = None, + commandTypes: Seq[CommandType] = Seq.empty, + buildInput: (LogicalPlan, ArrayBuffer[PrivilegeObject], Seq[CommandType]) => Unit = + v2Commands.defaultBuildInput, + buildOutput: ( + LogicalPlan, + ArrayBuffer[PrivilegeObject], + Seq[CommandType], + PrivilegeObjectActionType) => Unit = v2Commands.defaultBuildOutput, + outputActionType: PrivilegeObjectActionType = PrivilegeObjectActionType.OTHER) + extends super.Val { + + def buildPrivileges( + plan: LogicalPlan, + inputObjs: ArrayBuffer[PrivilegeObject], + outputObjs: ArrayBuffer[PrivilegeObject]): Unit = { + this.buildInput(plan, inputObjs, commandTypes) + this.buildOutput(plan, outputObjs, commandTypes, outputActionType) + } + } + + // dml commands + + val DeleteFromIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder( + operationType = ALTERTABLE_DROPPARTS, + leastVer = Some("3.2"), + commandTypes = Seq(v2Commands.CommandType.HasTableAsIdentifierOption), + outputActionType = PrivilegeObjectActionType.UPDATE) + + val UpdateIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder( + operationType = ALTERTABLE_ADDPARTS, + leastVer = Some("3.2"), + commandTypes = Seq(v2Commands.CommandType.HasTableAsIdentifierOption), + outputActionType = PrivilegeObjectActionType.UPDATE) + + val UnresolvedMergeIntoIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder() + + val MergeIntoIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder( + buildInput = v2Commands.MergeIntoTable.buildInput, + buildOutput = v2Commands.MergeIntoTable.buildOutput) +} diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala index d57cd2b0f..49fbe6042 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala @@ -212,6 +212,9 @@ object PrivilegesBuilder { case v2Cmd if v2Commands.accept(v2Cmd) => v2Commands.withName(v2Cmd).buildPrivileges(plan, inputObjs, outputObjs) + case icebergCmd if IcebergCommands.accept(icebergCmd) => + IcebergCommands.withName(icebergCmd).buildPrivileges(plan, inputObjs, outputObjs) + case "AlterDatabasePropertiesCommand" | "AlterDatabaseSetLocationCommand" | "CreateDatabaseCommand" | diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala index 7b3135cec..a30e2e4ff 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala @@ -24,16 +24,27 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType} +import org.apache.kyuubi.plugin.spark.authz.{IcebergCommands, ObjectType, OperationType} import org.apache.kyuubi.plugin.spark.authz.util.{PermanentViewMarker, RowFilterAndDataMaskingMarker} import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] { + private def mapPlanChildren(plan: LogicalPlan)(f: LogicalPlan => LogicalPlan): LogicalPlan = { + val newChildren = plan match { + case _ if IcebergCommands.accept(plan.nodeName) => + val skipped = IcebergCommands.skipMappedChildren(plan) + skipped ++ (plan.children diff skipped).map(f) + case _ => + plan.children.map(f) + } + plan.withNewChildren(newChildren) + } + override def apply(plan: LogicalPlan): LogicalPlan = { // Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation/DataSourceV2Relation with // RowFilterAndDataMaskingMarker if it is not wrapped yet. - plan mapChildren { + mapPlanChildren(plan) { case p: RowFilterAndDataMaskingMarker => p case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) => val table = getHiveTable(hiveTableRelation) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala index e157a8314..270acb7d5 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala @@ -44,6 +44,8 @@ private[authz] object AuthZUtils { } } + def getFieldValOpt[T](o: Any, name: String): Option[T] = Try(getFieldVal[T](o, name)).toOption + def invoke( obj: AnyRef, methodName: String, @@ -128,6 +130,19 @@ private[authz] object AuthZUtils { SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString) } + /** + * check if spark version satisfied + * first param is option of supported most spark version, + * and secont param is option of supported least spark version + * + * @return + */ + def passSparkVersionCheck: (Option[String], Option[String]) => Boolean = + (mostSparkVersion, leastSparkVersion) => { + mostSparkVersion.forall(isSparkVersionAtMost) && + leastSparkVersion.forall(isSparkVersionAtLeast) + } + def quoteIfNeeded(part: String): String = { if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) { part diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala index 90035d1f5..8ed6ca161 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala @@ -64,11 +64,8 @@ object v2Commands extends Enumeration { val command = v2Commands.withName(commandName) // check spark version requirements - def passSparkVersionCheck: Boolean = - (command.mostVer.isEmpty || isSparkVersionAtMost(command.mostVer.get)) && - (command.leastVer.isEmpty || isSparkVersionAtLeast(command.leastVer.get)) + passSparkVersionCheck(command.mostVer, command.leastVer) - passSparkVersionCheck } catch { case _: NoSuchElementException => false } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala index ae8032f5a..eb2b77959 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala @@ -32,6 +32,8 @@ trait SparkSessionProvider { protected val isSparkV33OrGreater: Boolean = isSparkVersionAtLeast("3.3") protected val extension: SparkSessionExtensions => Unit = _ => Unit + protected val sqlExtensions: String = "" + protected lazy val spark: SparkSession = { val metastore = { val path = Files.createTempDirectory("hms") @@ -46,6 +48,7 @@ trait SparkSessionProvider { .config( "spark.sql.warehouse.dir", Files.createTempDirectory("spark-warehouse").toString) + .config("spark.sql.extensions", sqlExtensions) .withExtensions(extension) .getOrCreate() } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala new file mode 100644 index 000000000..fe2902ada --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.plugin.spark.authz.ranger + +// scalastyle:off +import java.nio.file.Files + +import org.apache.kyuubi.plugin.spark.authz.AccessControlException + +/** + * Tests for RangerSparkExtensionSuite + * on Iceberg catalog with DataSource V2 API. + */ +class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { + override protected val catalogImpl: String = "hive" + override protected val sqlExtensions: String = + if (isSparkV32OrGreater) + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + else "" + + val catalogV2 = "local" + val namespace1 = "ns1" + val table1 = "table1" + val outputTable1 = "outputTable1" + + override def beforeAll(): Unit = { + if (isSparkV32OrGreater) { + spark.conf.set( + s"spark.sql.catalog.$catalogV2", + "org.apache.iceberg.spark.SparkCatalog") + spark.conf.set(s"spark.sql.catalog.$catalogV2.type", "hadoop") + spark.conf.set( + s"spark.sql.catalog.$catalogV2.warehouse", + Files.createTempDirectory("iceberg-hadoop").toString) + + super.beforeAll() + + doAs("admin", sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace1")) + doAs( + "admin", + sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table1" + + " (id int, name string, city string) USING iceberg")) + + doAs( + "admin", + sql(s"INSERT INTO $catalogV2.$namespace1.$table1" + + " (id , name , city ) VALUES (1, 'liangbowen','Guangzhou')")) + doAs( + "admin", + sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$outputTable1" + + " (id int, name string, city string) USING iceberg")) + } + } + + override def afterAll(): Unit = { + super.afterAll() + spark.sessionState.catalog.reset() + spark.sessionState.conf.clear() + } + + test("[KYUUBI #3515] MERGE INTO") { + assume(isSparkV32OrGreater) + + val mergeIntoSql = + s""" + |MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target + |USING $catalogV2.$namespace1.$table1 AS source + |ON target.id = source.id + |WHEN MATCHED AND (target.name='delete') THEN DELETE + |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city + """.stripMargin + + // MergeIntoTable: Using a MERGE INTO Statement + val e1 = intercept[AccessControlException]( + doAs( + "someone", + sql(mergeIntoSql))) + assert(e1.getMessage.contains(s"does not have [select] privilege" + + s" on [$namespace1/$table1/id]")) + + try { + SparkRangerAdminPlugin.getRangerConf.setBoolean( + s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call", + true) + val e2 = intercept[AccessControlException]( + doAs( + "someone", + sql(mergeIntoSql))) + assert(e2.getMessage.contains(s"does not have" + + s" [select] privilege" + + s" on [$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," + + s" [update] privilege on [$namespace1/$outputTable1]")) + } finally { + SparkRangerAdminPlugin.getRangerConf.setBoolean( + s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call", + false) + } + + doAs("admin", sql(mergeIntoSql)) + } + + test("[KYUUBI #3515] UPDATE TABLE") { + assume(isSparkV32OrGreater) + + // UpdateTable + val e1 = intercept[AccessControlException]( + doAs( + "someone", + sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " + + " WHERE id=1"))) + assert(e1.getMessage.contains(s"does not have [update] privilege" + + s" on [$namespace1/$table1]")) + + doAs( + "admin", + sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " + + " WHERE id=1")) + } + + test("[KYUUBI #3515] DELETE FROM TABLE") { + assume(isSparkV32OrGreater) + + // DeleteFromTable + val e6 = intercept[AccessControlException]( + doAs("someone", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))) + assert(e6.getMessage.contains(s"does not have [update] privilege" + + s" on [$namespace1/$table1]")) + + doAs("admin", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2")) + } +}