From 8d4d00feb3cd7e6ef5e942531856de532ed3a74f Mon Sep 17 00:00:00 2001 From: zhouyifan279 Date: Thu, 30 Jun 2022 15:20:44 +0800 Subject: [PATCH] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified ### _Why are the changes needed?_ Fix #2918 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2967 from zhouyifan279/2918. Closes #2918 79800d5b [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified b279d2f5 [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified fcb1f8a3 [Min Zhao] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified 3cab67b1 [Min Zhao] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified cff04d1c [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified 24aaf81e [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified 306508f8 [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified Lead-authored-by: zhouyifan279 Co-authored-by: Min Zhao Signed-off-by: Kent Yao --- .../RuleApplyRowFilterAndDataMasking.scala | 6 +- .../RuleReplaceShowObjectCommands.scala | 8 +-- .../util/RowFilterAndDataMaskingMarker.scala | 12 +++- .../authz/util/RuleEliminateMarker.scala | 2 +- ...Child.scala => WithInternalChildren.scala} | 6 +- .../ranger/RangerSparkExtensionSuite.scala | 58 ++++++++++++++++++- 6 files changed, 81 insertions(+), 11 deletions(-) rename extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/{WithInternalChild.scala => WithInternalChildren.scala} (91%) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala index 5ea1d2737..78081b555 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala @@ -30,7 +30,10 @@ import org.apache.kyuubi.plugin.spark.authz.util.RowFilterAndDataMaskingMarker class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { - plan transformUp { + // Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation with + // RowFilterAndDataMaskingMarker if it is not wrapped yet. + plan mapChildren { + case p: RowFilterAndDataMaskingMarker => p case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) => val table = getHiveTable(hiveTableRelation) applyFilterAndMasking(hiveTableRelation, table, spark) @@ -41,6 +44,7 @@ class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[Logical } else { applyFilterAndMasking(logicalRelation, table.get, spark) } + case other => apply(other) } } diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala index 22f91249b..08d2b4fd0 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{RunnableCommand, ShowColumnsCommand} import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType} -import org.apache.kyuubi.plugin.spark.authz.util.{AuthZUtils, ObjectFilterPlaceHolder, WithInternalChild} +import org.apache.kyuubi.plugin.spark.authz.util.{AuthZUtils, ObjectFilterPlaceHolder, WithInternalChildren} class RuleReplaceShowObjectCommands extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan match { @@ -76,7 +76,7 @@ case class FilteredShowDatabasesCommand(delegated: RunnableCommand) } abstract class FilteredShowObjectCommand(delegated: RunnableCommand) - extends RunnableCommand with WithInternalChild { + extends RunnableCommand with WithInternalChildren { override val output: Seq[Attribute] = delegated.output @@ -92,7 +92,7 @@ abstract class FilteredShowObjectCommand(delegated: RunnableCommand) } case class FilteredShowFunctionsCommand(delegated: RunnableCommand) - extends FilteredShowObjectCommand(delegated) with WithInternalChild { + extends FilteredShowObjectCommand(delegated) with WithInternalChildren { override protected def isAllowed(r: Row, ugi: UserGroupInformation): Boolean = { val functionName = r.getString(0) @@ -110,7 +110,7 @@ case class FilteredShowFunctionsCommand(delegated: RunnableCommand) } case class FilteredShowColumnsCommand(delegated: RunnableCommand) - extends FilteredShowObjectCommand(delegated) with WithInternalChild { + extends FilteredShowObjectCommand(delegated) with WithInternalChildren { override val output: Seq[Attribute] = delegated.output diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala index b6da24217..357e9bfc2 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala @@ -18,8 +18,14 @@ package org.apache.kyuubi.plugin.spark.authz.util import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} + +case class RowFilterAndDataMaskingMarker(child: LogicalPlan) extends UnaryNode + with WithInternalChild { + + override def output: Seq[Attribute] = child.output + + override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(child = newChild) -case class RowFilterAndDataMaskingMarker(table: LogicalPlan) extends LeafNode { - override def output: Seq[Attribute] = table.output } diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala index 77e4083ab..d2da72570 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala @@ -22,6 +22,6 @@ import org.apache.spark.sql.catalyst.rules.Rule class RuleEliminateMarker extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { - plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.table } + plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.child } } } diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChild.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChildren.scala similarity index 91% rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChild.scala rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChildren.scala index f10e72e28..bbce1dff8 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChild.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChildren.scala @@ -19,6 +19,10 @@ package org.apache.kyuubi.plugin.spark.authz.util import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -trait WithInternalChild { +trait WithInternalChildren { def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan } + +trait WithInternalChild { + def withNewChildInternal(newChild: LogicalPlan): LogicalPlan +} diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala index 30f1a9f9c..ec48c6587 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala @@ -25,18 +25,22 @@ import scala.util.Try import org.apache.commons.codec.digest.DigestUtils import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.sql.{Row, SparkSessionExtensions} +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.scalatest.BeforeAndAfterAll // scalastyle:off import org.scalatest.funsuite.AnyFunSuite import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider +import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.getFieldVal abstract class RangerSparkExtensionSuite extends AnyFunSuite with SparkSessionProvider with BeforeAndAfterAll { // scalastyle:on override protected val extension: SparkSessionExtensions => Unit = new RangerSparkExtension - private def doAs[T](user: String, f: => T): T = { + protected def doAs[T](user: String, f: => T): T = { UserGroupInformation.createRemoteUser(user).doAs[T]( new PrivilegedExceptionAction[T] { override def run(): T = f @@ -380,4 +384,56 @@ class InMemoryCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { override protected val catalogImpl: String = "hive" + + test("table stats must be specified") { + val table = "hive_src" + try { + doAs("admin", sql(s"CREATE TABLE IF NOT EXISTS $table (id int)")) + doAs( + "admin", { + val hiveTableRelation = sql(s"SELECT * FROM $table") + .queryExecution.optimizedPlan.collectLeaves().head.asInstanceOf[HiveTableRelation] + assert(getFieldVal[Option[Statistics]](hiveTableRelation, "tableStats").nonEmpty) + }) + } finally { + doAs("admin", sql(s"DROP TABLE IF EXISTS $table")) + } + } + + test("HiveTableRelation should be able to be converted to LogicalRelation") { + val table = "hive_src" + try { + doAs("admin", sql(s"CREATE TABLE IF NOT EXISTS $table (id int) STORED AS PARQUET")) + doAs( + "admin", { + val relation = sql(s"SELECT * FROM $table") + .queryExecution.optimizedPlan.collectLeaves().head + assert(relation.isInstanceOf[LogicalRelation]) + }) + } finally { + doAs("admin", sql(s"DROP TABLE IF EXISTS $table")) + } + } + + test("Pass through JoinSelection") { + val db = "test" + val table1 = "table1" + val table2 = "table2" + + doAs( + "admin", + try { + sql(s"CREATE DATABASE IF NOT EXISTS $db") + sql(s"CREATE TABLE IF NOT EXISTS $db.$table1(id int) STORED AS PARQUET") + sql(s"INSERT INTO $db.$table1 SELECT 1") + sql(s"CREATE TABLE IF NOT EXISTS $db.$table2(id int, name string) STORED AS PARQUET") + sql(s"INSERT INTO $db.$table2 SELECT 1, 'a'") + val join = s"SELECT a.id, b.name FROM $db.$table1 a JOIN $db.$table2 b ON a.id=b.id" + assert(sql(join).collect().length == 1) + } finally { + sql(s"DROP TABLE IF EXISTS $db.$table2") + sql(s"DROP TABLE IF EXISTS $db.$table1") + sql(s"DROP DATABASE IF EXISTS $db") + }) + } }