[KYUUBI #5064] Audit kubernetes application state change in separate log files

### _Why are the changes needed?_

To provide more insights for kyuubi admin.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5064 from turboFei/k8s_log.

Closes #5064

0c072d08e [fwang12] k8s audit
d5f2b44a5 [fwang12] save
c07306283 [fwang12] audit pod
a98234aba [fwang12] log more info

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2023-07-19 15:43:54 +08:00
parent ac21e271fe
commit f66b0e98dc
3 changed files with 62 additions and 2 deletions

View File

@ -24,6 +24,10 @@
<Property name="restAuditLogPath">rest-audit.log</Property>
<Property name="restAuditLogFilePattern">rest-audit-%d{yyyy-MM-dd}-%i.log</Property>
</Properties>
<Properties>
<Property name="k8sAuditLogPath">k8s-audit.log</Property>
<Property name="k8sAuditLogFilePattern">k8s-audit-%d{yyyy-MM-dd}-%i.log</Property>
</Properties>
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %p %c: %m%n"/>
@ -39,6 +43,14 @@
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
<RollingFile name="k8sAudit" fileName="${sys:k8sAuditLogPath}"
filePattern="${sys:k8sAuditLogFilePattern}">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %p %c{1}: %m%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="51200KB" />
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
</Appenders>
<Loggers>
<Root level="INFO">
@ -58,5 +70,8 @@
<Logger name="org.apache.kyuubi.server.http.authentication.AuthenticationAuditLogger" additivity="false">
<AppenderRef ref="restAudit" />
</Logger>
<Logger name="org.apache.kyuubi.engine.KubernetesApplicationAuditLogger" additivity="false">
<AppenderRef ref="k8sAudit" />
</Logger>
</Loggers>
</Configuration>

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine
import io.fabric8.kubernetes.api.model.Pod
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
object KubernetesApplicationAuditLogger extends Logging {
final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
override protected def initialValue: StringBuilder = new StringBuilder()
}
def audit(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = {
val sb = AUDIT_BUFFER.get()
sb.setLength(0)
sb.append(s"label=${pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)}").append("\t")
sb.append(s"context=${kubernetesInfo.context.orNull}").append("\t")
sb.append(s"namespace=${kubernetesInfo.namespace.orNull}").append("\t")
sb.append(s"pod=${pod.getMetadata.getName}").append("\t")
sb.append(s"appId=${pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)}").append("\t")
sb.append(s"appState=${toApplicationState(pod.getStatus.getPhase)}")
info(sb.toString())
}
}

View File

@ -83,7 +83,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
info(s"[$kubernetesInfo] Initialized Kubernetes Client connect to: ${client.getMasterUrl}")
val enginePodInformer = client.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY)
.inform(new SparkEnginePodEventHandler)
.inform(new SparkEnginePodEventHandler(kubernetesInfo))
info(s"[$kubernetesInfo] Start Kubernetes Client Informer.")
enginePodInformers.put(kubernetesInfo, enginePodInformer)
client
@ -201,11 +201,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
}
private class SparkEnginePodEventHandler extends ResourceEventHandler[Pod] {
private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)
extends ResourceEventHandler[Pod] {
override def onAdd(pod: Pod): Unit = {
if (isSparkEnginePod(pod)) {
updateApplicationState(pod)
KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod)
}
}
@ -216,6 +218,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
if (isTerminated(appState)) {
markApplicationTerminated(newPod)
}
KubernetesApplicationAuditLogger.audit(kubernetesInfo, newPod)
}
}
@ -223,6 +226,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
if (isSparkEnginePod(pod)) {
updateApplicationState(pod)
markApplicationTerminated(pod)
KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod)
}
}
}