diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index e5c0a95af..127e578c4 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -385,6 +385,12 @@
test
+
+ org.scalatestplus
+ mockito-3-4_${scala.binary.version}
+ test
+
+
mysql
mysql-connector-java
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index ba601a7a7..99599979b 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -18,15 +18,15 @@
package org.apache.kyuubi.engine.spark
import java.io.{File, FilenameFilter, IOException}
-import java.lang.ProcessBuilder.Redirect
import java.net.URI
import java.nio.file.{Files, Paths}
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
@@ -35,6 +35,7 @@ import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.KyuubiHadoopUtils
class SparkProcessBuilder(
override val proxyUser: String,
@@ -44,6 +45,10 @@ class SparkProcessBuilder(
import SparkProcessBuilder._
+ val yarnClient = getYarnClient
+
+ def getYarnClient: YarnClient = YarnClient.createYarnClient
+
override protected val executable: String = {
val sparkHomeOpt = env.get("SPARK_HOME").orElse {
val cwd = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
@@ -190,20 +195,21 @@ class SparkProcessBuilder(
override def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String =
YARN_APP_NAME_REGEX.findFirstIn(line) match {
case Some(appId) =>
- env.get(KyuubiConf.KYUUBI_HOME) match {
- case Some(kyuubiHome) =>
- val pb = new ProcessBuilder("/bin/sh", s"$kyuubiHome/bin/stop-application.sh", appId)
- pb.environment()
- .putAll(childProcEnv.asJava)
- pb.redirectError(Redirect.appendTo(engineLog))
- pb.redirectOutput(Redirect.appendTo(engineLog))
- val process = pb.start()
- process.waitFor() match {
- case id if id != 0 => s"Failed to kill Application $appId, please kill it manually. "
- case _ => s"Killed Application $appId successfully. "
- }
- case None =>
- s"KYUUBI_HOME is not set! Failed to kill Application $appId, please kill it manually."
+ try {
+ val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
+ yarnClient.init(hadoopConf)
+ yarnClient.start()
+ val applicationId = ApplicationId.fromString(appId)
+ yarnClient.killApplication(applicationId)
+ s"Killed Application $appId successfully."
+ } catch {
+ case e: Throwable =>
+ s"Failed to kill Application $appId, please kill it manually." +
+ s" Caused by ${e.getMessage}."
+ } finally {
+ if (yarnClient != null) {
+ yarnClient.stop()
+ }
}
case None => ""
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index a9b3fae6d..45ff4b3bd 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -22,7 +22,9 @@ import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import java.time.Duration
import java.util.concurrent.{Executors, TimeUnit}
+import org.apache.hadoop.yarn.client.api.YarnClient
import org.scalatest.time.SpanSugar._
+import org.scalatestplus.mockito.MockitoSugar
import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
@@ -31,7 +33,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
import org.apache.kyuubi.service.ServiceUtils
-class SparkProcessBuilderSuite extends KerberizedTestHelper {
+class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
private def conf = KyuubiConf().set("kyuubi.on", "off")
test("spark process builder") {
@@ -240,20 +242,26 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
test("kill application") {
val pb1 = new FakeSparkProcessBuilder(conf) {
override protected def env: Map[String, String] = Map()
+ override def getYarnClient: YarnClient = mock[YarnClient]
}
val exit1 = pb1.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
"Application report for application_1593587619692_20149 (state: ACCEPTED)")
- assert(exit1.contains("KYUUBI_HOME is not set!"))
+ assert(exit1.contains("Killed Application application_1593587619692_20149 successfully."))
val pb2 = new FakeSparkProcessBuilder(conf) {
- override protected def env: Map[String, String] = Map("KYUUBI_HOME" -> "")
+ override protected def env: Map[String, String] = Map()
+ override def getYarnClient: YarnClient = null
}
val exit2 = pb2.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
"Application report for application_1593587619692_20149 (state: ACCEPTED)")
- assert(exit2.contains("application_1593587619692_20149")
- && !exit2.contains("KYUUBI_HOME is not set!"))
+ assert(exit2.contains("Failed to kill Application application_1593587619692_20149")
+ && exit2.contains("Caused by"))
- val exit3 = pb2.killApplication("unknow")
+ val pb3 = new FakeSparkProcessBuilder(conf) {
+ override protected def env: Map[String, String] = Map()
+ override def getYarnClient: YarnClient = mock[YarnClient]
+ }
+ val exit3 = pb3.killApplication("unknow")
assert(exit3.equals(""))
}
diff --git a/pom.xml b/pom.xml
index 923e1a4f7..417eb3798 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
0.14.1
3.2.9.0
3.2.9
+ 3.2.9.0
4.0.1
1.7.35
0.9.3
@@ -892,6 +893,12 @@
test
+
+ org.scalatestplus
+ mockito-3-4_${scala.binary.version}
+ ${scalatestplus.version}
+
+
org.apache.hadoop
hadoop-minikdc