From 779b4bde31cd524a806f0cd314a63d04cdfd0640 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Sun, 22 Jun 2025 22:35:14 -0700 Subject: [PATCH] [KYUUBI #7101] Load the existing pods when initializing kubernetes client to cleanup terminated app pods ### Why are the changes needed? To prevent the terminated app pods leak if the events missed during kyuubi server restart. ### How was this patch tested? Manual test. ``` :2025-06-17 17:50:37.275 INFO [main] org.apache.kyuubi.engine.KubernetesApplicationOperation: [KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423211008-grectg-stm-17da59fe-caf4-41e4-a12f-6c1ed9a293f9-driver with label: kyuubi-unique-tag=17da59fe-caf4-41e4-a12f-6c1ed9a293f9 in app state FINISHED, marking it as terminated 2025-06-17 17:50:37.278 INFO [main] org.apache.kyuubi.engine.KubernetesApplicationOperation: [KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423212011-gpdtsi-stm-6a23000f-10be-4a42-ae62-4fa2da8fac07-driver with label: kyuubi-unique-tag=6a23000f-10be-4a42-ae62-4fa2da8fac07 in app state FINISHED, marking it as terminated ``` The pods are cleaned up eventually. image ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7101 from turboFei/pod_cleanup. Closes #7101 7f76cf57c [Wang, Fei] async 11c9db25d [Wang, Fei] cleanup Authored-by: Wang, Fei Signed-off-by: Wang, Fei (cherry picked from commit 302b5fa1e60708cf8783893d166a22c7e6130309) Signed-off-by: Wang, Fei --- .../KubernetesApplicationOperation.scala | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index c53dda0e3..42e8e13bc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -74,9 +74,45 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _ + private var kubernetesClientInitializeCleanupTerminatedPodExecutor: ThreadPoolExecutor = _ + private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { checkKubernetesInfo(kubernetesInfo) - kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo)) + kubernetesClients.computeIfAbsent( + kubernetesInfo, + kInfo => { + val kubernetesClient = buildKubernetesClient(kInfo) + cleanTerminatedAppPodsOnKubernetesClientInitialize(kInfo, kubernetesClient) + kubernetesClient + }) + } + + private def cleanTerminatedAppPodsOnKubernetesClientInitialize( + kubernetesInfo: KubernetesInfo, + kubernetesClient: KubernetesClient): Unit = { + if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) { + kubernetesClientInitializeCleanupTerminatedPodExecutor.submit(new Runnable { + override def run(): Unit = { + val existingPods = + kubernetesClient.pods().withLabel(LABEL_KYUUBI_UNIQUE_KEY).list().getItems + info(s"[$kubernetesInfo] Found ${existingPods.size()} existing pods with label " + + s"$LABEL_KYUUBI_UNIQUE_KEY") + val eventType = KubernetesResourceEventTypes.UPDATE + existingPods.asScala.filter(isSparkEnginePod).foreach { pod => + val appState = toApplicationState(pod, appStateSource, appStateContainer, eventType) + if (isTerminated(appState)) { + val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) + info(s"[$kubernetesInfo] Found existing pod ${pod.getMetadata.getName} with " + + s"${toLabel(kyuubiUniqueKey)} in app state $appState, marking it as terminated") + if (appInfoStore.get(kyuubiUniqueKey) == null) { + updateApplicationState(kubernetesInfo, pod, eventType) + } + markApplicationTerminated(kubernetesInfo, pod, eventType) + } + } + } + }) + } } private var metadataManager: Option[MetadataManager] = _ @@ -167,6 +203,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { TimeUnit.MILLISECONDS) cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool( "cleanup-canceled-app-pod-thread") + kubernetesClientInitializeCleanupTerminatedPodExecutor = + ThreadUtils.newDaemonCachedThreadPool( + "kubernetes-client-initialize-cleanup-terminated-pod-thread") initializeKubernetesClient(kyuubiConf) } @@ -313,6 +352,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { ThreadUtils.shutdown(cleanupCanceledAppPodExecutor) cleanupCanceledAppPodExecutor = null } + + if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) { + ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor) + kubernetesClientInitializeCleanupTerminatedPodExecutor = null + } } private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)