[KYUUBI #5029] Close the alive probe session after engine session closed

### _Why are the changes needed?_

Now the server side session idle time is controlled by kyuubi admin, and the engine session idle time is controlled by users.

If the engine session idle time is short than that of the server session.

The engine session might be closed after engine session idle time, and now the alive probe session will still check the engine alive until the server session idle than the server session idle timeout.

![image](https://github.com/apache/kyuubi/assets/6757692/8d7dbbd3-6883-4e16-8bfd-01236ee26fa9)

In this pr, we will close the alive probe if the engine session transport is not open.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5029 from turboFei/close_alive_session.

Closes #5029

6ac3d9c1f [fwang12] close alive probe session after engine session closed

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2023-07-06 21:33:50 +08:00
parent 46f8e0ca94
commit ea2828070c

View File

@ -59,7 +59,7 @@ class KyuubiSyncThriftClient private (
@volatile private var _aliveProbeSessionHandle: TSessionHandle = _
@volatile private var remoteEngineBroken: Boolean = false
@volatile private var clientClosedOnEngineBroken: Boolean = false
@volatile private var clientClosedByAliveProbe: Boolean = false
private val engineAliveProbeClient = engineAliveProbeProtocol.map(new TCLIService.Client(_))
private var engineAliveThreadPool: ScheduledExecutorService = _
@volatile private var engineLastAlive: Long = _
@ -84,7 +84,7 @@ class KyuubiSyncThriftClient private (
"engine-alive-probe-" + _aliveProbeSessionHandle)
val task = new Runnable {
override def run(): Unit = {
if (!remoteEngineBroken) {
if (!remoteEngineBroken && protocol.getTransport.isOpen) {
engineAliveProbeClient.foreach { client =>
val tGetInfoReq = new TGetInfoReq()
tGetInfoReq.setSessionHandle(_aliveProbeSessionHandle)
@ -114,7 +114,7 @@ class KyuubiSyncThriftClient private (
}
}
}
clientClosedOnEngineBroken = true
clientClosedByAliveProbe = true
shutdownAsyncRequestExecutor()
Option(engineAliveThreadPool).foreach { pool =>
ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, TimeUnit.MILLISECONDS))
@ -206,7 +206,7 @@ class KyuubiSyncThriftClient private (
}
def closeSession(): Unit = {
if (clientClosedOnEngineBroken) return
if (clientClosedByAliveProbe) return
try {
if (_remoteSessionHandle != null) {
val req = new TCloseSessionReq(_remoteSessionHandle)