[KYUUBI #6172][TASK][EASY] Support to interrupt the thrift request immediately after marking the engine not alive

Support to interrupt the thrift request immediately after marking the engine not alive

# 🔍 Description
## Issue References 🔗

This pull request fixes #6172

## Describe Your Solution 🔧

12c5568c9b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala (L103-L110)

When probe fails and exceeds engineAliveTimeout, not interrupt the thrift request immediately, only marked `remoteEngineBroken` and wait next `engineAliveProbeInterval` to interrupt.
Unit test `KyuubiOperationPerConnectionSuite` assert timeout 3s.

12c5568c9b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala (L344-L346)

Exception log
```
03:25:15.125 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6 16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11886 WARN KyuubiSyncThriftClient: The engine[local-1714879506640] alive probe fails
org.apache.kyuubi.shaded.thrift.transport.TTransportException: Socket is closed by peer.
	...
03:25:16.126 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6 16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11886 WARN KyuubiSyncThriftClient: The engine[local-1714879506640] alive probe fails
org.apache.kyuubi.shaded.thrift.transport.TTransportException: java.net.SocketException: Broken pipe (Write failed)
	...
03:25:16.126 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:BD 23 DE B6 16 56 4E 2B 97 3C 09 74 89 F5 C9 26, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11886 ERROR KyuubiSyncThriftClient: Mark the engine[local-1714879506640] not alive with no recent alive probe success: 2001 ms exceeds timeout 1000 ms
```

Success log
```
16:57:46.859 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83 6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11609 WARN KyuubiSyncThriftClient: The engine[local-1715101059872] alive probe fails
...
16:57:46.860 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83 6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11609 ERROR KyuubiSyncThriftClient: Mark the engine[local-1715101059872] not alive with no recent alive probe success: 1001 ms exceeds timeout 1000 ms
16:57:47.860 engine-alive-probe-TSessionHandle(sessionId:THandleIdentifier(guid:B4 9D 71 83 6D 15 4D 8D BE DA 65 75 27 5D 4E D8, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38)): Thread-11609 WARN KyuubiSyncThriftClient: Removing Clients for TSessionHandle(sessionId:THandleIdentifier(guid:9D AA D5 C2 9B E4 43 D7 BE 81 D0 99 EA 5B 9E 37, secret:C2 EE 5B 97 3E A0 41 FC AC 16 9B D7 08 ED 8F 38))
```

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6375 from beryllw/kyuubi_6172.

Closes #6172

991798b86 [wangjunbo] [KYUUBI #6172][TASK][EASY] Support to interrupt the thrift request immediately after marking the engine not alive

Authored-by: wangjunbo <wangjunbo@qiyi.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
wangjunbo 2024-05-09 08:46:16 -07:00 committed by Wang, Fei
parent 5b6ab1af88
commit 1fc2b3519a

View File

@ -87,6 +87,23 @@ class KyuubiSyncThriftClient private (
private def startEngineAliveProbe(): Unit = {
engineAliveThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"engine-alive-probe-" + _aliveProbeSessionHandle)
def closeClient(): Unit = {
warn(s"Removing Clients for ${_remoteSessionHandle}")
Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach { tProtocol =>
Utils.tryLogNonFatalError {
if (tProtocol.getTransport.isOpen) {
tProtocol.getTransport.close()
}
}
}
clientClosedByAliveProbe = true
shutdownAsyncRequestExecutor()
Option(engineAliveThreadPool).foreach { pool =>
ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, TimeUnit.MILLISECONDS))
}
}
val task = new Runnable {
override def run(): Unit = {
if (!remoteEngineBroken && !engineConnectionClosed) {
@ -108,23 +125,12 @@ class KyuubiSyncThriftClient private (
error(s"Mark the engine[$engineIdStr] not alive with no recent alive probe" +
s" success: ${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms")
remoteEngineBroken = true
closeClient()
}
}
}
} else {
warn(s"Removing Clients for ${_remoteSessionHandle}")
Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach { tProtocol =>
Utils.tryLogNonFatalError {
if (tProtocol.getTransport.isOpen) {
tProtocol.getTransport.close()
}
}
}
clientClosedByAliveProbe = true
shutdownAsyncRequestExecutor()
Option(engineAliveThreadPool).foreach { pool =>
ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, TimeUnit.MILLISECONDS))
}
closeClient()
}
}
}