add kyuubi submit

This commit is contained in:
Kent Yao 2018-05-20 21:42:25 +08:00
parent 756e9b7f21
commit f9d6188f39
13 changed files with 455 additions and 285 deletions

View File

@ -17,20 +17,7 @@
# limitations under the License.
#
# Runs a Spark command as a daemon.
#
# Environment Variables
#
# SPARK_CONF_DIR Alternate conf dir. Default is ${SPARK_HOME}/conf.
# SPARK_LOG_DIR Where log files are stored. ${SPARK_HOME}/logs by default.
# SPARK_MASTER host:path where spark code should be rsync'd from
# SPARK_PID_DIR The pid files are stored. /tmp by default.
# SPARK_IDENT_STRING A string representing this instance of spark. $USER by default
# SPARK_NICENESS The scheduling priority for daemons. Defaults to 0.
# SPARK_NO_DAEMONIZE If set, will run the proposed command in the foreground. It will not output a PID file.
##
usage="Usage: spark-daemon.sh (start|stop) <spark-command> <spark-instance-number> <args...>"
usage="Usage: kyuubi-daemon.sh (start|stop) <kyuubi main class> <instance-number> <args...>"
. "${SPARK_HOME}/sbin/spark-config.sh"
@ -107,7 +94,7 @@ case ${option} in
rotate_log "$log"
echo "starting $command, logging to $log"
execute_command bash "${KYUUBI_HOME}"/bin/kyuubi-class.sh org.apache.spark.KyuubiSubmit --class "$command" "$@"
execute_command bash "${KYUUBI_HOME}"/bin/kyuubi-class.sh --class "$command" "$@"
;;
(stop)

View File

@ -1,117 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher;
import java.util.*;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
public class KyuubiMain {
/**
* Usage: Main [class] [class args]
*/
public static void main(String[] argsArray) throws Exception {
checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
List<String> args = new ArrayList<>(Arrays.asList(argsArray));
String className = args.remove(0);
AbstractCommandBuilder builder;
try {
builder = new SparkSubmitCommandBuilder(args);
} catch (IllegalArgumentException e) {
System.err.println("Error: " + e.getMessage());
System.err.println();
MainClassOptionParser parser = new MainClassOptionParser();
try {
parser.parse(args);
} catch (Exception ignored) {
// Ignore parsing exceptions.
}
List<String> help = new ArrayList<>();
if (parser.className != null) {
help.add(parser.CLASS);
help.add(parser.className);
}
help.add(parser.USAGE_ERROR);
builder = new SparkSubmitCommandBuilder(help);
}
Map<String, String> env = new HashMap<>();
List<String> cmd = builder.buildCommand(env);
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
// In bash, use NULL as the arg separator since it cannot be used in an argument.
List<String> bashCmd = prepareBashCommand(cmd, env);
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
}
}
/**
* Prepare the command for execution from a bash script. The final command will have commands to
* set up any needed environment variables needed by the child process.
*/
private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
if (childEnv.isEmpty()) {
return cmd;
}
List<String> newCmd = new ArrayList<>();
newCmd.add("env");
for (Map.Entry<String, String> e : childEnv.entrySet()) {
newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
}
newCmd.addAll(cmd);
return newCmd;
}
/**
* A parser used when command line parsing fails for spark-submit. It's used as a best-effort
* at trying to identify the class the user wanted to invoke, since that may require special
* usage strings (handled by SparkSubmitArguments).
*/
private static class MainClassOptionParser extends SparkSubmitOptionParser {
String className;
@Override
protected boolean handle(String opt, String value) {
if (CLASS.equals(opt)) {
className = value;
}
return false;
}
@Override
protected boolean handleUnknown(String opt) {
return false;
}
@Override
protected void handleExtraArgs(List<String> extra) {
}
}
}

View File

@ -1,60 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.net.URL
import java.util.Enumeration
import scala.collection.JavaConverters._
import org.apache.spark.util.{MutableURLClassLoader, ParentClassLoader}
/**
* A Copy of ChildFirstURLClassLoader from Spark, renamed for avoiding patten match
* It is used to overwrite some Spark class in runtime.
*/
private[spark] class KyuubiFirstClassLoader(urls: Array[URL], parent: ClassLoader)
extends MutableURLClassLoader(urls, null) {
private val parentClassLoader = new ParentClassLoader(parent)
override def loadClass(name: String, resolve: Boolean): Class[_] = {
try {
super.loadClass(name, resolve)
} catch {
case _: ClassNotFoundException =>
parentClassLoader.loadClass(name, resolve)
}
}
override def getResource(name: String): URL = {
val url = super.findResource(name)
val res = if (url != null) url else parentClassLoader.getResource(name)
res
}
override def getResources(name: String): Enumeration[URL] = {
val childUrls = super.findResources(name).asScala
val parentUrls = parentClassLoader.getResources(name).asScala
(childUrls ++ parentUrls).asJavaEnumeration
}
override def addURL(url: URL) {
super.addURL(url)
}
}

View File

@ -27,7 +27,6 @@ import org.apache.spark.util.{ShutdownHookManager, Utils, VersionUtils}
import org.slf4j.Logger
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.utils.ReflectUtils
/**
* Wrapper for [[Utils]] and [[SparkHadoopUtil]]
@ -72,18 +71,6 @@ object KyuubiSparkUtil extends Logging {
// Runtime Spark Version
val SPARK_VERSION = org.apache.spark.SPARK_VERSION
lazy val kyuubiFirstClassLoader: KyuubiFirstClassLoader = {
// get kyuubi jar
val url = this.getClass.getProtectionDomain.getCodeSource.getLocation
info(s"Initializing KyuubiFirstClassLoader instance with url $url as first class members")
val classLoader = new KyuubiFirstClassLoader(Array(url), getContextOrSparkClassLoader())
Thread.currentThread().setContextClassLoader(classLoader)
val envCls = classLoader.loadClass("org.apache.spark.SparkEnv", true)
ReflectUtils.invokeStaticMethod(envCls, "get")
val contextCls = classLoader.loadClass("org.apache.spark.SparkContext", true)
classLoader
}
def addShutdownHook(f: () => Unit): Unit = {
ShutdownHookManager.addShutdownHook(f)
}

View File

@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import java.io.{File, PrintStream}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Properties
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
/**
* Kyuubi version of SparkSubmit
*/
object KyuubiSubmit {
// Cluster managers
private val YARN = 1
// Deploy modes
private val CLIENT = 1
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
// scalastyle:off println
// Exposed for testing
private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
private[spark] var printStream: PrintStream = System.err
private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
private[spark] def printErrorAndExit(str: String): Unit = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")
exitFn(1)
}
private[spark] def printVersionAndExit(): Unit = {
printStream.println("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
printStream.println("Using Scala %s, %s, %s".format(
Properties.versionString, Properties.javaVmName, Properties.javaVersion))
printStream.println("Branch %s".format(SPARK_BRANCH))
printStream.println("Compiled by user %s on %s".format(SPARK_BUILD_USER, SPARK_BUILD_DATE))
printStream.println("Revision %s".format(SPARK_REVISION))
printStream.println("Url %s".format(SPARK_REPO_URL))
printStream.println("Type --help for more information.")
exitFn(0)
}
// scalastyle:on println
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
submit(appArgs)
}
/**
* Submit the application using the provided parameters.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*/
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
/**
* Prepare the environment for submitting an application.
* This returns a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
* Exposed for testing.
*/
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
val childMainClass = args.mainClass
args.master match {
case "yarn" =>
case "yarn-client" =>
printWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
args.master = "yarn"
case _ => printErrorAndExit("Kyuubi only supports yarn as master.")
}
args.deployMode match {
case "client" =>
case _ => printWarning("Kyuubi only supports client mode.")
args.deployMode = "client"
}
// Make sure YARN is included in our build if we're trying to use it
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client")) {
printErrorAndExit(
"Could not load YARN classes. Spark may not have been compiled with YARN support.")
}
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
Seq[OptionAssigner](
OptionAssigner(args.master, YARN, CLIENT, sysProp = "spark.master"),
OptionAssigner(args.deployMode, YARN, CLIENT, sysProp = "spark.submit.deployMode"),
OptionAssigner(args.name, YARN, CLIENT, sysProp = "spark.app.name"),
OptionAssigner(args.driverMemory, YARN, CLIENT, sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, YARN, CLIENT,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, YARN, CLIENT,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, YARN, CLIENT,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, CLIENT,
sysProp = "spark.executor.instances"),
OptionAssigner(args.jars, YARN, CLIENT, sysProp = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"),
OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, YARN, CLIENT, sysProp = "spark.executor.memory")
).foreach(opt => sysProps.put(opt.sysProp, opt.value))
childClasspath += args.primaryResource
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
if (args.childArgs != null) { childArgs ++= args.childArgs }
val jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty) ++
Seq(args.primaryResource)
sysProps.put("spark.jars", jars.mkString(","))
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
} else {
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
}
// Resolve paths in certain spark properties
val pathConfigs = Seq(
"spark.jars",
"spark.files",
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist
sysProps.get(config).foreach { oldValue =>
sysProps(config) = Utils.resolveURIs(oldValue)
}
}
(childArgs, childClasspath, sysProps, childMainClass)
}
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
val url = this.getClass.getProtectionDomain.getCodeSource.getLocation
val loader = new ChildFirstURLClassLoader(Array(url),
Thread.currentThread.getContextClassLoader)
Thread.currentThread.setContextClassLoader(loader)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause != null) findCause(e.getCause) else e
case e: InvocationTargetException =>
if (e.getCause != null) findCause(e.getCause) else e
case e: Throwable =>
e
}
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
val file = new File(uri.getPath)
if (file.exists()) {
loader.addURL(file.toURI.toURL)
} else {
printWarning(s"Local jar $file does not exist, skipping.")
}
case _ =>
printWarning(s"Skip remote jar $uri.")
}
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher
import java.util.{ArrayList => JAList, List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import org.apache.spark.launcher.CommandBuilderUtils._
object KyuubiMain {
@throws[Exception]
def main(args: Array[String]): Unit = {
val argAsJava = args.toList.asJava
val printStream = System.err
val builder = Try { new KyuubiSubmitCommandBuilder(argAsJava) } match {
case Success(b) => b
case Failure(e) =>
// scalastyle:off
printStream.println("Error: " + e.getMessage)
printStream.println()
// scalastyle:on
val parser = new MainClassOptionParser
try {
parser.parse(argAsJava)
} catch {
case _: Throwable => // ignored
}
val help = new ArrayBuffer[String]
if (parser.className != null) {
help += parser.CLASS
help += parser.className
}
help += parser.USAGE_ERROR
new SparkSubmitCommandBuilder(help.toList.asJava)
}
val env = Map.empty[String, String].asJava
val cmd = builder.buildCommand(env)
// scalastyle:off
printStream.println("Kyuubi Command: " + join(" ", cmd))
printStream.println("========================================")
// scalastyle:on
prepareBashCommand(cmd, env).asScala.foreach { c =>
print(c)
print('\u0000')
}
}
private[this] def prepareBashCommand(
cmd: JList[String],
childEnv: JMap[String, String]): JList[String] = {
if (childEnv.isEmpty) return cmd
val newCmd = new JAList[String]
newCmd.add("env")
childEnv.asScala.foreach(e => newCmd.add(String.format("%s=%s", e._1, e._2)))
newCmd.addAll(cmd)
newCmd
}
private class MainClassOptionParser extends SparkSubmitOptionParser {
private[launcher] var className: String = _
override protected def handle(opt: String, value: String): Boolean = {
if (CLASS == opt) {
className = value
}
false
}
override protected def handleUnknown(opt: String) = false
override protected def handleExtraArgs(extra: JList[String]): Unit = {}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.launcher
import java.util.{List => JList, Map => JMap}
import org.apache.spark.launcher.CommandBuilderUtils._
class KyuubiSubmitCommandBuilder(args: JList[String]) extends SparkSubmitCommandBuilder(args) {
override def buildCommand(env: JMap[String, String]): JList[String] = {
val config = getEffectiveConfig
val extraClassPath = config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH)
val cmd = buildJavaCommand(extraClassPath)
val driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS)
if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
val msg =
s"""
| Not allowed to specify max heap(Xmx) memory settings through java options:
| $driverExtraJavaOptions
| Use the corresponding --driver-memory or spark.driver.memory configuration instead.
""".stripMargin
throw new IllegalArgumentException(msg)
}
val memory = firstNonEmpty(
config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"),
DEFAULT_MEM)
cmd.add("-Xmx" + memory)
addOptionString(cmd, driverExtraJavaOptions)
mergeEnvPathList(env, getLibPathEnvName, config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH))
addPermGenSizeOpt(cmd)
cmd.add("org.apache.spark.deploy.KyuubiSubmit")
cmd.addAll(buildSparkSubmitArgs)
cmd
}
}

View File

@ -308,7 +308,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
statementId,
session.getUserName)
}
Thread.currentThread().setContextClassLoader(KyuubiSparkUtil.kyuubiFirstClassLoader)
session.sparkSession.sparkContext.setJobGroup(statementId, statement)
result = session.sparkSession.sql(statement)
KyuubiServerMonitor.getListener(session.getUserName).foreach {

View File

@ -35,7 +35,7 @@ package object kyuubi {
repo_url,
build_date) = {
val buildFile = "kyuubi-version-info.properties"
Option(KyuubiSparkUtil.kyuubiFirstClassLoader.getResourceAsStream(buildFile)) match {
Option(Thread.currentThread().getContextClassLoader.getResourceAsStream(buildFile)) match {
case Some(res) =>
try {
val unknown = "<unknown>"

View File

@ -52,7 +52,6 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L
override def run(): Unit = {
try {
promisedSparkContext.trySuccess {
Thread.currentThread().setContextClassLoader(KyuubiSparkUtil.kyuubiFirstClassLoader)
new SparkContext(conf)
}
} catch {

View File

@ -110,7 +110,11 @@ object ReflectUtils extends Logging {
c: Class[_],
name: String,
argTypes: Seq[Class[_]], params: Seq[AnyRef]): Any = {
Try { c.getMethod(name, argTypes: _*).invoke(null, params: _*) } match {
Try {
val method = c.getMethod(name, argTypes: _*)
method.setAccessible(true)
method.invoke(null, params: _*)
} match {
case Success(value) => value
case Failure(exception) => throw exception
}
@ -159,4 +163,15 @@ object ReflectUtils extends Logging {
case Failure(exception) => throw exception
}
}
def setAncestorField(obj: AnyRef, level: Int, fieldName: String, fieldValue: AnyRef) {
val ancestor = Iterator.iterate[Class[_]](obj.getClass)(_.getSuperclass).drop(level).next()
val field = ancestor.getDeclaredField(fieldName)
field.setAccessible(true)
field.set(obj, fieldValue)
}
def setSuperField(obj : Object, fieldName: String, fieldValue: Object) {
setAncestorField(obj, 1, fieldName, fieldValue)
}
}

View File

@ -1,65 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.net.URLClassLoader
class KyuubiFirstClassLoaderSuite extends SparkFunSuite {
val urls1 = List(TestUtils.createJarWithClasses(
classNames = Seq("FakeClass1", "FakeClass2", "FakeClass3"),
toStringValue = "2")).toArray
val urls2 = List(TestUtils.createJarWithClasses(
classNames = Seq("FakeClass1"),
classNamesWithBase = Seq(("FakeClass2", "FakeClass3")), // FakeClass3 is in parent
toStringValue = "1",
classpathUrls = urls1)).toArray
test("kyuubi class loader first") {
val parentLoader = new URLClassLoader(urls1, null)
val classLoader = new KyuubiFirstClassLoader(urls2, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
val fakeClass2 = classLoader.loadClass("FakeClass2").newInstance()
assert(fakeClass.getClass === fakeClass2.getClass)
classLoader.close()
parentLoader.close()
}
test("kyuubi class loader first can fall back") {
val parentLoader = new URLClassLoader(urls1, null)
val classLoader = new KyuubiFirstClassLoader(urls2, parentLoader)
val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
classLoader.close()
parentLoader.close()
}
test("kyuubi class loader first can fail") {
val parentLoader = new URLClassLoader(urls1, null)
val classLoader = new KyuubiFirstClassLoader(urls2, parentLoader)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("FakeClassDoesNotExist").newInstance()
}
classLoader.close()
parentLoader.close()
}
}

View File

@ -54,15 +54,4 @@ class KyuubiSparkUtilSuite extends SparkFunSuite {
assert(KyuubiSparkUtil.minorVersion("2.1.2-SNAPSHOT") === 1)
assert(KyuubiSparkUtil.minorVersion("2.3") === 3)
}
test("get Kyuubi first classloader") {
val classloader = KyuubiSparkUtil.kyuubiFirstClassLoader
assert(classloader.isInstanceOf[MutableURLClassLoader])
assert(classloader !== this.getClass.getClassLoader)
assert(classloader.getURLs().length === 1)
assert(classloader.loadClass(classOf[Test1].getCanonicalName).getClassLoader ===
this.getClass.getClassLoader)
}
}
class Test1