From 248259d84f199ba5e7032c6e65dec3cd56f928fc Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 26 Jul 2018 16:36:16 +0800 Subject: [PATCH] verify resource setting and add some comments --- build/kyuubi-build-info | 2 +- kyuubi-server/pom.xml | 16 ++- .../yarn/KyuubiDistributedCacheManager.scala | 2 +- .../kyuubi/yarn/KyuubiAppMaster.scala | 5 +- .../kyuubi/yarn/KyuubiYarnClient.scala | 102 ++++++++++-------- .../scala/yaooqinn/kyuubi/yarn/package.scala | 4 +- .../src/test/resources/kyuubi-fake.jar | 0 kyuubi-server/src/test/resources/log4j.xml | 2 +- .../kyuubi/yarn/KyuubiYarnClientSuite.scala | 85 ++++++++++----- 9 files changed, 139 insertions(+), 79 deletions(-) delete mode 100644 kyuubi-server/src/test/resources/kyuubi-fake.jar diff --git a/build/kyuubi-build-info b/build/kyuubi-build-info index 197271286..3917c701c 100755 --- a/build/kyuubi-build-info +++ b/build/kyuubi-build-info @@ -25,7 +25,7 @@ echo_build_properties() { echo kyuubi_version=$1 echo spark_version=$2 echo user="$USER" - echo jar=kyuubi-$1.jar + echo jar=kyuubi-server-$1.jar echo revision=$(git rev-parse HEAD) echo branch=$(git rev-parse --abbrev-ref HEAD) echo date=$(date -u +%Y-%m-%dT%H:%M:%SZ) diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml index 9c900b00d..29cad4c2a 100644 --- a/kyuubi-server/pom.xml +++ b/kyuubi-server/pom.xml @@ -166,6 +166,20 @@ run + + fake-jar-test + generate-test-resources + + + + + + + + + run + + @@ -288,7 +302,7 @@ -Xmx3g -Xss4m -XX:ReservedCodeCacheSize=512m - ${project.build.testOutputDirectory}/kyuubi-fake.jar + ${project.build.testOutputDirectory}/kyuubi-server-${version}.jar ${project.build.directory}/tmp diff --git a/kyuubi-server/src/main/scala/org/apache/spark/deploy/yarn/KyuubiDistributedCacheManager.scala b/kyuubi-server/src/main/scala/org/apache/spark/deploy/yarn/KyuubiDistributedCacheManager.scala index 0fd0a47be..0aa4754b7 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/deploy/yarn/KyuubiDistributedCacheManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/deploy/yarn/KyuubiDistributedCacheManager.scala @@ -54,6 +54,6 @@ object KyuubiDistributedCacheManager { link: String, statCache: Map[URI, FileStatus]): Unit = { cacheManager.addResource(fs, conf, destPath, - localResources, resourceType, link, statCache, true) + localResources, resourceType, link, statCache, appMasterOnly = true) } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiAppMaster.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiAppMaster.scala index 33006d2dc..2ae6930c7 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiAppMaster.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiAppMaster.scala @@ -39,6 +39,9 @@ import yaooqinn.kyuubi.ha.HighAvailabilityUtils import yaooqinn.kyuubi.server.KyuubiServer import yaooqinn.kyuubi.service.CompositeService +/** + * The ApplicationMaster which runs Kyuubi Server inside it. + */ class KyuubiAppMaster private(name: String) extends CompositeService(name) with Logging { @@ -206,4 +209,4 @@ object KyuubiAppMaster extends Logging { System.exit(-1) } } -} \ No newline at end of file +} diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClient.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClient.scala index 44350fa8c..94511df07 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClient.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClient.scala @@ -42,15 +42,15 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.YarnException import org.apache.hadoop.yarn.util.Records import org.apache.spark.{KyuubiSparkUtil, SparkConf} import org.apache.spark.deploy.yarn.KyuubiDistributedCacheManager import yaooqinn.kyuubi.{Logging, _} -import yaooqinn.kyuubi.service.ServiceException /** - * A Yarn Client for submiting Kyuubi On Yarn, + * A Yarn Client for submitting Kyuubi towards Yarn * @param conf SparkConf */ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { @@ -62,8 +62,8 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { yarnClient.init(hadoopConf) yarnClient.start() - private[this] val memory = conf.getSizeAsMb(KyuubiSparkUtil.DRIVER_MEM, "1024m").toInt - private[this] val memoryOverhead = + private[this] var memory = conf.getSizeAsMb(KyuubiSparkUtil.DRIVER_MEM, "1024m").toInt + private[this] var memoryOverhead = conf.getSizeAsMb(KyuubiSparkUtil.DRIVER_MEM_OVERHEAD, (memory * 0.1).toInt + "m").toInt private[this] val cores = conf.getInt(KyuubiSparkUtil.DRIVER_CORES, 1) private[this] val principal = conf.get(KyuubiSparkUtil.PRINCIPAL, "") @@ -80,10 +80,21 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { private[this] var appId: ApplicationId = _ private[this] var appStagingDir: Path = _ + @throws[IOException] + @throws[YarnException] def submit(): Unit = { try { val app = yarnClient.createApplication() val appRes = app.getNewApplicationResponse + val maxMemory = appRes.getMaximumResourceCapability.getMemory + if (memory + memoryOverhead > maxMemory) { + warn(s"Requesting AM memory(${memory}m + ${memoryOverhead}m) exceeds cluster's limit") + memory = (maxMemory * 0.9).toInt + memoryOverhead = (maxMemory * 0.1).toInt + warn(s"Adjusting Kyuubi container to ${memory}B heap and ${memoryOverhead}m overhead.") + } else { + info(s"Will allocate Kyuubi container with ${memory}m heap and ${memoryOverhead}m overhead") + } appId = appRes.getApplicationId appStagingDir = new Path(stagingHome, buildPath(KYUUBI_STAGING, appId.toString)) val containerContext = createContainerLaunchContext() @@ -126,6 +137,12 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { } } + /** + * Describe container information and create the context for launching Application Master + * + * @return ContainerLaunchContext which contains all information for NodeManager to launch + * Application Master + */ private[this] def createContainerLaunchContext(): ContainerLaunchContext = { val launchEnv = setupLaunchEnv() val localResources = prepareLocalResources() @@ -188,16 +205,18 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { appContext.setResource(capability) appContext } + /** - * Set up the environment for launching our AM container runs Kyuubi Server. + * Set up the environment for launching the ApplicationMaster which wraps Kyuubi Server. */ - private[this] def setupLaunchEnv(): ENV = { + private[this] def setupLaunchEnv(): EnvMap = { info("Setting up the launch environment for our Kyuubi Server container") - val env = new ENV + val env = new EnvMap populateClasspath(hadoopConf, env) env("KYUUBI_YARN_STAGING_DIR") = appStagingDir.toString + // Point to Localized Spark jars env("KYUUBI_YARN_SPARK_JARS") = buildPath(Environment.PWD.$$(), SPARK_LIB_DIR, "*") - val amEnvPrefix = "spark.yarn.appMasterEnv." + val amEnvPrefix = "spark.kyuubi.yarn.appMasterEnv." conf.getAll .filter(_._1.startsWith(amEnvPrefix)) .map { case (k, v) => (k.stripPrefix(amEnvPrefix), v) } @@ -205,35 +224,32 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { env } - private[this] def prepareLocalResources(): RESOURCES = { + @throws[IOException] + private[this] def prepareLocalResources(): HashMap[String, LocalResource] = { info("Preparing resources for our AM container to start Kyuubi Server") val fs = appStagingDir.getFileSystem(hadoopConf) val tokenRenewer = Master.getMasterPrincipal(hadoopConf) debug("Delegation token renewer is: " + tokenRenewer) - if (tokenRenewer == null || tokenRenewer.length() == 0) { - val errorMessage = "Can't get Master Kerberos principal for use as renewer." - error(errorMessage) - throw new ServiceException(errorMessage) + error("Can't get any Master Kerberos principal for use as renewer.") + } else { + fs.addDelegationTokens(tokenRenewer, creds) } - fs.addDelegationTokens(tokenRenewer, creds) - val localResources = new RESOURCES + val localResources = new HashMap[String, LocalResource] FileSystem.mkdirs(fs, appStagingDir, new FsPermission(STAGING_DIR_PERMISSION)) val symlinkCache: Map[URI, Path] = HashMap[URI, Path]() val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() /** - * Distribute a file to the cluster. + * Copy the non-local file to HDFS (if not already there) and add it to the Application Master's + * local resources * - * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied - * to HDFS (if not already there) and added to the application's distributed cache. - * - * @param path URI of the file to distribute. - * @param resType Type of resource being distributed. - * @param destName Name of the file in the distributed cache. + * @param path the canonical path of the file. + * @param resType the type of resource being distributed. + * @param destName the of the file in the distributed cache. * @param targetDir Subdirectory where to place the file. */ - def distribute( + def upload( path: String, resType: LocalResourceType = LocalResourceType.FILE, destName: Option[String] = None, @@ -251,12 +267,13 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { } } + // Upload keytab if exists if (loginFromKeytab) { - distribute(keytabOrigin, destName = Option(keytabForAM)) + upload(keytabOrigin, destName = Option(keytabForAM)) } // Add KYUUBI jar to the cache - distribute(kyuubiJar) + upload(kyuubiJar) // Add Spark to the cache val jarsDir = new File(KyuubiSparkUtil.SPARK_JARS_DIR) @@ -277,7 +294,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { jarsStream.close() } - distribute(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE, + upload(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE, destName = Some(SPARK_LIB_DIR)) jarsArchive.delete() @@ -334,15 +351,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { try { confStream.setLevel(0) - // Upload $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that - // the executors will use the latest configurations instead of the default values. This is - // required when user changes log4j.properties directly to set the log configurations. If - // configuration file is provided through --files then executors will be taking configurations - // from --files instead of $SPARK_CONF_DIR/log4j.properties. - - // Also upload metrics.properties to distributed cache if exists in classpath. - // If user specify this file using --files then executors will use the one - // from --files instead. + // 1. log4j and metrics for { prop <- Seq("log4j.properties", "metrics.properties") url <- Option(KyuubiSparkUtil.getContextOrSparkClassLoader.getResource(prop)) if url.getProtocol == "file" } { @@ -352,8 +361,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { confStream.closeEntry() } - // Save the Hadoop config files under a separate directory in the archive. This directory - // is appended to the classpath so that the cluster-provided configuration takes precedence. + // 2. hadoop/hive and other xml configurations confStream.putNextEntry(new ZipEntry(s"$HADOOP_CONF_DIR/")) confStream.closeEntry() hadoopConfFiles.foreach { case (name, file) => @@ -364,6 +372,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { } } + // 3. spark conf val props = new Properties() conf.getAll.foreach { case (k, v) => props.setProperty(k, v) @@ -386,7 +395,7 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { * Copy the given file to a remote file system (e.g. HDFS) if needed. * The file is only copied if the source and destination file systems are different or the source * scheme is "file". This is used for preparing resources for launching the ApplicationMaster - * container. Exposed for testing. + * container. */ private[this] def copyFileToRemote(destDir: Path, srcPath: Path, cache: Map[URI, Path], destName: Option[String] = None): Path = { @@ -410,7 +419,14 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging { object KyuubiYarnClient { - private val kyuubiJar = System.getenv("KYUUBI_JAR") + /** + * Load kyuubi server jar, choose the specified kyuubi jar first, try the default compiled one + * if not found + */ + private val kyuubiJar = + Option(System.getenv("KYUUBI_JAR")) + .getOrElse(KyuubiSparkUtil.getContextOrSparkClassLoader + .getResource(KYUUBI_JAR_NAME).getPath) /** * Given a local URI, resolve it and return a qualified local path that corresponds to the URI. @@ -439,7 +455,7 @@ object KyuubiYarnClient { * Add a path variable to the given environment map. * If the map already contains this key, append the value to the existing value instead. */ - def addPathToEnvironment(env: ENV, key: String, value: String): Unit = { + def addPathToEnvironment(env: EnvMap, key: String, value: String): Unit = { val newValue = if (env.contains(key)) { env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value @@ -453,10 +469,10 @@ object KyuubiYarnClient { * Add the given path to the classpath entry of the given environment map. * If the classpath is already set, this appends the new path to the existing classpath. */ - def addClasspathEntry(path: String, env: ENV): Unit = + def addClasspathEntry(path: String, env: EnvMap): Unit = addPathToEnvironment(env, Environment.CLASSPATH.name, path) - def populateClasspath(conf: Configuration, env: ENV): Unit = { + def populateClasspath(conf: Configuration, env: EnvMap): Unit = { addClasspathEntry(Environment.PWD.$$(), env) addClasspathEntry(buildPath(Environment.PWD.$$(), SPARK_CONF_DIR), env) addClasspathEntry(buildPath(Environment.PWD.$$(), new File(kyuubiJar).getName), env) @@ -469,7 +485,7 @@ object KyuubiYarnClient { * Populate the classpath entry in the given environment map with any application * classpath specified through the Hadoop and Yarn configurations. */ - private[yarn] def populateHadoopClasspath(conf: Configuration, env: ENV): Unit = { + private[yarn] def populateHadoopClasspath(conf: Configuration, env: EnvMap): Unit = { val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) classPathElementsToAdd.foreach { c => addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/package.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/package.scala index 5e3cd9b4e..386075eb8 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/package.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/yarn/package.scala @@ -20,12 +20,10 @@ package yaooqinn.kyuubi import scala.collection.mutable.HashMap import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.yarn.api.records.LocalResource package object yarn { - type ENV = HashMap[String, String] - type RESOURCES = HashMap[String, LocalResource] + type EnvMap = HashMap[String, String] val KYUUBI_YARN_APP_NAME = "KYUUBI SERVER" val KYUUBI_YARN_APP_TYPE = "KYUUBI" diff --git a/kyuubi-server/src/test/resources/kyuubi-fake.jar b/kyuubi-server/src/test/resources/kyuubi-fake.jar deleted file mode 100644 index e69de29bb..000000000 diff --git a/kyuubi-server/src/test/resources/log4j.xml b/kyuubi-server/src/test/resources/log4j.xml index b53d8cc5a..7a3e6b218 100644 --- a/kyuubi-server/src/test/resources/log4j.xml +++ b/kyuubi-server/src/test/resources/log4j.xml @@ -23,7 +23,7 @@ - + diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClientSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClientSuite.scala index 386e2ad22..080524dec 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClientSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/yarn/KyuubiYarnClientSuite.scala @@ -17,6 +17,7 @@ package yaooqinn.kyuubi.yarn + import java.net.URI import scala.collection.mutable @@ -26,7 +27,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse -import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationSubmissionContext} +import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationSubmissionContext, Resource} import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} @@ -34,7 +35,7 @@ import org.mockito.Mockito.{doNothing, when} import org.scalatest.Matchers import org.scalatest.mock.MockitoSugar -import yaooqinn.kyuubi.service.ServiceException +import yaooqinn.kyuubi.KYUUBI_JAR_NAME import yaooqinn.kyuubi.utils.ReflectUtils class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSugar { @@ -90,7 +91,7 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga val cp = classpath(env) cp should contain("{{PWD}}") cp should contain(Environment.PWD.$$() + Path.SEPARATOR + SPARK_CONF_DIR) - cp should contain(KyuubiYarnClient.buildPath(Environment.PWD.$$(), "kyuubi-fake.jar")) + cp should contain(KyuubiYarnClient.buildPath(Environment.PWD.$$(), KYUUBI_JAR_NAME)) cp should contain(KyuubiYarnClient.buildPath(Environment.PWD.$$(), SPARK_LIB_DIR, "*")) cp should contain(yarnDefCP.head) cp should contain(mrDefCP.head) @@ -124,7 +125,6 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga val uri3 = KyuubiSparkUtil.resolveURI("hdfs://1") val path3 = KyuubiYarnClient.getQualifiedLocalPath(uri3, conf) path3.toUri should be(uri3) - } test("kyuubi yarn client init") { @@ -140,30 +140,47 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga assert(ReflectUtils.getFieldValue(client2, "loginFromKeytab").asInstanceOf[Boolean]) } - /** - * test with mavne. not ide - */ - test("submit application") { - val conf = new SparkConf() - val client = new KyuubiYarnClient(conf) - val yarnClient = mock[YarnClient] - ReflectUtils.setFieldValue(client, "yarnClient", yarnClient) - doNothing().when(yarnClient).start() - val app = mock[YarnClientApplication] - when(yarnClient.createApplication()).thenReturn(app) - intercept[NullPointerException](client.submit()) // app res = null - val appRes = mock[GetNewApplicationResponse] - when(app.getNewApplicationResponse).thenReturn(appRes) - intercept[NullPointerException](client.submit()) // appid = null - val appId = mock[ApplicationId] - when(appRes.getApplicationId).thenReturn(appId) - when(appId.toString).thenReturn("appId1") - val jarName = "kyuubi-fake.jar" - val kyuubiJar = this.getClass.getClassLoader.getResource(jarName).getPath - ReflectUtils.setFieldValue(KyuubiSparkUtil, "SPARK_JARS_DIR", kyuubiJar.stripSuffix(jarName)) - val appContext = mock[ApplicationSubmissionContext] - when(app.getApplicationSubmissionContext).thenReturn(appContext) - intercept[ServiceException](client.submit()) + test("submit with exceeded memory") { + withClient { (c, kc) => + val app = mock[YarnClientApplication] + when(c.createApplication()).thenReturn(app) + val appRes = mock[GetNewApplicationResponse] + when(app.getNewApplicationResponse).thenReturn(appRes) + val resource = mock[Resource] + when(appRes.getMaximumResourceCapability).thenReturn(resource) + when(resource.getMemory).thenReturn(10) + val appId = mock[ApplicationId] + when(appRes.getApplicationId).thenReturn(appId) + when(appId.toString).thenReturn("appId1") + val appContext = mock[ApplicationSubmissionContext] + when(app.getApplicationSubmissionContext).thenReturn(appContext) + kc.submit() + ReflectUtils.getFieldValue(kc, "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memory") should be(9) + ReflectUtils.getFieldValue(kc, + "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memoryOverhead") should be(1) + } + } + + test("submit with suitable memory") { + withClient { (c, kc) => + val app = mock[YarnClientApplication] + when(c.createApplication()).thenReturn(app) + val appRes = mock[GetNewApplicationResponse] + when(app.getNewApplicationResponse).thenReturn(appRes) + val resource = mock[Resource] + when(appRes.getMaximumResourceCapability).thenReturn(resource) + when(resource.getMemory).thenReturn(2048 *1024) + val appId = mock[ApplicationId] + when(appRes.getApplicationId).thenReturn(appId) + when(appId.toString).thenReturn("appId1") + val appContext = mock[ApplicationSubmissionContext] + when(app.getApplicationSubmissionContext).thenReturn(appContext) + kc.submit() + ReflectUtils.getFieldValue(kc, + "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memory") should be(1024) + ReflectUtils.getFieldValue(kc, + "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memoryOverhead") should be(102) + } } def withConf(map: Map[String, String] = Map.empty)(testCode: Configuration => Any): Unit = { @@ -172,6 +189,18 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga testCode(conf) } + def withClient(f: (YarnClient, KyuubiYarnClient) => Any): Unit = { + val conf = new SparkConf() + val client = new KyuubiYarnClient(conf) + val yarnClient = mock[YarnClient] + ReflectUtils.setFieldValue(client, "yarnClient", yarnClient) + doNothing().when(yarnClient).start() + val kyuubiJar = this.getClass.getClassLoader.getResource(KYUUBI_JAR_NAME).getPath + ReflectUtils.setFieldValue(KyuubiSparkUtil, + "SPARK_JARS_DIR", kyuubiJar.stripSuffix(KYUUBI_JAR_NAME)) + f(yarnClient, client) + } + def classpath(env: mutable.HashMap[String, String]): Array[String] = env(Environment.CLASSPATH.name).split(":|;|") }