diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala index 565c8a694..ff3d51245 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala @@ -24,6 +24,7 @@ import io.fabric8.kubernetes.api.model.Pod import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationStateAndError, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL} +import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType object KubernetesApplicationAuditLogger extends Logging { final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() { @@ -31,12 +32,14 @@ object KubernetesApplicationAuditLogger extends Logging { } def audit( + eventType: KubernetesResourceEventType, kubernetesInfo: KubernetesInfo, pod: Pod, appStateSource: KubernetesApplicationStateSource, appStateContainer: String): Unit = { val sb = AUDIT_BUFFER.get() sb.setLength(0) + sb.append("eventType=").append(eventType).append("\t") sb.append(s"label=${pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)}").append("\t") sb.append(s"context=${kubernetesInfo.context.orNull}").append("\t") sb.append(s"namespace=${kubernetesInfo.namespace.orNull}").append("\t") @@ -48,7 +51,7 @@ object KubernetesApplicationAuditLogger extends Logging { sb.append(s"containers=$containerStatuses").append("\t") sb.append(s"appId=${pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)}").append("\t") val (appState, appError) = - toApplicationStateAndError(pod, appStateSource, appStateContainer) + toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType) sb.append(s"appState=$appState").append("\t") sb.append(s"appError='${appError.getOrElse("")}'") info(sb.toString()) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 55f0fa661..c7ce750f2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -35,6 +35,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE} import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} +import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.KyuubiServer import org.apache.kyuubi.session.KyuubiSessionManager @@ -315,8 +316,10 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { override def onAdd(pod: Pod): Unit = { if (isSparkEnginePod(pod)) { - updateApplicationState(kubernetesInfo, pod) + val eventType = KubernetesResourceEventTypes.ADD + updateApplicationState(kubernetesInfo, pod, eventType) KubernetesApplicationAuditLogger.audit( + eventType, kubernetesInfo, pod, appStateSource, @@ -327,14 +330,16 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { override def onUpdate(oldPod: Pod, newPod: Pod): Unit = { if (isSparkEnginePod(newPod)) { + val eventType = KubernetesResourceEventTypes.UPDATE val kyuubiUniqueKey = newPod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) val firstUpdate = appInfoStore.get(kyuubiUniqueKey) == null - updateApplicationState(kubernetesInfo, newPod) - val appState = toApplicationState(newPod, appStateSource, appStateContainer) + updateApplicationState(kubernetesInfo, newPod, eventType) + val appState = toApplicationState(newPod, appStateSource, appStateContainer, eventType) if (isTerminated(appState)) { - markApplicationTerminated(newPod) + markApplicationTerminated(newPod, eventType) } KubernetesApplicationAuditLogger.audit( + eventType, kubernetesInfo, newPod, appStateSource, @@ -347,9 +352,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit = { if (isSparkEnginePod(pod)) { - updateApplicationState(kubernetesInfo, pod) - markApplicationTerminated(pod) + val eventType = KubernetesResourceEventTypes.DELETE + updateApplicationState(kubernetesInfo, pod, eventType) + markApplicationTerminated(pod, eventType) KubernetesApplicationAuditLogger.audit( + eventType, kubernetesInfo, pod, appStateSource, @@ -388,9 +395,12 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { selectors.containsKey(LABEL_KYUUBI_UNIQUE_KEY) && selectors.containsKey(SPARK_APP_ID_LABEL) } - private def updateApplicationState(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = { + private def updateApplicationState( + kubernetesInfo: KubernetesInfo, + pod: Pod, + eventType: KubernetesResourceEventType): Unit = { val (appState, appError) = - toApplicationStateAndError(pod, appStateSource, appStateContainer) + toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType) debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: $appState") val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) appInfoStore.synchronized { @@ -439,12 +449,14 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { }.getOrElse(warn(s"Spark UI port not found in service ${svc.getMetadata.getName}")) } - private def markApplicationTerminated(pod: Pod): Unit = synchronized { + private def markApplicationTerminated( + pod: Pod, + eventType: KubernetesResourceEventType): Unit = synchronized { val key = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) { cleanupTerminatedAppInfoTrigger.put( key, - toApplicationState(pod, appStateSource, appStateContainer)) + toApplicationState(pod, appStateSource, appStateContainer, eventType)) } } @@ -504,11 +516,31 @@ object KubernetesApplicationOperation extends Logging { def toApplicationState( pod: Pod, appStateSource: KubernetesApplicationStateSource, - appStateContainer: String): ApplicationState = { - toApplicationStateAndError(pod, appStateSource, appStateContainer)._1 + appStateContainer: String, + eventType: KubernetesResourceEventType): ApplicationState = { + toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType)._1 } def toApplicationStateAndError( + pod: Pod, + appStateSource: KubernetesApplicationStateSource, + appStateContainer: String, + eventType: KubernetesResourceEventType): (ApplicationState, Option[String]) = { + eventType match { + case KubernetesResourceEventTypes.ADD | KubernetesResourceEventTypes.UPDATE => + getApplicationStateAndErrorFromPod(pod, appStateSource, appStateContainer) + case KubernetesResourceEventTypes.DELETE => + val (appState, appError) = + getApplicationStateAndErrorFromPod(pod, appStateSource, appStateContainer) + if (ApplicationState.isTerminated(appState)) { + (appState, appError) + } else { + (ApplicationState.FAILED, Some(s"Pod ${pod.getMetadata.getName} is deleted")) + } + } + } + + private def getApplicationStateAndErrorFromPod( pod: Pod, appStateSource: KubernetesApplicationStateSource, appStateContainer: String): (ApplicationState, Option[String]) = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesResourceEventTypes.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesResourceEventTypes.scala new file mode 100644 index 000000000..f1d4db5ab --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesResourceEventTypes.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +object KubernetesResourceEventTypes extends Enumeration { + type KubernetesResourceEventType = Value + + val ADD, UPDATE, DELETE = Value +}