diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 0bb583a30..81ad368ba 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -18,12 +18,14 @@ package org.apache.kyuubi.engine.spark import java.time.Instant +import java.util.concurrent.CountDownLatch import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery} import org.apache.kyuubi.service.Serverable @@ -37,6 +39,7 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession) override private[kyuubi] val backendService = new SparkSQLBackendService(spark) override protected def stopServer(): Unit = { + countDownLatch.countDown() spark.stop() } } @@ -47,6 +50,8 @@ object SparkSQLEngine extends Logging { private val user = Utils.currentUser + private[spark] val countDownLatch = new CountDownLatch(1) + def createSpark(): SparkSession = { val sparkConf = new SparkConf() sparkConf.setIfMissing("spark.master", "local") @@ -104,6 +109,8 @@ object SparkSQLEngine extends Logging { engine = startEngine(spark) exposeEngine(engine) info(KyuubiSparkUtil.diagnostics(spark)) + // blocking main thread + countDownLatch.await() } catch { case t: Throwable => error("Error start SparkSQLEngine", t) diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 5df94b109..26df13798 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -29,7 +29,7 @@ import org.apache.thrift.TException import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.{TSocket, TTransport} -import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils} +import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.spark.SparkProcessBuilder @@ -96,9 +96,13 @@ class KyuubiSessionImpl( info(s"Launching SQL engine: $builder") var sh = getServerHost val started = System.currentTimeMillis() + var exitValue: Option[Int] = None while (sh.isEmpty) { - if (process.waitFor(1, TimeUnit.SECONDS)) { - throw builder.getError + if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { + exitValue = Some(process.exitValue()) + if (exitValue.get != 0) { + throw builder.getError + } } if (started + timeout <= System.currentTimeMillis()) { process.destroyForcibly()