diff --git a/build/kyuubi-build-info b/build/kyuubi-build-info
index 197271286..3917c701c 100755
--- a/build/kyuubi-build-info
+++ b/build/kyuubi-build-info
@@ -25,7 +25,7 @@ echo_build_properties() {
echo kyuubi_version=$1
echo spark_version=$2
echo user="$USER"
- echo jar=kyuubi-$1.jar
+ echo jar=kyuubi-server-$1.jar
echo revision=$(git rev-parse HEAD)
echo branch=$(git rev-parse --abbrev-ref HEAD)
echo date=$(date -u +%Y-%m-%dT%H:%M:%SZ)
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 9c900b00d..29cad4c2a 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -166,6 +166,20 @@
run
+
+ fake-jar-test
+ generate-test-resources
+
+
+
+
+
+
+
+
+ run
+
+
@@ -288,7 +302,7 @@
-Xmx3g -Xss4m -XX:ReservedCodeCacheSize=512m
- ${project.build.testOutputDirectory}/kyuubi-fake.jar
+ ${project.build.testOutputDirectory}/kyuubi-server-${version}.jar
${project.build.directory}/tmp
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/deploy/yarn/KyuubiDistributedCacheManager.scala b/kyuubi-server/src/main/scala/org/apache/spark/deploy/yarn/KyuubiDistributedCacheManager.scala
index 0fd0a47be..0aa4754b7 100644
--- a/kyuubi-server/src/main/scala/org/apache/spark/deploy/yarn/KyuubiDistributedCacheManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/spark/deploy/yarn/KyuubiDistributedCacheManager.scala
@@ -54,6 +54,6 @@ object KyuubiDistributedCacheManager {
link: String,
statCache: Map[URI, FileStatus]): Unit = {
cacheManager.addResource(fs, conf, destPath,
- localResources, resourceType, link, statCache, true)
+ localResources, resourceType, link, statCache, appMasterOnly = true)
}
}
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiAppMaster.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiAppMaster.scala
index 33006d2dc..2ae6930c7 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiAppMaster.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiAppMaster.scala
@@ -39,6 +39,9 @@ import yaooqinn.kyuubi.ha.HighAvailabilityUtils
import yaooqinn.kyuubi.server.KyuubiServer
import yaooqinn.kyuubi.service.CompositeService
+/**
+ * The ApplicationMaster which runs Kyuubi Server inside it.
+ */
class KyuubiAppMaster private(name: String) extends CompositeService(name)
with Logging {
@@ -206,4 +209,4 @@ object KyuubiAppMaster extends Logging {
System.exit(-1)
}
}
-}
\ No newline at end of file
+}
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClient.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClient.scala
index 44350fa8c..94511df07 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClient.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClient.scala
@@ -42,15 +42,15 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.YarnException
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.deploy.yarn.KyuubiDistributedCacheManager
import yaooqinn.kyuubi.{Logging, _}
-import yaooqinn.kyuubi.service.ServiceException
/**
- * A Yarn Client for submiting Kyuubi On Yarn,
+ * A Yarn Client for submitting Kyuubi towards Yarn
* @param conf SparkConf
*/
private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
@@ -62,8 +62,8 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
yarnClient.init(hadoopConf)
yarnClient.start()
- private[this] val memory = conf.getSizeAsMb(KyuubiSparkUtil.DRIVER_MEM, "1024m").toInt
- private[this] val memoryOverhead =
+ private[this] var memory = conf.getSizeAsMb(KyuubiSparkUtil.DRIVER_MEM, "1024m").toInt
+ private[this] var memoryOverhead =
conf.getSizeAsMb(KyuubiSparkUtil.DRIVER_MEM_OVERHEAD, (memory * 0.1).toInt + "m").toInt
private[this] val cores = conf.getInt(KyuubiSparkUtil.DRIVER_CORES, 1)
private[this] val principal = conf.get(KyuubiSparkUtil.PRINCIPAL, "")
@@ -80,10 +80,21 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
private[this] var appId: ApplicationId = _
private[this] var appStagingDir: Path = _
+ @throws[IOException]
+ @throws[YarnException]
def submit(): Unit = {
try {
val app = yarnClient.createApplication()
val appRes = app.getNewApplicationResponse
+ val maxMemory = appRes.getMaximumResourceCapability.getMemory
+ if (memory + memoryOverhead > maxMemory) {
+ warn(s"Requesting AM memory(${memory}m + ${memoryOverhead}m) exceeds cluster's limit")
+ memory = (maxMemory * 0.9).toInt
+ memoryOverhead = (maxMemory * 0.1).toInt
+ warn(s"Adjusting Kyuubi container to ${memory}B heap and ${memoryOverhead}m overhead.")
+ } else {
+ info(s"Will allocate Kyuubi container with ${memory}m heap and ${memoryOverhead}m overhead")
+ }
appId = appRes.getApplicationId
appStagingDir = new Path(stagingHome, buildPath(KYUUBI_STAGING, appId.toString))
val containerContext = createContainerLaunchContext()
@@ -126,6 +137,12 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
}
}
+ /**
+ * Describe container information and create the context for launching Application Master
+ *
+ * @return ContainerLaunchContext which contains all information for NodeManager to launch
+ * Application Master
+ */
private[this] def createContainerLaunchContext(): ContainerLaunchContext = {
val launchEnv = setupLaunchEnv()
val localResources = prepareLocalResources()
@@ -188,16 +205,18 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
appContext.setResource(capability)
appContext
}
+
/**
- * Set up the environment for launching our AM container runs Kyuubi Server.
+ * Set up the environment for launching the ApplicationMaster which wraps Kyuubi Server.
*/
- private[this] def setupLaunchEnv(): ENV = {
+ private[this] def setupLaunchEnv(): EnvMap = {
info("Setting up the launch environment for our Kyuubi Server container")
- val env = new ENV
+ val env = new EnvMap
populateClasspath(hadoopConf, env)
env("KYUUBI_YARN_STAGING_DIR") = appStagingDir.toString
+ // Point to Localized Spark jars
env("KYUUBI_YARN_SPARK_JARS") = buildPath(Environment.PWD.$$(), SPARK_LIB_DIR, "*")
- val amEnvPrefix = "spark.yarn.appMasterEnv."
+ val amEnvPrefix = "spark.kyuubi.yarn.appMasterEnv."
conf.getAll
.filter(_._1.startsWith(amEnvPrefix))
.map { case (k, v) => (k.stripPrefix(amEnvPrefix), v) }
@@ -205,35 +224,32 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
env
}
- private[this] def prepareLocalResources(): RESOURCES = {
+ @throws[IOException]
+ private[this] def prepareLocalResources(): HashMap[String, LocalResource] = {
info("Preparing resources for our AM container to start Kyuubi Server")
val fs = appStagingDir.getFileSystem(hadoopConf)
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
debug("Delegation token renewer is: " + tokenRenewer)
-
if (tokenRenewer == null || tokenRenewer.length() == 0) {
- val errorMessage = "Can't get Master Kerberos principal for use as renewer."
- error(errorMessage)
- throw new ServiceException(errorMessage)
+ error("Can't get any Master Kerberos principal for use as renewer.")
+ } else {
+ fs.addDelegationTokens(tokenRenewer, creds)
}
- fs.addDelegationTokens(tokenRenewer, creds)
- val localResources = new RESOURCES
+ val localResources = new HashMap[String, LocalResource]
FileSystem.mkdirs(fs, appStagingDir, new FsPermission(STAGING_DIR_PERMISSION))
val symlinkCache: Map[URI, Path] = HashMap[URI, Path]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
/**
- * Distribute a file to the cluster.
+ * Copy the non-local file to HDFS (if not already there) and add it to the Application Master's
+ * local resources
*
- * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
- * to HDFS (if not already there) and added to the application's distributed cache.
- *
- * @param path URI of the file to distribute.
- * @param resType Type of resource being distributed.
- * @param destName Name of the file in the distributed cache.
+ * @param path the canonical path of the file.
+ * @param resType the type of resource being distributed.
+ * @param destName the of the file in the distributed cache.
* @param targetDir Subdirectory where to place the file.
*/
- def distribute(
+ def upload(
path: String,
resType: LocalResourceType = LocalResourceType.FILE,
destName: Option[String] = None,
@@ -251,12 +267,13 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
}
}
+ // Upload keytab if exists
if (loginFromKeytab) {
- distribute(keytabOrigin, destName = Option(keytabForAM))
+ upload(keytabOrigin, destName = Option(keytabForAM))
}
// Add KYUUBI jar to the cache
- distribute(kyuubiJar)
+ upload(kyuubiJar)
// Add Spark to the cache
val jarsDir = new File(KyuubiSparkUtil.SPARK_JARS_DIR)
@@ -277,7 +294,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
jarsStream.close()
}
- distribute(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE,
+ upload(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE,
destName = Some(SPARK_LIB_DIR))
jarsArchive.delete()
@@ -334,15 +351,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
try {
confStream.setLevel(0)
- // Upload $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
- // the executors will use the latest configurations instead of the default values. This is
- // required when user changes log4j.properties directly to set the log configurations. If
- // configuration file is provided through --files then executors will be taking configurations
- // from --files instead of $SPARK_CONF_DIR/log4j.properties.
-
- // Also upload metrics.properties to distributed cache if exists in classpath.
- // If user specify this file using --files then executors will use the one
- // from --files instead.
+ // 1. log4j and metrics
for { prop <- Seq("log4j.properties", "metrics.properties")
url <- Option(KyuubiSparkUtil.getContextOrSparkClassLoader.getResource(prop))
if url.getProtocol == "file" } {
@@ -352,8 +361,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
confStream.closeEntry()
}
- // Save the Hadoop config files under a separate directory in the archive. This directory
- // is appended to the classpath so that the cluster-provided configuration takes precedence.
+ // 2. hadoop/hive and other xml configurations
confStream.putNextEntry(new ZipEntry(s"$HADOOP_CONF_DIR/"))
confStream.closeEntry()
hadoopConfFiles.foreach { case (name, file) =>
@@ -364,6 +372,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
}
}
+ // 3. spark conf
val props = new Properties()
conf.getAll.foreach { case (k, v) =>
props.setProperty(k, v)
@@ -386,7 +395,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
* Copy the given file to a remote file system (e.g. HDFS) if needed.
* The file is only copied if the source and destination file systems are different or the source
* scheme is "file". This is used for preparing resources for launching the ApplicationMaster
- * container. Exposed for testing.
+ * container.
*/
private[this] def copyFileToRemote(destDir: Path, srcPath: Path,
cache: Map[URI, Path], destName: Option[String] = None): Path = {
@@ -410,7 +419,14 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
object KyuubiYarnClient {
- private val kyuubiJar = System.getenv("KYUUBI_JAR")
+ /**
+ * Load kyuubi server jar, choose the specified kyuubi jar first, try the default compiled one
+ * if not found
+ */
+ private val kyuubiJar =
+ Option(System.getenv("KYUUBI_JAR"))
+ .getOrElse(KyuubiSparkUtil.getContextOrSparkClassLoader
+ .getResource(KYUUBI_JAR_NAME).getPath)
/**
* Given a local URI, resolve it and return a qualified local path that corresponds to the URI.
@@ -439,7 +455,7 @@ object KyuubiYarnClient {
* Add a path variable to the given environment map.
* If the map already contains this key, append the value to the existing value instead.
*/
- def addPathToEnvironment(env: ENV, key: String, value: String): Unit = {
+ def addPathToEnvironment(env: EnvMap, key: String, value: String): Unit = {
val newValue =
if (env.contains(key)) {
env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value
@@ -453,10 +469,10 @@ object KyuubiYarnClient {
* Add the given path to the classpath entry of the given environment map.
* If the classpath is already set, this appends the new path to the existing classpath.
*/
- def addClasspathEntry(path: String, env: ENV): Unit =
+ def addClasspathEntry(path: String, env: EnvMap): Unit =
addPathToEnvironment(env, Environment.CLASSPATH.name, path)
- def populateClasspath(conf: Configuration, env: ENV): Unit = {
+ def populateClasspath(conf: Configuration, env: EnvMap): Unit = {
addClasspathEntry(Environment.PWD.$$(), env)
addClasspathEntry(buildPath(Environment.PWD.$$(), SPARK_CONF_DIR), env)
addClasspathEntry(buildPath(Environment.PWD.$$(), new File(kyuubiJar).getName), env)
@@ -469,7 +485,7 @@ object KyuubiYarnClient {
* Populate the classpath entry in the given environment map with any application
* classpath specified through the Hadoop and Yarn configurations.
*/
- private[yarn] def populateHadoopClasspath(conf: Configuration, env: ENV): Unit = {
+ private[yarn] def populateHadoopClasspath(conf: Configuration, env: EnvMap): Unit = {
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
classPathElementsToAdd.foreach { c =>
addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/package.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/package.scala
index 5e3cd9b4e..386075eb8 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/package.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/package.scala
@@ -20,12 +20,10 @@ package yaooqinn.kyuubi
import scala.collection.mutable.HashMap
import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.yarn.api.records.LocalResource
package object yarn {
- type ENV = HashMap[String, String]
- type RESOURCES = HashMap[String, LocalResource]
+ type EnvMap = HashMap[String, String]
val KYUUBI_YARN_APP_NAME = "KYUUBI SERVER"
val KYUUBI_YARN_APP_TYPE = "KYUUBI"
diff --git a/kyuubi-server/src/test/resources/kyuubi-fake.jar b/kyuubi-server/src/test/resources/kyuubi-fake.jar
deleted file mode 100644
index e69de29bb..000000000
diff --git a/kyuubi-server/src/test/resources/log4j.xml b/kyuubi-server/src/test/resources/log4j.xml
index b53d8cc5a..7a3e6b218 100644
--- a/kyuubi-server/src/test/resources/log4j.xml
+++ b/kyuubi-server/src/test/resources/log4j.xml
@@ -23,7 +23,7 @@
-
+
diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClientSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClientSuite.scala
index 386e2ad22..080524dec 100644
--- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClientSuite.scala
+++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClientSuite.scala
@@ -17,6 +17,7 @@
package yaooqinn.kyuubi.yarn
+
import java.net.URI
import scala.collection.mutable
@@ -26,7 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
-import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationSubmissionContext}
+import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationSubmissionContext, Resource}
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
@@ -34,7 +35,7 @@ import org.mockito.Mockito.{doNothing, when}
import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar
-import yaooqinn.kyuubi.service.ServiceException
+import yaooqinn.kyuubi.KYUUBI_JAR_NAME
import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSugar {
@@ -90,7 +91,7 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
val cp = classpath(env)
cp should contain("{{PWD}}")
cp should contain(Environment.PWD.$$() + Path.SEPARATOR + SPARK_CONF_DIR)
- cp should contain(KyuubiYarnClient.buildPath(Environment.PWD.$$(), "kyuubi-fake.jar"))
+ cp should contain(KyuubiYarnClient.buildPath(Environment.PWD.$$(), KYUUBI_JAR_NAME))
cp should contain(KyuubiYarnClient.buildPath(Environment.PWD.$$(), SPARK_LIB_DIR, "*"))
cp should contain(yarnDefCP.head)
cp should contain(mrDefCP.head)
@@ -124,7 +125,6 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
val uri3 = KyuubiSparkUtil.resolveURI("hdfs://1")
val path3 = KyuubiYarnClient.getQualifiedLocalPath(uri3, conf)
path3.toUri should be(uri3)
-
}
test("kyuubi yarn client init") {
@@ -140,30 +140,47 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
assert(ReflectUtils.getFieldValue(client2, "loginFromKeytab").asInstanceOf[Boolean])
}
- /**
- * test with mavne. not ide
- */
- test("submit application") {
- val conf = new SparkConf()
- val client = new KyuubiYarnClient(conf)
- val yarnClient = mock[YarnClient]
- ReflectUtils.setFieldValue(client, "yarnClient", yarnClient)
- doNothing().when(yarnClient).start()
- val app = mock[YarnClientApplication]
- when(yarnClient.createApplication()).thenReturn(app)
- intercept[NullPointerException](client.submit()) // app res = null
- val appRes = mock[GetNewApplicationResponse]
- when(app.getNewApplicationResponse).thenReturn(appRes)
- intercept[NullPointerException](client.submit()) // appid = null
- val appId = mock[ApplicationId]
- when(appRes.getApplicationId).thenReturn(appId)
- when(appId.toString).thenReturn("appId1")
- val jarName = "kyuubi-fake.jar"
- val kyuubiJar = this.getClass.getClassLoader.getResource(jarName).getPath
- ReflectUtils.setFieldValue(KyuubiSparkUtil, "SPARK_JARS_DIR", kyuubiJar.stripSuffix(jarName))
- val appContext = mock[ApplicationSubmissionContext]
- when(app.getApplicationSubmissionContext).thenReturn(appContext)
- intercept[ServiceException](client.submit())
+ test("submit with exceeded memory") {
+ withClient { (c, kc) =>
+ val app = mock[YarnClientApplication]
+ when(c.createApplication()).thenReturn(app)
+ val appRes = mock[GetNewApplicationResponse]
+ when(app.getNewApplicationResponse).thenReturn(appRes)
+ val resource = mock[Resource]
+ when(appRes.getMaximumResourceCapability).thenReturn(resource)
+ when(resource.getMemory).thenReturn(10)
+ val appId = mock[ApplicationId]
+ when(appRes.getApplicationId).thenReturn(appId)
+ when(appId.toString).thenReturn("appId1")
+ val appContext = mock[ApplicationSubmissionContext]
+ when(app.getApplicationSubmissionContext).thenReturn(appContext)
+ kc.submit()
+ ReflectUtils.getFieldValue(kc, "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memory") should be(9)
+ ReflectUtils.getFieldValue(kc,
+ "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memoryOverhead") should be(1)
+ }
+ }
+
+ test("submit with suitable memory") {
+ withClient { (c, kc) =>
+ val app = mock[YarnClientApplication]
+ when(c.createApplication()).thenReturn(app)
+ val appRes = mock[GetNewApplicationResponse]
+ when(app.getNewApplicationResponse).thenReturn(appRes)
+ val resource = mock[Resource]
+ when(appRes.getMaximumResourceCapability).thenReturn(resource)
+ when(resource.getMemory).thenReturn(2048 *1024)
+ val appId = mock[ApplicationId]
+ when(appRes.getApplicationId).thenReturn(appId)
+ when(appId.toString).thenReturn("appId1")
+ val appContext = mock[ApplicationSubmissionContext]
+ when(app.getApplicationSubmissionContext).thenReturn(appContext)
+ kc.submit()
+ ReflectUtils.getFieldValue(kc,
+ "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memory") should be(1024)
+ ReflectUtils.getFieldValue(kc,
+ "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memoryOverhead") should be(102)
+ }
}
def withConf(map: Map[String, String] = Map.empty)(testCode: Configuration => Any): Unit = {
@@ -172,6 +189,18 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
testCode(conf)
}
+ def withClient(f: (YarnClient, KyuubiYarnClient) => Any): Unit = {
+ val conf = new SparkConf()
+ val client = new KyuubiYarnClient(conf)
+ val yarnClient = mock[YarnClient]
+ ReflectUtils.setFieldValue(client, "yarnClient", yarnClient)
+ doNothing().when(yarnClient).start()
+ val kyuubiJar = this.getClass.getClassLoader.getResource(KYUUBI_JAR_NAME).getPath
+ ReflectUtils.setFieldValue(KyuubiSparkUtil,
+ "SPARK_JARS_DIR", kyuubiJar.stripSuffix(KYUUBI_JAR_NAME))
+ f(yarnClient, client)
+ }
+
def classpath(env: mutable.HashMap[String, String]): Array[String] =
env(Environment.CLASSPATH.name).split(":|;|")
}