[#258] delete the engine's timeoutChecker scheduler and merge it to sessionManager's scheduler
fix #258 Squashed commit of the following: commit 5b0a4d1f77b32cef00f55b80b8562752c1f462d7 Author: zen <xinjingziranchan@gmail.com> Date: Wed Dec 16 10:15:16 2020 +0800 fix closing session log commit c0fe24d06638f9be18d91a4ad05943a023967c62 Author: zen <xinjingziranchan@gmail.com> Date: Sat Dec 12 22:08:25 2020 +0800 move this check task to SparkSQLSessionManager. commit 1968913702872f0ba433679596596f114f2e16d1 Author: zen <xinjingziranchan@gmail.com> Date: Thu Dec 10 15:55:32 2020 +0800 add log when close session commit 26a944787c63666813a0d64e67a36b671640d493 Author: zen <xinjingziranchan@gmail.com> Date: Thu Dec 10 11:32:45 2020 +0800 delete the engine's timeoutChecker scheduler and merge it to sessionManager's scheduler.
This commit is contained in:
parent
b11a9d4172
commit
e8bc44c2ad
@ -18,52 +18,26 @@
|
||||
package org.apache.kyuubi.engine.spark
|
||||
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import org.apache.kyuubi.{Logging, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery}
|
||||
import org.apache.kyuubi.service.Serverable
|
||||
import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
|
||||
import org.apache.kyuubi.util.SignalRegister
|
||||
|
||||
private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
|
||||
extends Serverable(name) {
|
||||
|
||||
def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark)
|
||||
|
||||
private val timeoutChecker =
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-timeout-checker")
|
||||
|
||||
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
|
||||
|
||||
override protected def stopServer(): Unit = {
|
||||
spark.stop()
|
||||
timeoutChecker.shutdown()
|
||||
timeoutChecker.awaitTermination(10, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
override def start(): Unit = {
|
||||
val interval = conf.get(KyuubiConf.ENGINE_CHECK_INTERVAL)
|
||||
val idleTimeout = conf.get(KyuubiConf.ENGINE_IDLE_TIMEOUT)
|
||||
|
||||
val checkTask = new Runnable {
|
||||
override def run(): Unit = {
|
||||
val current = System.currentTimeMillis
|
||||
val sessionManager = backendService.sessionManager.asInstanceOf[SparkSQLSessionManager]
|
||||
if (sessionManager.getOpenSessionCount <= 0 &&
|
||||
(current - sessionManager.latestLogoutTime) > idleTimeout) {
|
||||
info(s"Idled for more than $idleTimeout, terminating")
|
||||
sys.exit(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
|
||||
super.start()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -43,7 +43,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
|
||||
sessionToSpark.put(sessionHandle, spark)
|
||||
}
|
||||
|
||||
def removeSparkSession(sessionHandle: SessionHandle): Unit = {
|
||||
def removeSparkSession(sessionHandle: SessionHandle): SparkSession = {
|
||||
sessionToSpark.remove(sessionHandle)
|
||||
}
|
||||
|
||||
|
||||
@ -17,10 +17,13 @@
|
||||
|
||||
package org.apache.kyuubi.engine.spark.session
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import org.apache.hive.service.rpc.thrift.TProtocolVersion
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
|
||||
import org.apache.kyuubi.session._
|
||||
|
||||
@ -39,7 +42,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
|
||||
|
||||
val operationManager = new SparkSQLOperationManager()
|
||||
|
||||
@volatile private var _latestLogoutTime: Long = Long.MaxValue
|
||||
@volatile private var _latestLogoutTime: Long = System.currentTimeMillis()
|
||||
def latestLogoutTime: Long = _latestLogoutTime
|
||||
|
||||
override def openSession(
|
||||
@ -87,4 +90,26 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
|
||||
}
|
||||
|
||||
override protected def isServer: Boolean = false
|
||||
|
||||
override def start(): Unit = {
|
||||
startTimeoutChecker()
|
||||
super.start()
|
||||
}
|
||||
|
||||
private def startTimeoutChecker(): Unit = {
|
||||
val interval = conf.get(KyuubiConf.ENGINE_CHECK_INTERVAL)
|
||||
val idleTimeout = conf.get(KyuubiConf.ENGINE_IDLE_TIMEOUT)
|
||||
val checkTask = new Runnable {
|
||||
override def run(): Unit = {
|
||||
while (getOpenSessionCount > 0 ||
|
||||
System.currentTimeMillis - latestLogoutTime < idleTimeout) {
|
||||
TimeUnit.MILLISECONDS.sleep(interval)
|
||||
}
|
||||
info(s"Idled for more than $idleTimeout ms, terminating")
|
||||
sys.exit(0)
|
||||
}
|
||||
}
|
||||
submitBackgroundOperation(checkTask)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -65,6 +65,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
|
||||
if (session == null) {
|
||||
throw KyuubiSQLException(s"Invalid $sessionHandle")
|
||||
}
|
||||
info(s"$sessionHandle is closed, current opening sessions $getOpenSessionCount")
|
||||
session.close()
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user