[CELEBORN-2002][MASTER] Audit shuffle lifecycle in separate log file

### What changes were proposed in this pull request?
Audit shuffle lifecycle in separate log file
- OFFER_SLOTS
- EXPIRE
- REVIVE
- UNREGISTER

### Why are the changes needed?
 Remove redundant logs of expired shuffle in master-worker heartbeat, see https://github.com/apache/celeborn/pull/3244

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
```
(base) ➜  celeborn git:(shuffle_audit) grep ShuffleAuditLogger tests/spark-it/target/unit-tests.log
25/05/19 20:05:27,031 INFO [celeborn-dispatcher-41] ShuffleAuditLogger: shuffleKey=local-1747710326897-0        op=OFFER_SLOTS  numReducers=4   workerNum=5     extraSlots=1
25/05/19 20:05:27,719 INFO [celeborn-dispatcher-44] ShuffleAuditLogger: shuffleKey=local-1747710326897-1        op=OFFER_SLOTS  numReducers=2   workerNum=5     extraSlots=3
25/05/19 20:05:28,094 INFO [celeborn-dispatcher-47] ShuffleAuditLogger: shuffleKey=local-1747710326897-2        op=OFFER_SLOTS  numReducers=2   workerNum=5     extraSlots=3
25/05/19 20:05:28,467 INFO [celeborn-dispatcher-52] ShuffleAuditLogger: shuffleKey=local-1747710326897-3        op=OFFER_SLOTS  numReducers=8   workerNum=5     extraSlots=0
25/05/19 20:05:28,769 INFO [celeborn-dispatcher-53] ShuffleAuditLogger: shuffleKey=local-1747710326897-4        op=OFFER_SLOTS  numReducers=8   workerNum=5     extraSlots=0
25/05/19 20:05:29,720 INFO [celeborn-dispatcher-56] ShuffleAuditLogger: shuffleKey=local-1747710326897-5        op=OFFER_SLOTS  numReducers=200 workerNum=5     extraSlots=0
25/05/19 20:05:30,349 INFO [celeborn-dispatcher-59] ShuffleAuditLogger: shuffleKey=local-1747710326897-6        op=OFFER_SLOTS  numReducers=4   workerNum=5     extraSlots=1
25/05/19 20:05:40,534 INFO [celeborn-dispatcher-11] ShuffleAuditLogger: shuffleKey=local-1747710340484-0        op=OFFER_SLOTS  numReducers=4   workerNum=5     extraSlots=1
25/05/19 20:05:41,101 INFO [celeborn-dispatcher-14] ShuffleAuditLogger: shuffleKey=local-1747710340484-1        op=OFFER_SLOTS  numReducers=2   workerNum=5     extraSlots=3
25/05/19 20:05:41,480 INFO [celeborn-dispatcher-17] ShuffleAuditLogger: shuffleKey=local-1747710340484-2        op=OFFER_SLOTS  numReducers=2   workerNum=5     extraSlots=3
25/05/19 20:05:41,848 INFO [celeborn-dispatcher-26] ShuffleAuditLogger: shuffleKey=local-1747710340484-3        op=OFFER_SLOTS  numReducers=8   workerNum=5     extraSlots=0
25/05/19 20:05:42,136 INFO [celeborn-dispatcher-18] ShuffleAuditLogger: shuffleKey=local-1747710340484-4        op=OFFER_SLOTS  numReducers=8   workerNum=5     extraSlots=0
25/05/19 20:05:43,058 INFO [celeborn-dispatcher-21] ShuffleAuditLogger: shuffleKey=local-1747710340484-5        op=OFFER_SLOTS  numReducers=200 workerNum=5     extraSlots=0
25/05/19 20:05:43,542 INFO [celeborn-dispatcher-31] ShuffleAuditLogger: shuffleKey=local-1747710340484-6        op=OFFER_SLOTS  numReducers=4   workerNum=5     extraSlots=1
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-29] ShuffleAuditLogger: shuffleKeys=local-1747710326897-0,local-1747710326897-1,local-1747710326897-2,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5 op=EXPIRE       worker=127.0.0.1:59932:59934:59948:59941
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-27] ShuffleAuditLogger: shuffleKeys=local-1747710326897-0,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6       op=EXPIRE       worker=127.0.0.1:59930:59938:59944:59940
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-32] ShuffleAuditLogger: shuffleKeys=local-1747710326897-1,local-1747710326897-2,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6 op=EXPIRE       worker=127.0.0.1:59931:59936:59945:59939
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-33] ShuffleAuditLogger: shuffleKeys=local-1747710326897-0,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6       op=EXPIRE       worker=127.0.0.1:59933:59935:59946:59943
25/05/19 20:05:44,436 INFO [celeborn-dispatcher-28] ShuffleAuditLogger: shuffleKeys=local-1747710326897-0,local-1747710326897-3,local-1747710326897-4,local-1747710326897-5,local-1747710326897-6       op=EXPIRE       worker=127.0.0.1:59929:59937:59947:59942

```

Closes #3265 from turboFei/shuffle_audit.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-05-20 05:45:34 -07:00
parent fd715b41af
commit 90ece9665c
3 changed files with 84 additions and 4 deletions

View File

@ -66,6 +66,23 @@
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="shuffleAuditFile" fileName="${env:CELEBORN_LOG_DIR}/audit/shuffle-audit.log"
filePattern="${env:CELEBORN_LOG_DIR}/audit/shuffle-audit.log.%d-%i">
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/>
<Policies>
<SizeBasedTriggeringPolicy size="200 MB"/>
</Policies>
<DefaultRolloverStrategy max="7">
<Delete basePath="${env:CELEBORN_LOG_DIR}/audit" maxDepth="1">
<IfFileName glob="shuffle-audit.log*">
<IfAny>
<IfAccumulatedFileSize exceeds="1 GB"/>
<IfAccumulatedFileCount exceeds="10"/>
</IfAny>
</IfFileName>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
@ -87,5 +104,8 @@
<Logger name="org.apache.celeborn.server.common.http.RestAuditLogger" level="INFO" additivity="false">
<Appender-ref ref="restAuditFile" level="INFO"/>
</Logger>
<Logger name="org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger" level="INFO" additivity="false">
<Appender-ref ref="shuffleAuditFile" level="INFO"/>
</Logger>
</Loggers>
</Configuration>

View File

@ -51,6 +51,7 @@ import org.apache.celeborn.common.rpc._
import org.apache.celeborn.common.rpc.{RpcSecurityContextBuilder, ServerSaslContextBuilder}
import org.apache.celeborn.common.util.{CelebornHadoopUtils, JavaUtils, PbSerDeUtils, SignalUtils, ThreadUtils, Utils}
import org.apache.celeborn.server.common.{HttpService, Service}
import org.apache.celeborn.service.deploy.master.audit.ShuffleAuditLogger
import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler}
import org.apache.celeborn.service.deploy.master.quota.QuotaManager
@ -706,13 +707,13 @@ private[celeborn] class Master(
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
val shuffleIds = statusSystem.registeredAppAndShuffles.get(appId)
if (shuffleIds == null || !shuffleIds.contains(shuffleId)) {
logWarning(
s"Shuffle $shuffleKey expired on $host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
expiredShuffleKeys.add(shuffleKey)
}
}
logDebug(
s"Shuffle ${expiredShuffleKeys.asScala.mkString("[", " ,", "]")} expired on ${targetWorker.toUniqueId}.")
ShuffleAuditLogger.batchAudit(
expiredShuffleKeys.asScala.mkString(","),
"EXPIRE",
Seq(s"worker=${targetWorker.toUniqueId}"))
val workerEventInfo = statusSystem.workerEventInfos.get(targetWorker)
if (workerEventInfo == null) {
@ -735,6 +736,11 @@ private[celeborn] class Master(
try {
logInfo(s"Handle lost shuffles for ${appId} ${lostShuffles} ")
statusSystem.handleReviseLostShuffles(appId, lostShuffles, requestId);
ShuffleAuditLogger.batchAudit(
lostShuffles.asScala.map { shuffleId =>
Utils.makeShuffleKey(appId, shuffleId)
}.mkString(","),
"REVIVE")
if (context != null) {
context.reply(ReviseLostShufflesResponse(true, ""))
}
@ -984,6 +990,14 @@ private[celeborn] class Master(
logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
}
ShuffleAuditLogger.audit(
shuffleKey,
"OFFER_SLOTS",
Seq(
s"numReducers=$numReducers",
s"workerNum=${slots.size()}",
s"extraSlots=$offerSlotsExtraSize"))
if (authEnabled) {
pushApplicationMetaToWorkers(requestSlots, slots)
}
@ -1035,6 +1049,7 @@ private[celeborn] class Master(
val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
statusSystem.handleUnRegisterShuffle(shuffleKey, requestId)
logInfo(s"Unregister shuffle $shuffleKey")
ShuffleAuditLogger.audit(shuffleKey, "UNREGISTER")
context.reply(UnregisterShuffleResponse(StatusCode.SUCCESS))
}
@ -1047,6 +1062,7 @@ private[celeborn] class Master(
shuffleIds.map(shuffleId => Utils.makeShuffleKey(applicationId, shuffleId)).asJava
statusSystem.handleBatchUnRegisterShuffles(shuffleKeys, requestId)
logInfo(s"BatchUnregister shuffle $shuffleKeys")
ShuffleAuditLogger.batchAudit(shuffleKeys.asScala.mkString(","), "BATCH_UNREGISTER")
context.reply(BatchUnregisterShuffleResponse(StatusCode.SUCCESS, shuffleIds.asJava))
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.master.audit
import org.apache.celeborn.common.internal.Logging
object ShuffleAuditLogger extends Logging {
final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
override protected def initialValue: StringBuilder = new StringBuilder()
}
def audit(shuffleKey: String, op: String, labels: Seq[String] = Seq.empty): Unit = {
val sb = AUDIT_BUFFER.get()
sb.setLength(0)
sb.append(s"shuffleKey=$shuffleKey").append("\t")
sb.append(s"op=$op")
if (labels.nonEmpty) sb.append(labels.mkString("\t", "\t", ""))
logInfo(sb.toString())
}
def batchAudit(shuffleKeys: String, op: String, labels: Seq[String] = Seq.empty): Unit = {
val sb = AUDIT_BUFFER.get()
sb.setLength(0)
sb.append(s"shuffleKeys=$shuffleKeys").append("\t")
sb.append(s"op=$op")
if (labels.nonEmpty) sb.append(labels.mkString("\t", "\t", ""))
logInfo(sb.toString())
}
}