[KYUUBI #6668] Fix kyuubi batch state abnormal

# 🔍 Description
## Issue References 🔗

This pull request fixes #6668

## Describe Your Solution 🔧

1. when failed to kill the batch, check the current application info
2. if the application state is UNKNOWN(less than submit timeout) or NOT_FOUND, mark the batch state to CANCELED
3. If the k8s pod added after the batch marked as CANCELED, delete the pod

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6670 from turboFei/session_close_operation.

Closes #6668

068eaf216 [Wang, Fei] def
248c3e383 [Wang, Fei] check for onUpdate
695bb805d [Wang, Fei] clean up on add
9304f4605 [Wang, Fei] method
e2a15f8bc [Wang, Fei] batch

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2024-09-14 12:27:34 -07:00
parent 550f1fce89
commit c78e23250a
2 changed files with 76 additions and 23 deletions

View File

@ -18,7 +18,7 @@
package org.apache.kyuubi.engine
import java.util.Locale
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@ -35,6 +35,9 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE}
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
class KubernetesApplicationOperation extends ApplicationOperation with Logging {
@ -69,11 +72,16 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _
private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _
private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = {
checkKubernetesInfo(kubernetesInfo)
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo))
}
private def metadataManager = KyuubiServer.kyuubiServer.backendService
.sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
// Visible for testing
private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo): Unit = {
val context = kubernetesInfo.context
@ -131,27 +139,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
case COMPLETED => !ApplicationState.isFailed(notification.getValue)
}
if (shouldDelete) {
val podName = removed.name
try {
val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo)
val deleted = if (podName == null) {
!kubernetesClient.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY, appLabel)
.delete().isEmpty
} else {
!kubernetesClient.pods().withName(podName).delete().isEmpty
}
if (deleted) {
info(s"[$kubernetesInfo] Operation of delete pod $podName with" +
s" ${toLabel(appLabel)} is completed.")
} else {
warn(s"[$kubernetesInfo] Failed to delete pod $podName with ${toLabel(appLabel)}.")
}
} catch {
case NonFatal(e) => error(
s"[$kubernetesInfo] Failed to delete pod $podName with ${toLabel(appLabel)}",
e)
}
deletePod(kubernetesInfo, removed.name, appLabel)
}
info(s"Remove terminated application $removed with ${toLabel(appLabel)}")
}
@ -175,6 +163,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
cleanupDriverPodCheckInterval,
cleanupDriverPodCheckInterval,
TimeUnit.MILLISECONDS)
cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
"cleanup-canceled-app-pod-thread")
}
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
@ -296,11 +286,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
pod,
appStateSource,
appStateContainer)
checkPodAppCanceled(kubernetesInfo, pod)
}
}
override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
if (isSparkEnginePod(newPod)) {
val kyuubiUniqueKey = newPod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
val firstUpdate = appInfoStore.get(kyuubiUniqueKey) == null
updateApplicationState(kubernetesInfo, newPod)
val appState = toApplicationState(newPod, appStateSource, appStateContainer)
if (isTerminated(appState)) {
@ -311,6 +304,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
newPod,
appStateSource,
appStateContainer)
if (firstUpdate) {
checkPodAppCanceled(kubernetesInfo, newPod)
}
}
}
@ -416,6 +412,49 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
toApplicationState(pod, appStateSource, appStateContainer))
}
}
private def deletePod(
kubernetesInfo: KubernetesInfo,
podName: String,
podLabelUniqueKey: String): Unit = {
try {
val kubernetesClient = getOrCreateKubernetesClient(kubernetesInfo)
val deleted = if (podName == null) {
!kubernetesClient.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY, podLabelUniqueKey)
.delete().isEmpty
} else {
!kubernetesClient.pods().withName(podName).delete().isEmpty
}
if (deleted) {
info(s"[$kubernetesInfo] Operation of delete pod $podName with" +
s" ${toLabel(podLabelUniqueKey)} is completed.")
} else {
warn(s"[$kubernetesInfo] Failed to delete pod $podName with ${toLabel(podLabelUniqueKey)}.")
}
} catch {
case NonFatal(e) => error(
s"[$kubernetesInfo] Failed to delete pod $podName with ${toLabel(podLabelUniqueKey)}",
e)
}
}
private def checkPodAppCanceled(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = {
if (kyuubiConf.isRESTEnabled) {
cleanupCanceledAppPodExecutor.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
val batch = metadataManager.flatMap(_.getBatchSessionMetadata(kyuubiUniqueKey))
if (batch.map(_.state).map(OperationState.withName)
.exists(_ == OperationState.CANCELED)) {
warn(s"[$kubernetesInfo] Batch[$kyuubiUniqueKey] is canceled, " +
s"try to delete the pod ${pod.getMetadata.getName}")
deletePod(kubernetesInfo, pod.getMetadata.getName, kyuubiUniqueKey)
}
}
})
}
}
}
object KubernetesApplicationOperation extends Logging {

View File

@ -377,7 +377,21 @@ class BatchJobSubmission(
// we can not change state safely
killMessage = (false, s"batch $batchId is already terminal so can not kill it.")
} else if (!isTerminalState(state)) {
// failed to kill, the kill message is enough
_applicationInfo = currentApplicationInfo()
_applicationInfo.map(_.state) match {
case Some(ApplicationState.FINISHED) =>
setState(OperationState.FINISHED)
updateBatchMetadata()
case Some(ApplicationState.FAILED) =>
setState(OperationState.ERROR)
updateBatchMetadata()
case Some(ApplicationState.UNKNOWN) |
Some(ApplicationState.NOT_FOUND) |
Some(ApplicationState.KILLED) =>
setState(OperationState.CANCELED)
updateBatchMetadata()
case _ => // failed to kill, the kill message is enough
}
}
}
}