diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index 7133131d7..d34458c64 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -63,7 +63,8 @@ class KyuubiSyncThriftClient private ( private[kyuubi] def remoteSessionHandle: TSessionHandle = _remoteSessionHandle @volatile private var _aliveProbeSessionHandle: TSessionHandle = _ - @volatile private var remoteEngineBroken: Boolean = false + @volatile private var _remoteEngineBroken: Boolean = false + private[kyuubi] def remoteEngineBroken: Boolean = _remoteEngineBroken @volatile private var clientClosedByAliveProbe: Boolean = false private val engineAliveProbeClient = engineAliveProbeProtocol.map(new TCLIService.Client(_)) private var engineAliveThreadPool: ScheduledExecutorService = _ @@ -111,7 +112,7 @@ class KyuubiSyncThriftClient private ( val task = new Runnable { override def run(): Unit = { - if (!remoteEngineBroken && !engineConnectionClosed) { + if (!_remoteEngineBroken && !engineConnectionClosed) { engineAliveProbeClient.foreach { client => val tGetInfoReq = new TGetInfoReq() tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle) @@ -120,7 +121,7 @@ class KyuubiSyncThriftClient private ( try { client.GetInfo(tGetInfoReq).getInfoValue.getStringValue engineLastAlive = System.currentTimeMillis() - remoteEngineBroken = false + _remoteEngineBroken = false } catch { case e: Throwable => val engineIdStr = engineId.getOrElse("") @@ -129,7 +130,7 @@ class KyuubiSyncThriftClient private ( if (now - engineLastAlive > engineAliveTimeout) { error(s"Mark the engine[$engineIdStr] not alive with no recent alive probe" + s" success: ${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms") - remoteEngineBroken = true + _remoteEngineBroken = true closeClient() } } @@ -165,7 +166,7 @@ class KyuubiSyncThriftClient private ( val task = asyncRequestExecutor.submit(() => { val resp = block - remoteEngineBroken = false + _remoteEngineBroken = false resp }) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index c0634455f..22cd4dc8a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -119,7 +119,6 @@ class KyuubiSessionImpl( super.open() runOperation(launchEngineOp) - engineLastAlive = System.currentTimeMillis() } def getEngineNode: Option[ServiceNodeInfo] = { @@ -315,41 +314,8 @@ class KyuubiSessionImpl( } } - @volatile private var engineLastAlive: Long = _ - private val engineAliveTimeout = sessionConf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT) - private val aliveProbeEnabled = sessionConf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED) - private val engineAliveMaxFailCount = sessionConf.get(KyuubiConf.ENGINE_ALIVE_MAX_FAILURES) - @volatile private var engineAliveFailCount = 0 - def checkEngineConnectionAlive(): Boolean = { - try { - if (Option(client).exists(_.engineConnectionClosed)) return false - if (!aliveProbeEnabled) return true - getInfo(TGetInfoType.CLI_DBMS_VER) - engineLastAlive = System.currentTimeMillis() - engineAliveFailCount = 0 - true - } catch { - case e: Throwable => - val now = System.currentTimeMillis() - engineAliveFailCount = engineAliveFailCount + 1 - if (now - engineLastAlive > engineAliveTimeout && - engineAliveFailCount >= engineAliveMaxFailCount) { - error(s"The engineRef[${engine.getEngineRefId}] is marked as not alive " - + s"due to a lack of recent successful alive probes. " - + s"The time since last successful probe: " - + s"${now - engineLastAlive} ms exceeds the timeout of $engineAliveTimeout ms. " - + s"The engine has failed $engineAliveFailCount times, " - + s"surpassing the maximum failure count of $engineAliveMaxFailCount.") - false - } else { - warn( - s"The engineRef[${engine.getEngineRefId}] alive probe fails, " + - s"${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms, " + - s"and has failed $engineAliveFailCount times.", - e) - true - } - } + if (Option(client).exists(_.engineConnectionClosed)) return false + !client.remoteEngineBroken } }