[KYUUBI #7101] Load the existing pods when initializing kubernetes client to cleanup terminated app pods

### Why are the changes needed?

To prevent the terminated app pods leak if the events missed during kyuubi server restart.

### How was this patch tested?

Manual test.

```
:2025-06-17 17:50:37.275 INFO [main] org.apache.kyuubi.engine.KubernetesApplicationOperation: [KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423211008-grectg-stm-17da59fe-caf4-41e4-a12f-6c1ed9a293f9-driver with label: kyuubi-unique-tag=17da59fe-caf4-41e4-a12f-6c1ed9a293f9 in app state FINISHED, marking it as terminated
2025-06-17 17:50:37.278 INFO [main] org.apache.kyuubi.engine.KubernetesApplicationOperation: [KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423212011-gpdtsi-stm-6a23000f-10be-4a42-ae62-4fa2da8fac07-driver with label: kyuubi-unique-tag=6a23000f-10be-4a42-ae62-4fa2da8fac07 in app state FINISHED, marking it as terminated
```
The pods are cleaned up eventually.
<img width="664" alt="image" src="https://github.com/user-attachments/assets/8cf58f61-065f-4fb0-9718-2e3c00e8d2e0" />

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #7101 from turboFei/pod_cleanup.

Closes #7101

7f76cf57c [Wang, Fei] async
11c9db25d [Wang, Fei] cleanup

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-06-22 22:35:14 -07:00
parent 364f062852
commit 302b5fa1e6

View File

@ -75,9 +75,45 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _
private var kubernetesClientInitializeCleanupTerminatedPodExecutor: ThreadPoolExecutor = _
private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = {
checkKubernetesInfo(kubernetesInfo)
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo))
kubernetesClients.computeIfAbsent(
kubernetesInfo,
kInfo => {
val kubernetesClient = buildKubernetesClient(kInfo)
cleanTerminatedAppPodsOnKubernetesClientInitialize(kInfo, kubernetesClient)
kubernetesClient
})
}
private def cleanTerminatedAppPodsOnKubernetesClientInitialize(
kubernetesInfo: KubernetesInfo,
kubernetesClient: KubernetesClient): Unit = {
if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) {
kubernetesClientInitializeCleanupTerminatedPodExecutor.submit(new Runnable {
override def run(): Unit = {
val existingPods =
kubernetesClient.pods().withLabel(LABEL_KYUUBI_UNIQUE_KEY).list().getItems
info(s"[$kubernetesInfo] Found ${existingPods.size()} existing pods with label " +
s"$LABEL_KYUUBI_UNIQUE_KEY")
val eventType = KubernetesResourceEventTypes.UPDATE
existingPods.asScala.filter(isSparkEnginePod).foreach { pod =>
val appState = toApplicationState(pod, appStateSource, appStateContainer, eventType)
if (isTerminated(appState)) {
val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
info(s"[$kubernetesInfo] Found existing pod ${pod.getMetadata.getName} with " +
s"${toLabel(kyuubiUniqueKey)} in app state $appState, marking it as terminated")
if (appInfoStore.get(kyuubiUniqueKey) == null) {
updateApplicationState(kubernetesInfo, pod, eventType)
}
markApplicationTerminated(kubernetesInfo, pod, eventType)
}
}
}
})
}
}
private var metadataManager: Option[MetadataManager] = _
@ -168,6 +204,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
TimeUnit.MILLISECONDS)
cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
"cleanup-canceled-app-pod-thread")
kubernetesClientInitializeCleanupTerminatedPodExecutor =
ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-client-initialize-cleanup-terminated-pod-thread")
initializeKubernetesClient(kyuubiConf)
}
@ -321,6 +360,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
ThreadUtils.shutdown(cleanupCanceledAppPodExecutor)
cleanupCanceledAppPodExecutor = null
}
if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) {
ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor)
kubernetesClientInitializeCleanupTerminatedPodExecutor = null
}
}
private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)