From 88b24601d07991dedb46a416cd507537effb7b7a Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 9 May 2024 08:48:21 -0700 Subject: [PATCH] [KYUUBI #4847][FOLLOWUP] Exclude the alive probe sessions in terminating checker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes # follow up of #4847 Address comments: https://github.com/apache/kyuubi/issues/4847#issuecomment-2072945805 ## Describe Your Solution ๐Ÿ”ง In this pr, when checking the engine terminating, it will ignore the alive probe sessions. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6355 from turboFei/engine_idle. Closes #4847 a8e26e71d [Wang, Fei] comments 418d0b41c [Wang, Fei] val Authored-by: Wang, Fei Signed-off-by: Wang, Fei --- .../kyuubi/engine/spark/SparkSQLEngine.scala | 8 ++++---- .../apache/kyuubi/config/KyuubiReservedKeys.scala | 1 + .../apache/kyuubi/session/AbstractSession.scala | 4 ++++ .../scala/org/apache/kyuubi/session/Session.scala | 2 ++ .../apache/kyuubi/session/SessionManager.scala | 15 ++++++++++----- .../kyuubi/service/TFrontendServiceSuite.scala | 6 +++--- .../kyuubi/ha/client/ServiceDiscovery.scala | 5 +++-- .../kyuubi/client/KyuubiSyncThriftClient.scala | 5 +++-- .../kyuubi/server/api/v1/SessionsResource.scala | 2 +- .../org/apache/kyuubi/session/KyuubiSession.scala | 2 ++ .../kyuubi/session/KyuubiSessionManager.scala | 2 +- .../server/api/v1/BatchesResourceSuite.scala | 4 ++-- 12 files changed, 36 insertions(+), 20 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index d1331cd02..c9a5f21dd 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -128,7 +128,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin if (!shutdown.get) { info(s"Spark engine is de-registering from engine discovery space.") frontendServices.flatMap(_.discoveryService).foreach(_.stop()) - while (backendService.sessionManager.getOpenSessionCount > 0) { + while (backendService.sessionManager.getActiveUserSessionCount > 0) { Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } info(s"Spark engine has no open session now, terminating.") @@ -145,12 +145,12 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin Utils.tryLogNonFatalError { ThreadUtils.runInNewThread("spark-engine-failfast-checker") { if (!shutdown.get) { - while (backendService.sessionManager.getOpenSessionCount <= 0 && + while (backendService.sessionManager.getActiveUserSessionCount <= 0 && System.currentTimeMillis() - startedTime < maxTimeout) { info(s"Waiting for the initial connection") Thread.sleep(Duration(10, TimeUnit.SECONDS).toMillis) } - if (backendService.sessionManager.getOpenSessionCount <= 0) { + if (backendService.sessionManager.getActiveUserSessionCount <= 0) { error(s"Spark engine has been terminated because no incoming connection" + s" for more than $maxTimeout ms, de-registering from engine discovery space.") assert(currentEngine.isDefined) @@ -180,7 +180,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin frontendServices.flatMap(_.discoveryService).foreach(_.stop()) } - if (backendService.sessionManager.getOpenSessionCount <= 0) { + if (backendService.sessionManager.getActiveUserSessionCount <= 0) { info(s"Spark engine has been running for more than $maxLifetime ms" + s" and no open session now, terminating.") stop() diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala index 592425a4b..9f22dd1f8 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala @@ -37,6 +37,7 @@ object KyuubiReservedKeys { final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time" final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials" final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle" + final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe" final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID = "kyuubi.session.engine.launch.handle.guid" final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET = diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala index a00a12c1f..2dfbe510f 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.config.KyuubiReservedKeys import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_CLIENT_IP_KEY import org.apache.kyuubi.operation.{Operation, OperationHandle} import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation @@ -259,4 +260,7 @@ abstract class AbstractSession( override def open(): Unit = { OperationLog.createOperationLogRootDirectory(this) } + + val isForAliveProbe: Boolean = + conf.get(KyuubiReservedKeys.KYUUBI_SESSION_ALIVE_PROBE).exists(_.equalsIgnoreCase("true")) } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala index c618c0480..dd9f69fb9 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala @@ -93,4 +93,6 @@ trait Session { fetchLog: Boolean): TFetchResultsResp def closeExpiredOperations(): Unit + + def isForAliveProbe: Boolean } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala index 3a4dab54c..7751b7298 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala @@ -92,7 +92,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) { protected def logSessionCountInfo(session: Session, action: String): Unit = { info(s"${session.user}'s ${session.getClass.getSimpleName} with" + s" ${session.handle}${session.name.map("/" + _).getOrElse("")} is $action," + - s" current opening sessions $getOpenSessionCount") + s" current opening sessions $getActiveUserSessionCount") } def openSession( @@ -122,11 +122,13 @@ abstract class SessionManager(name: String) extends CompositeService(name) { } def closeSession(sessionHandle: SessionHandle): Unit = { - _latestLogoutTime = System.currentTimeMillis() val session = handleToSession.remove(sessionHandle) if (session == null) { throw KyuubiSQLException(s"Invalid $sessionHandle") } + if (!session.isForAliveProbe) { + _latestLogoutTime = System.currentTimeMillis() + } logSessionCountInfo(session, "closed") try { session.close() @@ -159,7 +161,10 @@ abstract class SessionManager(name: String) extends CompositeService(name) { handleToSession.put(sessionHandle, session) } - def getOpenSessionCount: Int = handleToSession.size() + /** + * Get the count of active user sessions, which excludes alive probe sessions. + */ + def getActiveUserSessionCount: Int = handleToSession.values().asScala.count(!_.isForAliveProbe) def allSessions(): Iterable[Session] = handleToSession.values().asScala @@ -303,7 +308,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) { val checkTask = new Runnable { override def run(): Unit = { - info(s"Checking sessions timeout, current count: $getOpenSessionCount") + info(s"Checking sessions timeout, current count: $getActiveUserSessionCount") val current = System.currentTimeMillis if (!shutdown) { for (session <- handleToSession.values().asScala) { @@ -341,7 +346,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) { val checkTask = new Runnable { override def run(): Unit = { if (!shutdown && System.currentTimeMillis() - latestLogoutTime > idleTimeout && - getOpenSessionCount <= 0) { + getActiveUserSessionCount <= 0) { info(s"Idled for more than $idleTimeout ms, terminating") stop() } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala index 246fc59ad..0e3e98606 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala @@ -549,13 +549,13 @@ class TFrontendServiceSuite extends KyuubiFunSuite { .getSession(SessionHandle(handle)) .asInstanceOf[AbstractSession] var lastAccessTime = session.lastAccessTime - assert(sessionManager.getOpenSessionCount === 1) + assert(sessionManager.getActiveUserSessionCount === 1) assert(session.lastIdleTime > 0) val cancelOpReq = new TCancelOperationReq(resp.getOperationHandle) val cancelOpResp = client.CancelOperation(cancelOpReq) assert(cancelOpResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) - assert(sessionManager.getOpenSessionCount === 1) + assert(sessionManager.getActiveUserSessionCount === 1) assert(session.lastIdleTime === 0) lastAccessTime = session.lastAccessTime @@ -569,7 +569,7 @@ class TFrontendServiceSuite extends KyuubiFunSuite { assert(session.lastAccessTime > lastAccessTime) } info("session is terminated") - assert(sessionManager.getOpenSessionCount === 0) + assert(sessionManager.getActiveUserSessionCount === 0) } } diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index c7eee1503..2968c4f96 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -66,8 +66,9 @@ abstract class ServiceDiscovery( // stop the server genteelly def stopGracefully(isLost: Boolean = false): Unit = { - while (fe.be.sessionManager.getOpenSessionCount > 0) { - info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown") + val activeSessionCount = fe.be.sessionManager.getActiveUserSessionCount + while (activeSessionCount > 0) { + info(s"$activeSessionCount connection(s) are active, delay shutdown") Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } isServerLost.set(isLost) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index c5ef66f5e..9628613a6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -187,7 +187,7 @@ class KyuubiSyncThriftClient private ( val req = new TOpenSessionReq(protocol) req.setUsername(user) req.setPassword(password) - req.setConfiguration(configs.asJava) + req.setConfiguration((configs ++ Map(KYUUBI_SESSION_ALIVE_PROBE -> "false")).asJava) val resp = withLockAcquired(OpenSession(req)) ThriftUtils.verifyTStatus(resp.getStatus) _remoteSessionHandle = resp.getSessionHandle @@ -207,7 +207,8 @@ class KyuubiSyncThriftClient private ( req.setConfiguration((configs ++ Map( KyuubiConf.SESSION_NAME.key -> sessionName, KYUUBI_SESSION_HANDLE_KEY -> UUID.randomUUID().toString, - KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL.key -> "")).asJava) + KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL.key -> "", + KYUUBI_SESSION_ALIVE_PROBE -> "true")).asJava) val resp = aliveProbeClient.OpenSession(req) ThriftUtils.verifyTStatus(resp.getStatus) _aliveProbeSessionHandle = resp.getSessionHandle diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala index 928bb207a..0954f8828 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala @@ -109,7 +109,7 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging { @GET @Path("count") def sessionCount(): SessionOpenCount = { - new SessionOpenCount(sessionManager.getOpenSessionCount) + new SessionOpenCount(sessionManager.getActiveUserSessionCount) } @ApiResponse( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala index 19f403987..e9ce2adc9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala @@ -70,4 +70,6 @@ abstract class KyuubiSession( ms.decCount(MetricRegistry.name(CONN_OPEN, user, sessionType.toString)) ms.decCount(MetricRegistry.name(CONN_OPEN, sessionType.toString)) } + + override val isForAliveProbe: Boolean = false } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 2297cc02c..9edc8218e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -287,7 +287,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { override def start(): Unit = synchronized { MetricsSystem.tracing { ms => - ms.registerGauge(CONN_OPEN, getOpenSessionCount, 0) + ms.registerGauge(CONN_OPEN, getActiveUserSessionCount, 0) ms.registerGauge(EXEC_POOL_ALIVE, getExecPoolSize, 0) ms.registerGauge(EXEC_POOL_ACTIVE, getActiveCount, 0) ms.registerGauge(EXEC_POOL_WORK_QUEUE_SIZE, getWorkQueueSize, 0) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index fd4cca329..f3287170a 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -525,7 +525,7 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager] val kyuubiInstance = fe.connectionUrl - assert(sessionManager.getOpenSessionCount === 0) + assert(sessionManager.getActiveUserSessionCount === 0) val batchId1 = UUID.randomUUID().toString val batchId2 = UUID.randomUUID().toString @@ -585,7 +585,7 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite val restFe = fe.asInstanceOf[KyuubiRestFrontendService] restFe.recoverBatchSessions() - assert(sessionManager.getOpenSessionCount === 2) + assert(sessionManager.getActiveUserSessionCount === 2) val sessionHandle1 = SessionHandle.fromUUID(batchId1) val sessionHandle2 = SessionHandle.fromUUID(batchId2)