[KYUUBI #6785] Shutdown the executor service in KubernetesApplicationOperation and prevent NPE

# 🔍 Description
## Issue References 🔗

As title.

Fix NPE, because the cleanupTerminatedAppInfoTrigger will be set to `null`.
d3520ddbce/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala (L269)

Also shutdown the ExecutorService when KubernetesApplicationOperation stoped.
## Describe Your Solution 🔧

Shutdown the thread executor service and check the null.
## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6785 from turboFei/npe_k8s.

Closes #6785

6afd052e6 [Wang, Fei] comments
f0c3e3134 [Wang, Fei] prevent npe
9dffe0125 [Wang, Fei] shutdown

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-03-23 13:19:22 -07:00
parent a54ee39ab3
commit 338206e8a7

View File

@ -151,10 +151,12 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
expireCleanUpTriggerCacheExecutor,
() => {
try {
cleanupTerminatedAppInfoTrigger.asMap().asScala.foreach {
case (key, _) =>
// do get to trigger cache eviction
cleanupTerminatedAppInfoTrigger.getIfPresent(key)
Option(cleanupTerminatedAppInfoTrigger).foreach { trigger =>
trigger.asMap().asScala.foreach {
case (key, _) =>
// do get to trigger cache eviction
trigger.getIfPresent(key)
}
}
} catch {
case NonFatal(e) => error("Failed to evict clean up terminated app cache", e)
@ -273,6 +275,16 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
Utils.tryLogNonFatalError(client.close())
}
kubernetesClients.clear()
if (expireCleanUpTriggerCacheExecutor != null) {
ThreadUtils.shutdown(expireCleanUpTriggerCacheExecutor)
expireCleanUpTriggerCacheExecutor = null
}
if (cleanupCanceledAppPodExecutor != null) {
ThreadUtils.shutdown(cleanupCanceledAppPodExecutor)
cleanupCanceledAppPodExecutor = null
}
}
private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)