From f66b0e98dced778465e2d013a32dbc5c8ae8fea8 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Wed, 19 Jul 2023 15:43:54 +0800 Subject: [PATCH] [KYUUBI #5064] Audit kubernetes application state change in separate log files ### _Why are the changes needed?_ To provide more insights for kyuubi admin. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5064 from turboFei/k8s_log. Closes #5064 0c072d08e [fwang12] k8s audit d5f2b44a5 [fwang12] save c07306283 [fwang12] audit pod a98234aba [fwang12] log more info Authored-by: fwang12 Signed-off-by: fwang12 --- conf/log4j2.xml.template | 15 +++++++ .../KubernetesApplicationAuditLogger.scala | 41 +++++++++++++++++++ .../KubernetesApplicationOperation.scala | 8 +++- 3 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala diff --git a/conf/log4j2.xml.template b/conf/log4j2.xml.template index 37fc8acf0..86f9459a1 100644 --- a/conf/log4j2.xml.template +++ b/conf/log4j2.xml.template @@ -24,6 +24,10 @@ rest-audit.log rest-audit-%d{yyyy-MM-dd}-%i.log + + k8s-audit.log + k8s-audit-%d{yyyy-MM-dd}-%i.log + @@ -39,6 +43,14 @@ + + + + + + + @@ -58,5 +70,8 @@ + + + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala new file mode 100644 index 000000000..731b9d7b5 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL} + +object KubernetesApplicationAuditLogger extends Logging { + final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() { + override protected def initialValue: StringBuilder = new StringBuilder() + } + + def audit(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = { + val sb = AUDIT_BUFFER.get() + sb.setLength(0) + sb.append(s"label=${pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)}").append("\t") + sb.append(s"context=${kubernetesInfo.context.orNull}").append("\t") + sb.append(s"namespace=${kubernetesInfo.namespace.orNull}").append("\t") + sb.append(s"pod=${pod.getMetadata.getName}").append("\t") + sb.append(s"appId=${pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)}").append("\t") + sb.append(s"appState=${toApplicationState(pod.getStatus.getPhase)}") + info(sb.toString()) + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 5f21342e9..967003394 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -83,7 +83,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { info(s"[$kubernetesInfo] Initialized Kubernetes Client connect to: ${client.getMasterUrl}") val enginePodInformer = client.pods() .withLabel(LABEL_KYUUBI_UNIQUE_KEY) - .inform(new SparkEnginePodEventHandler) + .inform(new SparkEnginePodEventHandler(kubernetesInfo)) info(s"[$kubernetesInfo] Start Kubernetes Client Informer.") enginePodInformers.put(kubernetesInfo, enginePodInformer) client @@ -201,11 +201,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } } - private class SparkEnginePodEventHandler extends ResourceEventHandler[Pod] { + private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo) + extends ResourceEventHandler[Pod] { override def onAdd(pod: Pod): Unit = { if (isSparkEnginePod(pod)) { updateApplicationState(pod) + KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod) } } @@ -216,6 +218,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { if (isTerminated(appState)) { markApplicationTerminated(newPod) } + KubernetesApplicationAuditLogger.audit(kubernetesInfo, newPod) } } @@ -223,6 +226,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { if (isSparkEnginePod(pod)) { updateApplicationState(pod) markApplicationTerminated(pod) + KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod) } } }