From 512fbc50cf040cb89f5bc27c1958772f1017d77d Mon Sep 17 00:00:00 2001 From: Zen Date: Wed, 23 Dec 2020 20:45:09 +0800 Subject: [PATCH] Support Spark YARN cluster deployment mode fix #264 Squashed commit of the following: commit 3c875db5dd245b4f978afec92e1d02ac831c94bf Author: zen Date: Wed Dec 23 19:51:22 2020 +0800 remove unnecessary code commit c2487ee7ca4bb4271c7ae6a6263e32e040c4c159 Merge: 67bf250 d8e5c5a Author: zen Date: Wed Dec 23 10:58:16 2020 +0800 Merge branch 'master' into pr4-deploy-mode-with-cluster # Conflicts: # externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala commit 67bf25062d3cb38ccf3831b2e73561e16c4f8576 Author: zen Date: Wed Dec 23 10:51:46 2020 +0800 delete unit test commit 053834808f37c18a777a923d989a9b4d62d8419c Author: zen Date: Wed Dec 23 10:34:05 2020 +0800 Support Spark YARN cluster deployment mode commit 61b24d0e3246f319b8d2cf70932a5b71cbf4a28a Author: zen Date: Tue Dec 15 15:10:31 2020 +0800 fix null exception and add unit test commit 363cf93a77294485e7093716b6cb1cd58b4dba45 Author: zen Date: Tue Dec 15 14:21:19 2020 +0800 Support Spark YARN cluster deployment mode --- .../apache/kyuubi/engine/spark/SparkSQLEngine.scala | 7 +++++++ .../org/apache/kyuubi/session/KyuubiSessionImpl.scala | 10 +++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 0bb583a30..81ad368ba 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -18,12 +18,14 @@ package org.apache.kyuubi.engine.spark import java.time.Instant +import java.util.concurrent.CountDownLatch import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery} import org.apache.kyuubi.service.Serverable @@ -37,6 +39,7 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession) override private[kyuubi] val backendService = new SparkSQLBackendService(spark) override protected def stopServer(): Unit = { + countDownLatch.countDown() spark.stop() } } @@ -47,6 +50,8 @@ object SparkSQLEngine extends Logging { private val user = Utils.currentUser + private[spark] val countDownLatch = new CountDownLatch(1) + def createSpark(): SparkSession = { val sparkConf = new SparkConf() sparkConf.setIfMissing("spark.master", "local") @@ -104,6 +109,8 @@ object SparkSQLEngine extends Logging { engine = startEngine(spark) exposeEngine(engine) info(KyuubiSparkUtil.diagnostics(spark)) + // blocking main thread + countDownLatch.await() } catch { case t: Throwable => error("Error start SparkSQLEngine", t) diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 5df94b109..26df13798 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -29,7 +29,7 @@ import org.apache.thrift.TException import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.{TSocket, TTransport} -import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils} +import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.spark.SparkProcessBuilder @@ -96,9 +96,13 @@ class KyuubiSessionImpl( info(s"Launching SQL engine: $builder") var sh = getServerHost val started = System.currentTimeMillis() + var exitValue: Option[Int] = None while (sh.isEmpty) { - if (process.waitFor(1, TimeUnit.SECONDS)) { - throw builder.getError + if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { + exitValue = Some(process.exitValue()) + if (exitValue.get != 0) { + throw builder.getError + } } if (started + timeout <= System.currentTimeMillis()) { process.destroyForcibly()