diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 6e0082d9b..0bb583a30 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -18,52 +18,26 @@ package org.apache.kyuubi.engine.spark import java.time.Instant -import java.util.concurrent.TimeUnit import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery} import org.apache.kyuubi.service.Serverable -import org.apache.kyuubi.util.{SignalRegister, ThreadUtils} +import org.apache.kyuubi.util.SignalRegister private[spark] final class SparkSQLEngine(name: String, spark: SparkSession) extends Serverable(name) { def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark) - private val timeoutChecker = - ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-timeout-checker") - override private[kyuubi] val backendService = new SparkSQLBackendService(spark) override protected def stopServer(): Unit = { spark.stop() - timeoutChecker.shutdown() - timeoutChecker.awaitTermination(10, TimeUnit.SECONDS) - } - - override def start(): Unit = { - val interval = conf.get(KyuubiConf.ENGINE_CHECK_INTERVAL) - val idleTimeout = conf.get(KyuubiConf.ENGINE_IDLE_TIMEOUT) - - val checkTask = new Runnable { - override def run(): Unit = { - val current = System.currentTimeMillis - val sessionManager = backendService.sessionManager.asInstanceOf[SparkSQLSessionManager] - if (sessionManager.getOpenSessionCount <= 0 && - (current - sessionManager.latestLogoutTime) > idleTimeout) { - info(s"Idled for more than $idleTimeout, terminating") - sys.exit(0) - } - } - } - timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) - super.start() } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala index 0d0ae06cf..7169b71e0 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala @@ -43,7 +43,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n sessionToSpark.put(sessionHandle, spark) } - def removeSparkSession(sessionHandle: SessionHandle): Unit = { + def removeSparkSession(sessionHandle: SessionHandle): SparkSession = { sessionToSpark.remove(sessionHandle) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index ccc55e462..a77691079 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -17,10 +17,13 @@ package org.apache.kyuubi.engine.spark.session +import java.util.concurrent.TimeUnit + import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.spark.sql.SparkSession import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager import org.apache.kyuubi.session._ @@ -39,7 +42,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) val operationManager = new SparkSQLOperationManager() - @volatile private var _latestLogoutTime: Long = Long.MaxValue + @volatile private var _latestLogoutTime: Long = System.currentTimeMillis() def latestLogoutTime: Long = _latestLogoutTime override def openSession( @@ -87,4 +90,26 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) } override protected def isServer: Boolean = false + + override def start(): Unit = { + startTimeoutChecker() + super.start() + } + + private def startTimeoutChecker(): Unit = { + val interval = conf.get(KyuubiConf.ENGINE_CHECK_INTERVAL) + val idleTimeout = conf.get(KyuubiConf.ENGINE_IDLE_TIMEOUT) + val checkTask = new Runnable { + override def run(): Unit = { + while (getOpenSessionCount > 0 || + System.currentTimeMillis - latestLogoutTime < idleTimeout) { + TimeUnit.MILLISECONDS.sleep(interval) + } + info(s"Idled for more than $idleTimeout ms, terminating") + sys.exit(0) + } + } + submitBackgroundOperation(checkTask) + } + } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala index 8f6dea6e6..23dc3dbe2 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala @@ -65,6 +65,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) { if (session == null) { throw KyuubiSQLException(s"Invalid $sessionHandle") } + info(s"$sessionHandle is closed, current opening sessions $getOpenSessionCount") session.close() }