[KYUUBI #4847][FOLLOWUP] Fix engine session never idle issue

# 🔍 Description
## Issue References 🔗

Address comments https://github.com/apache/kyuubi/issues/4847#issuecomment-2114284381

Now, for `checkEngineConnectionAlive`, it use the client to send `TGetInfoType` to engine and cause the user session never idle for timeout.

## Describe Your Solution 🔧

We shall reuse the alive probe client.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests
Pass the current UT.

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6468 from turboFei/engine_alive_check.

Closes #4847

e2368b206 [Wang, Fei] reuse

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 83de6cf11e)
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Wang, Fei 2024-06-17 18:28:45 +08:00 committed by Cheng Pan
parent 5847edf37d
commit 5ceb641c19
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
2 changed files with 8 additions and 41 deletions

View File

@ -63,7 +63,8 @@ class KyuubiSyncThriftClient private (
private[kyuubi] def remoteSessionHandle: TSessionHandle = _remoteSessionHandle
@volatile private var _aliveProbeSessionHandle: TSessionHandle = _
@volatile private var remoteEngineBroken: Boolean = false
@volatile private var _remoteEngineBroken: Boolean = false
private[kyuubi] def remoteEngineBroken: Boolean = _remoteEngineBroken
@volatile private var clientClosedByAliveProbe: Boolean = false
private val engineAliveProbeClient = engineAliveProbeProtocol.map(new TCLIService.Client(_))
private var engineAliveThreadPool: ScheduledExecutorService = _
@ -111,7 +112,7 @@ class KyuubiSyncThriftClient private (
val task = new Runnable {
override def run(): Unit = {
if (!remoteEngineBroken && !engineConnectionClosed) {
if (!_remoteEngineBroken && !engineConnectionClosed) {
engineAliveProbeClient.foreach { client =>
val tGetInfoReq = new TGetInfoReq()
tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle)
@ -120,7 +121,7 @@ class KyuubiSyncThriftClient private (
try {
client.GetInfo(tGetInfoReq).getInfoValue.getStringValue
engineLastAlive = System.currentTimeMillis()
remoteEngineBroken = false
_remoteEngineBroken = false
} catch {
case e: Throwable =>
val engineIdStr = engineId.getOrElse("")
@ -129,7 +130,7 @@ class KyuubiSyncThriftClient private (
if (now - engineLastAlive > engineAliveTimeout) {
error(s"Mark the engine[$engineIdStr] not alive with no recent alive probe" +
s" success: ${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms")
remoteEngineBroken = true
_remoteEngineBroken = true
closeClient()
}
}
@ -165,7 +166,7 @@ class KyuubiSyncThriftClient private (
val task = asyncRequestExecutor.submit(() => {
val resp = block
remoteEngineBroken = false
_remoteEngineBroken = false
resp
})

View File

@ -119,7 +119,6 @@ class KyuubiSessionImpl(
super.open()
runOperation(launchEngineOp)
engineLastAlive = System.currentTimeMillis()
}
def getEngineNode: Option[ServiceNodeInfo] = {
@ -315,41 +314,8 @@ class KyuubiSessionImpl(
}
}
@volatile private var engineLastAlive: Long = _
private val engineAliveTimeout = sessionConf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
private val aliveProbeEnabled = sessionConf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
private val engineAliveMaxFailCount = sessionConf.get(KyuubiConf.ENGINE_ALIVE_MAX_FAILURES)
@volatile private var engineAliveFailCount = 0
def checkEngineConnectionAlive(): Boolean = {
try {
if (Option(client).exists(_.engineConnectionClosed)) return false
if (!aliveProbeEnabled) return true
getInfo(TGetInfoType.CLI_DBMS_VER)
engineLastAlive = System.currentTimeMillis()
engineAliveFailCount = 0
true
} catch {
case e: Throwable =>
val now = System.currentTimeMillis()
engineAliveFailCount = engineAliveFailCount + 1
if (now - engineLastAlive > engineAliveTimeout &&
engineAliveFailCount >= engineAliveMaxFailCount) {
error(s"The engineRef[${engine.getEngineRefId}] is marked as not alive "
+ s"due to a lack of recent successful alive probes. "
+ s"The time since last successful probe: "
+ s"${now - engineLastAlive} ms exceeds the timeout of $engineAliveTimeout ms. "
+ s"The engine has failed $engineAliveFailCount times, "
+ s"surpassing the maximum failure count of $engineAliveMaxFailCount.")
false
} else {
warn(
s"The engineRef[${engine.getEngineRefId}] alive probe fails, " +
s"${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms, " +
s"and has failed $engineAliveFailCount times.",
e)
true
}
}
if (Option(client).exists(_.engineConnectionClosed)) return false
!client.remoteEngineBroken
}
}