add state monitor

This commit is contained in:
Kent Yao 2018-07-27 16:36:51 +08:00
parent 9121a4f61b
commit 53eb12f60f
7 changed files with 204 additions and 45 deletions

View File

@ -126,6 +126,12 @@
<filtering>true</filtering>
</resource>
</resources>
<testResources>
<testResource>
<directory>${project.build.directory}/extra-resources</directory>
<filtering>true</filtering>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@ -149,30 +155,13 @@
<goal>run</goal>
</goals>
</execution>
<execution>
<id>build-info-test</id>
<phase>generate-test-resources</phase>
<configuration>
<target>
<exec executable="bash">
<arg value="${project.basedir}/../build/kyuubi-build-info"/>
<arg value="${build.testOutputDirectory}"/>
<arg value="${project.version}"/>
<arg value="${spark.version}"/>
</exec>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
<execution>
<id>fake-jar-test</id>
<phase>generate-test-resources</phase>
<configuration>
<target>
<exec executable="touch">
<arg value="${project.build.testOutputDirectory}/kyuubi-server-${version}.jar"/>
<arg value="${project.build.testOutputDirectory}/kyuubi-server-${project.version}.jar"/>
</exec>
</target>
</configuration>
@ -302,10 +291,10 @@
<argLine>-Xmx3g -Xss4m -XX:ReservedCodeCacheSize=512m</argLine>
<stderr/>
<environmentVariables>
<KYUUBI_JAR>${project.build.testOutputDirectory}/kyuubi-server-${version}.jar</KYUUBI_JAR>
<KYUUBI_JAR>${project.build.testOutputDirectory}/kyuubi-server-${project.version}.jar</KYUUBI_JAR>
</environmentVariables>
<systemProperties>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<java.io.tmpdir>${project.build.testOutputDirectory}/tmp</java.io.tmpdir>
</systemProperties>
</configuration>
<executions>

View File

@ -345,6 +345,16 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)
/////////////////////////////////////////////////////////////////////////////////////////////////
// Containerization //
/////////////////////////////////////////////////////////////////////////////////////////////////
val YARN_CONTAINER_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.yarn.container.timeout")
.doc("Timeout for client to wait Kyuubi successfully initialising itself")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.SECONDS.toMillis(60L))
/**
* Return all the configuration definitions that have been defined in [[KyuubiConf]]. Each
* definition contains key, defaultValue.

View File

@ -23,10 +23,12 @@ import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.security.PrivilegedExceptionAction
import java.util.{Locale, Properties, UUID}
import java.util.concurrent.Callable
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
import scala.util.control.NonFatal
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
@ -42,9 +44,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.YarnException
import org.apache.hadoop.yarn.exceptions.{ApplicationNotFoundException, YarnException}
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf}
import org.apache.spark.deploy.yarn.KyuubiDistributedCacheManager
import yaooqinn.kyuubi.{Logging, _}
@ -103,6 +105,9 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
val appContext = createApplicationSubmissionContext(app, containerContext)
info(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
while(stateChecker.call()) {
Thread.sleep(1000)
}
} catch {
case e: Throwable =>
if (appId != null) cleanupStagingDir()
@ -123,6 +128,17 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
} catch {
case ioe: IOException =>
warn("Failed to cleanup staging dir " + appStagingDir, ioe)
} finally {
killApplication
}
}
def killApplication: Unit = {
try {
yarnClient.killApplication(appId)
} catch {
case NonFatal(e) =>
error(s"Failed to kill application $appId", e)
}
}
@ -452,6 +468,58 @@ private[yarn] class KyuubiYarnClient(conf: SparkConf) extends Logging {
})
new Path(resolvedDestDir, qualifiedDestPath.getName)
}
private[this] val stateChecker: Callable[Boolean] = new Callable[Boolean] {
private val timeout = conf.getTimeAsMs(KyuubiConf.YARN_CONTAINER_TIMEOUT.key)
import YarnApplicationState._
override def call(): Boolean = {
try {
val report = yarnClient.getApplicationReport(appId)
info(formatReportDetails(report))
report.getYarnApplicationState match {
case RUNNING => false
case ACCEPTED | NEW | NEW_SAVING | SUBMITTED =>
if (System.currentTimeMillis - report.getStartTime < timeout) {
true
} else {
error(s"Application $appId failed to start after ${timeout}ms")
cleanupStagingDir()
false
}
case _ => false
}
} catch {
case _: ApplicationNotFoundException =>
error(s"Application $appId not found.")
cleanupStagingDir()
false
case NonFatal(e) =>
error(s"Failed to contact YARN for application $appId.", e)
false
}
}
def formatReportDetails(report: ApplicationReport): String = {
val details = Seq[(String, String)](
("Client token", Option(report.getClientToAMToken).map(_.toString).getOrElse("")),
("Diagnostics", report.getDiagnostics),
("Kyuubi server host", report.getHost),
("Bind port", report.getRpcPort.toString),
("Queue", report.getQueue),
("Start time", report.getStartTime.toString),
("Final status", Option(report.getFinalApplicationStatus).map(_.toString).getOrElse("")),
("Tracking URL", report.getTrackingUrl),
("User", report.getUser)
)
details.map { case (k, v) =>
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
s"\n\t $k: $newValue"
}.mkString("")
}
}
}
object KyuubiYarnClient {
@ -546,9 +614,12 @@ object KyuubiYarnClient {
private[yarn] def getDefaultMRApplicationClasspath: Seq[String] =
StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH).toSeq
/**
* Entrance for Kyuubi On Yarn
*/
def startKyuubiAppMaster(): Unit = {
val conf = new SparkConf()
KyuubiConf.getAllDefaults.foreach(kv => conf.setIfMissing(kv._1, kv._2))
new KyuubiYarnClient(conf).submit()
}
}

View File

@ -17,6 +17,7 @@
package yaooqinn.kyuubi.ha
import com.google.common.io.Files
import org.apache.curator.test.TestingServer
import org.apache.spark.{KyuubiConf, SparkConf, SparkFunSuite}
import org.apache.spark.KyuubiConf._
@ -36,13 +37,13 @@ class HighAvailabilityUtilsSuite extends SparkFunSuite with BeforeAndAfterEach {
var server: KyuubiServer = _
override def beforeAll(): Unit = {
zkServer = new TestingServer(2181, true)
zkServer = new TestingServer(2181, Files.createTempDir(), true)
connectString = zkServer.getConnectString
conf.set(HA_ZOOKEEPER_QUORUM.key, connectString)
conf.set(HA_ZOOKEEPER_CONNECTION_BASESLEEPTIME.key, "100ms")
conf.set(HA_ZOOKEEPER_ZNODE_CREATION_TIMEOUT.key, "1s")
conf.set(HA_ZOOKEEPER_SESSION_TIMEOUT.key, "15s")
conf.set(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key, "1")
conf.set(HA_ZOOKEEPER_CONNECTION_MAX_RETRIES.key, "0")
}
override def afterAll(): Unit = {

View File

@ -36,6 +36,7 @@ class packageSuite extends SparkFunSuite {
assert(props.getProperty("revision") === REVISION)
assert(props.getProperty("user") === BUILD_USER)
assert(props.getProperty("url") === REPO_URL)
assert(props.getProperty("date") === BUILD_DATE)
}
}

View File

@ -38,6 +38,4 @@ class AppMasterArgumentsSuite extends SparkFunSuite {
val e2 = intercept[IllegalArgumentException](AppMasterArguments(arg5))
assert(e2.getMessage.contains(arg5.toList.takeRight(2).toString()))
}
}

View File

@ -18,6 +18,7 @@
package yaooqinn.kyuubi.yarn
import java.io.IOException
import java.net.URI
import scala.collection.mutable
@ -27,10 +28,11 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationSubmissionContext, Resource}
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.mockito.Mockito.{doNothing, when}
import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar
@ -129,6 +131,8 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
test("kyuubi yarn client init") {
val conf = new SparkConf()
KyuubiConf.getAllDefaults.foreach(kv => conf.setIfMissing(kv._1, kv._2))
val client = new KyuubiYarnClient(conf)
assert(ReflectUtils.getFieldValue(client,
"yaooqinn$kyuubi$yarn$KyuubiYarnClient$$hadoopConf").isInstanceOf[YarnConfiguration])
@ -141,19 +145,14 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
}
test("submit with exceeded memory") {
withClient { (c, kc) =>
val app = mock[YarnClientApplication]
when(c.createApplication()).thenReturn(app)
val appRes = mock[GetNewApplicationResponse]
when(app.getNewApplicationResponse).thenReturn(appRes)
withAppBase { (c, kc, appRes) =>
val resource = mock[Resource]
when(appRes.getMaximumResourceCapability).thenReturn(resource)
when(resource.getMemory).thenReturn(10)
val appId = mock[ApplicationId]
when(appRes.getApplicationId).thenReturn(appId)
when(appId.toString).thenReturn("appId1")
val appContext = mock[ApplicationSubmissionContext]
when(app.getApplicationSubmissionContext).thenReturn(appContext)
when(c.getApplicationReport(appId)).thenThrow(classOf[IOException])
kc.submit()
ReflectUtils.getFieldValue(kc, "yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memory") should be(9)
ReflectUtils.getFieldValue(kc,
@ -162,19 +161,14 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
}
test("submit with suitable memory") {
withClient { (c, kc) =>
val app = mock[YarnClientApplication]
when(c.createApplication()).thenReturn(app)
val appRes = mock[GetNewApplicationResponse]
when(app.getNewApplicationResponse).thenReturn(appRes)
withAppBase { (c, kc, appRes) =>
val resource = mock[Resource]
when(appRes.getMaximumResourceCapability).thenReturn(resource)
when(resource.getMemory).thenReturn(2048 *1024)
val appId = mock[ApplicationId]
when(appRes.getApplicationId).thenReturn(appId)
when(appId.toString).thenReturn("appId1")
val appContext = mock[ApplicationSubmissionContext]
when(app.getApplicationSubmissionContext).thenReturn(appContext)
when(c.getApplicationReport(appId)).thenThrow(classOf[ApplicationNotFoundException])
kc.submit()
ReflectUtils.getFieldValue(kc,
"yaooqinn$kyuubi$yarn$KyuubiYarnClient$$memory") should be(1024)
@ -183,17 +177,76 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
}
}
test("submit accepted") {
withAppReport { (kc, report) =>
when(report.getYarnApplicationState).thenReturn(YarnApplicationState.ACCEPTED)
val currentTime = System.currentTimeMillis()
when(report.getStartTime).thenReturn(currentTime).thenReturn(currentTime - 100 *1000L)
kc.submit()
}
}
test("submit running") {
withAppReport { (kc, report) =>
when(report.getYarnApplicationState).thenReturn(YarnApplicationState.RUNNING)
kc.submit()
}
}
test("submit new") {
withAppReport { (kc, report) =>
when(report.getYarnApplicationState).thenReturn(YarnApplicationState.NEW)
val currentTime = System.currentTimeMillis()
when(report.getStartTime).thenReturn(currentTime).thenReturn(currentTime - 100 *1000L)
kc.submit()
}
}
test("submit submitted") {
withAppReport { (kc, report) =>
when(report.getYarnApplicationState).thenReturn(YarnApplicationState.SUBMITTED)
val currentTime = System.currentTimeMillis()
when(report.getStartTime).thenReturn(currentTime).thenReturn(currentTime - 100 *1000L)
kc.submit()
}
}
test("submit killed") {
withAppReport { (kc, report) =>
when(report.getYarnApplicationState).thenReturn(YarnApplicationState.KILLED)
kc.submit()
}
}
test("submit finished") {
withAppReport { (kc, report) =>
when(report.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
kc.submit()
}
}
test("submit new saving") {
withAppReport { (kc, report) =>
when(report.getYarnApplicationState).thenReturn(YarnApplicationState.NEW_SAVING)
val currentTime = System.currentTimeMillis()
when(report.getStartTime).thenReturn(currentTime).thenReturn(currentTime - 100 *1000L)
kc.submit()
}
}
def withConf(map: Map[String, String] = Map.empty)(testCode: Configuration => Any): Unit = {
val conf = new Configuration()
map.foreach { case (k, v) => conf.set(k, v) }
testCode(conf)
}
def withClient(f: (YarnClient, KyuubiYarnClient) => Any): Unit = {
def withClientBase(f: (YarnClient, KyuubiYarnClient) => Any): Unit = {
val conf = new SparkConf()
KyuubiConf.getAllDefaults.foreach(kv => conf.setIfMissing(kv._1, kv._2))
val client = new KyuubiYarnClient(conf)
val yarnClient = mock[YarnClient]
ReflectUtils.setFieldValue(client, "yarnClient", yarnClient)
ReflectUtils.setFieldValue(client,
"yaooqinn$kyuubi$yarn$KyuubiYarnClient$$yarnClient", yarnClient)
doNothing().when(yarnClient).start()
val kyuubiJar = this.getClass.getClassLoader.getResource(KYUUBI_JAR_NAME).getPath
ReflectUtils.setFieldValue(KyuubiSparkUtil,
@ -201,6 +254,42 @@ class KyuubiYarnClientSuite extends SparkFunSuite with Matchers with MockitoSuga
f(yarnClient, client)
}
def withAppBase(f: (YarnClient, KyuubiYarnClient, GetNewApplicationResponse) => Any): Unit = {
withClientBase { (c, kc) =>
val app = mock[YarnClientApplication]
when(c.createApplication()).thenReturn(app)
val appRes = mock[GetNewApplicationResponse]
when(app.getNewApplicationResponse).thenReturn(appRes)
val appContext = mock[ApplicationSubmissionContext]
when(app.getApplicationSubmissionContext).thenReturn(appContext)
f(c, kc, appRes)
}
}
def withAppReport(f: (KyuubiYarnClient, ApplicationReport) => Any): Unit = {
withAppBase { (c, kc, appRes) =>
val resource = mock[Resource]
when(appRes.getMaximumResourceCapability).thenReturn(resource)
when(resource.getMemory).thenReturn(2048 *1024)
val appId = mock[ApplicationId]
when(appRes.getApplicationId).thenReturn(appId)
when(appId.toString).thenReturn("appId1")
val report = mock[ApplicationReport]
when(c.getApplicationReport(appId)).thenReturn(report)
val token = mock[Token]
when(report.getClientToAMToken).thenReturn(token)
when(token.toString).thenReturn("")
when(report.getHost).thenReturn("")
when(report.getRpcPort).thenReturn(1)
when(report.getQueue).thenReturn("default")
when(report.getStartTime).thenReturn(0)
when(report.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
when(report.getTrackingUrl).thenReturn("")
when(report.getUser).thenReturn(KyuubiSparkUtil.getCurrentUserName)
f(kc, report)
}
}
def classpath(env: mutable.HashMap[String, String]): Array[String] =
env(Environment.CLASSPATH.name).split(":|;|<CPS>")
}