[KYUUBI #5594][AUTHZ] BuildQuery should respect normal node's input
# 🔍 Description ## Issue References 🔗 This pull request fixes #5594 ## Describe Your Solution 🔧 For case ``` def filter_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] df = spark.read.table("test_mapinpandas") execute_result = df.mapInPandas(filter_func, df.schema).show() ``` The logical plan is ``` GlobalLimit 21 +- LocalLimit 21 +- Project [cast(id#5 as string) AS id#11, name#6] +- MapInPandas filter_func(id#0, name#1), [id#5, name#6] +- HiveTableRelation [`default`.`test_mapinpandas`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#0, name#1], Partition Cols: []] ``` When handle `MapInPandas`, we didn't match its input with `HiveTableRelation`, cause we miss input table's columns. This pr fix this In this pr, we remove the branch of each project such as `Project`, `Aggregate` etc, handle it together. ## Types of changes 🔖 - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ For case ``` def filter_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] df = spark.read.table("test_mapinpandas") execute_result = df.mapInPandas(filter_func, df.schema).show() ``` We miss column info of table `test_mapinpandas` #### Behavior With This Pull Request 🎉 We got privilege object of table `test_mapinpandas` with it's column info. #### Related Unit Tests --- # Checklists ## 📝 Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [x] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## 📝 Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [x] Minimum number of approvals - [x] No changes are requested **Be nice. Be informative.** Closes #5787 from AngersZhuuuu/KYUUBI-5594-approach2. Closes #5594 e08545599 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 49f09fb0a [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 4781f75b9 [Angerszhuuuu] Update PrivilegesBuilderSuite.scala 9e9208d38 [Angerszhuuuu] Update V2JdbcTableCatalogRangerSparkExtensionSuite.scala 626d3dd88 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 3d69997de [Angerszhuuuu] Update PrivilegesBuilderSuite.scala 6eb4b8e1a [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 61efb8ae3 [Angerszhuuuu] update 794ebb7be [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2 a236da86b [Angerszhuuuu] Update PrivilegesBuilderSuite.scala 74bd3f4d5 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 4acbc4276 [Angerszhuuuu] Merge branch 'KYUUBI-5594-approach2' of https://github.com/AngersZhuuuu/incubator-kyuubi into KYUUBI-5594-approach2 266f7e877 [Angerszhuuuu] update a6c784546 [Angerszhuuuu] Update PrivilegesBuilder.scala d785d5fdf [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2 014ef3b84 [Angerszhuuuu] Update PrivilegesBuilder.scala 7e1cd37a1 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2 71d266162 [Angerszhuuuu] update db9594170 [Angerszhuuuu] update 490eb95c2 [Angerszhuuuu] update 70d110e89 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2 e6a587718 [Angerszhuuuu] Update PrivilegesBuilder.scala 5ff22b103 [Angerszhuuuu] Update PrivilegesBuilder.scala e6843014b [Angerszhuuuu] Update PrivilegesBuilder.scala 594b202f7 [Angerszhuuuu] Update PrivilegesBuilder.scala 2f87c61e1 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 1de8c1c68 [Angerszhuuuu] Update PrivilegesBuilder.scala ad17255d7 [Angerszhuuuu] Update PrivilegesBuilderSuite.scala 4f5e8505f [Angerszhuuuu] update 64349ed97 [Angerszhuuuu] Update PrivilegesBuilder.scala 11b7a4c13 [Angerszhuuuu] Update PrivilegesBuilder.scala 9a58fb0c4 [Angerszhuuuu] update d0b022ec9 [Angerszhuuuu] Update RuleApplyPermanentViewMarker.scala e0f28a640 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594 0ebdd5de5 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594 8e53236ac [Angerszhuuuu] update 3bafa7ca5 [Angerszhuuuu] update d6e984e07 [Angerszhuuuu] update b00bf5e20 [Angerszhuuuu] Update PrivilegesBuilder.scala 821422852 [Angerszhuuuu] update 93fc6892b [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594 04184e39d [Angerszhuuuu] update 0bb762467 [Angerszhuuuu] Revert "Revert "Update PrivilegesBuilder.scala"" f481283ae [Angerszhuuuu] Revert "Update PrivilegesBuilder.scala" 9f871822f [Angerszhuuuu] Revert "Update PrivilegesBuilder.scala" 29b67c457 [Angerszhuuuu] Update PrivilegesBuilder.scala 8785ad1ab [Angerszhuuuu] Update PrivilegesBuilder.scala 270f21dcc [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 60872efcb [Angerszhuuuu] Update RangerSparkExtensionSuite.scala c34f32ea2 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594 86fc4756a [Angerszhuuuu] Update PrivilegesBuilder.scala 404f1ea4c [Angerszhuuuu] Update PrivilegesBuilder.scala dcca394e0 [Angerszhuuuu] Update PrivilegesBuilder.scala c2c6fa447 [Angerszhuuuu] Update PrivilegesBuilder.scala 6f6a36e5b [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594]-AUTH]BuildQuery-should-respect-normal-node's-input 4dd47a124 [Angerszhuuuu] update c549b6a1a [Angerszhuuuu] update 80013b981 [Angerszhuuuu] Update PrivilegesBuilder.scala 3cbba422a [Angerszhuuuu] Update PrivilegesBuilder.scala Authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
a2179cc599
commit
f67140e650
@ -20,7 +20,7 @@ package org.apache.kyuubi.plugin.spark.authz
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, NamedExpression}
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.execution.command.ExplainCommand
|
||||
import org.slf4j.LoggerFactory
|
||||
@ -69,44 +69,20 @@ object PrivilegesBuilder {
|
||||
if (projectionList.isEmpty) {
|
||||
privilegeObjects += PrivilegeObject(table, plan.output.map(_.name))
|
||||
} else {
|
||||
val cols = (projectionList ++ conditionList).flatMap(collectLeaves)
|
||||
.filter(plan.outputSet.contains).map(_.name).distinct
|
||||
privilegeObjects += PrivilegeObject(table, cols)
|
||||
val cols = columnPrune(projectionList ++ conditionList, plan.outputSet)
|
||||
privilegeObjects += PrivilegeObject(table, cols.map(_.name).distinct)
|
||||
}
|
||||
}
|
||||
|
||||
def columnPrune(projectionList: Seq[Expression], output: AttributeSet): Seq[NamedExpression] = {
|
||||
(projectionList ++ conditionList)
|
||||
.flatMap(collectLeaves)
|
||||
.filter(output.contains)
|
||||
}
|
||||
|
||||
plan match {
|
||||
case p if p.getTagValue(KYUUBI_AUTHZ_TAG).nonEmpty =>
|
||||
|
||||
case p: Project => buildQuery(p.child, privilegeObjects, p.projectList, conditionList, spark)
|
||||
|
||||
case j: Join =>
|
||||
val cols =
|
||||
conditionList ++ j.condition.map(expr => collectLeaves(expr)).getOrElse(Nil)
|
||||
buildQuery(j.left, privilegeObjects, projectionList, cols, spark)
|
||||
buildQuery(j.right, privilegeObjects, projectionList, cols, spark)
|
||||
|
||||
case f: Filter =>
|
||||
val cols = conditionList ++ collectLeaves(f.condition)
|
||||
buildQuery(f.child, privilegeObjects, projectionList, cols, spark)
|
||||
|
||||
case w: Window =>
|
||||
val orderCols = w.orderSpec.flatMap(orderSpec => collectLeaves(orderSpec))
|
||||
val partitionCols = w.partitionSpec.flatMap(partitionSpec => collectLeaves(partitionSpec))
|
||||
val cols = conditionList ++ orderCols ++ partitionCols
|
||||
buildQuery(w.child, privilegeObjects, projectionList, cols, spark)
|
||||
|
||||
case s: Sort =>
|
||||
val sortCols = s.order.flatMap(sortOrder => collectLeaves(sortOrder))
|
||||
val cols = conditionList ++ sortCols
|
||||
buildQuery(s.child, privilegeObjects, projectionList, cols, spark)
|
||||
|
||||
case a: Aggregate =>
|
||||
val aggCols =
|
||||
(a.aggregateExpressions ++ a.groupingExpressions).flatMap(e => collectLeaves(e))
|
||||
val cols = conditionList ++ aggCols
|
||||
buildQuery(a.child, privilegeObjects, projectionList, cols, spark)
|
||||
|
||||
case scan if isKnownScan(scan) && scan.resolved =>
|
||||
val tables = getScanSpec(scan).tables(scan, spark)
|
||||
// If the the scan is table-based, we check privileges on the table we found
|
||||
@ -125,7 +101,33 @@ object PrivilegesBuilder {
|
||||
|
||||
case p =>
|
||||
for (child <- p.children) {
|
||||
buildQuery(child, privilegeObjects, projectionList, conditionList, spark)
|
||||
// If current plan's references don't have relation to it's input, have two cases
|
||||
// 1. `MapInPandas`, `ScriptTransformation`
|
||||
// 2. `Project` output only have constant value
|
||||
if (columnPrune(p.references.toSeq ++ p.output, p.inputSet).isEmpty) {
|
||||
// If plan is project and output don't have relation to input, can ignore.
|
||||
if (!p.isInstanceOf[Project]) {
|
||||
buildQuery(
|
||||
child,
|
||||
privilegeObjects,
|
||||
p.inputSet.map(_.toAttribute).toSeq,
|
||||
Nil,
|
||||
spark)
|
||||
}
|
||||
} else {
|
||||
buildQuery(
|
||||
child,
|
||||
privilegeObjects,
|
||||
// Here we use `projectList ++ p.reference` do column prune.
|
||||
// For `Project`, `Aggregate`, plan's output is contained by plan's referenced
|
||||
// For `Filter`, `Sort` etc... it rely on upper `Project` node,
|
||||
// since we wrap a `Project` before call `buildQuery()`.
|
||||
// So here we use upper node's projectionList and current's references
|
||||
// to do column pruning can get the correct column.
|
||||
columnPrune(projectionList ++ p.references.toSeq, p.inputSet).distinct,
|
||||
conditionList ++ p.references,
|
||||
spark)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -221,7 +223,26 @@ object PrivilegesBuilder {
|
||||
LOG.debug(ud.error(plan, e))
|
||||
}
|
||||
}
|
||||
spec.queries(plan).foreach(buildQuery(_, inputObjs, spark = spark))
|
||||
spec.queries(plan).foreach { p =>
|
||||
if (p.resolved) {
|
||||
buildQuery(Project(p.output, p), inputObjs, spark = spark)
|
||||
} else {
|
||||
try {
|
||||
// For spark 3.1, Some command such as CreateTableASSelect, its query was unresolved,
|
||||
// Before this pr, we just ignore it, now we support this.
|
||||
val analyzed = spark.sessionState.analyzer.execute(p)
|
||||
buildQuery(Project(analyzed.output, analyzed), inputObjs, spark = spark)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
LOG.debug(
|
||||
s"""
|
||||
|Failed to analyze unresolved
|
||||
|$p
|
||||
|due to ${e.getMessage}""".stripMargin,
|
||||
e)
|
||||
}
|
||||
}
|
||||
}
|
||||
spec.operationType
|
||||
|
||||
case classname if FUNCTION_COMMAND_SPECS.contains(classname) =>
|
||||
@ -315,7 +336,7 @@ object PrivilegesBuilder {
|
||||
case cmd: Command => buildCommand(cmd, inputObjs, outputObjs, spark)
|
||||
// Queries
|
||||
case _ =>
|
||||
buildQuery(plan, inputObjs, spark = spark)
|
||||
buildQuery(Project(plan.output, plan), inputObjs, spark = spark)
|
||||
OperationType.QUERY
|
||||
}
|
||||
(inputObjs, outputObjs, opType)
|
||||
|
||||
@ -59,11 +59,15 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
|
||||
protected def checkColumns(plan: LogicalPlan, cols: Seq[String]): Unit = {
|
||||
val (in, out, _) = PrivilegesBuilder.build(plan, spark)
|
||||
assert(out.isEmpty, "Queries shall not check output privileges")
|
||||
val po = in.head
|
||||
assert(po.actionType === PrivilegeObjectActionType.OTHER)
|
||||
assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
|
||||
assert(po.columns === cols)
|
||||
checkTableOwner(po)
|
||||
if (in.nonEmpty) {
|
||||
val po = in.head
|
||||
assert(po.actionType === PrivilegeObjectActionType.OTHER)
|
||||
assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
|
||||
assert(po.columns === cols)
|
||||
checkTableOwner(po)
|
||||
} else {
|
||||
assert(cols.isEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
protected def checkColumns(query: String, cols: Seq[String]): Unit = {
|
||||
@ -365,7 +369,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
|
||||
assertEqualsIgnoreCase(reusedPartTableShort)(po0.objectName)
|
||||
if (isSparkV32OrGreater) {
|
||||
// Query in AlterViewAsCommand can not be resolved before SPARK-34698
|
||||
assert(po0.columns === Seq("key", "value", "pid"))
|
||||
assert(po0.columns === Seq("key", "pid", "value"))
|
||||
checkTableOwner(po0)
|
||||
}
|
||||
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
|
||||
@ -526,12 +530,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
|
||||
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
|
||||
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
|
||||
assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
|
||||
if (isSparkV32OrGreater) {
|
||||
assert(po0.columns.head === "key")
|
||||
checkTableOwner(po0)
|
||||
} else {
|
||||
assert(po0.columns.isEmpty)
|
||||
}
|
||||
assert(po0.columns.head === "key")
|
||||
checkTableOwner(po0)
|
||||
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
|
||||
assert(accessType0 === AccessType.SELECT)
|
||||
|
||||
@ -549,12 +549,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
|
||||
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
|
||||
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
|
||||
assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
|
||||
if (isSparkV32OrGreater) {
|
||||
assert(po0.columns === Seq("key", "value"))
|
||||
checkTableOwner(po0)
|
||||
} else {
|
||||
assert(po0.columns.isEmpty)
|
||||
}
|
||||
assert(po0.columns === Seq("key", "value"))
|
||||
checkTableOwner(po0)
|
||||
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
|
||||
assert(accessType0 === AccessType.SELECT)
|
||||
|
||||
@ -1050,7 +1046,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
|
||||
assertEqualsIgnoreCase(reusedDb)(po.dbname)
|
||||
assertStartsWithIgnoreCase(reusedTableShort)(po.objectName)
|
||||
assert(
|
||||
po.columns === Seq("value", "pid", "key"),
|
||||
po.columns === Seq("value", "key", "pid"),
|
||||
s"$reusedPartTable both 'key', 'value' and 'pid' should be authenticated")
|
||||
checkTableOwner(po)
|
||||
val accessType = ranger.AccessType(po, operationType, isInput = true)
|
||||
@ -1107,7 +1103,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
|
||||
assertEqualsIgnoreCase(reusedDb)(po.dbname)
|
||||
assertStartsWithIgnoreCase(reusedTableShort)(po.objectName)
|
||||
assert(
|
||||
po.columns === Seq("key", "value"),
|
||||
po.columns.sorted === Seq("key", "value").sorted,
|
||||
s"$reusedPartTable 'key' is the join key and 'pid' is omitted")
|
||||
checkTableOwner(po)
|
||||
val accessType = ranger.AccessType(po, operationType, isInput = true)
|
||||
@ -1218,7 +1214,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
|
||||
assertEqualsIgnoreCase(reusedDb)(po.dbname)
|
||||
assertStartsWithIgnoreCase(reusedTableShort)(po.objectName)
|
||||
assert(
|
||||
po.columns === Seq("key", "value", "pid"),
|
||||
po.columns === Seq("key", "pid", "value"),
|
||||
s"$reusedPartTable both 'key', 'value' and 'pid' should be authenticated")
|
||||
checkTableOwner(po)
|
||||
val accessType = ranger.AccessType(po, operationType, isInput = true)
|
||||
@ -1625,7 +1621,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
|
||||
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
|
||||
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
|
||||
assert(po0.objectName equalsIgnoreCase reusedPartTable.split("\\.").last)
|
||||
assert(po0.columns === Seq("key", "value", "pid"))
|
||||
assert(po0.columns === Seq("key", "pid", "value"))
|
||||
checkTableOwner(po0)
|
||||
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
|
||||
assert(accessType0 === AccessType.SELECT)
|
||||
@ -1721,7 +1717,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
|
||||
assert(out1.isEmpty)
|
||||
val pi1 = in1.head
|
||||
assert(pi1.columns.size === 3)
|
||||
assert(pi1.columns === Seq("key", "value", "pid"))
|
||||
assert(pi1.columns === Seq("key", "pid", "value"))
|
||||
|
||||
// case2: Some columns are involved, and the group column is not selected.
|
||||
val plan2 = sql(s"SELECT COUNT(key) FROM $reusedPartTable GROUP BY pid")
|
||||
@ -1741,7 +1737,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
|
||||
assert(out3.isEmpty)
|
||||
val pi3 = in3.head
|
||||
assert(pi3.columns.size === 2)
|
||||
assert(pi3.columns === Seq("key", "pid"))
|
||||
assert(pi3.columns === Seq("pid", "key"))
|
||||
|
||||
// case4: HAVING & GROUP clause
|
||||
val plan4 = sql(s"SELECT COUNT(key) FROM $reusedPartTable GROUP BY pid HAVING MAX(key) > 1000")
|
||||
|
||||
@ -127,7 +127,7 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite {
|
||||
assert(po0.catalog.isEmpty)
|
||||
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
|
||||
assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
|
||||
assert(po0.columns.take(2) === Seq("key", "value"))
|
||||
assert(po0.columns === Seq("a", "key", "value"))
|
||||
checkTableOwner(po0)
|
||||
|
||||
assert(outputs.size === 1)
|
||||
@ -186,7 +186,7 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite {
|
||||
assert(po0.catalog.isEmpty)
|
||||
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
|
||||
assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
|
||||
assert(po0.columns.take(2) === Seq("key", "value"))
|
||||
assert(po0.columns === Seq("a", "key", "value"))
|
||||
checkTableOwner(po0)
|
||||
|
||||
assert(outputs.size === 1)
|
||||
|
||||
@ -215,8 +215,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
s" SELECT * FROM $namespace1.$table2"
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(insertIntoSql)))(
|
||||
s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," +
|
||||
s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
|
||||
s"does not have [select] privilege on " +
|
||||
s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," +
|
||||
s"$namespace1/$table2/id,$namespace1/$table2/name]," +
|
||||
s" [update] privilege on [$namespace1/$table1]")
|
||||
doAs(admin, sql(insertIntoSql))
|
||||
|
||||
@ -225,8 +226,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
s" SELECT * FROM $namespace1.$table2"
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(insertOverwriteSql)))(
|
||||
s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," +
|
||||
s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
|
||||
s"does not have [select] privilege on " +
|
||||
s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," +
|
||||
s"$namespace1/$table2/id,$namespace1/$table2/name]," +
|
||||
s" [update] privilege on [$namespace1/$table1]")
|
||||
doAs(admin, sql(insertOverwriteSql))
|
||||
}
|
||||
@ -283,8 +285,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
|""".stripMargin
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(mergeIntoSql)))(
|
||||
s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," +
|
||||
s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
|
||||
s"does not have [select] privilege on " +
|
||||
s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," +
|
||||
s"$namespace1/$table2/id,$namespace1/$table2/name]," +
|
||||
s" [update] privilege on [$namespace1/$table1]")
|
||||
doAs(admin, sql(mergeIntoSql))
|
||||
}
|
||||
@ -378,9 +381,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
val insertIntoSql = s"INSERT INTO delta.`$path` SELECT * FROM $namespace1.$table2"
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(insertIntoSql)))(
|
||||
s"does not have [select] privilege on [$namespace1/$table2/id," +
|
||||
s"$namespace1/$table2/name,$namespace1/$table2/gender," +
|
||||
s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]")
|
||||
s"does not have [select] privilege on [$namespace1/$table2/birthDate," +
|
||||
s"$namespace1/$table2/gender,$namespace1/$table2/id," +
|
||||
s"$namespace1/$table2/name], [write] privilege on [[$path, $path/]]")
|
||||
doAs(admin, sql(insertIntoSql))
|
||||
|
||||
// insert overwrite
|
||||
@ -388,9 +391,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
s"INSERT OVERWRITE delta.`$path` SELECT * FROM $namespace1.$table2"
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(insertOverwriteSql)))(
|
||||
s"does not have [select] privilege on [$namespace1/$table2/id," +
|
||||
s"$namespace1/$table2/name,$namespace1/$table2/gender," +
|
||||
s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]")
|
||||
s"does not have [select] privilege on [$namespace1/$table2/birthDate," +
|
||||
s"$namespace1/$table2/gender,$namespace1/$table2/id," +
|
||||
s"$namespace1/$table2/name], [write] privilege on [[$path, $path/]]")
|
||||
doAs(admin, sql(insertOverwriteSql))
|
||||
})
|
||||
}
|
||||
@ -433,9 +436,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
|""".stripMargin
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(mergeIntoSql)))(
|
||||
s"does not have [select] privilege on [$namespace1/$table2/id," +
|
||||
s"$namespace1/$table2/name,$namespace1/$table2/gender," +
|
||||
s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]")
|
||||
s"does not have [select] privilege on [$namespace1/$table2/birthDate," +
|
||||
s"$namespace1/$table2/gender,$namespace1/$table2/id," +
|
||||
s"$namespace1/$table2/name], [write] privilege on [[$path, $path/]]")
|
||||
doAs(admin, sql(mergeIntoSql))
|
||||
})
|
||||
}
|
||||
|
||||
@ -507,7 +507,7 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
interceptEndsWith[AccessControlException] {
|
||||
doAs(someone, sql(mergeIntoSQL))
|
||||
}(s"does not have [select] privilege on " +
|
||||
s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city], " +
|
||||
s"[$namespace1/$table2/city,$namespace1/$table2/id,$namespace1/$table2/name], " +
|
||||
s"[update] privilege on [$namespace1/$table1]")
|
||||
doAs(admin, sql(mergeIntoSQL))
|
||||
}
|
||||
|
||||
@ -93,73 +93,87 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
|
||||
}
|
||||
|
||||
test("[KYUUBI #3515] MERGE INTO") {
|
||||
val mergeIntoSql =
|
||||
s"""
|
||||
|MERGE INTO $catalogV2.$bobNamespace.$bobSelectTable AS target
|
||||
|USING $catalogV2.$namespace1.$table1 AS source
|
||||
|ON target.id = source.id
|
||||
|WHEN MATCHED AND (target.name='delete') THEN DELETE
|
||||
|WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city
|
||||
withSingleCallEnabled {
|
||||
val mergeIntoSql =
|
||||
s"""
|
||||
|MERGE INTO $catalogV2.$bobNamespace.$bobSelectTable AS target
|
||||
|USING $catalogV2.$namespace1.$table1 AS source
|
||||
|ON target.id = source.id
|
||||
|WHEN MATCHED AND (target.name='delete') THEN DELETE
|
||||
|WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city
|
||||
""".stripMargin
|
||||
|
||||
// MergeIntoTable: Using a MERGE INTO Statement
|
||||
val e1 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(mergeIntoSql)))
|
||||
assert(e1.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/id]"))
|
||||
// MergeIntoTable: Using a MERGE INTO Statement
|
||||
val e1 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(mergeIntoSql)))
|
||||
assert(e1.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
|
||||
|
||||
withSingleCallEnabled {
|
||||
interceptEndsWith[AccessControlException](doAs(someone, sql(mergeIntoSql)))(
|
||||
if (isSparkV35OrGreater) {
|
||||
s"does not have [select] privilege on [$namespace1/table1/id" +
|
||||
s",$namespace1/$table1/name,$namespace1/$table1/city]"
|
||||
} else {
|
||||
"does not have " +
|
||||
s"[select] privilege on [$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city]," +
|
||||
s" [update] privilege on [$bobNamespace/$bobSelectTable]"
|
||||
})
|
||||
withSingleCallEnabled {
|
||||
interceptEndsWith[AccessControlException](doAs(someone, sql(mergeIntoSql)))(
|
||||
if (isSparkV35OrGreater) {
|
||||
s"does not have [select] privilege on [$namespace1/table1/city" +
|
||||
s",$namespace1/$table1/id,$namespace1/$table1/name]"
|
||||
} else {
|
||||
"does not have " +
|
||||
s"[select] privilege on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]," +
|
||||
s" [update] privilege on [$bobNamespace/$bobSelectTable]"
|
||||
})
|
||||
|
||||
interceptEndsWith[AccessControlException] {
|
||||
doAs(bob, sql(mergeIntoSql))
|
||||
}(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
|
||||
interceptEndsWith[AccessControlException] {
|
||||
doAs(bob, sql(mergeIntoSql))
|
||||
}(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
|
||||
}
|
||||
|
||||
doAs(admin, sql(mergeIntoSql))
|
||||
}
|
||||
|
||||
doAs(admin, sql(mergeIntoSql))
|
||||
}
|
||||
|
||||
test("[KYUUBI #3515] UPDATE TABLE") {
|
||||
// UpdateTable
|
||||
interceptEndsWith[AccessControlException] {
|
||||
doAs(someone, sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' WHERE id=1"))
|
||||
}(if (isSparkV35OrGreater) {
|
||||
s"does not have [select] privilege on [$namespace1/$table1/id]"
|
||||
} else {
|
||||
s"does not have [update] privilege on [$namespace1/$table1]"
|
||||
})
|
||||
withSingleCallEnabled {
|
||||
// UpdateTable
|
||||
interceptEndsWith[AccessControlException] {
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' WHERE id=1"))
|
||||
}(if (isSparkV35OrGreater) {
|
||||
s"does not have [select] privilege on " +
|
||||
s"[$namespace1/$table1/_file,$namespace1/$table1/_pos," +
|
||||
s"$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city], " +
|
||||
s"[update] privilege on [$namespace1/$table1]"
|
||||
} else {
|
||||
s"does not have [update] privilege on [$namespace1/$table1]"
|
||||
})
|
||||
|
||||
doAs(
|
||||
admin,
|
||||
sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
|
||||
" WHERE id=1"))
|
||||
doAs(
|
||||
admin,
|
||||
sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
|
||||
" WHERE id=1"))
|
||||
}
|
||||
}
|
||||
|
||||
test("[KYUUBI #3515] DELETE FROM TABLE") {
|
||||
// DeleteFromTable
|
||||
interceptEndsWith[AccessControlException] {
|
||||
doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
|
||||
}(if (isSparkV34OrGreater) {
|
||||
s"does not have [select] privilege on [$namespace1/$table1/id]"
|
||||
} else {
|
||||
s"does not have [update] privilege on [$namespace1/$table1]"
|
||||
})
|
||||
withSingleCallEnabled {
|
||||
// DeleteFromTable
|
||||
interceptEndsWith[AccessControlException] {
|
||||
doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
|
||||
}(if (isSparkV34OrGreater) {
|
||||
s"does not have [select] privilege on " +
|
||||
s"[$namespace1/$table1/_file,$namespace1/$table1/_pos," +
|
||||
s"$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name], " +
|
||||
s"[update] privilege on [$namespace1/$table1]"
|
||||
} else {
|
||||
s"does not have [update] privilege on [$namespace1/$table1]"
|
||||
})
|
||||
|
||||
interceptEndsWith[AccessControlException] {
|
||||
doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable WHERE id=2"))
|
||||
}(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
|
||||
interceptEndsWith[AccessControlException] {
|
||||
doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable WHERE id=2"))
|
||||
}(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
|
||||
|
||||
doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
|
||||
doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
|
||||
}
|
||||
}
|
||||
|
||||
test("[KYUUBI #3666] Support {OWNER} variable for queries run on CatalogV2") {
|
||||
|
||||
@ -23,12 +23,15 @@ import java.nio.file.Path
|
||||
import scala.util.Try
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.spark.sql.{Row, SparkSessionExtensions}
|
||||
import org.apache.spark.sql.{DataFrame, Row, SparkSessionExtensions}
|
||||
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
|
||||
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
|
||||
import org.apache.spark.sql.catalyst.expressions.PythonUDF
|
||||
import org.apache.spark.sql.catalyst.plans.logical.Statistics
|
||||
import org.apache.spark.sql.execution.columnar.InMemoryRelation
|
||||
import org.apache.spark.sql.execution.datasources.LogicalRelation
|
||||
import org.apache.spark.sql.functions.col
|
||||
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
// scalastyle:off
|
||||
import org.scalatest.funsuite.AnyFunSuite
|
||||
@ -555,11 +558,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
|
||||
val e2 = intercept[AccessControlException](
|
||||
doAs(someone, sql(s"CREATE VIEW $permView AS SELECT * FROM $table")))
|
||||
if (isSparkV32OrGreater) {
|
||||
assert(e2.getMessage.contains(s"does not have [select] privilege on [default/$table/id]"))
|
||||
} else {
|
||||
assert(e2.getMessage.contains(s"does not have [select] privilege on [$table]"))
|
||||
}
|
||||
assert(e2.getMessage.contains(s"does not have [select] privilege on [default/$table/id]"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -638,14 +637,12 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
s" FROM $db1.$srcTable1 as tb1" +
|
||||
s" JOIN $db1.$srcTable2 as tb2" +
|
||||
s" on tb1.id = tb2.id"
|
||||
val e1 = intercept[AccessControlException](doAs(someone, sql(insertSql1)))
|
||||
assert(e1.getMessage.contains(s"does not have [select] privilege on [$db1/$srcTable1/id]"))
|
||||
|
||||
withSingleCallEnabled {
|
||||
val e2 = intercept[AccessControlException](doAs(someone, sql(insertSql1)))
|
||||
assert(e2.getMessage.contains(s"does not have" +
|
||||
val e = intercept[AccessControlException](doAs(someone, sql(insertSql1)))
|
||||
assert(e.getMessage.contains(s"does not have" +
|
||||
s" [select] privilege on" +
|
||||
s" [$db1/$srcTable1/id,$db1/$srcTable1/name,$db1/$srcTable1/city," +
|
||||
s" [$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name," +
|
||||
s"$db1/$srcTable2/age,$db1/$srcTable2/id]," +
|
||||
s" [update] privilege on [$db1/$sinkTable1/id,$db1/$sinkTable1/age," +
|
||||
s"$db1/$sinkTable1/name,$db1/$sinkTable1/city]"))
|
||||
@ -675,11 +672,13 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
sql(s"CREATE TABLE IF NOT EXISTS $db1.$srcTable1" +
|
||||
s" (id int, name string, city string)"))
|
||||
|
||||
val e1 = intercept[AccessControlException](
|
||||
doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from $db1.$srcTable1")))
|
||||
assert(
|
||||
e1.getMessage.contains(s"does not have [select] privilege on [$db1/$srcTable1/id]"))
|
||||
|
||||
withSingleCallEnabled {
|
||||
val e1 = intercept[AccessControlException](
|
||||
doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from $db1.$srcTable1")))
|
||||
assert(
|
||||
e1.getMessage.contains(s"does not have [select] privilege on " +
|
||||
s"[$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name]"))
|
||||
}
|
||||
doAs(admin, sql(s"CACHE TABLE $cacheTable3 SELECT 1 AS a, 2 AS b "))
|
||||
doAs(someone, sql(s"CACHE TABLE $cacheTable4 select 1 as a, 2 as b "))
|
||||
}
|
||||
@ -888,9 +887,15 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"SELECT id as new_id, name, max_scope FROM $db1.$view1".stripMargin).show()))
|
||||
assert(e2.getMessage.contains(
|
||||
s"does not have [select] privilege on " +
|
||||
s"[$db1/$view1/id,$db1/$view1/name,$db1/$view1/max_scope]"))
|
||||
if (isSparkV35OrGreater) {
|
||||
assert(e2.getMessage.contains(
|
||||
s"does not have [select] privilege on " +
|
||||
s"[$db1/$view1/id,$db1/$view1/max_scope,$db1/$view1/name]"))
|
||||
} else {
|
||||
assert(e2.getMessage.contains(
|
||||
s"does not have [select] privilege on " +
|
||||
s"[$db1/$view1/name,$db1/$view1/id,$db1/$view1/max_scope]"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -927,17 +932,11 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
|AS
|
||||
|SELECT count(*) as cnt, sum(id) as sum_id FROM $db1.$table1
|
||||
""".stripMargin))
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(s"SELECT count(*) FROM $db1.$table1").show()))(
|
||||
s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/scope]")
|
||||
checkAnswer(someone, s"SELECT count(*) FROM $db1.$table1", Row(0) :: Nil)
|
||||
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(s"SELECT count(*) FROM $db1.$view1").show()))(
|
||||
s"does not have [select] privilege on [$db1/$view1/id,$db1/$view1/scope]")
|
||||
checkAnswer(someone, s"SELECT count(*) FROM $db1.$view1", Row(0) :: Nil)
|
||||
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(s"SELECT count(*) FROM $db1.$view2").show()))(
|
||||
s"does not have [select] privilege on [$db1/$view2/cnt,$db1/$view2/sum_id]")
|
||||
checkAnswer(someone, s"SELECT count(*) FROM $db1.$view2", Row(1) :: Nil)
|
||||
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(someone, sql(s"SELECT count(id) FROM $db1.$table1 WHERE id > 10").show()))(
|
||||
@ -1321,7 +1320,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"SELECT typeof(id), typeof(typeof(day)) FROM $db1.$table1").collect()))(
|
||||
s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/day]")
|
||||
s"does not have [select] privilege on [$db1/$table1/day,$db1/$table1/id]")
|
||||
interceptEndsWith[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
@ -1331,7 +1330,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
|typeof(cast(id as string)),
|
||||
|typeof(substring(day, 1, 3))
|
||||
|FROM $db1.$table1""".stripMargin).collect()))(
|
||||
s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/day]")
|
||||
s"does not have [select] privilege on [$db1/$table1/day,$db1/$table1/id]")
|
||||
checkAnswer(
|
||||
admin,
|
||||
s"""
|
||||
@ -1414,60 +1413,76 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|
||||
}
|
||||
}
|
||||
|
||||
test("[KYUUBI #5884] PVM should inherit MultiInstance and wrap with new exprId") {
|
||||
test("[KYUUBI #5594][AUTHZ] BuildQuery should respect normal node's input ") {
|
||||
assume(!isSparkV35OrGreater, "mapInPandas not supported after spark 3.5")
|
||||
val db1 = defaultDb
|
||||
val table1 = "table1"
|
||||
val perm_view = "perm_view"
|
||||
val view1 = "view1"
|
||||
val view2 = "view2"
|
||||
val view3 = "view3"
|
||||
withSingleCallEnabled {
|
||||
withCleanTmpResources(Seq.empty) {
|
||||
sql("set spark.sql.legacy.storeAnalyzedPlanForView=true")
|
||||
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1(id int, scope int)"))
|
||||
doAs(admin, sql(s"CREATE VIEW $db1.$perm_view AS SELECT * FROM $db1.$table1"))
|
||||
|
||||
doAs(
|
||||
admin,
|
||||
sql(
|
||||
s"""
|
||||
|CREATE OR REPLACE TEMPORARY VIEW $view1 AS
|
||||
|SELECT *
|
||||
|FROM $db1.$perm_view
|
||||
|WHERE id > 10
|
||||
|""".stripMargin))
|
||||
|
||||
doAs(
|
||||
admin,
|
||||
sql(
|
||||
s"""
|
||||
|CREATE OR REPLACE TEMPORARY VIEW $view2 AS
|
||||
|SELECT *
|
||||
|FROM $view1
|
||||
|WHERE scope < 10
|
||||
|""".stripMargin))
|
||||
|
||||
doAs(
|
||||
admin,
|
||||
sql(
|
||||
s"""
|
||||
|CREATE OR REPLACE TEMPORARY VIEW $view3 AS
|
||||
|SELECT *
|
||||
|FROM $view1
|
||||
|WHERE scope is not null
|
||||
|""".stripMargin))
|
||||
withCleanTmpResources(Seq((s"$db1.$table1", "table"), (s"$db1.$view1", "view"))) {
|
||||
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)"))
|
||||
doAs(admin, sql(s"CREATE VIEW $db1.$view1 AS SELECT * FROM $db1.$table1"))
|
||||
|
||||
val table = spark.read.table(s"$db1.$table1")
|
||||
val mapTableInPandasUDF = PythonUDF(
|
||||
"mapInPandasUDF",
|
||||
null,
|
||||
StructType(Seq(StructField("id", IntegerType), StructField("scope", IntegerType))),
|
||||
table.queryExecution.analyzed.output,
|
||||
205,
|
||||
true)
|
||||
interceptContains[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(
|
||||
s"""
|
||||
|SELECT a.*, b.scope as new_scope
|
||||
|FROM $view2 a
|
||||
|JOIN $view3 b
|
||||
|ON a.id == b.id
|
||||
|""".stripMargin).collect()))(s"does not have [select] privilege on " +
|
||||
s"[$db1/$perm_view/id,$db1/$perm_view/scope,$db1/$perm_view/scope,$db1/$perm_view/id]")
|
||||
invokeAs(
|
||||
table,
|
||||
"mapInPandas",
|
||||
(classOf[PythonUDF], mapTableInPandasUDF))
|
||||
.asInstanceOf[DataFrame].select(col("id"), col("scope")).limit(1).show(true)))(
|
||||
s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/scope]")
|
||||
|
||||
val view = spark.read.table(s"$db1.$view1")
|
||||
val mapViewInPandasUDF = PythonUDF(
|
||||
"mapInPandasUDF",
|
||||
null,
|
||||
StructType(Seq(StructField("id", IntegerType), StructField("scope", IntegerType))),
|
||||
view.queryExecution.analyzed.output,
|
||||
205,
|
||||
true)
|
||||
interceptContains[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
invokeAs(
|
||||
view,
|
||||
"mapInPandas",
|
||||
(classOf[PythonUDF], mapViewInPandasUDF))
|
||||
.asInstanceOf[DataFrame].select(col("id"), col("scope")).limit(1).show(true)))(
|
||||
s"does not have [select] privilege on [$db1/$view1/id,$db1/$view1/scope]")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("[KYUUBI #5594][AUTHZ] BuildQuery should respect sort agg input") {
|
||||
val db1 = defaultDb
|
||||
val table1 = "table1"
|
||||
val view1 = "view1"
|
||||
withSingleCallEnabled {
|
||||
withCleanTmpResources(Seq((s"$db1.$table1", "table"), (s"$db1.$view1", "view"))) {
|
||||
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)"))
|
||||
doAs(admin, sql(s"CREATE VIEW $db1.$view1 AS SELECT * FROM $db1.$table1"))
|
||||
checkAnswer(
|
||||
someone,
|
||||
s"SELECT count(*) FROM $db1.$table1 WHERE id > 1",
|
||||
Row(0) :: Nil)
|
||||
checkAnswer(
|
||||
someone,
|
||||
s"SELECT count(*) FROM $db1.$view1 WHERE id > 1",
|
||||
Row(0) :: Nil)
|
||||
interceptContains[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"SELECT count(id) FROM $db1.$view1 WHERE id > 1").collect()))(
|
||||
s"does not have [select] privilege on [$db1/$view1/id]")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -107,20 +107,23 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu
|
||||
}
|
||||
|
||||
test("[KYUUBI #3424] CREATE TABLE") {
|
||||
// CreateTable
|
||||
val e2 = intercept[AccessControlException](
|
||||
doAs(someone, sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2")))
|
||||
assert(e2.getMessage.contains(s"does not have [create] privilege" +
|
||||
s" on [$namespace1/$table2]"))
|
||||
withSingleCallEnabled {
|
||||
// CreateTable
|
||||
val e2 = intercept[AccessControlException](
|
||||
doAs(someone, sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2")))
|
||||
assert(e2.getMessage.contains(s"does not have [create] privilege" +
|
||||
s" on [$namespace1/$table2]"))
|
||||
|
||||
// CreateTableAsSelect
|
||||
val e21 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2" +
|
||||
s" AS select * from $catalogV2.$namespace1.$table1")))
|
||||
assert(e21.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/id]"))
|
||||
// CreateTableAsSelect
|
||||
|
||||
val e21 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2" +
|
||||
s" AS select * from $catalogV2.$namespace1.$table1")))
|
||||
assert(e21.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
|
||||
}
|
||||
}
|
||||
|
||||
test("[KYUUBI #3424] DROP TABLE") {
|
||||
@ -133,69 +136,74 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu
|
||||
|
||||
test("[KYUUBI #3424] INSERT TABLE") {
|
||||
// AppendData: Insert Using a VALUES Clause
|
||||
val e4 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
|
||||
s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
|
||||
assert(e4.getMessage.contains(s"does not have [update] privilege" +
|
||||
s" on [$namespace1/$outputTable1]"))
|
||||
withSingleCallEnabled {
|
||||
|
||||
// AppendData: Insert Using a TABLE Statement
|
||||
val e42 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
|
||||
s" TABLE $catalogV2.$namespace1.$table1")))
|
||||
assert(e42.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/id]"))
|
||||
val e4 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
|
||||
s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
|
||||
assert(e4.getMessage.contains(s"does not have [update] privilege" +
|
||||
s" on [$namespace1/$outputTable1]"))
|
||||
|
||||
// AppendData: Insert Using a SELECT Statement
|
||||
val e43 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
|
||||
s" SELECT * from $catalogV2.$namespace1.$table1")))
|
||||
assert(e43.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/id]"))
|
||||
// AppendData: Insert Using a TABLE Statement
|
||||
val e42 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
|
||||
s" TABLE $catalogV2.$namespace1.$table1")))
|
||||
assert(e42.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
|
||||
|
||||
// OverwriteByExpression: Insert Overwrite
|
||||
val e44 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"INSERT OVERWRITE $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
|
||||
s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
|
||||
assert(e44.getMessage.contains(s"does not have [update] privilege" +
|
||||
s" on [$namespace1/$outputTable1]"))
|
||||
// AppendData: Insert Using a SELECT Statement
|
||||
val e43 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
|
||||
s" SELECT * from $catalogV2.$namespace1.$table1")))
|
||||
assert(e43.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
|
||||
|
||||
// OverwriteByExpression: Insert Overwrite
|
||||
val e44 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"INSERT OVERWRITE $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
|
||||
s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
|
||||
assert(e44.getMessage.contains(s"does not have [update] privilege" +
|
||||
s" on [$namespace1/$outputTable1]"))
|
||||
}
|
||||
}
|
||||
|
||||
test("[KYUUBI #3424] MERGE INTO") {
|
||||
val mergeIntoSql =
|
||||
s"""
|
||||
|MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target
|
||||
|USING $catalogV2.$namespace1.$table1 AS source
|
||||
|ON target.id = source.id
|
||||
|WHEN MATCHED AND (target.name='delete') THEN DELETE
|
||||
|WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city
|
||||
withSingleCallEnabled {
|
||||
val mergeIntoSql =
|
||||
s"""
|
||||
|MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target
|
||||
|USING $catalogV2.$namespace1.$table1 AS source
|
||||
|ON target.id = source.id
|
||||
|WHEN MATCHED AND (target.name='delete') THEN DELETE
|
||||
|WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city
|
||||
""".stripMargin
|
||||
|
||||
// MergeIntoTable: Using a MERGE INTO Statement
|
||||
val e1 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(mergeIntoSql)))
|
||||
assert(e1.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/id]"))
|
||||
|
||||
withSingleCallEnabled {
|
||||
val e2 = intercept[AccessControlException](
|
||||
// MergeIntoTable: Using a MERGE INTO Statement
|
||||
val e1 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(mergeIntoSql)))
|
||||
assert(e2.getMessage.contains(s"does not have" +
|
||||
s" [select] privilege" +
|
||||
s" on [$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," +
|
||||
s" [update] privilege on [$namespace1/$outputTable1]"))
|
||||
assert(e1.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
|
||||
|
||||
withSingleCallEnabled {
|
||||
val e2 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(mergeIntoSql)))
|
||||
assert(e2.getMessage.contains(s"does not have" +
|
||||
s" [select] privilege" +
|
||||
s" on [$namespace1/$table1/city,$namespace1/table1/id,$namespace1/$table1/name]," +
|
||||
s" [update] privilege on [$namespace1/$outputTable1]"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,17 +228,14 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu
|
||||
|
||||
test("[KYUUBI #3424] CACHE TABLE") {
|
||||
// CacheTable
|
||||
val e7 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"CACHE TABLE $cacheTable1" +
|
||||
s" AS select * from $catalogV2.$namespace1.$table1")))
|
||||
if (isSparkV32OrGreater) {
|
||||
withSingleCallEnabled {
|
||||
val e7 = intercept[AccessControlException](
|
||||
doAs(
|
||||
someone,
|
||||
sql(s"CACHE TABLE $cacheTable1" +
|
||||
s" AS select * from $catalogV2.$namespace1.$table1")))
|
||||
assert(e7.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$namespace1/$table1/id]"))
|
||||
} else {
|
||||
assert(e7.getMessage.contains(s"does not have [select] privilege" +
|
||||
s" on [$catalogV2.$namespace1/$table1]"))
|
||||
s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user