diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index c02851715..6cace4452 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -59,7 +59,7 @@ class KyuubiSyncThriftClient private ( @volatile private var _aliveProbeSessionHandle: TSessionHandle = _ @volatile private var remoteEngineBroken: Boolean = false - @volatile private var clientClosedOnEngineBroken: Boolean = false + @volatile private var clientClosedByAliveProbe: Boolean = false private val engineAliveProbeClient = engineAliveProbeProtocol.map(new TCLIService.Client(_)) private var engineAliveThreadPool: ScheduledExecutorService = _ @volatile private var engineLastAlive: Long = _ @@ -84,7 +84,7 @@ class KyuubiSyncThriftClient private ( "engine-alive-probe-" + _aliveProbeSessionHandle) val task = new Runnable { override def run(): Unit = { - if (!remoteEngineBroken) { + if (!remoteEngineBroken && protocol.getTransport.isOpen) { engineAliveProbeClient.foreach { client => val tGetInfoReq = new TGetInfoReq() tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle) @@ -114,7 +114,7 @@ class KyuubiSyncThriftClient private ( } } } - clientClosedOnEngineBroken = true + clientClosedByAliveProbe = true shutdownAsyncRequestExecutor() Option(engineAliveThreadPool).foreach { pool => ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, TimeUnit.MILLISECONDS)) @@ -206,7 +206,7 @@ class KyuubiSyncThriftClient private ( } def closeSession(): Unit = { - if (clientClosedOnEngineBroken) return + if (clientClosedByAliveProbe) return try { if (_remoteSessionHandle != null) { val req = new TCloseSessionReq(_remoteSessionHandle)