diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 392de720a..c4d3c93ba 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.engine import java.util.Locale -import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit} import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -35,6 +35,9 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE} import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} +import org.apache.kyuubi.operation.OperationState +import org.apache.kyuubi.server.KyuubiServer +import org.apache.kyuubi.session.KyuubiSessionManager import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils} class KubernetesApplicationOperation extends ApplicationOperation with Logging { @@ -69,11 +72,16 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _ + private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _ + private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { checkKubernetesInfo(kubernetesInfo) kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo)) } + private def metadataManager = KyuubiServer.kyuubiServer.backendService + .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager + // Visible for testing private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo): Unit = { val context = kubernetesInfo.context @@ -131,27 +139,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { case COMPLETED => !ApplicationState.isFailed(notification.getValue) } if (shouldDelete) { - val podName = removed.name - try { - val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo) - val deleted = if (podName == null) { - !kubernetesClient.pods() - .withLabel(LABEL_KYUUBI_UNIQUE_KEY, appLabel) - .delete().isEmpty - } else { - !kubernetesClient.pods().withName(podName).delete().isEmpty - } - if (deleted) { - info(s"[$kubernetesInfo] Operation of delete pod $podName with" + - s" ${toLabel(appLabel)} is completed.") - } else { - warn(s"[$kubernetesInfo] Failed to delete pod $podName with ${toLabel(appLabel)}.") - } - } catch { - case NonFatal(e) => error( - s"[$kubernetesInfo] Failed to delete pod $podName with ${toLabel(appLabel)}", - e) - } + deletePod(kubernetesInfo, removed.name, appLabel) } info(s"Remove terminated application $removed with ${toLabel(appLabel)}") } @@ -175,6 +163,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { cleanupDriverPodCheckInterval, cleanupDriverPodCheckInterval, TimeUnit.MILLISECONDS) + cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool( + "cleanup-canceled-app-pod-thread") } override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = { @@ -296,11 +286,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { pod, appStateSource, appStateContainer) + checkPodAppCanceled(kubernetesInfo, pod) } } override def onUpdate(oldPod: Pod, newPod: Pod): Unit = { if (isSparkEnginePod(newPod)) { + val kyuubiUniqueKey = newPod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) + val firstUpdate = appInfoStore.get(kyuubiUniqueKey) == null updateApplicationState(kubernetesInfo, newPod) val appState = toApplicationState(newPod, appStateSource, appStateContainer) if (isTerminated(appState)) { @@ -311,6 +304,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { newPod, appStateSource, appStateContainer) + if (firstUpdate) { + checkPodAppCanceled(kubernetesInfo, newPod) + } } } @@ -416,6 +412,49 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { toApplicationState(pod, appStateSource, appStateContainer)) } } + + private def deletePod( + kubernetesInfo: KubernetesInfo, + podName: String, + podLabelUniqueKey: String): Unit = { + try { + val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo) + val deleted = if (podName == null) { + !kubernetesClient.pods() + .withLabel(LABEL_KYUUBI_UNIQUE_KEY, podLabelUniqueKey) + .delete().isEmpty + } else { + !kubernetesClient.pods().withName(podName).delete().isEmpty + } + if (deleted) { + info(s"[$kubernetesInfo] Operation of delete pod $podName with" + + s" ${toLabel(podLabelUniqueKey)} is completed.") + } else { + warn(s"[$kubernetesInfo] Failed to delete pod $podName with ${toLabel(podLabelUniqueKey)}.") + } + } catch { + case NonFatal(e) => error( + s"[$kubernetesInfo] Failed to delete pod $podName with ${toLabel(podLabelUniqueKey)}", + e) + } + } + + private def checkPodAppCanceled(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = { + if (kyuubiConf.isRESTEnabled) { + cleanupCanceledAppPodExecutor.submit(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) + val batch = metadataManager.flatMap(_.getBatchSessionMetadata(kyuubiUniqueKey)) + if (batch.map(_.state).map(OperationState.withName) + .exists(_ == OperationState.CANCELED)) { + warn(s"[$kubernetesInfo] Batch[$kyuubiUniqueKey] is canceled, " + + s"try to delete the pod ${pod.getMetadata.getName}") + deletePod(kubernetesInfo, pod.getMetadata.getName, kyuubiUniqueKey) + } + } + }) + } + } } object KubernetesApplicationOperation extends Logging { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 8b2cfef85..939806d17 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -377,7 +377,21 @@ class BatchJobSubmission( // we can not change state safely killMessage = (false, s"batch $batchId is already terminal so can not kill it.") } else if (!isTerminalState(state)) { - // failed to kill, the kill message is enough + _applicationInfo = currentApplicationInfo() + _applicationInfo.map(_.state) match { + case Some(ApplicationState.FINISHED) => + setState(OperationState.FINISHED) + updateBatchMetadata() + case Some(ApplicationState.FAILED) => + setState(OperationState.ERROR) + updateBatchMetadata() + case Some(ApplicationState.UNKNOWN) | + Some(ApplicationState.NOT_FOUND) | + Some(ApplicationState.KILLED) => + setState(OperationState.CANCELED) + updateBatchMetadata() + case _ => // failed to kill, the kill message is enough + } } } }