[KYUUBI #4712] Bump Spark from 3.2.3 to 3.2.4
### _Why are the changes needed?_ Fixes #4712 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4718 from anurag-rajawat/upgrade-spark. Closes #4712 79dcf1b79 [Anurag Rajawat] Bump Spark from 3.2.3 to 3.2.4 Authored-by: Anurag Rajawat <anuragsinghrajawat22@gmail.com> Signed-off-by: liangbowen <liangbowen@gf.com.cn>
This commit is contained in:
parent
db46b5b320
commit
93ba8f762f
2
.github/workflows/master.yml
vendored
2
.github/workflows/master.yml
vendored
@ -61,7 +61,7 @@ jobs:
|
||||
comment: 'verify-on-spark-3.1-binary'
|
||||
- java: 8
|
||||
spark: '3.3'
|
||||
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.3 -Dspark.archive.name=spark-3.2.3-bin-hadoop3.2.tgz'
|
||||
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz'
|
||||
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.IcebergTest'
|
||||
comment: 'verify-on-spark-3.2-binary'
|
||||
env:
|
||||
|
||||
@ -17,4 +17,5 @@
|
||||
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionInfoFunctionExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.FunctionIdentifierFunctionExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.QualifiedNameStringFunctionExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.StringFunctionExtractor
|
||||
|
||||
@ -17,4 +17,5 @@
|
||||
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionInfoFunctionTypeExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.FunctionIdentifierFunctionTypeExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.FunctionNameFunctionTypeExtractor
|
||||
org.apache.kyuubi.plugin.spark.authz.serde.TempMarkerFunctionTypeExtractor
|
||||
|
||||
@ -1,29 +0,0 @@
|
||||
[ {
|
||||
"classname" : "org.apache.kyuubi.plugin.spark.authz.util.PermanentViewMarker",
|
||||
"scanDescs" : [ {
|
||||
"fieldName" : "catalogTable",
|
||||
"fieldExtractor" : "CatalogTableTableExtractor",
|
||||
"catalogDesc" : null
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.catalyst.catalog.HiveTableRelation",
|
||||
"scanDescs" : [ {
|
||||
"fieldName" : "tableMeta",
|
||||
"fieldExtractor" : "CatalogTableTableExtractor",
|
||||
"catalogDesc" : null
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.execution.datasources.LogicalRelation",
|
||||
"scanDescs" : [ {
|
||||
"fieldName" : "catalogTable",
|
||||
"fieldExtractor" : "CatalogTableOptionTableExtractor",
|
||||
"catalogDesc" : null
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation",
|
||||
"scanDescs" : [ {
|
||||
"fieldName" : null,
|
||||
"fieldExtractor" : "DataSourceV2RelationTableExtractor",
|
||||
"catalogDesc" : null
|
||||
} ]
|
||||
} ]
|
||||
@ -0,0 +1,89 @@
|
||||
[ {
|
||||
"classname" : "org.apache.kyuubi.plugin.spark.authz.util.PermanentViewMarker",
|
||||
"scanDescs" : [ {
|
||||
"fieldName" : "catalogTable",
|
||||
"fieldExtractor" : "CatalogTableTableExtractor",
|
||||
"catalogDesc" : null
|
||||
} ],
|
||||
"functionDescs" : [ ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.catalyst.catalog.HiveTableRelation",
|
||||
"scanDescs" : [ {
|
||||
"fieldName" : "tableMeta",
|
||||
"fieldExtractor" : "CatalogTableTableExtractor",
|
||||
"catalogDesc" : null
|
||||
} ],
|
||||
"functionDescs" : [ ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.execution.datasources.LogicalRelation",
|
||||
"scanDescs" : [ {
|
||||
"fieldName" : "catalogTable",
|
||||
"fieldExtractor" : "CatalogTableOptionTableExtractor",
|
||||
"catalogDesc" : null
|
||||
} ],
|
||||
"functionDescs" : [ ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation",
|
||||
"scanDescs" : [ {
|
||||
"fieldName" : null,
|
||||
"fieldExtractor" : "DataSourceV2RelationTableExtractor",
|
||||
"catalogDesc" : null
|
||||
} ],
|
||||
"functionDescs" : [ ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hive.HiveGenericUDF",
|
||||
"scanDescs" : [ ],
|
||||
"functionDescs" : [ {
|
||||
"fieldName" : "name",
|
||||
"fieldExtractor" : "QualifiedNameStringFunctionExtractor",
|
||||
"databaseDesc" : null,
|
||||
"functionTypeDesc" : {
|
||||
"fieldName" : "name",
|
||||
"fieldExtractor" : "FunctionNameFunctionTypeExtractor",
|
||||
"skipTypes" : [ "TEMP", "SYSTEM" ]
|
||||
},
|
||||
"isInput" : true
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hive.HiveGenericUDTF",
|
||||
"scanDescs" : [ ],
|
||||
"functionDescs" : [ {
|
||||
"fieldName" : "name",
|
||||
"fieldExtractor" : "QualifiedNameStringFunctionExtractor",
|
||||
"databaseDesc" : null,
|
||||
"functionTypeDesc" : {
|
||||
"fieldName" : "name",
|
||||
"fieldExtractor" : "FunctionNameFunctionTypeExtractor",
|
||||
"skipTypes" : [ "TEMP", "SYSTEM" ]
|
||||
},
|
||||
"isInput" : true
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hive.HiveSimpleUDF",
|
||||
"scanDescs" : [ ],
|
||||
"functionDescs" : [ {
|
||||
"fieldName" : "name",
|
||||
"fieldExtractor" : "QualifiedNameStringFunctionExtractor",
|
||||
"databaseDesc" : null,
|
||||
"functionTypeDesc" : {
|
||||
"fieldName" : "name",
|
||||
"fieldExtractor" : "FunctionNameFunctionTypeExtractor",
|
||||
"skipTypes" : [ "TEMP", "SYSTEM" ]
|
||||
},
|
||||
"isInput" : true
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hive.HiveUDAFFunction",
|
||||
"scanDescs" : [ ],
|
||||
"functionDescs" : [ {
|
||||
"fieldName" : "name",
|
||||
"fieldExtractor" : "QualifiedNameStringFunctionExtractor",
|
||||
"databaseDesc" : null,
|
||||
"functionTypeDesc" : {
|
||||
"fieldName" : "name",
|
||||
"fieldExtractor" : "FunctionNameFunctionTypeExtractor",
|
||||
"skipTypes" : [ "TEMP", "SYSTEM" ]
|
||||
},
|
||||
"isInput" : true
|
||||
} ]
|
||||
} ]
|
||||
@ -1243,14 +1243,6 @@
|
||||
"fieldName" : "query",
|
||||
"fieldExtractor" : "LogicalPlanQueryExtractor"
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand",
|
||||
"tableDescs" : [ ],
|
||||
"opType" : "QUERY",
|
||||
"queryDescs" : [ {
|
||||
"fieldName" : "query",
|
||||
"fieldExtractor" : "LogicalPlanQueryExtractor"
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.execution.datasources.RefreshTable",
|
||||
"tableDescs" : [ {
|
||||
@ -1293,6 +1285,14 @@
|
||||
"fieldName" : "query",
|
||||
"fieldExtractor" : "LogicalPlanQueryExtractor"
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand",
|
||||
"tableDescs" : [ ],
|
||||
"opType" : "QUERY",
|
||||
"queryDescs" : [ {
|
||||
"fieldName" : "query",
|
||||
"fieldExtractor" : "LogicalPlanQueryExtractor"
|
||||
} ]
|
||||
}, {
|
||||
"classname" : "org.apache.spark.sql.hive.execution.InsertIntoHiveTable",
|
||||
"tableDescs" : [ {
|
||||
|
||||
@ -235,4 +235,27 @@ object PrivilegesBuilder {
|
||||
}
|
||||
(inputObjs, outputObjs, opType)
|
||||
}
|
||||
|
||||
/**
|
||||
* Build input privilege objects from a Spark's LogicalPlan for hive permanent functions
|
||||
*
|
||||
* For `Command`s and other queries, build inputs.
|
||||
*
|
||||
* @param plan A Spark LogicalPlan
|
||||
*/
|
||||
def buildFunctionPrivileges(
|
||||
plan: LogicalPlan,
|
||||
spark: SparkSession): PrivilegesAndOpType = {
|
||||
val inputObjs = new ArrayBuffer[PrivilegeObject]
|
||||
plan transformAllExpressions {
|
||||
case hiveFunction: Expression if isKnowFunction(hiveFunction) =>
|
||||
val functionSpec: ScanSpec = getFunctionSpec(hiveFunction)
|
||||
if (functionSpec.functionDescs.exists(!_.functionTypeDesc.get.skip(hiveFunction, spark))) {
|
||||
functionSpec.functions(hiveFunction).foreach(func =>
|
||||
inputObjs += PrivilegeObject(func))
|
||||
}
|
||||
hiveFunction
|
||||
}
|
||||
(inputObjs, Seq.empty, OperationType.QUERY)
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.kyuubi.plugin.spark.authz.serde
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
@ -94,7 +95,8 @@ case class TableCommandSpec(
|
||||
|
||||
case class ScanSpec(
|
||||
classname: String,
|
||||
scanDescs: Seq[ScanDesc]) extends CommandSpec {
|
||||
scanDescs: Seq[ScanDesc],
|
||||
functionDescs: Seq[FunctionDesc] = Seq.empty) extends CommandSpec {
|
||||
override def opType: String = OperationType.QUERY.toString
|
||||
def tables: (LogicalPlan, SparkSession) => Seq[Table] = (plan, spark) => {
|
||||
scanDescs.flatMap { td =>
|
||||
@ -107,4 +109,16 @@ case class ScanSpec(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def functions: (Expression) => Seq[Function] = (expr) => {
|
||||
functionDescs.flatMap { fd =>
|
||||
try {
|
||||
Some(fd.extract(expr))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
LOG.debug(fd.error(expr, e))
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,12 +20,23 @@ package org.apache.kyuubi.plugin.spark.authz.serde
|
||||
import org.apache.spark.sql.catalyst.FunctionIdentifier
|
||||
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.serde.FunctionExtractor.buildFunctionIdentFromQualifiedName
|
||||
|
||||
trait FunctionExtractor extends (AnyRef => Function) with Extractor
|
||||
|
||||
object FunctionExtractor {
|
||||
val functionExtractors: Map[String, FunctionExtractor] = {
|
||||
loadExtractorsToMap[FunctionExtractor]
|
||||
}
|
||||
|
||||
def buildFunctionIdentFromQualifiedName(qualifiedName: String): (String, Option[String]) = {
|
||||
val parts: Array[String] = qualifiedName.split("\\.", 2)
|
||||
if (parts.length == 1) {
|
||||
(qualifiedName, None)
|
||||
} else {
|
||||
(parts.last, Some(parts.head))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -37,6 +48,17 @@ class StringFunctionExtractor extends FunctionExtractor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* * String
|
||||
*/
|
||||
class QualifiedNameStringFunctionExtractor extends FunctionExtractor {
|
||||
override def apply(v1: AnyRef): Function = {
|
||||
val qualifiedName: String = v1.asInstanceOf[String]
|
||||
val (funcName, database) = buildFunctionIdentFromQualifiedName(qualifiedName)
|
||||
Function(database, funcName)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* org.apache.spark.sql.catalyst.FunctionIdentifier
|
||||
*/
|
||||
|
||||
@ -19,8 +19,11 @@ package org.apache.kyuubi.plugin.spark.authz.serde
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.FunctionIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.serde.FunctionExtractor.buildFunctionIdentFromQualifiedName
|
||||
import org.apache.kyuubi.plugin.spark.authz.serde.FunctionType.{FunctionType, PERMANENT, SYSTEM, TEMP}
|
||||
import org.apache.kyuubi.plugin.spark.authz.serde.FunctionTypeExtractor.getFunctionType
|
||||
|
||||
object FunctionType extends Enumeration {
|
||||
type FunctionType = Value
|
||||
@ -33,6 +36,17 @@ object FunctionTypeExtractor {
|
||||
val functionTypeExtractors: Map[String, FunctionTypeExtractor] = {
|
||||
loadExtractorsToMap[FunctionTypeExtractor]
|
||||
}
|
||||
|
||||
def getFunctionType(fi: FunctionIdentifier, catalog: SessionCatalog): FunctionType = {
|
||||
fi match {
|
||||
case permanent if catalog.isPersistentFunction(permanent) =>
|
||||
PERMANENT
|
||||
case system if catalog.isRegisteredFunction(system) =>
|
||||
SYSTEM
|
||||
case _ =>
|
||||
TEMP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -66,14 +80,18 @@ class FunctionIdentifierFunctionTypeExtractor extends FunctionTypeExtractor {
|
||||
override def apply(v1: AnyRef, spark: SparkSession): FunctionType = {
|
||||
val catalog = spark.sessionState.catalog
|
||||
val fi = v1.asInstanceOf[FunctionIdentifier]
|
||||
if (catalog.isTemporaryFunction(fi)) {
|
||||
TEMP
|
||||
} else if (catalog.isPersistentFunction(fi)) {
|
||||
PERMANENT
|
||||
} else if (catalog.isRegisteredFunction(fi)) {
|
||||
SYSTEM
|
||||
} else {
|
||||
TEMP
|
||||
}
|
||||
getFunctionType(fi, catalog)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* String
|
||||
*/
|
||||
class FunctionNameFunctionTypeExtractor extends FunctionTypeExtractor {
|
||||
override def apply(v1: AnyRef, spark: SparkSession): FunctionType = {
|
||||
val catalog: SessionCatalog = spark.sessionState.catalog
|
||||
val qualifiedName: String = v1.asInstanceOf[String]
|
||||
val (funcName, database) = buildFunctionIdentFromQualifiedName(qualifiedName)
|
||||
getFunctionType(FunctionIdentifier(funcName, database), catalog)
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,9 +66,10 @@ package object serde {
|
||||
}
|
||||
|
||||
final private lazy val SCAN_SPECS: Map[String, ScanSpec] = {
|
||||
val is = getClass.getClassLoader.getResourceAsStream("scan_command_spec.json")
|
||||
val is = getClass.getClassLoader.getResourceAsStream("scan_spec.json")
|
||||
mapper.readValue(is, new TypeReference[Array[ScanSpec]] {})
|
||||
.map(e => (e.classname, e)).toMap
|
||||
.map(e => (e.classname, e))
|
||||
.filter(t => t._2.scanDescs.nonEmpty).toMap
|
||||
}
|
||||
|
||||
def isKnownScan(r: AnyRef): Boolean = {
|
||||
@ -79,6 +80,21 @@ package object serde {
|
||||
SCAN_SPECS(r.getClass.getName)
|
||||
}
|
||||
|
||||
final private lazy val FUNCTION_SPECS: Map[String, ScanSpec] = {
|
||||
val is = getClass.getClassLoader.getResourceAsStream("scan_spec.json")
|
||||
mapper.readValue(is, new TypeReference[Array[ScanSpec]] {})
|
||||
.map(e => (e.classname, e))
|
||||
.filter(t => t._2.functionDescs.nonEmpty).toMap
|
||||
}
|
||||
|
||||
def isKnowFunction(r: AnyRef): Boolean = {
|
||||
FUNCTION_SPECS.contains(r.getClass.getName)
|
||||
}
|
||||
|
||||
def getFunctionSpec(r: AnyRef): ScanSpec = {
|
||||
FUNCTION_SPECS(r.getClass.getName)
|
||||
}
|
||||
|
||||
def operationType(plan: LogicalPlan): OperationType = {
|
||||
val classname = plan.getClass.getName
|
||||
TABLE_COMMAND_SPECS.get(classname)
|
||||
|
||||
@ -0,0 +1,196 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.plugin.spark.authz
|
||||
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
|
||||
// scalastyle:off
|
||||
import org.scalatest.funsuite.AnyFunSuite
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.OperationType.QUERY
|
||||
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
|
||||
|
||||
abstract class FunctionPrivilegesBuilderSuite extends AnyFunSuite
|
||||
with SparkSessionProvider with BeforeAndAfterAll with BeforeAndAfterEach {
|
||||
// scalastyle:on
|
||||
|
||||
protected def withTable(t: String)(f: String => Unit): Unit = {
|
||||
try {
|
||||
f(t)
|
||||
} finally {
|
||||
sql(s"DROP TABLE IF EXISTS $t")
|
||||
}
|
||||
}
|
||||
|
||||
protected def withDatabase(t: String)(f: String => Unit): Unit = {
|
||||
try {
|
||||
f(t)
|
||||
} finally {
|
||||
sql(s"DROP DATABASE IF EXISTS $t")
|
||||
}
|
||||
}
|
||||
|
||||
protected def checkColumns(plan: LogicalPlan, cols: Seq[String]): Unit = {
|
||||
val (in, out, _) = PrivilegesBuilder.build(plan, spark)
|
||||
assert(out.isEmpty, "Queries shall not check output privileges")
|
||||
val po = in.head
|
||||
assert(po.actionType === PrivilegeObjectActionType.OTHER)
|
||||
assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
|
||||
assert(po.columns === cols)
|
||||
}
|
||||
|
||||
protected def checkColumns(query: String, cols: Seq[String]): Unit = {
|
||||
checkColumns(sql(query).queryExecution.optimizedPlan, cols)
|
||||
}
|
||||
|
||||
protected val reusedDb: String = getClass.getSimpleName
|
||||
protected val reusedDb2: String = getClass.getSimpleName + "2"
|
||||
protected val reusedTable: String = reusedDb + "." + getClass.getSimpleName
|
||||
protected val reusedTableShort: String = reusedTable.split("\\.").last
|
||||
protected val reusedPartTable: String = reusedTable + "_part"
|
||||
protected val reusedPartTableShort: String = reusedPartTable.split("\\.").last
|
||||
protected val functionCount = 3
|
||||
protected val functionNamePrefix = "kyuubi_fun_"
|
||||
protected val tempFunNamePrefix = "kyuubi_temp_fun_"
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
sql(s"CREATE DATABASE IF NOT EXISTS $reusedDb")
|
||||
sql(s"CREATE DATABASE IF NOT EXISTS $reusedDb2")
|
||||
sql(s"CREATE TABLE IF NOT EXISTS $reusedTable" +
|
||||
s" (key int, value string) USING parquet")
|
||||
sql(s"CREATE TABLE IF NOT EXISTS $reusedPartTable" +
|
||||
s" (key int, value string, pid string) USING parquet" +
|
||||
s" PARTITIONED BY(pid)")
|
||||
// scalastyle:off
|
||||
(0 until functionCount).foreach { index =>
|
||||
{
|
||||
sql(s"CREATE FUNCTION ${reusedDb}.${functionNamePrefix}${index} AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash'")
|
||||
sql(s"CREATE FUNCTION ${reusedDb2}.${functionNamePrefix}${index} AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash'")
|
||||
sql(s"CREATE TEMPORARY FUNCTION ${tempFunNamePrefix}${index} AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash'")
|
||||
}
|
||||
}
|
||||
sql(s"USE ${reusedDb2}")
|
||||
// scalastyle:on
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
Seq(reusedTable, reusedPartTable).foreach { t =>
|
||||
sql(s"DROP TABLE IF EXISTS $t")
|
||||
}
|
||||
|
||||
Seq(reusedDb, reusedDb2).foreach { db =>
|
||||
(0 until functionCount).foreach { index =>
|
||||
sql(s"DROP FUNCTION ${db}.${functionNamePrefix}${index}")
|
||||
}
|
||||
sql(s"DROP DATABASE IF EXISTS ${db}")
|
||||
}
|
||||
|
||||
spark.stop()
|
||||
super.afterAll()
|
||||
}
|
||||
}
|
||||
|
||||
class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite {
|
||||
|
||||
override protected val catalogImpl: String = "hive"
|
||||
|
||||
test("Function Call Query") {
|
||||
val plan = sql(s"SELECT kyuubi_fun_1('data'), " +
|
||||
s"kyuubi_fun_2(value), " +
|
||||
s"${reusedDb}.kyuubi_fun_0(value), " +
|
||||
s"kyuubi_temp_fun_1('data2')," +
|
||||
s"kyuubi_temp_fun_2(key) " +
|
||||
s"FROM $reusedTable").queryExecution.analyzed
|
||||
val (inputs, _, _) = PrivilegesBuilder.buildFunctionPrivileges(plan, spark)
|
||||
assert(inputs.size === 3)
|
||||
inputs.foreach { po =>
|
||||
assert(po.actionType === PrivilegeObjectActionType.OTHER)
|
||||
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
|
||||
assert(po.dbname startsWith reusedDb.toLowerCase)
|
||||
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
|
||||
val accessType = ranger.AccessType(po, QUERY, isInput = true)
|
||||
assert(accessType === AccessType.SELECT)
|
||||
}
|
||||
}
|
||||
|
||||
test("Function Call Query with Quoted Name") {
|
||||
val plan = sql(s"SELECT `kyuubi_fun_1`('data'), " +
|
||||
s"`kyuubi_fun_2`(value), " +
|
||||
s"`${reusedDb}`.`kyuubi_fun_0`(value), " +
|
||||
s"`kyuubi_temp_fun_1`('data2')," +
|
||||
s"`kyuubi_temp_fun_2`(key) " +
|
||||
s"FROM $reusedTable").queryExecution.analyzed
|
||||
val (inputs, _, _) = PrivilegesBuilder.buildFunctionPrivileges(plan, spark)
|
||||
assert(inputs.size === 3)
|
||||
inputs.foreach { po =>
|
||||
assert(po.actionType === PrivilegeObjectActionType.OTHER)
|
||||
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
|
||||
assert(po.dbname startsWith reusedDb.toLowerCase)
|
||||
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
|
||||
val accessType = ranger.AccessType(po, QUERY, isInput = true)
|
||||
assert(accessType === AccessType.SELECT)
|
||||
}
|
||||
}
|
||||
|
||||
test("Simple Function Call Query") {
|
||||
val plan = sql(s"SELECT kyuubi_fun_1('data'), " +
|
||||
s"kyuubi_fun_0('value'), " +
|
||||
s"${reusedDb}.kyuubi_fun_0('value'), " +
|
||||
s"${reusedDb}.kyuubi_fun_2('value'), " +
|
||||
s"kyuubi_temp_fun_1('data2')," +
|
||||
s"kyuubi_temp_fun_2('key') ").queryExecution.analyzed
|
||||
val (inputs, _, _) = PrivilegesBuilder.buildFunctionPrivileges(plan, spark)
|
||||
assert(inputs.size === 4)
|
||||
inputs.foreach { po =>
|
||||
assert(po.actionType === PrivilegeObjectActionType.OTHER)
|
||||
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
|
||||
assert(po.dbname startsWith reusedDb.toLowerCase)
|
||||
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
|
||||
val accessType = ranger.AccessType(po, QUERY, isInput = true)
|
||||
assert(accessType === AccessType.SELECT)
|
||||
}
|
||||
}
|
||||
|
||||
test("Function Call In CAST Command") {
|
||||
val table = "castTable"
|
||||
withTable(table) { table =>
|
||||
val plan = sql(s"CREATE TABLE ${table} " +
|
||||
s"SELECT kyuubi_fun_1('data') col1, " +
|
||||
s"${reusedDb2}.kyuubi_fun_2(value) col2, " +
|
||||
s"kyuubi_fun_0(value) col3, " +
|
||||
s"kyuubi_fun_2('value') col4, " +
|
||||
s"${reusedDb}.kyuubi_fun_2('value') col5, " +
|
||||
s"${reusedDb}.kyuubi_fun_1('value') col6, " +
|
||||
s"kyuubi_temp_fun_1('data2') col7, " +
|
||||
s"kyuubi_temp_fun_2(key) col8 " +
|
||||
s"FROM ${reusedTable} WHERE ${reusedDb2}.kyuubi_fun_1(key)='123'").queryExecution.analyzed
|
||||
val (inputs, _, _) = PrivilegesBuilder.buildFunctionPrivileges(plan, spark)
|
||||
assert(inputs.size === 7)
|
||||
inputs.foreach { po =>
|
||||
assert(po.actionType === PrivilegeObjectActionType.OTHER)
|
||||
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
|
||||
assert(po.dbname startsWith reusedDb.toLowerCase)
|
||||
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
|
||||
val accessType = ranger.AccessType(po, QUERY, isInput = true)
|
||||
assert(accessType === AccessType.SELECT)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -34,13 +34,16 @@ object JsonSpecFileGenerator {
|
||||
writeCommandSpecJson("database", DatabaseCommands.data)
|
||||
writeCommandSpecJson("table", TableCommands.data ++ IcebergCommands.data)
|
||||
writeCommandSpecJson("function", FunctionCommands.data)
|
||||
writeCommandSpecJson("scan", Scans.data)
|
||||
writeCommandSpecJson("scan", Scans.data, isScanResource = true)
|
||||
}
|
||||
|
||||
def writeCommandSpecJson[T <: CommandSpec](commandType: String, specArr: Array[T]): Unit = {
|
||||
def writeCommandSpecJson[T <: CommandSpec](
|
||||
commandType: String,
|
||||
specArr: Array[T],
|
||||
isScanResource: Boolean = false): Unit = {
|
||||
val pluginHome = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
|
||||
.split("target").head
|
||||
val filename = s"${commandType}_command_spec.json"
|
||||
val filename = s"${commandType}${if (isScanResource) "" else "_command"}_spec.json"
|
||||
val writer = {
|
||||
val p = Paths.get(pluginHome, "src", "main", "resources", filename)
|
||||
Files.newBufferedWriter(p, StandardCharsets.UTF_8)
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.kyuubi.plugin.spark.authz.gen
|
||||
|
||||
import org.apache.kyuubi.plugin.spark.authz.serde._
|
||||
import org.apache.kyuubi.plugin.spark.authz.serde.FunctionType._
|
||||
|
||||
object Scans {
|
||||
|
||||
@ -57,9 +58,34 @@ object Scans {
|
||||
ScanSpec(r, Seq(tableDesc))
|
||||
}
|
||||
|
||||
val HiveSimpleUDF = {
|
||||
ScanSpec(
|
||||
"org.apache.spark.sql.hive.HiveSimpleUDF",
|
||||
Seq.empty,
|
||||
Seq(FunctionDesc(
|
||||
"name",
|
||||
classOf[QualifiedNameStringFunctionExtractor],
|
||||
functionTypeDesc = Some(FunctionTypeDesc(
|
||||
"name",
|
||||
classOf[FunctionNameFunctionTypeExtractor],
|
||||
Seq(TEMP, SYSTEM))),
|
||||
isInput = true)))
|
||||
}
|
||||
|
||||
val HiveGenericUDF = HiveSimpleUDF.copy(classname = "org.apache.spark.sql.hive.HiveGenericUDF")
|
||||
|
||||
val HiveUDAFFunction = HiveSimpleUDF.copy(classname =
|
||||
"org.apache.spark.sql.hive.HiveUDAFFunction")
|
||||
|
||||
val HiveGenericUDTF = HiveSimpleUDF.copy(classname = "org.apache.spark.sql.hive.HiveGenericUDTF")
|
||||
|
||||
val data: Array[ScanSpec] = Array(
|
||||
HiveTableRelation,
|
||||
LogicalRelation,
|
||||
DataSourceV2Relation,
|
||||
PermanentViewMarker)
|
||||
PermanentViewMarker,
|
||||
HiveSimpleUDF,
|
||||
HiveGenericUDF,
|
||||
HiveUDAFFunction,
|
||||
HiveGenericUDTF)
|
||||
}
|
||||
|
||||
@ -637,7 +637,7 @@ object TableCommands {
|
||||
"org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand"),
|
||||
InsertIntoHadoopFsRelationCommand,
|
||||
InsertIntoDataSourceDir.copy(classname =
|
||||
"org.apache.spark.sql.execution.datasources.InsertIntoDataSourceDirCommand"),
|
||||
"org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand"),
|
||||
InsertIntoHiveTable,
|
||||
LoadData,
|
||||
MergeIntoTable,
|
||||
|
||||
2
pom.xml
2
pom.xml
@ -2337,7 +2337,7 @@
|
||||
<module>extensions/spark/kyuubi-extension-spark-3-2</module>
|
||||
</modules>
|
||||
<properties>
|
||||
<spark.version>3.2.3</spark.version>
|
||||
<spark.version>3.2.4</spark.version>
|
||||
<spark.binary.version>3.2</spark.binary.version>
|
||||
<delta.version>2.0.2</delta.version>
|
||||
<spark.archive.name>spark-${spark.version}-bin-hadoop3.2.tgz</spark.archive.name>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user