Support Spark YARN cluster deployment mode fix #264
Squashed commit of the following:
commit 3c875db5dd245b4f978afec92e1d02ac831c94bf
Author: zen <xinjingziranchan@gmail.com>
Date: Wed Dec 23 19:51:22 2020 +0800
remove unnecessary code
commit c2487ee7ca4bb4271c7ae6a6263e32e040c4c159
Merge: 67bf250 d8e5c5a
Author: zen <xinjingziranchan@gmail.com>
Date: Wed Dec 23 10:58:16 2020 +0800
Merge branch 'master' into pr4-deploy-mode-with-cluster
# Conflicts:
# externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
commit 67bf25062d3cb38ccf3831b2e73561e16c4f8576
Author: zen <xinjingziranchan@gmail.com>
Date: Wed Dec 23 10:51:46 2020 +0800
delete unit test
commit 053834808f37c18a777a923d989a9b4d62d8419c
Author: zen <xinjingziranchan@gmail.com>
Date: Wed Dec 23 10:34:05 2020 +0800
Support Spark YARN cluster deployment mode
commit 61b24d0e3246f319b8d2cf70932a5b71cbf4a28a
Author: zen <xinjingziranchan@gmail.com>
Date: Tue Dec 15 15:10:31 2020 +0800
fix null exception and add unit test
commit 363cf93a77294485e7093716b6cb1cd58b4dba45
Author: zen <xinjingziranchan@gmail.com>
Date: Tue Dec 15 14:21:19 2020 +0800
Support Spark YARN cluster deployment mode
This commit is contained in:
parent
d8e5c5aa4a
commit
512fbc50cf
@ -18,12 +18,14 @@
|
||||
package org.apache.kyuubi.engine.spark
|
||||
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import org.apache.kyuubi.{Logging, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery}
|
||||
import org.apache.kyuubi.service.Serverable
|
||||
@ -37,6 +39,7 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
|
||||
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
|
||||
|
||||
override protected def stopServer(): Unit = {
|
||||
countDownLatch.countDown()
|
||||
spark.stop()
|
||||
}
|
||||
}
|
||||
@ -47,6 +50,8 @@ object SparkSQLEngine extends Logging {
|
||||
|
||||
private val user = Utils.currentUser
|
||||
|
||||
private[spark] val countDownLatch = new CountDownLatch(1)
|
||||
|
||||
def createSpark(): SparkSession = {
|
||||
val sparkConf = new SparkConf()
|
||||
sparkConf.setIfMissing("spark.master", "local")
|
||||
@ -104,6 +109,8 @@ object SparkSQLEngine extends Logging {
|
||||
engine = startEngine(spark)
|
||||
exposeEngine(engine)
|
||||
info(KyuubiSparkUtil.diagnostics(spark))
|
||||
// blocking main thread
|
||||
countDownLatch.await()
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
error("Error start SparkSQLEngine", t)
|
||||
|
||||
@ -29,7 +29,7 @@ import org.apache.thrift.TException
|
||||
import org.apache.thrift.protocol.TBinaryProtocol
|
||||
import org.apache.thrift.transport.{TSocket, TTransport}
|
||||
|
||||
import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils}
|
||||
import org.apache.kyuubi._
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
|
||||
@ -96,9 +96,13 @@ class KyuubiSessionImpl(
|
||||
info(s"Launching SQL engine: $builder")
|
||||
var sh = getServerHost
|
||||
val started = System.currentTimeMillis()
|
||||
var exitValue: Option[Int] = None
|
||||
while (sh.isEmpty) {
|
||||
if (process.waitFor(1, TimeUnit.SECONDS)) {
|
||||
throw builder.getError
|
||||
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
|
||||
exitValue = Some(process.exitValue())
|
||||
if (exitValue.get != 0) {
|
||||
throw builder.getError
|
||||
}
|
||||
}
|
||||
if (started + timeout <= System.currentTimeMillis()) {
|
||||
process.destroyForcibly()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user