[KYUUBI #6353] Catch exception for closing flink internal session

# 🔍 Description
## Issue References 🔗

This pull request fixes #6353

## Describe Your Solution 🔧

Catch exception for closing flink internal session.

## Types of changes 🔖

- [X] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6354 from wForget/KYUUBI-6353.

Closes #6353

32fc9afd9 [wforget] [KYUUBI #6353] Catch exception for closing flink internal session

Authored-by: wforget <643348094@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
wforget 2024-06-06 11:41:33 +08:00 committed by Cheng Pan
parent ef203e05cb
commit a586cb4452
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D

View File

@ -80,10 +80,16 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
try {
super.getSessionOption(sessionHandle).foreach { s =>
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle)
super.getSessionOption(sessionHandle).foreach { s =>
val internalSessionHandle = s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle
try {
sessionManager.closeSession(internalSessionHandle)
} catch {
case t: Throwable =>
warn(s"Error closing flink internal session $internalSessionHandle", t)
}
}
try {
super.closeSession(sessionHandle)
} catch {
case t: Throwable =>