[KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified

### _Why are the changes needed?_
Fix #2918

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2967 from zhouyifan279/2918.

Closes #2918

79800d5b [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
b279d2f5 [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
fcb1f8a3 [Min Zhao] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
3cab67b1 [Min Zhao] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
cff04d1c [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
24aaf81e [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
306508f8 [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified

Lead-authored-by: zhouyifan279 <zhouyifan279@gmail.com>
Co-authored-by: Min Zhao <zhaomin1423@163.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
zhouyifan279 2022-06-30 15:20:44 +08:00 committed by Kent Yao
parent a9908a1b81
commit 8d4d00feb3
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
6 changed files with 81 additions and 11 deletions

View File

@ -30,7 +30,10 @@ import org.apache.kyuubi.plugin.spark.authz.util.RowFilterAndDataMaskingMarker
class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation with
// RowFilterAndDataMaskingMarker if it is not wrapped yet.
plan mapChildren {
case p: RowFilterAndDataMaskingMarker => p
case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
val table = getHiveTable(hiveTableRelation)
applyFilterAndMasking(hiveTableRelation, table, spark)
@ -41,6 +44,7 @@ class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[Logical
} else {
applyFilterAndMasking(logicalRelation, table.get, spark)
}
case other => apply(other)
}
}

View File

@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{RunnableCommand, ShowColumnsCommand}
import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
import org.apache.kyuubi.plugin.spark.authz.util.{AuthZUtils, ObjectFilterPlaceHolder, WithInternalChild}
import org.apache.kyuubi.plugin.spark.authz.util.{AuthZUtils, ObjectFilterPlaceHolder, WithInternalChildren}
class RuleReplaceShowObjectCommands extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
@ -76,7 +76,7 @@ case class FilteredShowDatabasesCommand(delegated: RunnableCommand)
}
abstract class FilteredShowObjectCommand(delegated: RunnableCommand)
extends RunnableCommand with WithInternalChild {
extends RunnableCommand with WithInternalChildren {
override val output: Seq[Attribute] = delegated.output
@ -92,7 +92,7 @@ abstract class FilteredShowObjectCommand(delegated: RunnableCommand)
}
case class FilteredShowFunctionsCommand(delegated: RunnableCommand)
extends FilteredShowObjectCommand(delegated) with WithInternalChild {
extends FilteredShowObjectCommand(delegated) with WithInternalChildren {
override protected def isAllowed(r: Row, ugi: UserGroupInformation): Boolean = {
val functionName = r.getString(0)
@ -110,7 +110,7 @@ case class FilteredShowFunctionsCommand(delegated: RunnableCommand)
}
case class FilteredShowColumnsCommand(delegated: RunnableCommand)
extends FilteredShowObjectCommand(delegated) with WithInternalChild {
extends FilteredShowObjectCommand(delegated) with WithInternalChildren {
override val output: Seq[Attribute] = delegated.output

View File

@ -18,8 +18,14 @@
package org.apache.kyuubi.plugin.spark.authz.util
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
case class RowFilterAndDataMaskingMarker(child: LogicalPlan) extends UnaryNode
with WithInternalChild {
override def output: Seq[Attribute] = child.output
override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(child = newChild)
case class RowFilterAndDataMaskingMarker(table: LogicalPlan) extends LeafNode {
override def output: Seq[Attribute] = table.output
}

View File

@ -22,6 +22,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
class RuleEliminateMarker extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.table }
plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.child }
}
}

View File

@ -19,6 +19,10 @@ package org.apache.kyuubi.plugin.spark.authz.util
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
trait WithInternalChild {
trait WithInternalChildren {
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan
}
trait WithInternalChild {
def withNewChildInternal(newChild: LogicalPlan): LogicalPlan
}

View File

@ -25,18 +25,22 @@ import scala.util.Try
import org.apache.commons.codec.digest.DigestUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.sql.{Row, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.scalatest.BeforeAndAfterAll
// scalastyle:off
import org.scalatest.funsuite.AnyFunSuite
import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.getFieldVal
abstract class RangerSparkExtensionSuite extends AnyFunSuite
with SparkSessionProvider with BeforeAndAfterAll {
// scalastyle:on
override protected val extension: SparkSessionExtensions => Unit = new RangerSparkExtension
private def doAs[T](user: String, f: => T): T = {
protected def doAs[T](user: String, f: => T): T = {
UserGroupInformation.createRemoteUser(user).doAs[T](
new PrivilegedExceptionAction[T] {
override def run(): T = f
@ -380,4 +384,56 @@ class InMemoryCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
override protected val catalogImpl: String = "hive"
test("table stats must be specified") {
val table = "hive_src"
try {
doAs("admin", sql(s"CREATE TABLE IF NOT EXISTS $table (id int)"))
doAs(
"admin", {
val hiveTableRelation = sql(s"SELECT * FROM $table")
.queryExecution.optimizedPlan.collectLeaves().head.asInstanceOf[HiveTableRelation]
assert(getFieldVal[Option[Statistics]](hiveTableRelation, "tableStats").nonEmpty)
})
} finally {
doAs("admin", sql(s"DROP TABLE IF EXISTS $table"))
}
}
test("HiveTableRelation should be able to be converted to LogicalRelation") {
val table = "hive_src"
try {
doAs("admin", sql(s"CREATE TABLE IF NOT EXISTS $table (id int) STORED AS PARQUET"))
doAs(
"admin", {
val relation = sql(s"SELECT * FROM $table")
.queryExecution.optimizedPlan.collectLeaves().head
assert(relation.isInstanceOf[LogicalRelation])
})
} finally {
doAs("admin", sql(s"DROP TABLE IF EXISTS $table"))
}
}
test("Pass through JoinSelection") {
val db = "test"
val table1 = "table1"
val table2 = "table2"
doAs(
"admin",
try {
sql(s"CREATE DATABASE IF NOT EXISTS $db")
sql(s"CREATE TABLE IF NOT EXISTS $db.$table1(id int) STORED AS PARQUET")
sql(s"INSERT INTO $db.$table1 SELECT 1")
sql(s"CREATE TABLE IF NOT EXISTS $db.$table2(id int, name string) STORED AS PARQUET")
sql(s"INSERT INTO $db.$table2 SELECT 1, 'a'")
val join = s"SELECT a.id, b.name FROM $db.$table1 a JOIN $db.$table2 b ON a.id=b.id"
assert(sql(join).collect().length == 1)
} finally {
sql(s"DROP TABLE IF EXISTS $db.$table2")
sql(s"DROP TABLE IF EXISTS $db.$table1")
sql(s"DROP DATABASE IF EXISTS $db")
})
}
}