diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml b/extensions/spark/kyuubi-spark-authz/pom.xml index 35fea54c5..cf61281d4 100644 --- a/extensions/spark/kyuubi-spark-authz/pom.xml +++ b/extensions/spark/kyuubi-spark-authz/pom.xml @@ -283,6 +283,13 @@ guava test + + + org.apache.iceberg + iceberg-spark-runtime-${spark.binary.version}_${scala.binary.version} + test + + diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala new file mode 100644 index 000000000..fae9e6a02 --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.spark.authz + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.kyuubi.plugin.spark.authz.OperationType._ +import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType.PrivilegeObjectActionType +import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ +import org.apache.kyuubi.plugin.spark.authz.v2Commands.CommandType.CommandType + +/** + * Building privilege objects + * for Iceberg commands rewritten by extension + */ +object IcebergCommands extends Enumeration { + + import scala.language.implicitConversions + + implicit def valueToCmdPrivilegeBuilder(x: Value): CmdPrivilegeBuilder = + x.asInstanceOf[CmdPrivilegeBuilder] + + /** + * check whether commandName is implemented with supported privilege builders + * and pass the requirement checks (e.g. Spark version) + * + * @param commandName name of command + * @return true if so, false else + */ + def accept(commandName: String): Boolean = { + try { + val command = IcebergCommands.withName(commandName) + + // check spark version requirements + passSparkVersionCheck(command.mostVer, command.leastVer) + } catch { + case _: NoSuchElementException => false + } + } + + def skipMappedChildren(plan: LogicalPlan): Seq[LogicalPlan] = { + Seq( + getFieldValOpt[LogicalPlan](plan, "table"), + getFieldValOpt[LogicalPlan](plan, "targetTable"), + getFieldValOpt[LogicalPlan](plan, "sourceTable")) + .flatten intersect plan.children + } + + /** + * Command privilege builder + * + * @param operationType OperationType for converting accessType + * @param leastVer minimum Spark version required + * @param mostVer maximum Spark version supported + * @param commandTypes Seq of [[CommandType]] hinting privilege building + * @param buildInput input [[PrivilegeObject]] for privilege check + * @param buildOutput output [[PrivilegeObject]] for privilege check + * @param outputActionType [[PrivilegeObjectActionType]] for output [[PrivilegeObject]] + */ + case class CmdPrivilegeBuilder( + operationType: OperationType = QUERY, + leastVer: Option[String] = None, + mostVer: Option[String] = None, + commandTypes: Seq[CommandType] = Seq.empty, + buildInput: (LogicalPlan, ArrayBuffer[PrivilegeObject], Seq[CommandType]) => Unit = + v2Commands.defaultBuildInput, + buildOutput: ( + LogicalPlan, + ArrayBuffer[PrivilegeObject], + Seq[CommandType], + PrivilegeObjectActionType) => Unit = v2Commands.defaultBuildOutput, + outputActionType: PrivilegeObjectActionType = PrivilegeObjectActionType.OTHER) + extends super.Val { + + def buildPrivileges( + plan: LogicalPlan, + inputObjs: ArrayBuffer[PrivilegeObject], + outputObjs: ArrayBuffer[PrivilegeObject]): Unit = { + this.buildInput(plan, inputObjs, commandTypes) + this.buildOutput(plan, outputObjs, commandTypes, outputActionType) + } + } + + // dml commands + + val DeleteFromIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder( + operationType = ALTERTABLE_DROPPARTS, + leastVer = Some("3.2"), + commandTypes = Seq(v2Commands.CommandType.HasTableAsIdentifierOption), + outputActionType = PrivilegeObjectActionType.UPDATE) + + val UpdateIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder( + operationType = ALTERTABLE_ADDPARTS, + leastVer = Some("3.2"), + commandTypes = Seq(v2Commands.CommandType.HasTableAsIdentifierOption), + outputActionType = PrivilegeObjectActionType.UPDATE) + + val UnresolvedMergeIntoIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder() + + val MergeIntoIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder( + buildInput = v2Commands.MergeIntoTable.buildInput, + buildOutput = v2Commands.MergeIntoTable.buildOutput) +} diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala index d57cd2b0f..49fbe6042 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala @@ -212,6 +212,9 @@ object PrivilegesBuilder { case v2Cmd if v2Commands.accept(v2Cmd) => v2Commands.withName(v2Cmd).buildPrivileges(plan, inputObjs, outputObjs) + case icebergCmd if IcebergCommands.accept(icebergCmd) => + IcebergCommands.withName(icebergCmd).buildPrivileges(plan, inputObjs, outputObjs) + case "AlterDatabasePropertiesCommand" | "AlterDatabaseSetLocationCommand" | "CreateDatabaseCommand" | diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala index 7b3135cec..a30e2e4ff 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala @@ -24,16 +24,27 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType} +import org.apache.kyuubi.plugin.spark.authz.{IcebergCommands, ObjectType, OperationType} import org.apache.kyuubi.plugin.spark.authz.util.{PermanentViewMarker, RowFilterAndDataMaskingMarker} import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] { + private def mapPlanChildren(plan: LogicalPlan)(f: LogicalPlan => LogicalPlan): LogicalPlan = { + val newChildren = plan match { + case _ if IcebergCommands.accept(plan.nodeName) => + val skipped = IcebergCommands.skipMappedChildren(plan) + skipped ++ (plan.children diff skipped).map(f) + case _ => + plan.children.map(f) + } + plan.withNewChildren(newChildren) + } + override def apply(plan: LogicalPlan): LogicalPlan = { // Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation/DataSourceV2Relation with // RowFilterAndDataMaskingMarker if it is not wrapped yet. - plan mapChildren { + mapPlanChildren(plan) { case p: RowFilterAndDataMaskingMarker => p case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) => val table = getHiveTable(hiveTableRelation) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala index e157a8314..270acb7d5 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala @@ -44,6 +44,8 @@ private[authz] object AuthZUtils { } } + def getFieldValOpt[T](o: Any, name: String): Option[T] = Try(getFieldVal[T](o, name)).toOption + def invoke( obj: AnyRef, methodName: String, @@ -128,6 +130,19 @@ private[authz] object AuthZUtils { SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString) } + /** + * check if spark version satisfied + * first param is option of supported most spark version, + * and secont param is option of supported least spark version + * + * @return + */ + def passSparkVersionCheck: (Option[String], Option[String]) => Boolean = + (mostSparkVersion, leastSparkVersion) => { + mostSparkVersion.forall(isSparkVersionAtMost) && + leastSparkVersion.forall(isSparkVersionAtLeast) + } + def quoteIfNeeded(part: String): String = { if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) { part diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala index 90035d1f5..8ed6ca161 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala @@ -64,11 +64,8 @@ object v2Commands extends Enumeration { val command = v2Commands.withName(commandName) // check spark version requirements - def passSparkVersionCheck: Boolean = - (command.mostVer.isEmpty || isSparkVersionAtMost(command.mostVer.get)) && - (command.leastVer.isEmpty || isSparkVersionAtLeast(command.leastVer.get)) + passSparkVersionCheck(command.mostVer, command.leastVer) - passSparkVersionCheck } catch { case _: NoSuchElementException => false } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala index ae8032f5a..eb2b77959 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala @@ -32,6 +32,8 @@ trait SparkSessionProvider { protected val isSparkV33OrGreater: Boolean = isSparkVersionAtLeast("3.3") protected val extension: SparkSessionExtensions => Unit = _ => Unit + protected val sqlExtensions: String = "" + protected lazy val spark: SparkSession = { val metastore = { val path = Files.createTempDirectory("hms") @@ -46,6 +48,7 @@ trait SparkSessionProvider { .config( "spark.sql.warehouse.dir", Files.createTempDirectory("spark-warehouse").toString) + .config("spark.sql.extensions", sqlExtensions) .withExtensions(extension) .getOrCreate() } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala new file mode 100644 index 000000000..fe2902ada --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.plugin.spark.authz.ranger + +// scalastyle:off +import java.nio.file.Files + +import org.apache.kyuubi.plugin.spark.authz.AccessControlException + +/** + * Tests for RangerSparkExtensionSuite + * on Iceberg catalog with DataSource V2 API. + */ +class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { + override protected val catalogImpl: String = "hive" + override protected val sqlExtensions: String = + if (isSparkV32OrGreater) + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + else "" + + val catalogV2 = "local" + val namespace1 = "ns1" + val table1 = "table1" + val outputTable1 = "outputTable1" + + override def beforeAll(): Unit = { + if (isSparkV32OrGreater) { + spark.conf.set( + s"spark.sql.catalog.$catalogV2", + "org.apache.iceberg.spark.SparkCatalog") + spark.conf.set(s"spark.sql.catalog.$catalogV2.type", "hadoop") + spark.conf.set( + s"spark.sql.catalog.$catalogV2.warehouse", + Files.createTempDirectory("iceberg-hadoop").toString) + + super.beforeAll() + + doAs("admin", sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace1")) + doAs( + "admin", + sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table1" + + " (id int, name string, city string) USING iceberg")) + + doAs( + "admin", + sql(s"INSERT INTO $catalogV2.$namespace1.$table1" + + " (id , name , city ) VALUES (1, 'liangbowen','Guangzhou')")) + doAs( + "admin", + sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$outputTable1" + + " (id int, name string, city string) USING iceberg")) + } + } + + override def afterAll(): Unit = { + super.afterAll() + spark.sessionState.catalog.reset() + spark.sessionState.conf.clear() + } + + test("[KYUUBI #3515] MERGE INTO") { + assume(isSparkV32OrGreater) + + val mergeIntoSql = + s""" + |MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target + |USING $catalogV2.$namespace1.$table1 AS source + |ON target.id = source.id + |WHEN MATCHED AND (target.name='delete') THEN DELETE + |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city + """.stripMargin + + // MergeIntoTable: Using a MERGE INTO Statement + val e1 = intercept[AccessControlException]( + doAs( + "someone", + sql(mergeIntoSql))) + assert(e1.getMessage.contains(s"does not have [select] privilege" + + s" on [$namespace1/$table1/id]")) + + try { + SparkRangerAdminPlugin.getRangerConf.setBoolean( + s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call", + true) + val e2 = intercept[AccessControlException]( + doAs( + "someone", + sql(mergeIntoSql))) + assert(e2.getMessage.contains(s"does not have" + + s" [select] privilege" + + s" on [$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," + + s" [update] privilege on [$namespace1/$outputTable1]")) + } finally { + SparkRangerAdminPlugin.getRangerConf.setBoolean( + s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call", + false) + } + + doAs("admin", sql(mergeIntoSql)) + } + + test("[KYUUBI #3515] UPDATE TABLE") { + assume(isSparkV32OrGreater) + + // UpdateTable + val e1 = intercept[AccessControlException]( + doAs( + "someone", + sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " + + " WHERE id=1"))) + assert(e1.getMessage.contains(s"does not have [update] privilege" + + s" on [$namespace1/$table1]")) + + doAs( + "admin", + sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " + + " WHERE id=1")) + } + + test("[KYUUBI #3515] DELETE FROM TABLE") { + assume(isSparkV32OrGreater) + + // DeleteFromTable + val e6 = intercept[AccessControlException]( + doAs("someone", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))) + assert(e6.getMessage.contains(s"does not have [update] privilege" + + s" on [$namespace1/$table1]")) + + doAs("admin", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2")) + } +}