diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index c53dda0e3..42e8e13bc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -74,9 +74,45 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _ + private var kubernetesClientInitializeCleanupTerminatedPodExecutor: ThreadPoolExecutor = _ + private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { checkKubernetesInfo(kubernetesInfo) - kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo)) + kubernetesClients.computeIfAbsent( + kubernetesInfo, + kInfo => { + val kubernetesClient = buildKubernetesClient(kInfo) + cleanTerminatedAppPodsOnKubernetesClientInitialize(kInfo, kubernetesClient) + kubernetesClient + }) + } + + private def cleanTerminatedAppPodsOnKubernetesClientInitialize( + kubernetesInfo: KubernetesInfo, + kubernetesClient: KubernetesClient): Unit = { + if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) { + kubernetesClientInitializeCleanupTerminatedPodExecutor.submit(new Runnable { + override def run(): Unit = { + val existingPods = + kubernetesClient.pods().withLabel(LABEL_KYUUBI_UNIQUE_KEY).list().getItems + info(s"[$kubernetesInfo] Found ${existingPods.size()} existing pods with label " + + s"$LABEL_KYUUBI_UNIQUE_KEY") + val eventType = KubernetesResourceEventTypes.UPDATE + existingPods.asScala.filter(isSparkEnginePod).foreach { pod => + val appState = toApplicationState(pod, appStateSource, appStateContainer, eventType) + if (isTerminated(appState)) { + val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) + info(s"[$kubernetesInfo] Found existing pod ${pod.getMetadata.getName} with " + + s"${toLabel(kyuubiUniqueKey)} in app state $appState, marking it as terminated") + if (appInfoStore.get(kyuubiUniqueKey) == null) { + updateApplicationState(kubernetesInfo, pod, eventType) + } + markApplicationTerminated(kubernetesInfo, pod, eventType) + } + } + } + }) + } } private var metadataManager: Option[MetadataManager] = _ @@ -167,6 +203,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { TimeUnit.MILLISECONDS) cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool( "cleanup-canceled-app-pod-thread") + kubernetesClientInitializeCleanupTerminatedPodExecutor = + ThreadUtils.newDaemonCachedThreadPool( + "kubernetes-client-initialize-cleanup-terminated-pod-thread") initializeKubernetesClient(kyuubiConf) } @@ -313,6 +352,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { ThreadUtils.shutdown(cleanupCanceledAppPodExecutor) cleanupCanceledAppPodExecutor = null } + + if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) { + ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor) + kubernetesClientInitializeCleanupTerminatedPodExecutor = null + } } private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)