diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala index 7d4999fde..8fc8028e6 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala @@ -50,7 +50,7 @@ object AccessRequest { "getRolesFromUserAndGroups", (classOf[String], userName), (classOf[JSet[String]], userGroups)) - invoke(req, "setUserRoles", (classOf[JSet[String]], roles)) + invokeAs[Unit](req, "setUserRoles", (classOf[JSet[String]], roles)) } catch { case _: Exception => } @@ -61,7 +61,7 @@ object AccessRequest { } try { val clusterName = invokeAs[String](SparkRangerAdminPlugin, "getClusterName") - invoke(req, "setClusterName", (classOf[String], clusterName)) + invokeAs[Unit](req, "setClusterName", (classOf[String], clusterName)) } catch { case _: Exception => } @@ -74,8 +74,8 @@ object AccessRequest { private def getUserGroupsFromUserStore(user: UserGroupInformation): Option[JSet[String]] = { try { - val storeEnricher = invoke(SparkRangerAdminPlugin, "getUserStoreEnricher") - val userStore = invoke(storeEnricher, "getRangerUserStore") + val storeEnricher = invokeAs[AnyRef](SparkRangerAdminPlugin, "getUserStoreEnricher") + val userStore = invokeAs[AnyRef](storeEnricher, "getRangerUserStore") val userGroupMapping = invokeAs[JHashMap[String, JSet[String]]](userStore, "getUserGroupMapping") Some(userGroupMapping.get(user.getShortUserName)) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala index 6e86ab9fb..bf762109c 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala @@ -46,7 +46,7 @@ class RuleReplaceShowObjectCommands extends Rule[LogicalPlan] { case class FilteredShowTablesCommand(delegated: RunnableCommand) extends FilteredShowObjectCommand(delegated) { - private val isExtended = getFieldVal[Boolean](delegated, "isExtended") + private val isExtended = getField[Boolean](delegated, "isExtended") override protected def isAllowed(r: Row, ugi: UserGroupInformation): Boolean = { val database = r.getString(0) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala index 3eb3fecea..fc660ce14 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala @@ -72,7 +72,7 @@ case class ColumnDesc( fieldName: String, fieldExtractor: String) extends Descriptor { override def extract(v: AnyRef): Seq[String] = { - val columnsVal = invoke(v, fieldName) + val columnsVal = invokeAs[AnyRef](v, fieldName) val columnExtractor = lookupExtractor[ColumnExtractor](fieldExtractor) columnExtractor(columnsVal) } @@ -91,7 +91,7 @@ case class DatabaseDesc( catalogDesc: Option[CatalogDesc] = None, isInput: Boolean = false) extends Descriptor { override def extract(v: AnyRef): Database = { - val databaseVal = invoke(v, fieldName) + val databaseVal = invokeAs[AnyRef](v, fieldName) val databaseExtractor = lookupExtractor[DatabaseExtractor](fieldExtractor) val db = databaseExtractor(databaseVal) if (db.catalog.isEmpty && catalogDesc.nonEmpty) { @@ -119,7 +119,7 @@ case class FunctionTypeDesc( } def extract(v: AnyRef, spark: SparkSession): FunctionType = { - val functionTypeVal = invoke(v, fieldName) + val functionTypeVal = invokeAs[AnyRef](v, fieldName) val functionTypeExtractor = lookupExtractor[FunctionTypeExtractor](fieldExtractor) functionTypeExtractor(functionTypeVal, spark) } @@ -145,7 +145,7 @@ case class FunctionDesc( functionTypeDesc: Option[FunctionTypeDesc] = None, isInput: Boolean = false) extends Descriptor { override def extract(v: AnyRef): Function = { - val functionVal = invoke(v, fieldName) + val functionVal = invokeAs[AnyRef](v, fieldName) val functionExtractor = lookupExtractor[FunctionExtractor](fieldExtractor) var function = functionExtractor(functionVal) if (function.database.isEmpty) { @@ -170,7 +170,7 @@ case class QueryDesc( fieldName: String, fieldExtractor: String = "LogicalPlanQueryExtractor") extends Descriptor { override def extract(v: AnyRef): Option[LogicalPlan] = { - val queryVal = invoke(v, fieldName) + val queryVal = invokeAs[AnyRef](v, fieldName) val queryExtractor = lookupExtractor[QueryExtractor](fieldExtractor) queryExtractor(queryVal) } @@ -192,7 +192,7 @@ case class TableTypeDesc( } def extract(v: AnyRef, spark: SparkSession): TableType = { - val tableTypeVal = invoke(v, fieldName) + val tableTypeVal = invokeAs[AnyRef](v, fieldName) val tableTypeExtractor = lookupExtractor[TableTypeExtractor](fieldExtractor) tableTypeExtractor(tableTypeVal, spark) } @@ -230,7 +230,7 @@ case class TableDesc( } def extract(v: AnyRef, spark: SparkSession): Option[Table] = { - val tableVal = invoke(v, fieldName) + val tableVal = invokeAs[AnyRef](v, fieldName) val tableExtractor = lookupExtractor[TableExtractor](fieldExtractor) val maybeTable = tableExtractor(spark, tableVal) maybeTable.map { t => @@ -257,7 +257,7 @@ case class ActionTypeDesc( actionType: Option[String] = None) extends Descriptor { override def extract(v: AnyRef): PrivilegeObjectActionType = { actionType.map(PrivilegeObjectActionType.withName).getOrElse { - val actionTypeVal = invoke(v, fieldName) + val actionTypeVal = invokeAs[AnyRef](v, fieldName) val actionTypeExtractor = lookupExtractor[ActionTypeExtractor](fieldExtractor) actionTypeExtractor(actionTypeVal) } @@ -274,7 +274,7 @@ case class CatalogDesc( fieldName: String = "catalog", fieldExtractor: String = "CatalogPluginCatalogExtractor") extends Descriptor { override def extract(v: AnyRef): Option[String] = { - val catalogVal = invoke(v, fieldName) + val catalogVal = invokeAs[AnyRef](v, fieldName) val catalogExtractor = lookupExtractor[CatalogExtractor](fieldExtractor) catalogExtractor(catalogVal) } @@ -292,7 +292,7 @@ case class ScanDesc( val tableVal = if (fieldName == null) { v } else { - invoke(v, fieldName) + invokeAs[AnyRef](v, fieldName) } val tableExtractor = lookupExtractor[TableExtractor](fieldExtractor) val maybeTable = tableExtractor(spark, tableVal) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/databaseExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/databaseExtractors.scala index f952c816e..713d3e3fb 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/databaseExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/databaseExtractors.scala @@ -69,9 +69,9 @@ class StringSeqOptionDatabaseExtractor extends DatabaseExtractor { */ class ResolvedNamespaceDatabaseExtractor extends DatabaseExtractor { override def apply(v1: AnyRef): Database = { - val catalogVal = invoke(v1, "catalog") + val catalogVal = invokeAs[AnyRef](v1, "catalog") val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal) - val namespace = getFieldVal[Seq[String]](v1, "namespace") + val namespace = getField[Seq[String]](v1, "namespace") Database(catalog, quote(namespace)) } } @@ -81,9 +81,9 @@ class ResolvedNamespaceDatabaseExtractor extends DatabaseExtractor { */ class ResolvedDBObjectNameDatabaseExtractor extends DatabaseExtractor { override def apply(v1: AnyRef): Database = { - val catalogVal = invoke(v1, "catalog") + val catalogVal = invokeAs[AnyRef](v1, "catalog") val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal) - val namespace = getFieldVal[Seq[String]](v1, "nameParts") + val namespace = getField[Seq[String]](v1, "nameParts") Database(catalog, quote(namespace)) } } diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala index 4579349ee..a8a08ed93 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala @@ -47,7 +47,7 @@ object TableExtractor { */ def getOwner(v: AnyRef): Option[String] = { // org.apache.spark.sql.connector.catalog.Table - val table = invoke(v, "table") + val table = invokeAs[AnyRef](v, "table") val properties = invokeAs[JMap[String, String]](table, "properties").asScala properties.get("owner") } @@ -97,9 +97,9 @@ class CatalogTableOptionTableExtractor extends TableExtractor { */ class ResolvedTableTableExtractor extends TableExtractor { override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = { - val catalogVal = invoke(v1, "catalog") + val catalogVal = invokeAs[AnyRef](v1, "catalog") val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal) - val identifier = invoke(v1, "identifier") + val identifier = invokeAs[AnyRef](v1, "identifier") val maybeTable = lookupExtractor[IdentifierTableExtractor].apply(spark, identifier) val maybeOwner = TableExtractor.getOwner(v1) maybeTable.map(_.copy(catalog = catalog, owner = maybeOwner)) @@ -157,7 +157,7 @@ class LogicalRelationTableExtractor extends TableExtractor { */ class ResolvedDbObjectNameTableExtractor extends TableExtractor { override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = { - val catalogVal = invoke(v1, "catalog") + val catalogVal = invokeAs[AnyRef](v1, "catalog") val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal) val nameParts = invokeAs[Seq[String]](v1, "nameParts") val namespace = nameParts.init.toArray @@ -173,9 +173,9 @@ class ResolvedIdentifierTableExtractor extends TableExtractor { override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = { v1.getClass.getName match { case "org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier" => - val catalogVal = invoke(v1, "catalog") + val catalogVal = invokeAs[AnyRef](v1, "catalog") val catalog = lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal) - val identifier = invoke(v1, "identifier") + val identifier = invokeAs[AnyRef](v1, "identifier") val maybeTable = lookupExtractor[IdentifierTableExtractor].apply(spark, identifier) maybeTable.map(_.copy(catalog = catalog)) case _ => None diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala index e13968e2e..9ac3bfef3 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View} import org.apache.kyuubi.plugin.spark.authz.AccessControlException import org.apache.kyuubi.plugin.spark.authz.util.ReservedKeys._ import org.apache.kyuubi.util.SemanticVersion +import org.apache.kyuubi.util.reflect.DynConstructors import org.apache.kyuubi.util.reflect.ReflectUtils._ private[authz] object AuthZUtils { @@ -61,7 +62,7 @@ private[authz] object AuthZUtils { def hasResolvedPermanentView(plan: LogicalPlan): Boolean = { plan match { case view: View if view.resolved && isSparkV31OrGreater => - !getFieldVal[Boolean](view, "isTempView") + !getField[Boolean](view, "isTempView") case _ => false } @@ -69,7 +70,12 @@ private[authz] object AuthZUtils { lazy val isRanger21orGreater: Boolean = { try { - classOf[RangerBasePlugin].getConstructor(classOf[String], classOf[String], classOf[String]) + DynConstructors.builder().impl( + classOf[RangerBasePlugin], + classOf[String], + classOf[String], + classOf[String]) + .buildChecked[RangerBasePlugin]() true } catch { case _: NoSuchMethodException => diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala index cb3a2371b..806914286 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RangerConfigProvider.scala @@ -34,18 +34,18 @@ trait RangerConfigProvider { * org.apache.ranger.authorization.hadoop.config.RangerConfiguration * for Ranger 2.0 and below */ - def getRangerConf: Configuration = { + val getRangerConf: Configuration = { if (isRanger21orGreater) { // for Ranger 2.1+ DynMethods.builder("getConfig") .impl("org.apache.ranger.plugin.service.RangerBasePlugin") - .build() - .invoke[Configuration](this) + .buildChecked(this) + .invoke[Configuration]() } else { // for Ranger 2.0 and below DynMethods.builder("getInstance") .impl("org.apache.ranger.authorization.hadoop.config.RangerConfiguration") - .buildStatic() + .buildStaticChecked() .invoke[Configuration]() } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala index 89b81ccee..b5dcf63cb 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala @@ -458,7 +458,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { admin, { val hiveTableRelation = sql(s"SELECT * FROM $table") .queryExecution.optimizedPlan.collectLeaves().head.asInstanceOf[HiveTableRelation] - assert(getFieldVal[Option[Statistics]](hiveTableRelation, "tableStats").nonEmpty) + assert(getField[Option[Statistics]](hiveTableRelation, "tableStats").nonEmpty) }) } } diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala index f060cc994..f2806f216 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.plugin.lineage.helper import scala.collection.immutable.ListMap -import scala.util.{Failure, Success, Try} +import scala.util.Try import org.apache.spark.internal.Logging import org.apache.spark.kyuubi.lineage.{LineageConf, SparkContextHelper} @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, Data import org.apache.kyuubi.plugin.lineage.Lineage import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost +import org.apache.kyuubi.util.reflect.ReflectUtils._ trait LineageParser { def sparkSession: SparkSession @@ -189,7 +190,7 @@ trait LineageParser { plan match { // For command case p if p.nodeName == "CommandResult" => - val commandPlan = getPlanField[LogicalPlan]("commandLogicalPlan", plan) + val commandPlan = getField[LogicalPlan](plan, "commandLogicalPlan") extractColumnsLineage(commandPlan, parentColumnsLineage) case p if p.nodeName == "AlterViewAsCommand" => val query = @@ -198,22 +199,22 @@ trait LineageParser { } else { getQuery(plan) } - val view = getPlanField[TableIdentifier]("name", plan).unquotedString + val view = getField[TableIdentifier](plan, "name").unquotedString extractColumnsLineage(query, parentColumnsLineage).map { case (k, v) => k.withName(s"$view.${k.name}") -> v } case p if p.nodeName == "CreateViewCommand" - && getPlanField[ViewType]("viewType", plan) == PersistedView => - val view = getPlanField[TableIdentifier]("name", plan).unquotedString + && getField[ViewType](plan, "viewType") == PersistedView => + val view = getField[TableIdentifier](plan, "name").unquotedString val outputCols = - getPlanField[Seq[(String, Option[String])]]("userSpecifiedColumns", plan).map(_._1) + getField[Seq[(String, Option[String])]](plan, "userSpecifiedColumns").map(_._1) val query = if (isSparkVersionAtMost("3.1")) { - sparkSession.sessionState.analyzer.execute(getPlanField[LogicalPlan]("child", plan)) + sparkSession.sessionState.analyzer.execute(getField[LogicalPlan](plan, "child")) } else { - getPlanField[LogicalPlan]("plan", plan) + getField[LogicalPlan](plan, "plan") } extractColumnsLineage(query, parentColumnsLineage).zipWithIndex.map { @@ -222,7 +223,7 @@ trait LineageParser { } case p if p.nodeName == "CreateDataSourceTableAsSelectCommand" => - val table = getPlanField[CatalogTable]("table", plan).qualifiedName + val table = getField[CatalogTable](plan, "table").qualifiedName extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) => k.withName(s"$table.${k.name}") -> v } @@ -230,7 +231,7 @@ trait LineageParser { case p if p.nodeName == "CreateHiveTableAsSelectCommand" || p.nodeName == "OptimizedCreateHiveTableAsSelectCommand" => - val table = getPlanField[CatalogTable]("tableDesc", plan).qualifiedName + val table = getField[CatalogTable](plan, "tableDesc").qualifiedName extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) => k.withName(s"$table.${k.name}") -> v } @@ -241,15 +242,15 @@ trait LineageParser { val (table, namespace, catalog) = if (isSparkVersionAtMost("3.2")) { ( - getPlanField[Identifier]("tableName", plan).name, - getPlanField[Identifier]("tableName", plan).namespace.mkString("."), - getPlanField[TableCatalog]("catalog", plan).name()) + getField[Identifier](plan, "tableName").name, + getField[Identifier](plan, "tableName").namespace.mkString("."), + getField[TableCatalog](plan, "catalog").name()) } else { ( - getPlanMethod[Identifier]("tableName", plan).name(), - getPlanMethod[Identifier]("tableName", plan).namespace().mkString("."), - getCurrentPlanField[CatalogPlugin]( - getPlanMethod[LogicalPlan]("left", plan), + invokeAs[Identifier](plan, "tableName").name(), + invokeAs[Identifier](plan, "tableName").namespace().mkString("."), + getField[CatalogPlugin]( + invokeAs[LogicalPlan](plan, "left"), "catalog").name()) } extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) => @@ -257,7 +258,7 @@ trait LineageParser { } case p if p.nodeName == "InsertIntoDataSourceCommand" => - val logicalRelation = getPlanField[LogicalRelation]("logicalRelation", plan) + val logicalRelation = getField[LogicalRelation](plan, "logicalRelation") val table = logicalRelation.catalogTable.map(_.qualifiedName).getOrElse("") extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) if table.nonEmpty => @@ -266,8 +267,8 @@ trait LineageParser { case p if p.nodeName == "InsertIntoHadoopFsRelationCommand" => val table = - getPlanField[Option[CatalogTable]]("catalogTable", plan).map(_.qualifiedName).getOrElse( - "") + getField[Option[CatalogTable]](plan, "catalogTable").map(_.qualifiedName) + .getOrElse("") extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) if table.nonEmpty => k.withName(s"$table.${k.name}") -> v @@ -277,15 +278,15 @@ trait LineageParser { if p.nodeName == "InsertIntoDataSourceDirCommand" || p.nodeName == "InsertIntoHiveDirCommand" => val dir = - getPlanField[CatalogStorageFormat]("storage", plan).locationUri.map(_.toString).getOrElse( - "") + getField[CatalogStorageFormat](plan, "storage").locationUri.map(_.toString) + .getOrElse("") extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) if dir.nonEmpty => k.withName(s"`$dir`.${k.name}") -> v } case p if p.nodeName == "InsertIntoHiveTable" => - val table = getPlanField[CatalogTable]("table", plan).qualifiedName + val table = getField[CatalogTable](plan, "table").qualifiedName extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) => k.withName(s"$table.${k.name}") -> v } @@ -297,14 +298,14 @@ trait LineageParser { if p.nodeName == "AppendData" || p.nodeName == "OverwriteByExpression" || p.nodeName == "OverwritePartitionsDynamic" => - val table = getPlanField[NamedRelation]("table", plan).name + val table = getField[NamedRelation](plan, "table").name extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) => k.withName(s"$table.${k.name}") -> v } case p if p.nodeName == "MergeIntoTable" => - val matchedActions = getPlanField[Seq[MergeAction]]("matchedActions", plan) - val notMatchedActions = getPlanField[Seq[MergeAction]]("notMatchedActions", plan) + val matchedActions = getField[Seq[MergeAction]](plan, "matchedActions") + val notMatchedActions = getField[Seq[MergeAction]](plan, "notMatchedActions") val allAssignments = (matchedActions ++ notMatchedActions).collect { case UpdateAction(_, assignments) => assignments case InsertAction(_, assignments) => assignments @@ -314,8 +315,8 @@ trait LineageParser { assignment.key.asInstanceOf[Attribute], assignment.value.references) }: _*) - val targetTable = getPlanField[LogicalPlan]("targetTable", plan) - val sourceTable = getPlanField[LogicalPlan]("sourceTable", plan) + val targetTable = getField[LogicalPlan](plan, "targetTable") + val sourceTable = getField[LogicalPlan](plan, "sourceTable") val targetColumnsLineage = extractColumnsLineage( targetTable, nextColumnsLlineage.map { case (k, _) => (k, AttributeSet(k)) }) @@ -474,47 +475,7 @@ trait LineageParser { } } - private def getPlanField[T](field: String, plan: LogicalPlan): T = { - getFieldVal[T](plan, field) - } - - private def getCurrentPlanField[T](curPlan: LogicalPlan, field: String): T = { - getFieldVal[T](curPlan, field) - } - - private def getPlanMethod[T](name: String, plan: LogicalPlan): T = { - getMethod[T](plan, name) - } - - private def getQuery(plan: LogicalPlan): LogicalPlan = { - getPlanField[LogicalPlan]("query", plan) - } - - private def getFieldVal[T](o: Any, name: String): T = { - Try { - val field = o.getClass.getDeclaredField(name) - field.setAccessible(true) - field.get(o) - } match { - case Success(value) => value.asInstanceOf[T] - case Failure(e) => - val candidates = o.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]") - throw new RuntimeException(s"$name not in $candidates", e) - } - } - - private def getMethod[T](o: Any, name: String): T = { - Try { - val method = o.getClass.getDeclaredMethod(name) - method.invoke(o) - } match { - case Success(value) => value.asInstanceOf[T] - case Failure(e) => - val candidates = o.getClass.getDeclaredMethods.map(_.getName).mkString("[", ",", "]") - throw new RuntimeException(s"$name not in $candidates", e) - } - } - + private def getQuery(plan: LogicalPlan): LogicalPlan = getField[LogicalPlan](plan, "query") } case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends LineageParser diff --git a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala index 0dea6a2c1..cb6e4b6c5 100644 --- a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala +++ b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/connection/ConnectionProvider.scala @@ -16,24 +16,25 @@ */ package org.apache.kyuubi.engine.jdbc.connection -import java.sql.{Connection, DriverManager} +import java.sql.{Connection, Driver, DriverManager} import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_PROVIDER, ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_DRIVER_CLASS} +import org.apache.kyuubi.util.reflect.DynClasses import org.apache.kyuubi.util.reflect.ReflectUtils._ abstract class AbstractConnectionProvider extends Logging { protected val providers = loadProviders() def getProviderClass(kyuubiConf: KyuubiConf): String = { - val specifiedDriverClass = kyuubiConf.get(ENGINE_JDBC_DRIVER_CLASS) - specifiedDriverClass.foreach(Class.forName) - - specifiedDriverClass.getOrElse { + val driverClass: Class[_ <: Driver] = Option( + DynClasses.builder().impl(kyuubiConf.get(ENGINE_JDBC_DRIVER_CLASS).get) + .orNull().build[Driver]()).getOrElse { val url = kyuubiConf.get(ENGINE_JDBC_CONNECTION_URL).get - DriverManager.getDriver(url).getClass.getCanonicalName + DriverManager.getDriver(url).getClass } + driverClass.getCanonicalName } def create(kyuubiConf: KyuubiConf): Connection = { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala index 14ef3d3fd..52edcf220 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala @@ -26,7 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.SparkSQLEngine import org.apache.kyuubi.engine.spark.events.EngineEventsStore import org.apache.kyuubi.service.ServiceState -import org.apache.kyuubi.util.reflect.DynClasses +import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods} /** * Note that [[SparkUITab]] is private for Spark @@ -68,30 +68,29 @@ case class EngineTab( val sparkServletContextHandlerClz = DynClasses.builder() .impl("org.sparkproject.jetty.servlet.ServletContextHandler") .impl("org.eclipse.jetty.servlet.ServletContextHandler") - .build() - val attachHandlerMethod = Class.forName("org.apache.spark.ui.SparkUI") - .getMethod("attachHandler", sparkServletContextHandlerClz) - val createRedirectHandlerMethod = Class.forName("org.apache.spark.ui.JettyUtils") - .getMethod( - "createRedirectHandler", + .buildChecked() + val attachHandlerMethod = DynMethods.builder("attachHandler") + .impl("org.apache.spark.ui.SparkUI", sparkServletContextHandlerClz) + .buildChecked(ui) + val createRedirectHandlerMethod = DynMethods.builder("createRedirectHandler") + .impl( + "org.apache.spark.ui.JettyUtils", classOf[String], classOf[String], - classOf[(HttpServletRequest) => Unit], + classOf[HttpServletRequest => Unit], classOf[String], classOf[Set[String]]) + .buildStaticChecked() attachHandlerMethod .invoke( - ui, createRedirectHandlerMethod - .invoke(null, "/kyuubi/stop", "/kyuubi", handleKillRequest _, "", Set("GET", "POST"))) + .invoke("/kyuubi/stop", "/kyuubi", handleKillRequest _, "", Set("GET", "POST"))) attachHandlerMethod .invoke( - ui, createRedirectHandlerMethod .invoke( - null, "/kyuubi/gracefulstop", "/kyuubi", handleGracefulKillRequest _, diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala index a9e486fb2..c9fd8203c 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift.{TStatus, TStatusCode} import org.apache.kyuubi.Utils.stringifyException +import org.apache.kyuubi.util.reflect.DynConstructors /** * @param reason a description of the exception @@ -139,9 +140,10 @@ object KyuubiSQLException { } private def newInstance(className: String, message: String, cause: Throwable): Throwable = { try { - Class.forName(className) - .getConstructor(classOf[String], classOf[Throwable]) - .newInstance(message, cause).asInstanceOf[Throwable] + DynConstructors.builder() + .impl(className, classOf[String], classOf[Throwable]) + .buildChecked[Throwable]() + .newInstance(message, cause) } catch { case _: Exception => new RuntimeException(className + ":" + message, cause) } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala index 2bcfe9a67..3216a43be 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.service.authentication import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.util.reflect.DynConstructors trait EngineSecuritySecretProvider { @@ -50,9 +51,10 @@ class SimpleEngineSecuritySecretProviderImpl extends EngineSecuritySecretProvide object EngineSecuritySecretProvider { def create(conf: KyuubiConf): EngineSecuritySecretProvider = { - val providerClass = Class.forName(conf.get(ENGINE_SECURITY_SECRET_PROVIDER)) - val provider = providerClass.getConstructor().newInstance() - .asInstanceOf[EngineSecuritySecretProvider] + val provider = DynConstructors.builder() + .impl(conf.get(ENGINE_SECURITY_SECRET_PROVIDER)) + .buildChecked[EngineSecuritySecretProvider]() + .newInstance(conf) provider.initialize(conf) provider } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala index 28806e915..4959c845d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala @@ -33,12 +33,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.util.reflect.ReflectUtils._ object KyuubiHadoopUtils extends Logging { - private val tokenMapField = classOf[Credentials].getDeclaredField("tokenMap") - tokenMapField.setAccessible(true) - def newHadoopConf( conf: KyuubiConf, loadDefaults: Boolean = true): Configuration = { @@ -76,12 +74,8 @@ object KyuubiHadoopUtils extends Logging { * Get [[Credentials#tokenMap]] by reflection as [[Credentials#getTokenMap]] is not present before * Hadoop 3.2.1. */ - def getTokenMap(credentials: Credentials): Map[Text, Token[_ <: TokenIdentifier]] = { - tokenMapField.get(credentials) - .asInstanceOf[JMap[Text, Token[_ <: TokenIdentifier]]] - .asScala - .toMap - } + def getTokenMap(credentials: Credentials): Map[Text, Token[_ <: TokenIdentifier]] = + getField[JMap[Text, Token[_ <: TokenIdentifier]]](credentials, "tokenMap").asScala.toMap def getTokenIssueDate(token: Token[_ <: TokenIdentifier]): Option[Long] = { token.decodeIdentifier() match { diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala index de849f83e..0a5db3b43 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala @@ -39,6 +39,7 @@ import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry import org.apache.kyuubi.shaded.zookeeper.ZooDefs import org.apache.kyuubi.shaded.zookeeper.data.ACL +import org.apache.kyuubi.util.reflect.DynFields import org.apache.kyuubi.zookeeper.EmbeddedZookeeper import org.apache.kyuubi.zookeeper.ZookeeperConf.ZK_CLIENT_PORT @@ -156,12 +157,13 @@ abstract class ZookeeperDiscoveryClientSuite extends DiscoveryClientTests assert(service.getServiceState === ServiceState.STARTED) stopZk() - val isServerLostM = discovery.getClass.getSuperclass.getDeclaredField("isServerLost") - isServerLostM.setAccessible(true) - val isServerLost = isServerLostM.get(discovery) + val isServerLost = DynFields.builder() + .hiddenImpl(discovery.getClass.getSuperclass, "isServerLost") + .buildChecked[AtomicBoolean]() + .get(discovery) eventually(timeout(10.seconds), interval(100.millis)) { - assert(isServerLost.asInstanceOf[AtomicBoolean].get()) + assert(isServerLost.get()) assert(discovery.getServiceState === ServiceState.STOPPED) assert(service.getServiceState === ServiceState.STOPPED) } diff --git a/kyuubi-hive-beeline/pom.xml b/kyuubi-hive-beeline/pom.xml index beacba438..6c5f255dc 100644 --- a/kyuubi-hive-beeline/pom.xml +++ b/kyuubi-hive-beeline/pom.xml @@ -40,6 +40,12 @@ ${project.version} + + org.apache.kyuubi + kyuubi-util + ${project.version} + + org.apache.hive hive-beeline diff --git a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java index 7ca767148..b3a2fa307 100644 --- a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java +++ b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java @@ -19,8 +19,6 @@ package org.apache.hive.beeline; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.sql.Driver; import java.util.Arrays; import java.util.Collections; @@ -29,12 +27,15 @@ import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.kyuubi.util.reflect.DynConstructors; +import org.apache.kyuubi.util.reflect.DynFields; +import org.apache.kyuubi.util.reflect.DynMethods; public class KyuubiBeeLine extends BeeLine { public static final String KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER = "org.apache.kyuubi.jdbc.KyuubiHiveDriver"; protected KyuubiCommands commands = new KyuubiCommands(this); - private Driver defaultDriver = null; + private Driver defaultDriver; public KyuubiBeeLine() { this(true); @@ -44,20 +45,16 @@ public class KyuubiBeeLine extends BeeLine { public KyuubiBeeLine(boolean isBeeLine) { super(isBeeLine); try { - Field commandsField = BeeLine.class.getDeclaredField("commands"); - commandsField.setAccessible(true); - commandsField.set(this, commands); + DynFields.builder().hiddenImpl(BeeLine.class, "commands").buildChecked(this).set(commands); } catch (Throwable t) { throw new ExceptionInInitializerError("Failed to inject kyuubi commands"); } try { defaultDriver = - (Driver) - Class.forName( - KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER, - true, - Thread.currentThread().getContextClassLoader()) - .newInstance(); + DynConstructors.builder() + .impl(KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER) + .buildChecked() + .newInstance(); } catch (Throwable t) { throw new ExceptionInInitializerError(KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER + "-missing"); } @@ -115,25 +112,26 @@ public class KyuubiBeeLine extends BeeLine { BeelineParser beelineParser; boolean connSuccessful; boolean exit; - Field exitField; + DynFields.BoundField exitField; try { - Field optionsField = BeeLine.class.getDeclaredField("options"); - optionsField.setAccessible(true); - Options options = (Options) optionsField.get(this); + Options options = + DynFields.builder() + .hiddenImpl(BeeLine.class, "options") + .buildStaticChecked() + .get(); beelineParser = new BeelineParser(); cl = beelineParser.parse(options, args); - Method connectUsingArgsMethod = - BeeLine.class.getDeclaredMethod( - "connectUsingArgs", BeelineParser.class, CommandLine.class); - connectUsingArgsMethod.setAccessible(true); - connSuccessful = (boolean) connectUsingArgsMethod.invoke(this, beelineParser, cl); + connSuccessful = + DynMethods.builder("connectUsingArgs") + .hiddenImpl(BeeLine.class, BeelineParser.class, CommandLine.class) + .buildChecked(this) + .invoke(beelineParser, cl); - exitField = BeeLine.class.getDeclaredField("exit"); - exitField.setAccessible(true); - exit = (boolean) exitField.get(this); + exitField = DynFields.builder().hiddenImpl(BeeLine.class, "exit").buildChecked(this); + exit = exitField.get(); } catch (ParseException e1) { output(e1.getMessage()); @@ -149,10 +147,11 @@ public class KyuubiBeeLine extends BeeLine { // no-op if the file is not present if (!connSuccessful && !exit) { try { - Method defaultBeelineConnectMethod = - BeeLine.class.getDeclaredMethod("defaultBeelineConnect", CommandLine.class); - defaultBeelineConnectMethod.setAccessible(true); - connSuccessful = (boolean) defaultBeelineConnectMethod.invoke(this, cl); + connSuccessful = + DynMethods.builder("defaultBeelineConnect") + .hiddenImpl(BeeLine.class, CommandLine.class) + .buildChecked(this) + .invoke(beelineParser, cl); } catch (Exception t) { error(t.getMessage()); @@ -184,7 +183,7 @@ public class KyuubiBeeLine extends BeeLine { } try { exit = true; - exitField.set(this, exit); + exitField.set(exit); } catch (Exception e) { error(e.getMessage()); return 1; diff --git a/kyuubi-hive-jdbc/pom.xml b/kyuubi-hive-jdbc/pom.xml index d6889e365..36c27efe0 100644 --- a/kyuubi-hive-jdbc/pom.xml +++ b/kyuubi-hive-jdbc/pom.xml @@ -35,6 +35,11 @@ + + org.apache.kyuubi + kyuubi-util + ${project.version} + org.apache.arrow diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiSQLException.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiSQLException.java index 1ac0adf04..7d26f8078 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiSQLException.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiSQLException.java @@ -21,6 +21,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import org.apache.hive.service.rpc.thrift.TStatus; +import org.apache.kyuubi.util.reflect.DynConstructors; public class KyuubiSQLException extends SQLException { @@ -186,7 +187,10 @@ public class KyuubiSQLException extends SQLException { private static Throwable newInstance(String className, String message) { try { - return (Throwable) Class.forName(className).getConstructor(String.class).newInstance(message); + return DynConstructors.builder() + .impl(className, String.class) + .buildChecked() + .newInstance(message); } catch (Exception e) { return new RuntimeException(className + ":" + message); } diff --git a/kyuubi-rest-client/pom.xml b/kyuubi-rest-client/pom.xml index a9ceb9bb3..176051deb 100644 --- a/kyuubi-rest-client/pom.xml +++ b/kyuubi-rest-client/pom.xml @@ -77,6 +77,12 @@ true + + org.apache.kyuubi + kyuubi-util + ${project.version} + + org.slf4j slf4j-api diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/auth/SpnegoAuthHeaderGenerator.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/auth/SpnegoAuthHeaderGenerator.java index 435a85014..c66c6465e 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/auth/SpnegoAuthHeaderGenerator.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/auth/SpnegoAuthHeaderGenerator.java @@ -17,13 +17,13 @@ package org.apache.kyuubi.client.auth; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.util.Base64; import javax.security.auth.Subject; import org.apache.kyuubi.client.exception.KyuubiRestException; +import org.apache.kyuubi.util.reflect.DynFields; +import org.apache.kyuubi.util.reflect.DynMethods; import org.ietf.jgss.GSSContext; import org.ietf.jgss.GSSException; import org.ietf.jgss.GSSManager; @@ -61,13 +61,17 @@ public class SpnegoAuthHeaderGenerator implements AuthHeaderGenerator { private String generateToken(String server) throws Exception { Subject subject; try { - Class ugiClz = Class.forName(UGI_CLASS); - Method ugiGetCurrentUserMethod = ugiClz.getDeclaredMethod("getCurrentUser"); - Object ugiCurrentUser = ugiGetCurrentUserMethod.invoke(null); + Object ugiCurrentUser = + DynMethods.builder("getCurrentUser") + .hiddenImpl(Class.forName(UGI_CLASS)) + .buildStaticChecked() + .invoke(); LOG.debug("The user credential is {}", ugiCurrentUser); - Field ugiSubjectField = ugiCurrentUser.getClass().getDeclaredField("subject"); - ugiSubjectField.setAccessible(true); - subject = (Subject) ugiSubjectField.get(ugiCurrentUser); + subject = + DynFields.builder() + .hiddenImpl(ugiCurrentUser.getClass(), "subject") + .buildChecked(ugiCurrentUser) + .get(); } catch (ClassNotFoundException e) { // TODO do kerberos authentication using JDK class directly LOG.error("Hadoop UGI class {} is required for SPNEGO authentication.", UGI_CLASS); diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala index 17ad69524..da4c8e4a9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala @@ -21,6 +21,7 @@ import scala.util.control.NonFatal import org.apache.kyuubi.KyuubiException import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.util.reflect.DynConstructors private[kyuubi] object PluginLoader { @@ -31,8 +32,7 @@ private[kyuubi] object PluginLoader { } try { - Class.forName(advisorClass.get).getConstructor().newInstance() - .asInstanceOf[SessionConfAdvisor] + DynConstructors.builder.impl(advisorClass.get).buildChecked[SessionConfAdvisor].newInstance() } catch { case _: ClassCastException => throw new KyuubiException( @@ -45,8 +45,7 @@ private[kyuubi] object PluginLoader { def loadGroupProvider(conf: KyuubiConf): GroupProvider = { val groupProviderClass = conf.get(KyuubiConf.GROUP_PROVIDER) try { - Class.forName(groupProviderClass).getConstructor().newInstance() - .asInstanceOf[GroupProvider] + DynConstructors.builder().impl(groupProviderClass).buildChecked[GroupProvider]().newInstance() } catch { case _: ClassCastException => throw new KyuubiException( diff --git a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala index 6e306e371..5ded0af2c 100644 --- a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala +++ b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/reflect/ReflectUtils.scala @@ -21,7 +21,7 @@ import java.util.ServiceLoader import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import scala.util.{Failure, Success, Try} +import scala.util.Try object ReflectUtils { /** @@ -37,32 +37,56 @@ object ReflectUtils { DynClasses.builder().loader(cl).impl(className).buildChecked() }.isSuccess - def getFieldVal[T](target: Any, fieldName: String): T = - Try { - DynFields.builder().hiddenImpl(target.getClass, fieldName).build[T]().get(target) - } match { - case Success(value) => value - case Failure(e) => - val candidates = target.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]") - throw new RuntimeException(s"$fieldName not in ${target.getClass} $candidates", e) - } - - def getFieldValOpt[T](target: Any, name: String): Option[T] = - Try(getFieldVal[T](target, name)).toOption - - def invoke(target: AnyRef, methodName: String, args: (Class[_], AnyRef)*): AnyRef = + /** + * get the field value of the given object + * @param target the target object + * @param fieldName the field name from declared field names + * @tparam T the expected return class type + * @return + */ + def getField[T](target: Any, fieldName: String): T = { + val targetClass = target.getClass try { - val (types, values) = args.unzip - DynMethods.builder(methodName).hiddenImpl(target.getClass, types: _*).build() - .invoke(target, values: _*) + DynFields.builder() + .hiddenImpl(targetClass, fieldName) + .buildChecked[T](target) + .get() } catch { - case e: NoSuchMethodException => - val candidates = target.getClass.getMethods.map(_.getName).mkString("[", ",", "]") - throw new RuntimeException(s"$methodName not in ${target.getClass} $candidates", e) + case e: Exception => + val candidates = targetClass.getDeclaredFields.map(_.getName).sorted + throw new RuntimeException( + s"Field $fieldName not in $targetClass [${candidates.mkString(",")}]", + e) } + } - def invokeAs[T](target: AnyRef, methodName: String, args: (Class[_], AnyRef)*): T = - invoke(target, methodName, args: _*).asInstanceOf[T] + /** + * Invoke a method with the given name and arguments on the given target object. + * @param target the target object + * @param methodName the method name from declared field names + * @param args pairs of class and values for the arguments + * @tparam T the expected return class type, + * returning type Nothing if it's not provided or inferable + * @return + */ + def invokeAs[T](target: AnyRef, methodName: String, args: (Class[_], AnyRef)*): T = { + val targetClass = target.getClass + val argClasses = args.map(_._1) + try { + DynMethods.builder(methodName) + .hiddenImpl(targetClass, argClasses: _*) + .buildChecked(target) + .invoke[T](args.map(_._2): _*) + } catch { + case e: Exception => + val candidates = targetClass.getDeclaredMethods.map(_.getName).sorted + val argClassesNames = argClasses.map(_.getClass.getName) + throw new RuntimeException( + s"Method $methodName (${argClassesNames.mkString(",")})" + + s" not found in $targetClass [${candidates.mkString(",")}]", + e) + } + } /** * Creates a iterator for with a new service loader for the given service type and class diff --git a/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala b/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala index dfaa1f412..ae4651fe2 100644 --- a/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala +++ b/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala @@ -19,9 +19,11 @@ package org.apache.kyuubi.tools import java.io.File import java.nio.file.Files +import java.util.{Map => JMap} import java.util.UUID import org.apache.kyuubi.{KyuubiFunSuite, Utils} +import org.apache.kyuubi.util.reflect.ReflectUtils._ class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite { import KubernetesSparkBlockCleanerConstants._ @@ -83,10 +85,7 @@ class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite { } private def updateEnv(name: String, value: String): Unit = { - val env = System.getenv - val field = env.getClass.getDeclaredField("m") - field.setAccessible(true) - field.get(env).asInstanceOf[java.util.Map[String, String]].put(name, value) + getField[JMap[String, String]](System.getenv, "m").put(name, value) } test("test clean") {