From ffd8852b608522ec3c5e0458b66a67787f778089 Mon Sep 17 00:00:00 2001 From: Xieming LI Date: Sun, 16 Jul 2023 23:00:16 +0800 Subject: [PATCH] [KYUUBI #5002] Fail the engine fast when no incoming connection in CONNECTION mode ### _Why are the changes needed?_ Please refer to #4997 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate 1. connect to KyuubiServer with beeline 2. Confirm the Application is ACCEPTed in ResourceManager, Restart KyuubiServer 3. Confirmed that Engine was terminated shortly ``` 23/06/28 10:44:59 INFO storage.BlockManagerMaster: Removed 1 successfully in removeExecutor 23/06/28 10:45:00 INFO spark.SparkSQLEngine: Current open session is 0 23/06/28 10:45:00 ERROR spark.SparkSQLEngine: Spark engine has been terminated because no incoming connection for more than 60000 ms, deregistering from engine discovery space. 23/06/28 10:45:00 WARN zookeeper.ZookeeperDiscoveryClient: This Kyuubi instance lniuhpi1616.nhnjp.ism:46588 is now de-registered from ZooKeeper. The server will be shut down after the last client session completes. 23/06/28 10:45:00 INFO spark.SparkSQLEngine: Service: [SparkTBinaryFrontend] is stopping. ``` - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5002 from risyomei/feature/failfast. Closes #5002 402d6c01f [Xieming LI] Changed runInNewThread based on comment 58f11e157 [Xieming LI] Changed runInNewThread to non-blocking c6bb02d6a [Xieming LI] Fixed Unit Test 168d996d0 [Xieming LI] Start countdown after engine is started 48ee819f2 [Xieming LI] Fixed a typo a8d305942 [Xieming LI] Using runInNewThread ported from Spark 21f0671df [Xieming LI] Updated document a7d5d1082 [Xieming LI] Changed the default value to turn off this feature 437be512d [Xieming LI] Trigger CI to test agagin 42a847e84 [Xieming LI] Added Configuration for timeout, changed to ThreadPoolExecutor 639bd5239 [Xieming LI] Fail the engine fast when no incoming connection in CONNECTION mode Authored-by: Xieming LI Signed-off-by: Cheng Pan --- docs/deployment/settings.md | 1 + .../kyuubi/engine/spark/SparkSQLEngine.scala | 28 +++++++++++++++++++ .../EtcdShareLevelSparkEngineSuite.scala | 5 ++-- .../ZookeeperShareLevelSparkEngineSuite.scala | 2 ++ .../engine/spark/session/SessionSuite.scala | 4 ++- .../org/apache/kyuubi/config/KyuubiConf.scala | 10 +++++++ .../kyuubi/util/NamedThreadFactory.scala | 2 +- .../org/apache/kyuubi/util/ThreadUtils.scala | 14 ++++++++++ .../KyuubiOperationPerConnectionSuite.scala | 1 + 9 files changed, 62 insertions(+), 5 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index ca29592e2..2d2c589ae 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -413,6 +413,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 | | kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 | | kyuubi.session.engine.spark.main.resource | <undefined> | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 | +| kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 | | kyuubi.session.engine.spark.max.lifetime | PT0S | Max lifetime for Spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate. | duration | 1.6.0 | | kyuubi.session.engine.spark.progress.timeFormat | yyyy-MM-dd HH:mm:ss.SSS | The time format of the progress bar | string | 1.6.0 | | kyuubi.session.engine.spark.progress.update.interval | PT1S | Update period of progress bar. | duration | 1.6.0 | diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index bdbc7c08f..b94367e9e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -37,6 +37,7 @@ import org.apache.kyuubi.Utils._ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY +import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine} import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister} import org.apache.kyuubi.engine.spark.session.SparkSessionImpl @@ -80,6 +81,12 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin assert(currentEngine.isDefined) currentEngine.get.stop() }) + + val maxInitTimeout = conf.get(ENGINE_SPARK_MAX_INITIAL_WAIT) + if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString && + maxInitTimeout > 0) { + startFastFailChecker(maxInitTimeout) + } } override def stop(): Unit = if (shutdown.compareAndSet(false, true)) { @@ -114,6 +121,27 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin stopEngineExec.get.execute(stopTask) } + private[kyuubi] def startFastFailChecker(maxTimeout: Long): Unit = { + val startedTime = System.currentTimeMillis() + Utils.tryLogNonFatalError { + ThreadUtils.runInNewThread("spark-engine-failfast-checker") { + if (!shutdown.get) { + while (backendService.sessionManager.getOpenSessionCount <= 0 && + System.currentTimeMillis() - startedTime < maxTimeout) { + info(s"Waiting for the initial connection") + Thread.sleep(Duration(10, TimeUnit.SECONDS).toMillis) + } + if (backendService.sessionManager.getOpenSessionCount <= 0) { + error(s"Spark engine has been terminated because no incoming connection" + + s" for more than $maxTimeout ms, de-registering from engine discovery space.") + assert(currentEngine.isDefined) + currentEngine.get.stop() + } + } + } + } + } + override protected def stopServer(): Unit = { countDownLatch.countDown() } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala index 46dc3b54c..727b232e3 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala @@ -17,9 +17,7 @@ package org.apache.kyuubi.engine.spark -import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME} import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel @@ -30,6 +28,7 @@ trait EtcdShareLevelSparkEngineSuite etcdConf ++ Map( ENGINE_SHARE_LEVEL.key -> shareLevel.toString, ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s", + ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", ENGINE_CHECK_INTERVAL.key -> "PT5s") } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala index 4ef96e61a..f24abb36c 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel @@ -30,6 +31,7 @@ trait ZookeeperShareLevelSparkEngineSuite zookeeperConf ++ Map( ENGINE_SHARE_LEVEL.key -> shareLevel.toString, ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s", + ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", ENGINE_CHECK_INTERVAL.key -> "PT5s") } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala index 5e0b6c28e..b89c560b3 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/session/SessionSuite.scala @@ -27,7 +27,9 @@ import org.apache.kyuubi.service.ServiceState._ class SessionSuite extends WithSparkSQLEngine with HiveJDBCTestHelper { override def withKyuubiConf: Map[String, String] = { - Map(ENGINE_SHARE_LEVEL.key -> "CONNECTION") + Map( + ENGINE_SHARE_LEVEL.key -> "CONNECTION", + ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0") } override protected def beforeEach(): Unit = { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 669f72da0..b8a75d27f 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1282,6 +1282,16 @@ object KyuubiConf { .timeConf .createWithDefault(0) + val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] = + buildConf("kyuubi.session.engine.spark.max.initial.wait") + .doc("Max wait time for the initial connection to Spark engine. The engine will" + + " self-terminate no new incoming connection is established within this time." + + " This setting only applies at the CONNECTION share level." + + " 0 or negative means not to self-terminate.") + .version("1.8.0") + .timeConf + .createWithDefault(Duration.ofSeconds(60).toMillis) + val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] = buildConf("kyuubi.session.engine.flink.main.resource") .doc("The package used to create Flink SQL engine remote job. If it is undefined," + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala index 13127b59b..3ce421e23 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NamedThreadFactory.scala @@ -32,5 +32,5 @@ class NamedThreadFactory(name: String, daemon: Boolean) extends ThreadFactory { } object NamedThreadFactory { - private val kyuubiUncaughtExceptionHandler = new KyuubiUncaughtExceptionHandler + private[util] val kyuubiUncaughtExceptionHandler = new KyuubiUncaughtExceptionHandler } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala index 8ce4bb2e5..76d3f416f 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala @@ -95,4 +95,18 @@ object ThreadUtils extends Logging { } } } + + def runInNewThread( + threadName: String, + isDaemon: Boolean = true)(body: => Unit): Unit = { + + val thread = new Thread(threadName) { + override def run(): Unit = { + body + } + } + thread.setDaemon(isDaemon) + thread.setUncaughtExceptionHandler(NamedThreadFactory.kyuubiUncaughtExceptionHandler) + thread.start() + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala index 0c180db72..97ab21998 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala @@ -48,6 +48,7 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe override protected val conf: KyuubiConf = { KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "connection") .set(SESSION_CONF_ADVISOR.key, classOf[TestSessionConfAdvisor].getName) + .set(KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT.key, "0") } test("KYUUBI #647 - async query causes engine crash") {