diff --git a/conf/log4j2.xml.template b/conf/log4j2.xml.template
index efd1511e4..22048d582 100644
--- a/conf/log4j2.xml.template
+++ b/conf/log4j2.xml.template
@@ -66,6 +66,23 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -87,5 +104,8 @@
+
+
+
diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index dfe4e03a1..8316f1706 100644
--- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -51,6 +51,7 @@ import org.apache.celeborn.common.rpc._
import org.apache.celeborn.common.rpc.{RpcSecurityContextBuilder, ServerSaslContextBuilder}
import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils, PbSerDeUtils, SignalUtils, ThreadUtils, Utils}
import org.apache.celeborn.server.common.{HttpService, Service}
+import org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger
import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler}
import org.apache.celeborn.service.deploy.master.quota.QuotaManager
@@ -706,13 +707,13 @@ private[celeborn] class Master(
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
val shuffleIds = statusSystem.registeredAppAndShuffles.get(appId)
if (shuffleIds == null || !shuffleIds.contains(shuffleId)) {
- logWarning(
- s"Shuffle $shuffleKey expired on $host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
expiredShuffleKeys.add(shuffleKey)
}
}
- logDebug(
- s"Shuffle ${expiredShuffleKeys.asScala.mkString("[", " ,", "]")} expired on ${targetWorker.toUniqueId}.")
+ ShuffleAuditLogger.batchAudit(
+ expiredShuffleKeys.asScala.mkString(","),
+ "EXPIRE",
+ Seq(s"worker=${targetWorker.toUniqueId}"))
val workerEventInfo = statusSystem.workerEventInfos.get(targetWorker)
if (workerEventInfo == null) {
@@ -735,6 +736,11 @@ private[celeborn] class Master(
try {
logInfo(s"Handle lost shuffles for ${appId} ${lostShuffles} ")
statusSystem.handleReviseLostShuffles(appId, lostShuffles, requestId);
+ ShuffleAuditLogger.batchAudit(
+ lostShuffles.asScala.map { shuffleId =>
+ Utils.makeShuffleKey(appId, shuffleId)
+ }.mkString(","),
+ "REVIVE")
if (context != null) {
context.reply(ReviseLostShufflesResponse(true, ""))
}
@@ -984,6 +990,14 @@ private[celeborn] class Master(
logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
}
+ ShuffleAuditLogger.audit(
+ shuffleKey,
+ "OFFER_SLOTS",
+ Seq(
+ s"numReducers=$numReducers",
+ s"workerNum=${slots.size()}",
+ s"extraSlots=$offerSlotsExtraSize"))
+
if (authEnabled) {
pushApplicationMetaToWorkers(requestSlots, slots)
}
@@ -1035,6 +1049,7 @@ private[celeborn] class Master(
val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
statusSystem.handleUnRegisterShuffle(shuffleKey, requestId)
logInfo(s"Unregister shuffle $shuffleKey")
+ ShuffleAuditLogger.audit(shuffleKey, "UNREGISTER")
context.reply(UnregisterShuffleResponse(StatusCode.SUCCESS))
}
@@ -1047,6 +1062,7 @@ private[celeborn] class Master(
shuffleIds.map(shuffleId => Utils.makeShuffleKey(applicationId, shuffleId)).asJava
statusSystem.handleBatchUnRegisterShuffles(shuffleKeys, requestId)
logInfo(s"BatchUnregister shuffle $shuffleKeys")
+ ShuffleAuditLogger.batchAudit(shuffleKeys.asScala.mkString(","), "BATCH_UNREGISTER")
context.reply(BatchUnregisterShuffleResponse(StatusCode.SUCCESS, shuffleIds.asJava))
}
diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/audit/ShuffleAuditLogger.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/audit/ShuffleAuditLogger.scala
new file mode 100644
index 000000000..ba5b0261a
--- /dev/null
+++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/audit/ShuffleAuditLogger.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master.audit
+
+import org.apache.celeborn.common.internal.Logging
+
+object ShuffleAuditLogger extends Logging {
+ final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
+ override protected def initialValue: StringBuilder = new StringBuilder()
+ }
+
+ def audit(shuffleKey: String, op: String, labels: Seq[String] = Seq.empty): Unit = {
+ val sb = AUDIT_BUFFER.get()
+ sb.setLength(0)
+ sb.append(s"shuffleKey=$shuffleKey").append("\t")
+ sb.append(s"op=$op")
+ if (labels.nonEmpty) sb.append(labels.mkString("\t", "\t", ""))
+ logInfo(sb.toString())
+ }
+
+ def batchAudit(shuffleKeys: String, op: String, labels: Seq[String] = Seq.empty): Unit = {
+ val sb = AUDIT_BUFFER.get()
+ sb.setLength(0)
+ sb.append(s"shuffleKeys=$shuffleKeys").append("\t")
+ sb.append(s"op=$op")
+ if (labels.nonEmpty) sb.append(labels.mkString("\t", "\t", ""))
+ logInfo(sb.toString())
+ }
+}