[KYUUBI #4847] Close the session immediately when engine corrupt

### _Why are the changes needed?_

to close https://github.com/apache/kyuubi/issues/4847

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [X] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4848 from huangzhir/kyuubisession_leak.

Closes #4847

37e58ce66 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
170c044f5 [huangzhir] Add some logging and modify some code style.
0c393cd9d [huangzhir] fix sytle
d0183298e [huangzhir] "Use the ENGINE_ALIVE_PROBE_ENABLED configuration to enable the Kyuubi session engine alive probe. The default value of ENGINE_ALIVE_PROBE_ENABLED is false. Use the ENGINE_ALIVE_TIMEOUT configuration to determine the duration for checking the engine's alive status. The engineAliveMaxFailCount configuration controls the maximum number of failures allowed during engine alive checks."
b716dd8f6 [huangzhir] fix kyuubi session leak caused by engine stop

Lead-authored-by: huangzhir <306824224@qq.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
huangzhir 2023-06-11 13:02:56 +08:00 committed by Cheng Pan
parent 5f98539c82
commit 3f74e8bf84
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
4 changed files with 163 additions and 2 deletions

View File

@ -114,6 +114,7 @@ class KyuubiSessionImpl(
super.open()
runOperation(launchEngineOp)
engineLastAlive = System.currentTimeMillis()
}
private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit =
@ -283,4 +284,41 @@ class KyuubiSessionImpl(
case _ => super.executeStatement(statement, confOverlay, runAsync, queryTimeout)
}
}
@volatile private var engineLastAlive: Long = _
val engineAliveTimeout = sessionConf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
val aliveProbeEnabled = sessionConf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
var engineAliveMaxFailCount = 3
var engineAliveFailCount = 0
def checkEngineAlive(): Boolean = {
try {
if (!aliveProbeEnabled) return true
getInfo(TGetInfoType.CLI_DBMS_VER)
engineLastAlive = System.currentTimeMillis()
engineAliveFailCount = 0
true
} catch {
case e: Throwable =>
val now = System.currentTimeMillis()
engineAliveFailCount = engineAliveFailCount + 1
if (now - engineLastAlive > engineAliveTimeout &&
engineAliveFailCount >= engineAliveMaxFailCount) {
error(s"The engineRef[${engine.getEngineRefId}] is marked as not alive "
+ s"due to a lack of recent successful alive probes. "
+ s"The time since last successful probe: "
+ s"${now - engineLastAlive} ms exceeds the timeout of $engineAliveTimeout ms. "
+ s"The engine has failed $engineAliveFailCount times, "
+ s"surpassing the maximum failure count of $engineAliveMaxFailCount.")
false
} else {
warn(
s"The engineRef[${engine.getEngineRefId}] alive probe fails, " +
s"${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms, " +
s"and has failed $engineAliveFailCount times.",
e)
true
}
}
}
}

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.session
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import com.codahale.metrics.MetricRegistry
@ -36,7 +38,7 @@ import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, SessionConfAdvisor
import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetryRef}
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.sql.parser.server.KyuubiParser
import org.apache.kyuubi.util.SignUtils
import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
class KyuubiSessionManager private (name: String) extends SessionManager(name) {
@ -61,6 +63,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
private var batchLimiter: Option[SessionLimiter] = None
lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair()
private val engineAliveChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
addService(applicationManager)
@ -265,6 +270,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
ms.registerGauge(EXEC_POOL_WORK_QUEUE_SIZE, getWorkQueueSize, 0)
}
super.start()
startEngineAliveChecker()
}
def getBatchSessionsToRecover(kyuubiInstance: String): Seq[KyuubiBatchSessionImpl] = {
@ -339,4 +345,24 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
Seq(userLimit, ipAddressLimit, userIpAddressLimit).find(_ > 0).map(_ =>
SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, userUnlimitedList.toSet))
}
private def startEngineAliveChecker(): Unit = {
val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL)
val checkTask: Runnable = () => {
allSessions.foreach { session =>
if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) {
try {
closeSession(session.handle)
logger.info(s"The session ${session.handle} has been closed " +
s"due to engine unresponsiveness (checked by the engine alive checker).")
} catch {
case e: KyuubiSQLException =>
warn(s"Error closing session ${session.handle}", e)
}
}
}
}
engineAliveChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
}
}

View File

@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
import org.apache.kyuubi.client.api.v1.dto
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.client.api.v1.dto.{SessionData, _}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
import org.apache.kyuubi.engine.ShareLevel
@ -301,4 +301,67 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(sessionEvent.contains("The last 10 line(s) of log are:"))
}
}
test("fix kyuubi session leak caused by engine stop") {
// clean up all sessions
var response = webTarget.path("api/v1/sessions").request().get()
val sessionDataList = response.readEntity(new GenericType[List[SessionData]]() {})
sessionDataList.foreach(sessionData => {
response = webTarget.path(s"api/v1/sessions/${sessionData.getIdentifier}")
.request().delete()
assert(200 == response.getStatus)
})
// open a session
val requestObj = new SessionOpenRequest(Map(
KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "5000",
KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "3000").asJava)
response = webTarget.path("api/v1/sessions")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
val sessionHandle = response.readEntity(classOf[SessionHandle]).getIdentifier
val pathPrefix = s"api/v1/sessions/$sessionHandle"
response = webTarget.path("api/v1/sessions/count").request().get()
val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
assert(openedSessionCount.getOpenSessionCount == 1)
var statementReq = new StatementRequest(
"spark.sql(\"show tables\")",
true,
3000,
Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA"))
response = webTarget
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
var operationHandle = response.readEntity(classOf[OperationHandle])
assert(operationHandle !== null)
assert(openedSessionCount.getOpenSessionCount == 1)
statementReq = new StatementRequest(
"spark.close()",
true,
3000,
Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA"))
response = webTarget
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
operationHandle = response.readEntity(classOf[OperationHandle])
assert(operationHandle !== null)
// Because the engine has stopped (due to spark.close), the Spark session is closed.
// Therefore, the Kyuubi session count should be 0.
eventually(timeout(30.seconds), interval(1000.milliseconds)) {
var response = webTarget.path("api/v1/sessions/count").request().get()
val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
assert(openedSessionCount.getOpenSessionCount == 0)
response = webTarget.path("api/v1/sessions").request().get()
val sessionDataList = response.readEntity(new GenericType[List[SessionData]]() {})
assert(sessionDataList.isEmpty)
}
}
}

View File

@ -18,10 +18,13 @@
package org.apache.kyuubi.server.rest.client
import java.util
import java.util.Collections
import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt
import org.apache.hive.service.rpc.thrift.TGetInfoType
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.apache.kyuubi.RestClientTestHelper
import org.apache.kyuubi.client.{KyuubiRestClient, SessionRestApi}
@ -163,6 +166,37 @@ class SessionRestApiSuite extends RestClientTestHelper {
}
}
test("fix kyuubi session leak caused by engine stop") {
withSessionRestApi { sessionRestApi =>
// close all sessions
var sessions = sessionRestApi.listSessions().asScala
sessions.foreach(session => sessionRestApi.closeSession(session.getIdentifier))
// open new session
val sessionOpenRequest = new SessionOpenRequest(Map(
KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "5000",
KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "3000").asJava)
val sessionHandle = sessionRestApi.openSession(sessionOpenRequest)
// get open session count
val sessionCount = sessionRestApi.getOpenSessionCount
assert(sessionCount == 1)
val statementReq = new StatementRequest(
"spark.stop()",
true,
3000,
Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA"))
sessionRestApi.executeStatement(sessionHandle.getIdentifier.toString, statementReq)
eventually(Timeout(30.seconds), interval(1.seconds)) {
assert(sessionRestApi.getOpenSessionCount == 0)
assert(sessionRestApi.listSessions().asScala.isEmpty)
}
}
}
def withSessionRestApi[T](f: SessionRestApi => T): T = {
val basicKyuubiRestClient: KyuubiRestClient =
KyuubiRestClient.builder(baseUri.toString)