[CELEBORN-1775][FOLLOWUP] Improve logging around commit files

### What changes were proposed in this pull request?
Minor logging improvement around commit files to log shuffleKey.

### Why are the changes needed?
Ditto.

### Does this PR introduce _any_ user-facing change?
Some logs will change.

### How was this patch tested?
NA

Closes #3270 from s0nskar/CELEBORN-1775.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Sanskar Modi 2025-05-21 16:37:38 -07:00 committed by Wang, Fei
parent 45b94bf052
commit 082f0dd8c5
2 changed files with 15 additions and 13 deletions

View File

@ -330,6 +330,7 @@ abstract class CommitHandler(
while (iter.hasNext) {
val status = iter.next()
val worker = status.workerInfo
val shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId)
if (status.future.isCompleted) {
status.future.value.get match {
case scala.util.Success(res) =>
@ -337,10 +338,10 @@ abstract class CommitHandler(
case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED | StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
if (res.status == StatusCode.SUCCESS) {
logDebug(s"Request commitFiles return ${res.status} for " +
s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from worker ${worker.readableAddress()}")
s"$shuffleKey from worker ${worker.readableAddress()}")
} else {
logWarning(s"Request commitFiles return ${res.status} for " +
s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from worker ${worker.readableAddress()}")
s"$shuffleKey from worker ${worker.readableAddress()}")
if (res.status != StatusCode.WORKER_EXCLUDED) {
commitFilesFailedWorkers.put(worker, (res.status, System.currentTimeMillis()))
}
@ -350,12 +351,12 @@ abstract class CommitHandler(
case StatusCode.COMMIT_FILES_MOCK_FAILURE =>
if (status.retriedTimes < maxRetries) {
logError(s"Request commitFiles return ${res.status} for " +
s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for ${status.retriedTimes}/$maxRetries, will retry")
s"$shuffleKey for ${status.retriedTimes}/$maxRetries, will retry")
retryCommitFiles(status, currentTime)
} else {
logError(
s"Request commitFiles return ${StatusCode.COMMIT_FILES_MOCK_FAILURE} for " +
s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for ${status.retriedTimes}/$maxRetries, will not retry")
s"$shuffleKey for ${status.retriedTimes}/$maxRetries, will not retry")
val res = createFailResponse(status)
processResponse(res, worker)
iter.remove()
@ -367,13 +368,13 @@ abstract class CommitHandler(
case scala.util.Failure(e) =>
if (status.retriedTimes < maxRetries) {
logError(
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleId failed" +
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleKey failed" +
s" (attempt ${status.retriedTimes}/$maxRetries), will retry.",
e)
retryCommitFiles(status, currentTime)
} else {
logError(
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleId failed" +
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleKey failed" +
s" (attempt ${status.retriedTimes}/$maxRetries), will not retry.",
e)
val res = createFailResponse(status)
@ -384,12 +385,12 @@ abstract class CommitHandler(
} else if (currentTime - status.startTime > timeout) {
if (status.retriedTimes < maxRetries) {
logError(
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleId failed because of Timeout" +
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleKey failed because of Timeout" +
s" (attempt ${status.retriedTimes}/$maxRetries), will retry.")
retryCommitFiles(status, currentTime)
} else {
logError(
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleId failed because of Timeout" +
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleKey failed because of Timeout" +
s" (attempt ${status.retriedTimes}/$maxRetries), will not retry.")
}
}

View File

@ -620,19 +620,20 @@ private[deploy] class Controller(
new BiFunction[Void, Throwable, Unit] {
override def apply(v: Void, t: Throwable): Unit = {
if (null != t) {
val errMsg = s"Exception while handling commitFiles for shuffleId: $shuffleKey"
t match {
case _: CancellationException =>
logWarning("While handling commitFiles, canceled.")
logWarning(s"$errMsg, operation was cancelled.")
case ee: ExecutionException =>
logError("While handling commitFiles, ExecutionException raised.", ee)
logError(s"$errMsg, ExecutionException was raised.", ee)
case ie: InterruptedException =>
logWarning("While handling commitFiles, interrupted.")
logWarning(s"$errMsg, operation was interrupted.")
Thread.currentThread().interrupt()
throw ie
case _: TimeoutException =>
logWarning(s"While handling commitFiles, timeout after $shuffleCommitTimeout ms.")
logWarning(s"$errMsg, operation timed out after $shuffleCommitTimeout ms.")
case throwable: Throwable =>
logError("While handling commitFiles, exception occurs.", throwable)
logError(s"$errMsg, an unexpected exception occurred.", throwable)
}
commitInfo.synchronized {
commitInfo.response = CommitFilesResponse(