diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index f72fcadd0..237eb3ca6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -114,6 +114,7 @@ class KyuubiSessionImpl( super.open() runOperation(launchEngineOp) + engineLastAlive = System.currentTimeMillis() } private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = @@ -283,4 +284,41 @@ class KyuubiSessionImpl( case _ => super.executeStatement(statement, confOverlay, runAsync, queryTimeout) } } + + @volatile private var engineLastAlive: Long = _ + val engineAliveTimeout = sessionConf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT) + val aliveProbeEnabled = sessionConf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED) + var engineAliveMaxFailCount = 3 + var engineAliveFailCount = 0 + + def checkEngineAlive(): Boolean = { + try { + if (!aliveProbeEnabled) return true + getInfo(TGetInfoType.CLI_DBMS_VER) + engineLastAlive = System.currentTimeMillis() + engineAliveFailCount = 0 + true + } catch { + case e: Throwable => + val now = System.currentTimeMillis() + engineAliveFailCount = engineAliveFailCount + 1 + if (now - engineLastAlive > engineAliveTimeout && + engineAliveFailCount >= engineAliveMaxFailCount) { + error(s"The engineRef[${engine.getEngineRefId}] is marked as not alive " + + s"due to a lack of recent successful alive probes. " + + s"The time since last successful probe: " + + s"${now - engineLastAlive} ms exceeds the timeout of $engineAliveTimeout ms. " + + s"The engine has failed $engineAliveFailCount times, " + + s"surpassing the maximum failure count of $engineAliveMaxFailCount.") + false + } else { + warn( + s"The engineRef[${engine.getEngineRefId}] alive probe fails, " + + s"${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms, " + + s"and has failed $engineAliveFailCount times.", + e) + true + } + } + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 0ef3f1ac1..d5504ed19 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.session +import java.util.concurrent.TimeUnit + import scala.collection.JavaConverters._ import com.codahale.metrics.MetricRegistry @@ -36,7 +38,7 @@ import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, SessionConfAdvisor import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetryRef} import org.apache.kyuubi.server.metadata.api.Metadata import org.apache.kyuubi.sql.parser.server.KyuubiParser -import org.apache.kyuubi.util.SignUtils +import org.apache.kyuubi.util.{SignUtils, ThreadUtils} class KyuubiSessionManager private (name: String) extends SessionManager(name) { @@ -61,6 +63,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { private var batchLimiter: Option[SessionLimiter] = None lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair() + private val engineAliveChecker = + ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker") + override def initialize(conf: KyuubiConf): Unit = { this.conf = conf addService(applicationManager) @@ -265,6 +270,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { ms.registerGauge(EXEC_POOL_WORK_QUEUE_SIZE, getWorkQueueSize, 0) } super.start() + startEngineAliveChecker() } def getBatchSessionsToRecover(kyuubiInstance: String): Seq[KyuubiBatchSessionImpl] = { @@ -339,4 +345,24 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { Seq(userLimit, ipAddressLimit, userIpAddressLimit).find(_ > 0).map(_ => SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, userUnlimitedList.toSet)) } + + private def startEngineAliveChecker(): Unit = { + val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL) + val checkTask: Runnable = () => { + allSessions.foreach { session => + if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) { + try { + closeSession(session.handle) + logger.info(s"The session ${session.handle} has been closed " + + s"due to engine unresponsiveness (checked by the engine alive checker).") + } catch { + case e: KyuubiSQLException => + warn(s"Error closing session ${session.handle}", e) + } + } + } + } + engineAliveChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS) + } + } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala index 07a711de6..b197a489c 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper} import org.apache.kyuubi.client.api.v1.dto -import org.apache.kyuubi.client.api.v1.dto._ +import org.apache.kyuubi.client.api.v1.dto.{SessionData, _} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY import org.apache.kyuubi.engine.ShareLevel @@ -301,4 +301,67 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(sessionEvent.contains("The last 10 line(s) of log are:")) } } + + test("fix kyuubi session leak caused by engine stop") { + // clean up all sessions + var response = webTarget.path("api/v1/sessions").request().get() + val sessionDataList = response.readEntity(new GenericType[List[SessionData]]() {}) + sessionDataList.foreach(sessionData => { + response = webTarget.path(s"api/v1/sessions/${sessionData.getIdentifier}") + .request().delete() + assert(200 == response.getStatus) + }) + + // open a session + val requestObj = new SessionOpenRequest(Map( + KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true", + KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "5000", + KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "3000").asJava) + response = webTarget.path("api/v1/sessions") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE)) + val sessionHandle = response.readEntity(classOf[SessionHandle]).getIdentifier + val pathPrefix = s"api/v1/sessions/$sessionHandle" + + response = webTarget.path("api/v1/sessions/count").request().get() + val openedSessionCount = response.readEntity(classOf[SessionOpenCount]) + assert(openedSessionCount.getOpenSessionCount == 1) + + var statementReq = new StatementRequest( + "spark.sql(\"show tables\")", + true, + 3000, + Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA")) + response = webTarget + .path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE)) + assert(200 == response.getStatus) + var operationHandle = response.readEntity(classOf[OperationHandle]) + assert(operationHandle !== null) + assert(openedSessionCount.getOpenSessionCount == 1) + + statementReq = new StatementRequest( + "spark.close()", + true, + 3000, + Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA")) + response = webTarget + .path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE)) + assert(200 == response.getStatus) + operationHandle = response.readEntity(classOf[OperationHandle]) + assert(operationHandle !== null) + + // Because the engine has stopped (due to spark.close), the Spark session is closed. + // Therefore, the Kyuubi session count should be 0. + eventually(timeout(30.seconds), interval(1000.milliseconds)) { + var response = webTarget.path("api/v1/sessions/count").request().get() + val openedSessionCount = response.readEntity(classOf[SessionOpenCount]) + assert(openedSessionCount.getOpenSessionCount == 0) + + response = webTarget.path("api/v1/sessions").request().get() + val sessionDataList = response.readEntity(new GenericType[List[SessionData]]() {}) + assert(sessionDataList.isEmpty) + } + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala index ed116d077..a1f0fc5ee 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala @@ -18,10 +18,13 @@ package org.apache.kyuubi.server.rest.client import java.util +import java.util.Collections import scala.collection.JavaConverters._ +import scala.concurrent.duration.DurationInt import org.apache.hive.service.rpc.thrift.TGetInfoType +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.apache.kyuubi.RestClientTestHelper import org.apache.kyuubi.client.{KyuubiRestClient, SessionRestApi} @@ -163,6 +166,37 @@ class SessionRestApiSuite extends RestClientTestHelper { } } + test("fix kyuubi session leak caused by engine stop") { + withSessionRestApi { sessionRestApi => + // close all sessions + var sessions = sessionRestApi.listSessions().asScala + sessions.foreach(session => sessionRestApi.closeSession(session.getIdentifier)) + + // open new session + val sessionOpenRequest = new SessionOpenRequest(Map( + KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true", + KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "5000", + KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "3000").asJava) + val sessionHandle = sessionRestApi.openSession(sessionOpenRequest) + + // get open session count + val sessionCount = sessionRestApi.getOpenSessionCount + assert(sessionCount == 1) + + val statementReq = new StatementRequest( + "spark.stop()", + true, + 3000, + Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA")) + sessionRestApi.executeStatement(sessionHandle.getIdentifier.toString, statementReq) + + eventually(Timeout(30.seconds), interval(1.seconds)) { + assert(sessionRestApi.getOpenSessionCount == 0) + assert(sessionRestApi.listSessions().asScala.isEmpty) + } + } + } + def withSessionRestApi[T](f: SessionRestApi => T): T = { val basicKyuubiRestClient: KyuubiRestClient = KyuubiRestClient.builder(baseUri.toString)