[#258] delete the engine's timeoutChecker scheduler and merge it to sessionManager's scheduler

fix #258
Squashed commit of the following:

commit 5b0a4d1f77b32cef00f55b80b8562752c1f462d7
Author: zen <xinjingziranchan@gmail.com>
Date:   Wed Dec 16 10:15:16 2020 +0800

    fix closing session log

commit c0fe24d06638f9be18d91a4ad05943a023967c62
Author: zen <xinjingziranchan@gmail.com>
Date:   Sat Dec 12 22:08:25 2020 +0800

    move this check task to SparkSQLSessionManager.

commit 1968913702872f0ba433679596596f114f2e16d1
Author: zen <xinjingziranchan@gmail.com>
Date:   Thu Dec 10 15:55:32 2020 +0800

    add log when close session

commit 26a944787c63666813a0d64e67a36b671640d493
Author: zen <xinjingziranchan@gmail.com>
Date:   Thu Dec 10 11:32:45 2020 +0800

    delete the engine's timeoutChecker scheduler and merge it to  sessionManager's scheduler.
This commit is contained in:
Zen 2020-12-21 11:01:31 +08:00 committed by Kent Yao
parent b11a9d4172
commit e8bc44c2ad
No known key found for this signature in database
GPG Key ID: A4F0BE81C89B595B
4 changed files with 29 additions and 29 deletions

View File

@ -18,52 +18,26 @@
package org.apache.kyuubi.engine.spark
import java.time.Instant
import java.util.concurrent.TimeUnit
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
import org.apache.kyuubi.util.SignalRegister
private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
extends Serverable(name) {
def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark)
private val timeoutChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-timeout-checker")
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
override protected def stopServer(): Unit = {
spark.stop()
timeoutChecker.shutdown()
timeoutChecker.awaitTermination(10, TimeUnit.SECONDS)
}
override def start(): Unit = {
val interval = conf.get(KyuubiConf.ENGINE_CHECK_INTERVAL)
val idleTimeout = conf.get(KyuubiConf.ENGINE_IDLE_TIMEOUT)
val checkTask = new Runnable {
override def run(): Unit = {
val current = System.currentTimeMillis
val sessionManager = backendService.sessionManager.asInstanceOf[SparkSQLSessionManager]
if (sessionManager.getOpenSessionCount <= 0 &&
(current - sessionManager.latestLogoutTime) > idleTimeout) {
info(s"Idled for more than $idleTimeout, terminating")
sys.exit(0)
}
}
}
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
super.start()
}
}

View File

@ -43,7 +43,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
sessionToSpark.put(sessionHandle, spark)
}
def removeSparkSession(sessionHandle: SessionHandle): Unit = {
def removeSparkSession(sessionHandle: SessionHandle): SparkSession = {
sessionToSpark.remove(sessionHandle)
}

View File

@ -17,10 +17,13 @@
package org.apache.kyuubi.engine.spark.session
import java.util.concurrent.TimeUnit
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
@ -39,7 +42,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
val operationManager = new SparkSQLOperationManager()
@volatile private var _latestLogoutTime: Long = Long.MaxValue
@volatile private var _latestLogoutTime: Long = System.currentTimeMillis()
def latestLogoutTime: Long = _latestLogoutTime
override def openSession(
@ -87,4 +90,26 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
}
override protected def isServer: Boolean = false
override def start(): Unit = {
startTimeoutChecker()
super.start()
}
private def startTimeoutChecker(): Unit = {
val interval = conf.get(KyuubiConf.ENGINE_CHECK_INTERVAL)
val idleTimeout = conf.get(KyuubiConf.ENGINE_IDLE_TIMEOUT)
val checkTask = new Runnable {
override def run(): Unit = {
while (getOpenSessionCount > 0 ||
System.currentTimeMillis - latestLogoutTime < idleTimeout) {
TimeUnit.MILLISECONDS.sleep(interval)
}
info(s"Idled for more than $idleTimeout ms, terminating")
sys.exit(0)
}
}
submitBackgroundOperation(checkTask)
}
}

View File

@ -65,6 +65,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
if (session == null) {
throw KyuubiSQLException(s"Invalid $sessionHandle")
}
info(s"$sessionHandle is closed, current opening sessions $getOpenSessionCount")
session.close()
}