[KYUUBI #4879] Refactor and promote relection utils and cleanup similar reflection methods

### _Why are the changes needed?_

- apply the usage of `ReflectUtils` and `Dyn*` to the modules of engines and plugins (eg. Spark engine, Authz plugin, lineage plugin, beeline)
- remove similar redundant methods for calling reflected methods or getting field values
- unified reflection helper methods with type casting support, as `getField[T]` for getting field values from `getFields`, `invokeAs[T]` for invoking methods in `getMethods`.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4879 from bowenliang123/reflect-use.

Closes #4879

c685fb67d [liangbowen] bug fix for "Cannot bind static field options" when executing "bin/beeline"
fc1fdf1de [liangbowen] import
59c3dd032 [liangbowen] comment
c435c131d [liangbowen] reflect util usage

Authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: liangbowen <liangbowen@gf.com.cn>
This commit is contained in:
liangbowen 2023-06-06 18:59:18 +08:00
parent 408862af72
commit d0675a35a7
24 changed files with 220 additions and 207 deletions

View File

@ -50,7 +50,7 @@ object AccessRequest {
"getRolesFromUserAndGroups",
(classOf[String], userName),
(classOf[JSet[String]], userGroups))
invoke(req, "setUserRoles", (classOf[JSet[String]], roles))
invokeAs[Unit](req, "setUserRoles", (classOf[JSet[String]], roles))
} catch {
case _: Exception =>
}
@ -61,7 +61,7 @@ object AccessRequest {
}
try {
val clusterName = invokeAs[String](SparkRangerAdminPlugin, "getClusterName")
invoke(req, "setClusterName", (classOf[String], clusterName))
invokeAs[Unit](req, "setClusterName", (classOf[String], clusterName))
} catch {
case _: Exception =>
}
@ -74,8 +74,8 @@ object AccessRequest {
private def getUserGroupsFromUserStore(user: UserGroupInformation): Option[JSet[String]] = {
try {
val storeEnricher = invoke(SparkRangerAdminPlugin, "getUserStoreEnricher")
val userStore = invoke(storeEnricher, "getRangerUserStore")
val storeEnricher = invokeAs[AnyRef](SparkRangerAdminPlugin, "getUserStoreEnricher")
val userStore = invokeAs[AnyRef](storeEnricher, "getRangerUserStore")
val userGroupMapping =
invokeAs[JHashMap[String, JSet[String]]](userStore, "getUserGroupMapping")
Some(userGroupMapping.get(user.getShortUserName))

View File

@ -46,7 +46,7 @@ class RuleReplaceShowObjectCommands extends Rule[LogicalPlan] {
case class FilteredShowTablesCommand(delegated: RunnableCommand)
extends FilteredShowObjectCommand(delegated) {
private val isExtended = getFieldVal[Boolean](delegated, "isExtended")
private val isExtended = getField[Boolean](delegated, "isExtended")
override protected def isAllowed(r: Row, ugi: UserGroupInformation): Boolean = {
val database = r.getString(0)

View File

@ -72,7 +72,7 @@ case class ColumnDesc(
fieldName: String,
fieldExtractor: String) extends Descriptor {
override def extract(v: AnyRef): Seq[String] = {
val columnsVal = invoke(v, fieldName)
val columnsVal = invokeAs[AnyRef](v, fieldName)
val columnExtractor = lookupExtractor[ColumnExtractor](fieldExtractor)
columnExtractor(columnsVal)
}
@ -91,7 +91,7 @@ case class DatabaseDesc(
catalogDesc: Option[CatalogDesc] = None,
isInput: Boolean = false) extends Descriptor {
override def extract(v: AnyRef): Database = {
val databaseVal = invoke(v, fieldName)
val databaseVal = invokeAs[AnyRef](v, fieldName)
val databaseExtractor = lookupExtractor[DatabaseExtractor](fieldExtractor)
val db = databaseExtractor(databaseVal)
if (db.catalog.isEmpty && catalogDesc.nonEmpty) {
@ -119,7 +119,7 @@ case class FunctionTypeDesc(
}
def extract(v: AnyRef, spark: SparkSession): FunctionType = {
val functionTypeVal = invoke(v, fieldName)
val functionTypeVal = invokeAs[AnyRef](v, fieldName)
val functionTypeExtractor = lookupExtractor[FunctionTypeExtractor](fieldExtractor)
functionTypeExtractor(functionTypeVal, spark)
}
@ -145,7 +145,7 @@ case class FunctionDesc(
functionTypeDesc: Option[FunctionTypeDesc] = None,
isInput: Boolean = false) extends Descriptor {
override def extract(v: AnyRef): Function = {
val functionVal = invoke(v, fieldName)
val functionVal = invokeAs[AnyRef](v, fieldName)
val functionExtractor = lookupExtractor[FunctionExtractor](fieldExtractor)
var function = functionExtractor(functionVal)
if (function.database.isEmpty) {
@ -170,7 +170,7 @@ case class QueryDesc(
fieldName: String,
fieldExtractor: String = "LogicalPlanQueryExtractor") extends Descriptor {
override def extract(v: AnyRef): Option[LogicalPlan] = {
val queryVal = invoke(v, fieldName)
val queryVal = invokeAs[AnyRef](v, fieldName)
val queryExtractor = lookupExtractor[QueryExtractor](fieldExtractor)
queryExtractor(queryVal)
}
@ -192,7 +192,7 @@ case class TableTypeDesc(
}
def extract(v: AnyRef, spark: SparkSession): TableType = {
val tableTypeVal = invoke(v, fieldName)
val tableTypeVal = invokeAs[AnyRef](v, fieldName)
val tableTypeExtractor = lookupExtractor[TableTypeExtractor](fieldExtractor)
tableTypeExtractor(tableTypeVal, spark)
}
@ -230,7 +230,7 @@ case class TableDesc(
}
def extract(v: AnyRef, spark: SparkSession): Option[Table] = {
val tableVal = invoke(v, fieldName)
val tableVal = invokeAs[AnyRef](v, fieldName)
val tableExtractor = lookupExtractor[TableExtractor](fieldExtractor)
val maybeTable = tableExtractor(spark, tableVal)
maybeTable.map { t =>
@ -257,7 +257,7 @@ case class ActionTypeDesc(
actionType: Option[String] = None) extends Descriptor {
override def extract(v: AnyRef): PrivilegeObjectActionType = {
actionType.map(PrivilegeObjectActionType.withName).getOrElse {
val actionTypeVal = invoke(v, fieldName)
val actionTypeVal = invokeAs[AnyRef](v, fieldName)
val actionTypeExtractor = lookupExtractor[ActionTypeExtractor](fieldExtractor)
actionTypeExtractor(actionTypeVal)
}
@ -274,7 +274,7 @@ case class CatalogDesc(
fieldName: String = "catalog",
fieldExtractor: String = "CatalogPluginCatalogExtractor") extends Descriptor {
override def extract(v: AnyRef): Option[String] = {
val catalogVal = invoke(v, fieldName)
val catalogVal = invokeAs[AnyRef](v, fieldName)
val catalogExtractor = lookupExtractor[CatalogExtractor](fieldExtractor)
catalogExtractor(catalogVal)
}
@ -292,7 +292,7 @@ case class ScanDesc(
val tableVal = if (fieldName == null) {
v
} else {
invoke(v, fieldName)
invokeAs[AnyRef](v, fieldName)
}
val tableExtractor = lookupExtractor[TableExtractor](fieldExtractor)
val maybeTable = tableExtractor(spark, tableVal)

View File

@ -69,9 +69,9 @@ class StringSeqOptionDatabaseExtractor extends DatabaseExtractor {
*/
class ResolvedNamespaceDatabaseExtractor extends DatabaseExtractor {
override def apply(v1: AnyRef): Database = {
val catalogVal = invoke(v1, "catalog")
val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
val namespace = getFieldVal[Seq[String]](v1, "namespace")
val namespace = getField[Seq[String]](v1, "namespace")
Database(catalog, quote(namespace))
}
}
@ -81,9 +81,9 @@ class ResolvedNamespaceDatabaseExtractor extends DatabaseExtractor {
*/
class ResolvedDBObjectNameDatabaseExtractor extends DatabaseExtractor {
override def apply(v1: AnyRef): Database = {
val catalogVal = invoke(v1, "catalog")
val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
val namespace = getFieldVal[Seq[String]](v1, "nameParts")
val namespace = getField[Seq[String]](v1, "nameParts")
Database(catalog, quote(namespace))
}
}

View File

@ -47,7 +47,7 @@ object TableExtractor {
*/
def getOwner(v: AnyRef): Option[String] = {
// org.apache.spark.sql.connector.catalog.Table
val table = invoke(v, "table")
val table = invokeAs[AnyRef](v, "table")
val properties = invokeAs[JMap[String, String]](table, "properties").asScala
properties.get("owner")
}
@ -97,9 +97,9 @@ class CatalogTableOptionTableExtractor extends TableExtractor {
*/
class ResolvedTableTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val catalogVal = invoke(v1, "catalog")
val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
val identifier = invoke(v1, "identifier")
val identifier = invokeAs[AnyRef](v1, "identifier")
val maybeTable = lookupExtractor[IdentifierTableExtractor].apply(spark, identifier)
val maybeOwner = TableExtractor.getOwner(v1)
maybeTable.map(_.copy(catalog = catalog, owner = maybeOwner))
@ -157,7 +157,7 @@ class LogicalRelationTableExtractor extends TableExtractor {
*/
class ResolvedDbObjectNameTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val catalogVal = invoke(v1, "catalog")
val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
val nameParts = invokeAs[Seq[String]](v1, "nameParts")
val namespace = nameParts.init.toArray
@ -173,9 +173,9 @@ class ResolvedIdentifierTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
v1.getClass.getName match {
case "org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier" =>
val catalogVal = invoke(v1, "catalog")
val catalogVal = invokeAs[AnyRef](v1, "catalog")
val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
val identifier = invoke(v1, "identifier")
val identifier = invokeAs[AnyRef](v1, "identifier")
val maybeTable = lookupExtractor[IdentifierTableExtractor].apply(spark, identifier)
maybeTable.map(_.copy(catalog = catalog))
case _ => None

View File

@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.util.ReservedKeys._
import org.apache.kyuubi.util.SemanticVersion
import org.apache.kyuubi.util.reflect.DynConstructors
import org.apache.kyuubi.util.reflect.ReflectUtils._
private[authz] object AuthZUtils {
@ -61,7 +62,7 @@ private[authz] object AuthZUtils {
def hasResolvedPermanentView(plan: LogicalPlan): Boolean = {
plan match {
case view: View if view.resolved && isSparkV31OrGreater =>
!getFieldVal[Boolean](view, "isTempView")
!getField[Boolean](view, "isTempView")
case _ =>
false
}
@ -69,7 +70,12 @@ private[authz] object AuthZUtils {
lazy val isRanger21orGreater: Boolean = {
try {
classOf[RangerBasePlugin].getConstructor(classOf[String], classOf[String], classOf[String])
DynConstructors.builder().impl(
classOf[RangerBasePlugin],
classOf[String],
classOf[String],
classOf[String])
.buildChecked[RangerBasePlugin]()
true
} catch {
case _: NoSuchMethodException =>

View File

@ -34,18 +34,18 @@ trait RangerConfigProvider {
* org.apache.ranger.authorization.hadoop.config.RangerConfiguration
* for Ranger 2.0 and below
*/
def getRangerConf: Configuration = {
val getRangerConf: Configuration = {
if (isRanger21orGreater) {
// for Ranger 2.1+
DynMethods.builder("getConfig")
.impl("org.apache.ranger.plugin.service.RangerBasePlugin")
.build()
.invoke[Configuration](this)
.buildChecked(this)
.invoke[Configuration]()
} else {
// for Ranger 2.0 and below
DynMethods.builder("getInstance")
.impl("org.apache.ranger.authorization.hadoop.config.RangerConfiguration")
.buildStatic()
.buildStaticChecked()
.invoke[Configuration]()
}
}

View File

@ -458,7 +458,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
admin, {
val hiveTableRelation = sql(s"SELECT * FROM $table")
.queryExecution.optimizedPlan.collectLeaves().head.asInstanceOf[HiveTableRelation]
assert(getFieldVal[Option[Statistics]](hiveTableRelation, "tableStats").nonEmpty)
assert(getField[Option[Statistics]](hiveTableRelation, "tableStats").nonEmpty)
})
}
}

View File

@ -18,7 +18,7 @@
package org.apache.kyuubi.plugin.lineage.helper
import scala.collection.immutable.ListMap
import scala.util.{Failure, Success, Try}
import scala.util.Try
import org.apache.spark.internal.Logging
import org.apache.spark.kyuubi.lineage.{LineageConf, SparkContextHelper}
@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, Data
import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
import org.apache.kyuubi.util.reflect.ReflectUtils._
trait LineageParser {
def sparkSession: SparkSession
@ -189,7 +190,7 @@ trait LineageParser {
plan match {
// For command
case p if p.nodeName == "CommandResult" =>
val commandPlan = getPlanField[LogicalPlan]("commandLogicalPlan", plan)
val commandPlan = getField[LogicalPlan](plan, "commandLogicalPlan")
extractColumnsLineage(commandPlan, parentColumnsLineage)
case p if p.nodeName == "AlterViewAsCommand" =>
val query =
@ -198,22 +199,22 @@ trait LineageParser {
} else {
getQuery(plan)
}
val view = getPlanField[TableIdentifier]("name", plan).unquotedString
val view = getField[TableIdentifier](plan, "name").unquotedString
extractColumnsLineage(query, parentColumnsLineage).map { case (k, v) =>
k.withName(s"$view.${k.name}") -> v
}
case p
if p.nodeName == "CreateViewCommand"
&& getPlanField[ViewType]("viewType", plan) == PersistedView =>
val view = getPlanField[TableIdentifier]("name", plan).unquotedString
&& getField[ViewType](plan, "viewType") == PersistedView =>
val view = getField[TableIdentifier](plan, "name").unquotedString
val outputCols =
getPlanField[Seq[(String, Option[String])]]("userSpecifiedColumns", plan).map(_._1)
getField[Seq[(String, Option[String])]](plan, "userSpecifiedColumns").map(_._1)
val query =
if (isSparkVersionAtMost("3.1")) {
sparkSession.sessionState.analyzer.execute(getPlanField[LogicalPlan]("child", plan))
sparkSession.sessionState.analyzer.execute(getField[LogicalPlan](plan, "child"))
} else {
getPlanField[LogicalPlan]("plan", plan)
getField[LogicalPlan](plan, "plan")
}
extractColumnsLineage(query, parentColumnsLineage).zipWithIndex.map {
@ -222,7 +223,7 @@ trait LineageParser {
}
case p if p.nodeName == "CreateDataSourceTableAsSelectCommand" =>
val table = getPlanField[CatalogTable]("table", plan).qualifiedName
val table = getField[CatalogTable](plan, "table").qualifiedName
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@ -230,7 +231,7 @@ trait LineageParser {
case p
if p.nodeName == "CreateHiveTableAsSelectCommand" ||
p.nodeName == "OptimizedCreateHiveTableAsSelectCommand" =>
val table = getPlanField[CatalogTable]("tableDesc", plan).qualifiedName
val table = getField[CatalogTable](plan, "tableDesc").qualifiedName
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@ -241,15 +242,15 @@ trait LineageParser {
val (table, namespace, catalog) =
if (isSparkVersionAtMost("3.2")) {
(
getPlanField[Identifier]("tableName", plan).name,
getPlanField[Identifier]("tableName", plan).namespace.mkString("."),
getPlanField[TableCatalog]("catalog", plan).name())
getField[Identifier](plan, "tableName").name,
getField[Identifier](plan, "tableName").namespace.mkString("."),
getField[TableCatalog](plan, "catalog").name())
} else {
(
getPlanMethod[Identifier]("tableName", plan).name(),
getPlanMethod[Identifier]("tableName", plan).namespace().mkString("."),
getCurrentPlanField[CatalogPlugin](
getPlanMethod[LogicalPlan]("left", plan),
invokeAs[Identifier](plan, "tableName").name(),
invokeAs[Identifier](plan, "tableName").namespace().mkString("."),
getField[CatalogPlugin](
invokeAs[LogicalPlan](plan, "left"),
"catalog").name())
}
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
@ -257,7 +258,7 @@ trait LineageParser {
}
case p if p.nodeName == "InsertIntoDataSourceCommand" =>
val logicalRelation = getPlanField[LogicalRelation]("logicalRelation", plan)
val logicalRelation = getField[LogicalRelation](plan, "logicalRelation")
val table = logicalRelation.catalogTable.map(_.qualifiedName).getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
@ -266,8 +267,8 @@ trait LineageParser {
case p if p.nodeName == "InsertIntoHadoopFsRelationCommand" =>
val table =
getPlanField[Option[CatalogTable]]("catalogTable", plan).map(_.qualifiedName).getOrElse(
"")
getField[Option[CatalogTable]](plan, "catalogTable").map(_.qualifiedName)
.getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
k.withName(s"$table.${k.name}") -> v
@ -277,15 +278,15 @@ trait LineageParser {
if p.nodeName == "InsertIntoDataSourceDirCommand" ||
p.nodeName == "InsertIntoHiveDirCommand" =>
val dir =
getPlanField[CatalogStorageFormat]("storage", plan).locationUri.map(_.toString).getOrElse(
"")
getField[CatalogStorageFormat](plan, "storage").locationUri.map(_.toString)
.getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if dir.nonEmpty =>
k.withName(s"`$dir`.${k.name}") -> v
}
case p if p.nodeName == "InsertIntoHiveTable" =>
val table = getPlanField[CatalogTable]("table", plan).qualifiedName
val table = getField[CatalogTable](plan, "table").qualifiedName
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@ -297,14 +298,14 @@ trait LineageParser {
if p.nodeName == "AppendData"
|| p.nodeName == "OverwriteByExpression"
|| p.nodeName == "OverwritePartitionsDynamic" =>
val table = getPlanField[NamedRelation]("table", plan).name
val table = getField[NamedRelation](plan, "table").name
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
case p if p.nodeName == "MergeIntoTable" =>
val matchedActions = getPlanField[Seq[MergeAction]]("matchedActions", plan)
val notMatchedActions = getPlanField[Seq[MergeAction]]("notMatchedActions", plan)
val matchedActions = getField[Seq[MergeAction]](plan, "matchedActions")
val notMatchedActions = getField[Seq[MergeAction]](plan, "notMatchedActions")
val allAssignments = (matchedActions ++ notMatchedActions).collect {
case UpdateAction(_, assignments) => assignments
case InsertAction(_, assignments) => assignments
@ -314,8 +315,8 @@ trait LineageParser {
assignment.key.asInstanceOf[Attribute],
assignment.value.references)
}: _*)
val targetTable = getPlanField[LogicalPlan]("targetTable", plan)
val sourceTable = getPlanField[LogicalPlan]("sourceTable", plan)
val targetTable = getField[LogicalPlan](plan, "targetTable")
val sourceTable = getField[LogicalPlan](plan, "sourceTable")
val targetColumnsLineage = extractColumnsLineage(
targetTable,
nextColumnsLlineage.map { case (k, _) => (k, AttributeSet(k)) })
@ -474,47 +475,7 @@ trait LineageParser {
}
}
private def getPlanField[T](field: String, plan: LogicalPlan): T = {
getFieldVal[T](plan, field)
}
private def getCurrentPlanField[T](curPlan: LogicalPlan, field: String): T = {
getFieldVal[T](curPlan, field)
}
private def getPlanMethod[T](name: String, plan: LogicalPlan): T = {
getMethod[T](plan, name)
}
private def getQuery(plan: LogicalPlan): LogicalPlan = {
getPlanField[LogicalPlan]("query", plan)
}
private def getFieldVal[T](o: Any, name: String): T = {
Try {
val field = o.getClass.getDeclaredField(name)
field.setAccessible(true)
field.get(o)
} match {
case Success(value) => value.asInstanceOf[T]
case Failure(e) =>
val candidates = o.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
throw new RuntimeException(s"$name not in $candidates", e)
}
}
private def getMethod[T](o: Any, name: String): T = {
Try {
val method = o.getClass.getDeclaredMethod(name)
method.invoke(o)
} match {
case Success(value) => value.asInstanceOf[T]
case Failure(e) =>
val candidates = o.getClass.getDeclaredMethods.map(_.getName).mkString("[", ",", "]")
throw new RuntimeException(s"$name not in $candidates", e)
}
}
private def getQuery(plan: LogicalPlan): LogicalPlan = getField[LogicalPlan](plan, "query")
}
case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends LineageParser

View File

@ -16,24 +16,25 @@
*/
package org.apache.kyuubi.engine.jdbc.connection
import java.sql.{Connection, DriverManager}
import java.sql.{Connection, Driver, DriverManager}
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_PROVIDER, ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_DRIVER_CLASS}
import org.apache.kyuubi.util.reflect.DynClasses
import org.apache.kyuubi.util.reflect.ReflectUtils._
abstract class AbstractConnectionProvider extends Logging {
protected val providers = loadProviders()
def getProviderClass(kyuubiConf: KyuubiConf): String = {
val specifiedDriverClass = kyuubiConf.get(ENGINE_JDBC_DRIVER_CLASS)
specifiedDriverClass.foreach(Class.forName)
specifiedDriverClass.getOrElse {
val driverClass: Class[_ <: Driver] = Option(
DynClasses.builder().impl(kyuubiConf.get(ENGINE_JDBC_DRIVER_CLASS).get)
.orNull().build[Driver]()).getOrElse {
val url = kyuubiConf.get(ENGINE_JDBC_CONNECTION_URL).get
DriverManager.getDriver(url).getClass.getCanonicalName
DriverManager.getDriver(url).getClass
}
driverClass.getCanonicalName
}
def create(kyuubiConf: KyuubiConf): Connection = {

View File

@ -26,7 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine
import org.apache.kyuubi.engine.spark.events.EngineEventsStore
import org.apache.kyuubi.service.ServiceState
import org.apache.kyuubi.util.reflect.DynClasses
import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}
/**
* Note that [[SparkUITab]] is private for Spark
@ -68,30 +68,29 @@ case class EngineTab(
val sparkServletContextHandlerClz = DynClasses.builder()
.impl("org.sparkproject.jetty.servlet.ServletContextHandler")
.impl("org.eclipse.jetty.servlet.ServletContextHandler")
.build()
val attachHandlerMethod = Class.forName("org.apache.spark.ui.SparkUI")
.getMethod("attachHandler", sparkServletContextHandlerClz)
val createRedirectHandlerMethod = Class.forName("org.apache.spark.ui.JettyUtils")
.getMethod(
"createRedirectHandler",
.buildChecked()
val attachHandlerMethod = DynMethods.builder("attachHandler")
.impl("org.apache.spark.ui.SparkUI", sparkServletContextHandlerClz)
.buildChecked(ui)
val createRedirectHandlerMethod = DynMethods.builder("createRedirectHandler")
.impl(
"org.apache.spark.ui.JettyUtils",
classOf[String],
classOf[String],
classOf[(HttpServletRequest) => Unit],
classOf[HttpServletRequest => Unit],
classOf[String],
classOf[Set[String]])
.buildStaticChecked()
attachHandlerMethod
.invoke(
ui,
createRedirectHandlerMethod
.invoke(null, "/kyuubi/stop", "/kyuubi", handleKillRequest _, "", Set("GET", "POST")))
.invoke("/kyuubi/stop", "/kyuubi", handleKillRequest _, "", Set("GET", "POST")))
attachHandlerMethod
.invoke(
ui,
createRedirectHandlerMethod
.invoke(
null,
"/kyuubi/gracefulstop",
"/kyuubi",
handleGracefulKillRequest _,

View File

@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TStatus, TStatusCode}
import org.apache.kyuubi.Utils.stringifyException
import org.apache.kyuubi.util.reflect.DynConstructors
/**
* @param reason a description of the exception
@ -139,9 +140,10 @@ object KyuubiSQLException {
}
private def newInstance(className: String, message: String, cause: Throwable): Throwable = {
try {
Class.forName(className)
.getConstructor(classOf[String], classOf[Throwable])
.newInstance(message, cause).asInstanceOf[Throwable]
DynConstructors.builder()
.impl(className, classOf[String], classOf[Throwable])
.buildChecked[Throwable]()
.newInstance(message, cause)
} catch {
case _: Exception => new RuntimeException(className + ":" + message, cause)
}

View File

@ -19,6 +19,7 @@ package org.apache.kyuubi.service.authentication
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.util.reflect.DynConstructors
trait EngineSecuritySecretProvider {
@ -50,9 +51,10 @@ class SimpleEngineSecuritySecretProviderImpl extends EngineSecuritySecretProvide
object EngineSecuritySecretProvider {
def create(conf: KyuubiConf): EngineSecuritySecretProvider = {
val providerClass = Class.forName(conf.get(ENGINE_SECURITY_SECRET_PROVIDER))
val provider = providerClass.getConstructor().newInstance()
.asInstanceOf[EngineSecuritySecretProvider]
val provider = DynConstructors.builder()
.impl(conf.get(ENGINE_SECURITY_SECRET_PROVIDER))
.buildChecked[EngineSecuritySecretProvider]()
.newInstance(conf)
provider.initialize(conf)
provider
}

View File

@ -33,12 +33,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.util.reflect.ReflectUtils._
object KyuubiHadoopUtils extends Logging {
private val tokenMapField = classOf[Credentials].getDeclaredField("tokenMap")
tokenMapField.setAccessible(true)
def newHadoopConf(
conf: KyuubiConf,
loadDefaults: Boolean = true): Configuration = {
@ -76,12 +74,8 @@ object KyuubiHadoopUtils extends Logging {
* Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]] is not present before
* Hadoop 3.2.1.
*/
def getTokenMap(credentials: Credentials): Map[Text, Token[_ <: TokenIdentifier]] = {
tokenMapField.get(credentials)
.asInstanceOf[JMap[Text, Token[_ <: TokenIdentifier]]]
.asScala
.toMap
}
def getTokenMap(credentials: Credentials): Map[Text, Token[_ <: TokenIdentifier]] =
getField[JMap[Text, Token[_ <: TokenIdentifier]]](credentials, "tokenMap").asScala.toMap
def getTokenIssueDate(token: Token[_ <: TokenIdentifier]): Option[Long] = {
token.decodeIdentifier() match {

View File

@ -39,6 +39,7 @@ import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry
import org.apache.kyuubi.shaded.zookeeper.ZooDefs
import org.apache.kyuubi.shaded.zookeeper.data.ACL
import org.apache.kyuubi.util.reflect.DynFields
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
import org.apache.kyuubi.zookeeper.ZookeeperConf.ZK_CLIENT_PORT
@ -156,12 +157,13 @@ abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests
assert(service.getServiceState === ServiceState.STARTED)
stopZk()
val isServerLostM = discovery.getClass.getSuperclass.getDeclaredField("isServerLost")
isServerLostM.setAccessible(true)
val isServerLost = isServerLostM.get(discovery)
val isServerLost = DynFields.builder()
.hiddenImpl(discovery.getClass.getSuperclass, "isServerLost")
.buildChecked[AtomicBoolean]()
.get(discovery)
eventually(timeout(10.seconds), interval(100.millis)) {
assert(isServerLost.asInstanceOf[AtomicBoolean].get())
assert(isServerLost.get())
assert(discovery.getServiceState === ServiceState.STOPPED)
assert(service.getServiceState === ServiceState.STOPPED)
}

View File

@ -40,6 +40,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-beeline</artifactId>

View File

@ -19,8 +19,6 @@ package org.apache.hive.beeline;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.sql.Driver;
import java.util.Arrays;
import java.util.Collections;
@ -29,12 +27,15 @@ import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.kyuubi.util.reflect.DynConstructors;
import org.apache.kyuubi.util.reflect.DynFields;
import org.apache.kyuubi.util.reflect.DynMethods;
public class KyuubiBeeLine extends BeeLine {
public static final String KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER =
"org.apache.kyuubi.jdbc.KyuubiHiveDriver";
protected KyuubiCommands commands = new KyuubiCommands(this);
private Driver defaultDriver = null;
private Driver defaultDriver;
public KyuubiBeeLine() {
this(true);
@ -44,20 +45,16 @@ public class KyuubiBeeLine extends BeeLine {
public KyuubiBeeLine(boolean isBeeLine) {
super(isBeeLine);
try {
Field commandsField = BeeLine.class.getDeclaredField("commands");
commandsField.setAccessible(true);
commandsField.set(this, commands);
DynFields.builder().hiddenImpl(BeeLine.class, "commands").buildChecked(this).set(commands);
} catch (Throwable t) {
throw new ExceptionInInitializerError("Failed to inject kyuubi commands");
}
try {
defaultDriver =
(Driver)
Class.forName(
KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER,
true,
Thread.currentThread().getContextClassLoader())
.newInstance();
DynConstructors.builder()
.impl(KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER)
.<Driver>buildChecked()
.newInstance();
} catch (Throwable t) {
throw new ExceptionInInitializerError(KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER + "-missing");
}
@ -115,25 +112,26 @@ public class KyuubiBeeLine extends BeeLine {
BeelineParser beelineParser;
boolean connSuccessful;
boolean exit;
Field exitField;
DynFields.BoundField<Boolean> exitField;
try {
Field optionsField = BeeLine.class.getDeclaredField("options");
optionsField.setAccessible(true);
Options options = (Options) optionsField.get(this);
Options options =
DynFields.builder()
.hiddenImpl(BeeLine.class, "options")
.<Options>buildStaticChecked()
.get();
beelineParser = new BeelineParser();
cl = beelineParser.parse(options, args);
Method connectUsingArgsMethod =
BeeLine.class.getDeclaredMethod(
"connectUsingArgs", BeelineParser.class, CommandLine.class);
connectUsingArgsMethod.setAccessible(true);
connSuccessful = (boolean) connectUsingArgsMethod.invoke(this, beelineParser, cl);
connSuccessful =
DynMethods.builder("connectUsingArgs")
.hiddenImpl(BeeLine.class, BeelineParser.class, CommandLine.class)
.buildChecked(this)
.invoke(beelineParser, cl);
exitField = BeeLine.class.getDeclaredField("exit");
exitField.setAccessible(true);
exit = (boolean) exitField.get(this);
exitField = DynFields.builder().hiddenImpl(BeeLine.class, "exit").buildChecked(this);
exit = exitField.get();
} catch (ParseException e1) {
output(e1.getMessage());
@ -149,10 +147,11 @@ public class KyuubiBeeLine extends BeeLine {
// no-op if the file is not present
if (!connSuccessful && !exit) {
try {
Method defaultBeelineConnectMethod =
BeeLine.class.getDeclaredMethod("defaultBeelineConnect", CommandLine.class);
defaultBeelineConnectMethod.setAccessible(true);
connSuccessful = (boolean) defaultBeelineConnectMethod.invoke(this, cl);
connSuccessful =
DynMethods.builder("defaultBeelineConnect")
.hiddenImpl(BeeLine.class, CommandLine.class)
.buildChecked(this)
.invoke(beelineParser, cl);
} catch (Exception t) {
error(t.getMessage());
@ -184,7 +183,7 @@ public class KyuubiBeeLine extends BeeLine {
}
try {
exit = true;
exitField.set(this, exit);
exitField.set(exit);
} catch (Exception e) {
error(e.getMessage());
return 1;

View File

@ -35,6 +35,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>

View File

@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hive.service.rpc.thrift.TStatus;
import org.apache.kyuubi.util.reflect.DynConstructors;
public class KyuubiSQLException extends SQLException {
@ -186,7 +187,10 @@ public class KyuubiSQLException extends SQLException {
private static Throwable newInstance(String className, String message) {
try {
return (Throwable) Class.forName(className).getConstructor(String.class).newInstance(message);
return DynConstructors.builder()
.impl(className, String.class)
.<Throwable>buildChecked()
.newInstance(message);
} catch (Exception e) {
return new RuntimeException(className + ":" + message);
}

View File

@ -77,6 +77,12 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -17,13 +17,13 @@
package org.apache.kyuubi.client.auth;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.Base64;
import javax.security.auth.Subject;
import org.apache.kyuubi.client.exception.KyuubiRestException;
import org.apache.kyuubi.util.reflect.DynFields;
import org.apache.kyuubi.util.reflect.DynMethods;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
@ -61,13 +61,17 @@ public class SpnegoAuthHeaderGenerator implements AuthHeaderGenerator {
private String generateToken(String server) throws Exception {
Subject subject;
try {
Class<?> ugiClz = Class.forName(UGI_CLASS);
Method ugiGetCurrentUserMethod = ugiClz.getDeclaredMethod("getCurrentUser");
Object ugiCurrentUser = ugiGetCurrentUserMethod.invoke(null);
Object ugiCurrentUser =
DynMethods.builder("getCurrentUser")
.hiddenImpl(Class.forName(UGI_CLASS))
.buildStaticChecked()
.invoke();
LOG.debug("The user credential is {}", ugiCurrentUser);
Field ugiSubjectField = ugiCurrentUser.getClass().getDeclaredField("subject");
ugiSubjectField.setAccessible(true);
subject = (Subject) ugiSubjectField.get(ugiCurrentUser);
subject =
DynFields.builder()
.hiddenImpl(ugiCurrentUser.getClass(), "subject")
.<Subject>buildChecked(ugiCurrentUser)
.get();
} catch (ClassNotFoundException e) {
// TODO do kerberos authentication using JDK class directly
LOG.error("Hadoop UGI class {} is required for SPNEGO authentication.", UGI_CLASS);

View File

@ -21,6 +21,7 @@ import scala.util.control.NonFatal
import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.util.reflect.DynConstructors
private[kyuubi] object PluginLoader {
@ -31,8 +32,7 @@ private[kyuubi] object PluginLoader {
}
try {
Class.forName(advisorClass.get).getConstructor().newInstance()
.asInstanceOf[SessionConfAdvisor]
DynConstructors.builder.impl(advisorClass.get).buildChecked[SessionConfAdvisor].newInstance()
} catch {
case _: ClassCastException =>
throw new KyuubiException(
@ -45,8 +45,7 @@ private[kyuubi] object PluginLoader {
def loadGroupProvider(conf: KyuubiConf): GroupProvider = {
val groupProviderClass = conf.get(KyuubiConf.GROUP_PROVIDER)
try {
Class.forName(groupProviderClass).getConstructor().newInstance()
.asInstanceOf[GroupProvider]
DynConstructors.builder().impl(groupProviderClass).buildChecked[GroupProvider]().newInstance()
} catch {
case _: ClassCastException =>
throw new KyuubiException(

View File

@ -21,7 +21,7 @@ import java.util.ServiceLoader
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
import scala.util.Try
object ReflectUtils {
/**
@ -37,32 +37,56 @@ object ReflectUtils {
DynClasses.builder().loader(cl).impl(className).buildChecked()
}.isSuccess
def getFieldVal[T](target: Any, fieldName: String): T =
Try {
DynFields.builder().hiddenImpl(target.getClass, fieldName).build[T]().get(target)
} match {
case Success(value) => value
case Failure(e) =>
val candidates = target.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
throw new RuntimeException(s"$fieldName not in ${target.getClass} $candidates", e)
}
def getFieldValOpt[T](target: Any, name: String): Option[T] =
Try(getFieldVal[T](target, name)).toOption
def invoke(target: AnyRef, methodName: String, args: (Class[_], AnyRef)*): AnyRef =
/**
* get the field value of the given object
* @param target the target object
* @param fieldName the field name from declared field names
* @tparam T the expected return class type
* @return
*/
def getField[T](target: Any, fieldName: String): T = {
val targetClass = target.getClass
try {
val (types, values) = args.unzip
DynMethods.builder(methodName).hiddenImpl(target.getClass, types: _*).build()
.invoke(target, values: _*)
DynFields.builder()
.hiddenImpl(targetClass, fieldName)
.buildChecked[T](target)
.get()
} catch {
case e: NoSuchMethodException =>
val candidates = target.getClass.getMethods.map(_.getName).mkString("[", ",", "]")
throw new RuntimeException(s"$methodName not in ${target.getClass} $candidates", e)
case e: Exception =>
val candidates = targetClass.getDeclaredFields.map(_.getName).sorted
throw new RuntimeException(
s"Field $fieldName not in $targetClass [${candidates.mkString(",")}]",
e)
}
}
def invokeAs[T](target: AnyRef, methodName: String, args: (Class[_], AnyRef)*): T =
invoke(target, methodName, args: _*).asInstanceOf[T]
/**
* Invoke a method with the given name and arguments on the given target object.
* @param target the target object
* @param methodName the method name from declared field names
* @param args pairs of class and values for the arguments
* @tparam T the expected return class type,
* returning type Nothing if it's not provided or inferable
* @return
*/
def invokeAs[T](target: AnyRef, methodName: String, args: (Class[_], AnyRef)*): T = {
val targetClass = target.getClass
val argClasses = args.map(_._1)
try {
DynMethods.builder(methodName)
.hiddenImpl(targetClass, argClasses: _*)
.buildChecked(target)
.invoke[T](args.map(_._2): _*)
} catch {
case e: Exception =>
val candidates = targetClass.getDeclaredMethods.map(_.getName).sorted
val argClassesNames = argClasses.map(_.getClass.getName)
throw new RuntimeException(
s"Method $methodName (${argClassesNames.mkString(",")})" +
s" not found in $targetClass [${candidates.mkString(",")}]",
e)
}
}
/**
* Creates a iterator for with a new service loader for the given service type and class

View File

@ -19,9 +19,11 @@ package org.apache.kyuubi.tools
import java.io.File
import java.nio.file.Files
import java.util.{Map => JMap}
import java.util.UUID
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.util.reflect.ReflectUtils._
class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite {
import KubernetesSparkBlockCleanerConstants._
@ -83,10 +85,7 @@ class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite {
}
private def updateEnv(name: String, value: String): Unit = {
val env = System.getenv
val field = env.getClass.getDeclaredField("m")
field.setAccessible(true)
field.get(env).asInstanceOf[java.util.Map[String, String]].put(name, value)
getField[JMap[String, String]](System.getenv, "m").put(name, value)
}
test("test clean") {