From 5ceb641c1958eb02516d9e49cc5a7777d087de2f Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Mon, 17 Jun 2024 18:28:45 +0800 Subject: [PATCH] [KYUUBI #4847][FOLLOWUP] Fix engine session never idle issue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— Address comments https://github.com/apache/kyuubi/issues/4847#issuecomment-2114284381 Now, for `checkEngineConnectionAlive`, it use the client to send `TGetInfoType` to engine and cause the user session never idle for timeout. ## Describe Your Solution ๐Ÿ”ง We shall reuse the alive probe client. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests Pass the current UT. --- # Checklist ๐Ÿ“ - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6468 from turboFei/engine_alive_check. Closes #4847 e2368b206 [Wang, Fei] reuse Authored-by: Wang, Fei Signed-off-by: Cheng Pan (cherry picked from commit 83de6cf11e23b65adc680427b5efca6931497993) Signed-off-by: Cheng Pan --- .../client/KyuubiSyncThriftClient.scala | 11 +++--- .../kyuubi/session/KyuubiSessionImpl.scala | 38 +------------------ 2 files changed, 8 insertions(+), 41 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index 7133131d7..d34458c64 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -63,7 +63,8 @@ class KyuubiSyncThriftClient private ( private[kyuubi] def remoteSessionHandle: TSessionHandle = _remoteSessionHandle @volatile private var _aliveProbeSessionHandle: TSessionHandle = _ - @volatile private var remoteEngineBroken: Boolean = false + @volatile private var _remoteEngineBroken: Boolean = false + private[kyuubi] def remoteEngineBroken: Boolean = _remoteEngineBroken @volatile private var clientClosedByAliveProbe: Boolean = false private val engineAliveProbeClient = engineAliveProbeProtocol.map(new TCLIService.Client(_)) private var engineAliveThreadPool: ScheduledExecutorService = _ @@ -111,7 +112,7 @@ class KyuubiSyncThriftClient private ( val task = new Runnable { override def run(): Unit = { - if (!remoteEngineBroken && !engineConnectionClosed) { + if (!_remoteEngineBroken && !engineConnectionClosed) { engineAliveProbeClient.foreach { client => val tGetInfoReq = new TGetInfoReq() tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle) @@ -120,7 +121,7 @@ class KyuubiSyncThriftClient private ( try { client.GetInfo(tGetInfoReq).getInfoValue.getStringValue engineLastAlive = System.currentTimeMillis() - remoteEngineBroken = false + _remoteEngineBroken = false } catch { case e: Throwable => val engineIdStr = engineId.getOrElse("") @@ -129,7 +130,7 @@ class KyuubiSyncThriftClient private ( if (now - engineLastAlive > engineAliveTimeout) { error(s"Mark the engine[$engineIdStr] not alive with no recent alive probe" + s" success: ${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms") - remoteEngineBroken = true + _remoteEngineBroken = true closeClient() } } @@ -165,7 +166,7 @@ class KyuubiSyncThriftClient private ( val task = asyncRequestExecutor.submit(() => { val resp = block - remoteEngineBroken = false + _remoteEngineBroken = false resp }) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index c0634455f..22cd4dc8a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -119,7 +119,6 @@ class KyuubiSessionImpl( super.open() runOperation(launchEngineOp) - engineLastAlive = System.currentTimeMillis() } def getEngineNode: Option[ServiceNodeInfo] = { @@ -315,41 +314,8 @@ class KyuubiSessionImpl( } } - @volatile private var engineLastAlive: Long = _ - private val engineAliveTimeout = sessionConf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT) - private val aliveProbeEnabled = sessionConf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED) - private val engineAliveMaxFailCount = sessionConf.get(KyuubiConf.ENGINE_ALIVE_MAX_FAILURES) - @volatile private var engineAliveFailCount = 0 - def checkEngineConnectionAlive(): Boolean = { - try { - if (Option(client).exists(_.engineConnectionClosed)) return false - if (!aliveProbeEnabled) return true - getInfo(TGetInfoType.CLI_DBMS_VER) - engineLastAlive = System.currentTimeMillis() - engineAliveFailCount = 0 - true - } catch { - case e: Throwable => - val now = System.currentTimeMillis() - engineAliveFailCount = engineAliveFailCount + 1 - if (now - engineLastAlive > engineAliveTimeout && - engineAliveFailCount >= engineAliveMaxFailCount) { - error(s"The engineRef[${engine.getEngineRefId}] is marked as not alive " - + s"due to a lack of recent successful alive probes. " - + s"The time since last successful probe: " - + s"${now - engineLastAlive} ms exceeds the timeout of $engineAliveTimeout ms. " - + s"The engine has failed $engineAliveFailCount times, " - + s"surpassing the maximum failure count of $engineAliveMaxFailCount.") - false - } else { - warn( - s"The engineRef[${engine.getEngineRefId}] alive probe fails, " + - s"${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms, " + - s"and has failed $engineAliveFailCount times.", - e) - true - } - } + if (Option(client).exists(_.engineConnectionClosed)) return false + !client.remoteEngineBroken } }