Support Spark YARN cluster deployment mode fix #264

Squashed commit of the following:

commit 3c875db5dd245b4f978afec92e1d02ac831c94bf
Author: zen <xinjingziranchan@gmail.com>
Date:   Wed Dec 23 19:51:22 2020 +0800

    remove unnecessary code

commit c2487ee7ca4bb4271c7ae6a6263e32e040c4c159
Merge: 67bf250 d8e5c5a
Author: zen <xinjingziranchan@gmail.com>
Date:   Wed Dec 23 10:58:16 2020 +0800

    Merge branch 'master' into pr4-deploy-mode-with-cluster

    # Conflicts:
    #	externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

commit 67bf25062d3cb38ccf3831b2e73561e16c4f8576
Author: zen <xinjingziranchan@gmail.com>
Date:   Wed Dec 23 10:51:46 2020 +0800

    delete unit test

commit 053834808f37c18a777a923d989a9b4d62d8419c
Author: zen <xinjingziranchan@gmail.com>
Date:   Wed Dec 23 10:34:05 2020 +0800

    Support Spark YARN cluster deployment mode

commit 61b24d0e3246f319b8d2cf70932a5b71cbf4a28a
Author: zen <xinjingziranchan@gmail.com>
Date:   Tue Dec 15 15:10:31 2020 +0800

    fix null exception and add unit test

commit 363cf93a77294485e7093716b6cb1cd58b4dba45
Author: zen <xinjingziranchan@gmail.com>
Date:   Tue Dec 15 14:21:19 2020 +0800

    Support Spark YARN cluster deployment mode
This commit is contained in:
Zen 2020-12-23 20:45:09 +08:00 committed by Kent Yao
parent d8e5c5aa4a
commit 512fbc50cf
No known key found for this signature in database
GPG Key ID: A4F0BE81C89B595B
2 changed files with 14 additions and 3 deletions

View File

@ -18,12 +18,14 @@
package org.apache.kyuubi.engine.spark
import java.time.Instant
import java.util.concurrent.CountDownLatch
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery}
import org.apache.kyuubi.service.Serverable
@ -37,6 +39,7 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
override protected def stopServer(): Unit = {
countDownLatch.countDown()
spark.stop()
}
}
@ -47,6 +50,8 @@ object SparkSQLEngine extends Logging {
private val user = Utils.currentUser
private[spark] val countDownLatch = new CountDownLatch(1)
def createSpark(): SparkSession = {
val sparkConf = new SparkConf()
sparkConf.setIfMissing("spark.master", "local")
@ -104,6 +109,8 @@ object SparkSQLEngine extends Logging {
engine = startEngine(spark)
exposeEngine(engine)
info(KyuubiSparkUtil.diagnostics(spark))
// blocking main thread
countDownLatch.await()
} catch {
case t: Throwable =>
error("Error start SparkSQLEngine", t)

View File

@ -29,7 +29,7 @@ import org.apache.thrift.TException
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{TSocket, TTransport}
import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils}
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
@ -96,9 +96,13 @@ class KyuubiSessionImpl(
info(s"Launching SQL engine: $builder")
var sh = getServerHost
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
if (process.waitFor(1, TimeUnit.SECONDS)) {
throw builder.getError
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
throw builder.getError
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()