[KYUUBI #1473] Exit gracefully when engine idle
### _Why are the changes needed?_ https://github.com/apache/incubator-kyuubi/issues/1473 It is not graceful to exit when engine is idle, which may cause AM to try again. The current workaround is to configure `spark.yarn.maxAppAttempts`. ``` yarn.ApplicationMaster: Final app status: FAILED, exitCode: 16, (reason: Shutdown hook called before final status was reported.) ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1474 from cxzl25/KYUUBI-1473. Closes #1473 dc07b387 [sychen] assert engine 6df54ebe [sychen] Exit gracefully when engine idle Authored-by: sychen <sychen@trip.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
parent
9d8476a3f2
commit
4bf17a25ed
@ -30,7 +30,7 @@ import org.apache.kyuubi.{KyuubiException, Logging}
|
||||
import org.apache.kyuubi.Utils._
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
|
||||
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
|
||||
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, EventLoggingService}
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.client.RetryPolicies
|
||||
@ -54,7 +54,10 @@ case class SparkSQLEngine(
|
||||
super.start()
|
||||
// Start engine self-terminating checker after all services are ready and it can be reached by
|
||||
// all servers in engine spaces.
|
||||
backendService.sessionManager.startTerminatingChecker()
|
||||
backendService.sessionManager.startTerminatingChecker(() => {
|
||||
assert(currentEngine.isDefined)
|
||||
currentEngine.get.stop()
|
||||
})
|
||||
}
|
||||
|
||||
override protected def stopServer(): Unit = {
|
||||
|
||||
@ -265,7 +265,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
|
||||
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
||||
private[kyuubi] def startTerminatingChecker(): Unit = if (!isServer) {
|
||||
private[kyuubi] def startTerminatingChecker(stop: () => Unit): Unit = if (!isServer) {
|
||||
// initialize `_latestLogoutTime` at start
|
||||
_latestLogoutTime = System.currentTimeMillis()
|
||||
val interval = conf.get(ENGINE_CHECK_INTERVAL)
|
||||
@ -275,7 +275,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
|
||||
if (!shutdown &&
|
||||
System.currentTimeMillis() - latestLogoutTime > idleTimeout && getOpenSessionCount <= 0) {
|
||||
info(s"Idled for more than $idleTimeout ms, terminating")
|
||||
sys.exit(0)
|
||||
stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user