[KYUUBI #4847][FOLLOWUP] Exclude the alive probe sessions in terminating checker

# 🔍 Description
## Issue References 🔗

This pull request fixes #
follow up of #4847

Address comments: https://github.com/apache/kyuubi/issues/4847#issuecomment-2072945805
## Describe Your Solution 🔧

In this pr, when checking the engine terminating, it will ignore the alive probe sessions.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6355 from turboFei/engine_idle.

Closes #4847

a8e26e71d [Wang, Fei] comments
418d0b41c [Wang, Fei] val

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2024-05-09 08:48:21 -07:00
parent 1fc2b3519a
commit 88b24601d0
12 changed files with 36 additions and 20 deletions

View File

@ -128,7 +128,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
if (!shutdown.get) {
info(s"Spark engine is de-registering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
while (backendService.sessionManager.getOpenSessionCount > 0) {
while (backendService.sessionManager.getActiveUserSessionCount > 0) {
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
info(s"Spark engine has no open session now, terminating.")
@ -145,12 +145,12 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
Utils.tryLogNonFatalError {
ThreadUtils.runInNewThread("spark-engine-failfast-checker") {
if (!shutdown.get) {
while (backendService.sessionManager.getOpenSessionCount <= 0 &&
while (backendService.sessionManager.getActiveUserSessionCount <= 0 &&
System.currentTimeMillis() - startedTime < maxTimeout) {
info(s"Waiting for the initial connection")
Thread.sleep(Duration(10, TimeUnit.SECONDS).toMillis)
}
if (backendService.sessionManager.getOpenSessionCount <= 0) {
if (backendService.sessionManager.getActiveUserSessionCount <= 0) {
error(s"Spark engine has been terminated because no incoming connection" +
s" for more than $maxTimeout ms, de-registering from engine discovery space.")
assert(currentEngine.isDefined)
@ -180,7 +180,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
}
if (backendService.sessionManager.getOpenSessionCount <= 0) {
if (backendService.sessionManager.getActiveUserSessionCount <= 0) {
info(s"Spark engine has been running for more than $maxLifetime ms" +
s" and no open session now, terminating.")
stop()

View File

@ -37,6 +37,7 @@ object KyuubiReservedKeys {
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
"kyuubi.session.engine.launch.handle.guid"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET =

View File

@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_CLIENT_IP_KEY
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@ -259,4 +260,7 @@ abstract class AbstractSession(
override def open(): Unit = {
OperationLog.createOperationLogRootDirectory(this)
}
val isForAliveProbe: Boolean =
conf.get(KyuubiReservedKeys.KYUUBI_SESSION_ALIVE_PROBE).exists(_.equalsIgnoreCase("true"))
}

View File

@ -93,4 +93,6 @@ trait Session {
fetchLog: Boolean): TFetchResultsResp
def closeExpiredOperations(): Unit
def isForAliveProbe: Boolean
}

View File

@ -92,7 +92,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
protected def logSessionCountInfo(session: Session, action: String): Unit = {
info(s"${session.user}'s ${session.getClass.getSimpleName} with" +
s" ${session.handle}${session.name.map("/" + _).getOrElse("")} is $action," +
s" current opening sessions $getOpenSessionCount")
s" current opening sessions $getActiveUserSessionCount")
}
def openSession(
@ -122,11 +122,13 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
}
def closeSession(sessionHandle: SessionHandle): Unit = {
_latestLogoutTime = System.currentTimeMillis()
val session = handleToSession.remove(sessionHandle)
if (session == null) {
throw KyuubiSQLException(s"Invalid $sessionHandle")
}
if (!session.isForAliveProbe) {
_latestLogoutTime = System.currentTimeMillis()
}
logSessionCountInfo(session, "closed")
try {
session.close()
@ -159,7 +161,10 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
handleToSession.put(sessionHandle, session)
}
def getOpenSessionCount: Int = handleToSession.size()
/**
* Get the count of active user sessions, which excludes alive probe sessions.
*/
def getActiveUserSessionCount: Int = handleToSession.values().asScala.count(!_.isForAliveProbe)
def allSessions(): Iterable[Session] = handleToSession.values().asScala
@ -303,7 +308,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
val checkTask = new Runnable {
override def run(): Unit = {
info(s"Checking sessions timeout, current count: $getOpenSessionCount")
info(s"Checking sessions timeout, current count: $getActiveUserSessionCount")
val current = System.currentTimeMillis
if (!shutdown) {
for (session <- handleToSession.values().asScala) {
@ -341,7 +346,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
val checkTask = new Runnable {
override def run(): Unit = {
if (!shutdown && System.currentTimeMillis() - latestLogoutTime > idleTimeout &&
getOpenSessionCount <= 0) {
getActiveUserSessionCount <= 0) {
info(s"Idled for more than $idleTimeout ms, terminating")
stop()
}

View File

@ -549,13 +549,13 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
.getSession(SessionHandle(handle))
.asInstanceOf[AbstractSession]
var lastAccessTime = session.lastAccessTime
assert(sessionManager.getOpenSessionCount === 1)
assert(sessionManager.getActiveUserSessionCount === 1)
assert(session.lastIdleTime > 0)
val cancelOpReq = new TCancelOperationReq(resp.getOperationHandle)
val cancelOpResp = client.CancelOperation(cancelOpReq)
assert(cancelOpResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(sessionManager.getOpenSessionCount === 1)
assert(sessionManager.getActiveUserSessionCount === 1)
assert(session.lastIdleTime === 0)
lastAccessTime = session.lastAccessTime
@ -569,7 +569,7 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
assert(session.lastAccessTime > lastAccessTime)
}
info("session is terminated")
assert(sessionManager.getOpenSessionCount === 0)
assert(sessionManager.getActiveUserSessionCount === 0)
}
}

View File

@ -66,8 +66,9 @@ abstract class ServiceDiscovery(
// stop the server genteelly
def stopGracefully(isLost: Boolean = false): Unit = {
while (fe.be.sessionManager.getOpenSessionCount > 0) {
info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown")
val activeSessionCount = fe.be.sessionManager.getActiveUserSessionCount
while (activeSessionCount > 0) {
info(s"$activeSessionCount connection(s) are active, delay shutdown")
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
isServerLost.set(isLost)

View File

@ -187,7 +187,7 @@ class KyuubiSyncThriftClient private (
val req = new TOpenSessionReq(protocol)
req.setUsername(user)
req.setPassword(password)
req.setConfiguration(configs.asJava)
req.setConfiguration((configs ++ Map(KYUUBI_SESSION_ALIVE_PROBE -> "false")).asJava)
val resp = withLockAcquired(OpenSession(req))
ThriftUtils.verifyTStatus(resp.getStatus)
_remoteSessionHandle = resp.getSessionHandle
@ -207,7 +207,8 @@ class KyuubiSyncThriftClient private (
req.setConfiguration((configs ++ Map(
KyuubiConf.SESSION_NAME.key -> sessionName,
KYUUBI_SESSION_HANDLE_KEY -> UUID.randomUUID().toString,
KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL.key -> "")).asJava)
KyuubiConf.ENGINE_SESSION_INITIALIZE_SQL.key -> "",
KYUUBI_SESSION_ALIVE_PROBE -> "true")).asJava)
val resp = aliveProbeClient.OpenSession(req)
ThriftUtils.verifyTStatus(resp.getStatus)
_aliveProbeSessionHandle = resp.getSessionHandle

View File

@ -109,7 +109,7 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging {
@GET
@Path("count")
def sessionCount(): SessionOpenCount = {
new SessionOpenCount(sessionManager.getOpenSessionCount)
new SessionOpenCount(sessionManager.getActiveUserSessionCount)
}
@ApiResponse(

View File

@ -70,4 +70,6 @@ abstract class KyuubiSession(
ms.decCount(MetricRegistry.name(CONN_OPEN, user, sessionType.toString))
ms.decCount(MetricRegistry.name(CONN_OPEN, sessionType.toString))
}
override val isForAliveProbe: Boolean = false
}

View File

@ -287,7 +287,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
override def start(): Unit = synchronized {
MetricsSystem.tracing { ms =>
ms.registerGauge(CONN_OPEN, getOpenSessionCount, 0)
ms.registerGauge(CONN_OPEN, getActiveUserSessionCount, 0)
ms.registerGauge(EXEC_POOL_ALIVE, getExecPoolSize, 0)
ms.registerGauge(EXEC_POOL_ACTIVE, getActiveCount, 0)
ms.registerGauge(EXEC_POOL_WORK_QUEUE_SIZE, getWorkQueueSize, 0)

View File

@ -525,7 +525,7 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val kyuubiInstance = fe.connectionUrl
assert(sessionManager.getOpenSessionCount === 0)
assert(sessionManager.getActiveUserSessionCount === 0)
val batchId1 = UUID.randomUUID().toString
val batchId2 = UUID.randomUUID().toString
@ -585,7 +585,7 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
val restFe = fe.asInstanceOf[KyuubiRestFrontendService]
restFe.recoverBatchSessions()
assert(sessionManager.getOpenSessionCount === 2)
assert(sessionManager.getActiveUserSessionCount === 2)
val sessionHandle1 = SessionHandle.fromUUID(batchId1)
val sessionHandle2 = SessionHandle.fromUUID(batchId2)