[KYUUBI #5002] Fail the engine fast when no incoming connection in CONNECTION mode

### _Why are the changes needed?_
Please refer to #4997

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate
1. connect to KyuubiServer with beeline
2. Confirm the Application is ACCEPTed in ResourceManager, Restart KyuubiServer
3. Confirmed that Engine was terminated shortly
```
23/06/28 10:44:59 INFO storage.BlockManagerMaster: Removed 1 successfully in removeExecutor
23/06/28 10:45:00 INFO spark.SparkSQLEngine: Current open session is 0
23/06/28 10:45:00 ERROR spark.SparkSQLEngine: Spark engine has been terminated because no incoming connection for more than 60000 ms, deregistering from engine discovery space.
23/06/28 10:45:00 WARN zookeeper.ZookeeperDiscoveryClient: This Kyuubi instance lniuhpi1616.nhnjp.ism:46588 is now de-registered from ZooKeeper. The server will be shut down after the last client session completes.
23/06/28 10:45:00 INFO spark.SparkSQLEngine: Service: [SparkTBinaryFrontend] is stopping.
```

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5002 from risyomei/feature/failfast.

Closes #5002

402d6c01f [Xieming LI] Changed runInNewThread based on comment
58f11e157 [Xieming LI] Changed runInNewThread to non-blocking
c6bb02d6a [Xieming LI] Fixed Unit Test
168d996d0 [Xieming LI] Start countdown after engine is started
48ee819f2 [Xieming LI] Fixed a typo
a8d305942 [Xieming LI] Using runInNewThread ported from Spark
21f0671df [Xieming LI] Updated document
a7d5d1082 [Xieming LI] Changed the default value to turn off this feature
437be512d [Xieming LI] Trigger CI to test agagin
42a847e84 [Xieming LI] Added Configuration for timeout, changed to ThreadPoolExecutor
639bd5239 [Xieming LI] Fail the engine fast when no incoming connection in CONNECTION mode

Authored-by: Xieming LI <risyomei@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Xieming LI 2023-07-16 23:00:16 +08:00 committed by Cheng Pan
parent f49182318b
commit ffd8852b60
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
9 changed files with 62 additions and 5 deletions

View File

@ -413,6 +413,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 |
| kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 |
| kyuubi.session.engine.spark.main.resource | &lt;undefined&gt; | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 |
| kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 |
| kyuubi.session.engine.spark.max.lifetime | PT0S | Max lifetime for Spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate. | duration | 1.6.0 |
| kyuubi.session.engine.spark.progress.timeFormat | yyyy-MM-dd HH:mm:ss.SSS | The time format of the progress bar | string | 1.6.0 |
| kyuubi.session.engine.spark.progress.update.interval | PT1S | Update period of progress bar. | duration | 1.6.0 |

View File

@ -37,6 +37,7 @@ import org.apache.kyuubi.Utils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
@ -80,6 +81,12 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
assert(currentEngine.isDefined)
currentEngine.get.stop()
})
val maxInitTimeout = conf.get(ENGINE_SPARK_MAX_INITIAL_WAIT)
if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString &&
maxInitTimeout > 0) {
startFastFailChecker(maxInitTimeout)
}
}
override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
@ -114,6 +121,27 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
stopEngineExec.get.execute(stopTask)
}
private[kyuubi] def startFastFailChecker(maxTimeout: Long): Unit = {
val startedTime = System.currentTimeMillis()
Utils.tryLogNonFatalError {
ThreadUtils.runInNewThread("spark-engine-failfast-checker") {
if (!shutdown.get) {
while (backendService.sessionManager.getOpenSessionCount <= 0 &&
System.currentTimeMillis() - startedTime < maxTimeout) {
info(s"Waiting for the initial connection")
Thread.sleep(Duration(10, TimeUnit.SECONDS).toMillis)
}
if (backendService.sessionManager.getOpenSessionCount <= 0) {
error(s"Spark engine has been terminated because no incoming connection" +
s" for more than $maxTimeout ms, de-registering from engine discovery space.")
assert(currentEngine.isDefined)
currentEngine.get.stop()
}
}
}
}
}
override protected def stopServer(): Unit = {
countDownLatch.countDown()
}

View File

@ -17,9 +17,7 @@
package org.apache.kyuubi.engine.spark
import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
@ -30,6 +28,7 @@ trait EtcdShareLevelSparkEngineSuite
etcdConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark
import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
@ -30,6 +31,7 @@ trait ZookeeperShareLevelSparkEngineSuite
zookeeperConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
}
}

View File

@ -27,7 +27,9 @@ import org.apache.kyuubi.service.ServiceState._
class SessionSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
override def withKyuubiConf: Map[String, String] = {
Map(ENGINE_SHARE_LEVEL.key -> "CONNECTION")
Map(
ENGINE_SHARE_LEVEL.key -> "CONNECTION",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0")
}
override protected def beforeEach(): Unit = {

View File

@ -1282,6 +1282,16 @@ object KyuubiConf {
.timeConf
.createWithDefault(0)
val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.initial.wait")
.doc("Max wait time for the initial connection to Spark engine. The engine will" +
" self-terminate no new incoming connection is established within this time." +
" This setting only applies at the CONNECTION share level." +
" 0 or negative means not to self-terminate.")
.version("1.8.0")
.timeConf
.createWithDefault(Duration.ofSeconds(60).toMillis)
val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("kyuubi.session.engine.flink.main.resource")
.doc("The package used to create Flink SQL engine remote job. If it is undefined," +

View File

@ -32,5 +32,5 @@ class NamedThreadFactory(name: String, daemon: Boolean) extends ThreadFactory {
}
object NamedThreadFactory {
private val kyuubiUncaughtExceptionHandler = new KyuubiUncaughtExceptionHandler
private[util] val kyuubiUncaughtExceptionHandler = new KyuubiUncaughtExceptionHandler
}

View File

@ -95,4 +95,18 @@ object ThreadUtils extends Logging {
}
}
}
def runInNewThread(
threadName: String,
isDaemon: Boolean = true)(body: => Unit): Unit = {
val thread = new Thread(threadName) {
override def run(): Unit = {
body
}
}
thread.setDaemon(isDaemon)
thread.setUncaughtExceptionHandler(NamedThreadFactory.kyuubiUncaughtExceptionHandler)
thread.start()
}
}

View File

@ -48,6 +48,7 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
override protected val conf: KyuubiConf = {
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "connection")
.set(SESSION_CONF_ADVISOR.key, classOf[TestSessionConfAdvisor].getName)
.set(KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT.key, "0")
}
test("KYUUBI #647 - async query causes engine crash") {