[KYUUBI #2104] Kill yarn job using yarn client API when kyuubi engine …
…initialization times out and yarn application status is accepted ### _Why are the changes needed?_ ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2110 from 942011334/KYUUBI-2104. Closes #2104 6bb4f37c [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API when kyuubi engine initialization times out and yarn application status is accepted 38118aaf [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API when kyuubi engine initialization times out and yarn application status is accepted 2db9c458 [jiadongdong] Merge branch 'KYUUBI-2104' of https://github.com/942011334/incubator-kyuubi into KYUUBI-2104 549fab4d [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API when kyuubi engine initialization times out and yarn application status is accepted 65c6c5cf [jiadongdong] [KYUUBI #2104] Kill yarn job using yarn client API when kyuubi engine initialization times out and yarn application status is accepted 28665f3d [jiadongdong] [Kyuubi#2104] Kill yarn job using yarn client API when kyuubi engine initialization times out and yarn application status is accepted Authored-by: jiadongdong <jiadongdong@bilibili.com> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
537a0f82ca
commit
ffdd665f79
@ -385,6 +385,12 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scalatestplus</groupId>
|
||||
<artifactId>mockito-3-4_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
|
||||
@ -18,15 +18,15 @@
|
||||
package org.apache.kyuubi.engine.spark
|
||||
|
||||
import java.io.{File, FilenameFilter, IOException}
|
||||
import java.lang.ProcessBuilder.Redirect
|
||||
import java.net.URI
|
||||
import java.nio.file.{Files, Paths}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.util.matching.Regex
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient
|
||||
|
||||
import org.apache.kyuubi._
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
@ -35,6 +35,7 @@ import org.apache.kyuubi.engine.ProcBuilder
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf
|
||||
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
|
||||
import org.apache.kyuubi.operation.log.OperationLog
|
||||
import org.apache.kyuubi.util.KyuubiHadoopUtils
|
||||
|
||||
class SparkProcessBuilder(
|
||||
override val proxyUser: String,
|
||||
@ -44,6 +45,10 @@ class SparkProcessBuilder(
|
||||
|
||||
import SparkProcessBuilder._
|
||||
|
||||
val yarnClient = getYarnClient
|
||||
|
||||
def getYarnClient: YarnClient = YarnClient.createYarnClient
|
||||
|
||||
override protected val executable: String = {
|
||||
val sparkHomeOpt = env.get("SPARK_HOME").orElse {
|
||||
val cwd = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
|
||||
@ -190,20 +195,21 @@ class SparkProcessBuilder(
|
||||
override def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String =
|
||||
YARN_APP_NAME_REGEX.findFirstIn(line) match {
|
||||
case Some(appId) =>
|
||||
env.get(KyuubiConf.KYUUBI_HOME) match {
|
||||
case Some(kyuubiHome) =>
|
||||
val pb = new ProcessBuilder("/bin/sh", s"$kyuubiHome/bin/stop-application.sh", appId)
|
||||
pb.environment()
|
||||
.putAll(childProcEnv.asJava)
|
||||
pb.redirectError(Redirect.appendTo(engineLog))
|
||||
pb.redirectOutput(Redirect.appendTo(engineLog))
|
||||
val process = pb.start()
|
||||
process.waitFor() match {
|
||||
case id if id != 0 => s"Failed to kill Application $appId, please kill it manually. "
|
||||
case _ => s"Killed Application $appId successfully. "
|
||||
}
|
||||
case None =>
|
||||
s"KYUUBI_HOME is not set! Failed to kill Application $appId, please kill it manually."
|
||||
try {
|
||||
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
|
||||
yarnClient.init(hadoopConf)
|
||||
yarnClient.start()
|
||||
val applicationId = ApplicationId.fromString(appId)
|
||||
yarnClient.killApplication(applicationId)
|
||||
s"Killed Application $appId successfully."
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
s"Failed to kill Application $appId, please kill it manually." +
|
||||
s" Caused by ${e.getMessage}."
|
||||
} finally {
|
||||
if (yarnClient != null) {
|
||||
yarnClient.stop()
|
||||
}
|
||||
}
|
||||
case None => ""
|
||||
}
|
||||
|
||||
@ -22,7 +22,9 @@ import java.nio.file.{Files, Path, Paths, StandardOpenOption}
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.{Executors, TimeUnit}
|
||||
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient
|
||||
import org.scalatest.time.SpanSugar._
|
||||
import org.scalatestplus.mockito.MockitoSugar
|
||||
|
||||
import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
@ -31,7 +33,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf
|
||||
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
|
||||
import org.apache.kyuubi.service.ServiceUtils
|
||||
|
||||
class SparkProcessBuilderSuite extends KerberizedTestHelper {
|
||||
class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
|
||||
private def conf = KyuubiConf().set("kyuubi.on", "off")
|
||||
|
||||
test("spark process builder") {
|
||||
@ -240,20 +242,26 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
|
||||
test("kill application") {
|
||||
val pb1 = new FakeSparkProcessBuilder(conf) {
|
||||
override protected def env: Map[String, String] = Map()
|
||||
override def getYarnClient: YarnClient = mock[YarnClient]
|
||||
}
|
||||
val exit1 = pb1.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
|
||||
"Application report for application_1593587619692_20149 (state: ACCEPTED)")
|
||||
assert(exit1.contains("KYUUBI_HOME is not set!"))
|
||||
assert(exit1.contains("Killed Application application_1593587619692_20149 successfully."))
|
||||
|
||||
val pb2 = new FakeSparkProcessBuilder(conf) {
|
||||
override protected def env: Map[String, String] = Map("KYUUBI_HOME" -> "")
|
||||
override protected def env: Map[String, String] = Map()
|
||||
override def getYarnClient: YarnClient = null
|
||||
}
|
||||
val exit2 = pb2.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
|
||||
"Application report for application_1593587619692_20149 (state: ACCEPTED)")
|
||||
assert(exit2.contains("application_1593587619692_20149")
|
||||
&& !exit2.contains("KYUUBI_HOME is not set!"))
|
||||
assert(exit2.contains("Failed to kill Application application_1593587619692_20149")
|
||||
&& exit2.contains("Caused by"))
|
||||
|
||||
val exit3 = pb2.killApplication("unknow")
|
||||
val pb3 = new FakeSparkProcessBuilder(conf) {
|
||||
override protected def env: Map[String, String] = Map()
|
||||
override def getYarnClient: YarnClient = mock[YarnClient]
|
||||
}
|
||||
val exit3 = pb3.killApplication("unknow")
|
||||
assert(exit3.equals(""))
|
||||
}
|
||||
|
||||
|
||||
7
pom.xml
7
pom.xml
@ -134,6 +134,7 @@
|
||||
<prometheus.version>0.14.1</prometheus.version>
|
||||
<scalacheck.version>3.2.9.0</scalacheck.version>
|
||||
<scalatest.version>3.2.9</scalatest.version>
|
||||
<scalatestplus.version>3.2.9.0</scalatestplus.version>
|
||||
<scopt.version>4.0.1</scopt.version>
|
||||
<slf4j.version>1.7.35</slf4j.version>
|
||||
<thrift.version>0.9.3</thrift.version>
|
||||
@ -892,6 +893,12 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scalatestplus</groupId>
|
||||
<artifactId>mockito-3-4_${scala.binary.version}</artifactId>
|
||||
<version>${scalatestplus.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-minikdc</artifactId>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user