[CELEBORN-1775][FOLLOWUP] Improve logging around commit files
### What changes were proposed in this pull request? Minor logging improvement around commit files to log shuffleKey. ### Why are the changes needed? Ditto. ### Does this PR introduce _any_ user-facing change? Some logs will change. ### How was this patch tested? NA Closes #3270 from s0nskar/CELEBORN-1775. Authored-by: Sanskar Modi <sanskarmodi97@gmail.com> Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
parent
45b94bf052
commit
082f0dd8c5
@ -330,6 +330,7 @@ abstract class CommitHandler(
|
||||
while (iter.hasNext) {
|
||||
val status = iter.next()
|
||||
val worker = status.workerInfo
|
||||
val shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId)
|
||||
if (status.future.isCompleted) {
|
||||
status.future.value.get match {
|
||||
case scala.util.Success(res) =>
|
||||
@ -337,10 +338,10 @@ abstract class CommitHandler(
|
||||
case StatusCode.SUCCESS | StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED | StatusCode.REQUEST_FAILED | StatusCode.WORKER_EXCLUDED | StatusCode.COMMIT_FILE_EXCEPTION =>
|
||||
if (res.status == StatusCode.SUCCESS) {
|
||||
logDebug(s"Request commitFiles return ${res.status} for " +
|
||||
s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from worker ${worker.readableAddress()}")
|
||||
s"$shuffleKey from worker ${worker.readableAddress()}")
|
||||
} else {
|
||||
logWarning(s"Request commitFiles return ${res.status} for " +
|
||||
s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} from worker ${worker.readableAddress()}")
|
||||
s"$shuffleKey from worker ${worker.readableAddress()}")
|
||||
if (res.status != StatusCode.WORKER_EXCLUDED) {
|
||||
commitFilesFailedWorkers.put(worker, (res.status, System.currentTimeMillis()))
|
||||
}
|
||||
@ -350,12 +351,12 @@ abstract class CommitHandler(
|
||||
case StatusCode.COMMIT_FILES_MOCK_FAILURE =>
|
||||
if (status.retriedTimes < maxRetries) {
|
||||
logError(s"Request commitFiles return ${res.status} for " +
|
||||
s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for ${status.retriedTimes}/$maxRetries, will retry")
|
||||
s"$shuffleKey for ${status.retriedTimes}/$maxRetries, will retry")
|
||||
retryCommitFiles(status, currentTime)
|
||||
} else {
|
||||
logError(
|
||||
s"Request commitFiles return ${StatusCode.COMMIT_FILES_MOCK_FAILURE} for " +
|
||||
s"${Utils.makeShuffleKey(appUniqueId, shuffleId)} for ${status.retriedTimes}/$maxRetries, will not retry")
|
||||
s"$shuffleKey for ${status.retriedTimes}/$maxRetries, will not retry")
|
||||
val res = createFailResponse(status)
|
||||
processResponse(res, worker)
|
||||
iter.remove()
|
||||
@ -367,13 +368,13 @@ abstract class CommitHandler(
|
||||
case scala.util.Failure(e) =>
|
||||
if (status.retriedTimes < maxRetries) {
|
||||
logError(
|
||||
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleId failed" +
|
||||
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleKey failed" +
|
||||
s" (attempt ${status.retriedTimes}/$maxRetries), will retry.",
|
||||
e)
|
||||
retryCommitFiles(status, currentTime)
|
||||
} else {
|
||||
logError(
|
||||
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleId failed" +
|
||||
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleKey failed" +
|
||||
s" (attempt ${status.retriedTimes}/$maxRetries), will not retry.",
|
||||
e)
|
||||
val res = createFailResponse(status)
|
||||
@ -384,12 +385,12 @@ abstract class CommitHandler(
|
||||
} else if (currentTime - status.startTime > timeout) {
|
||||
if (status.retriedTimes < maxRetries) {
|
||||
logError(
|
||||
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleId failed because of Timeout" +
|
||||
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleKey failed because of Timeout" +
|
||||
s" (attempt ${status.retriedTimes}/$maxRetries), will retry.")
|
||||
retryCommitFiles(status, currentTime)
|
||||
} else {
|
||||
logError(
|
||||
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleId failed because of Timeout" +
|
||||
s"Ask worker(${worker.readableAddress()}) CommitFiles for $shuffleKey failed because of Timeout" +
|
||||
s" (attempt ${status.retriedTimes}/$maxRetries), will not retry.")
|
||||
}
|
||||
}
|
||||
|
||||
@ -620,19 +620,20 @@ private[deploy] class Controller(
|
||||
new BiFunction[Void, Throwable, Unit] {
|
||||
override def apply(v: Void, t: Throwable): Unit = {
|
||||
if (null != t) {
|
||||
val errMsg = s"Exception while handling commitFiles for shuffleId: $shuffleKey"
|
||||
t match {
|
||||
case _: CancellationException =>
|
||||
logWarning("While handling commitFiles, canceled.")
|
||||
logWarning(s"$errMsg, operation was cancelled.")
|
||||
case ee: ExecutionException =>
|
||||
logError("While handling commitFiles, ExecutionException raised.", ee)
|
||||
logError(s"$errMsg, ExecutionException was raised.", ee)
|
||||
case ie: InterruptedException =>
|
||||
logWarning("While handling commitFiles, interrupted.")
|
||||
logWarning(s"$errMsg, operation was interrupted.")
|
||||
Thread.currentThread().interrupt()
|
||||
throw ie
|
||||
case _: TimeoutException =>
|
||||
logWarning(s"While handling commitFiles, timeout after $shuffleCommitTimeout ms.")
|
||||
logWarning(s"$errMsg, operation timed out after $shuffleCommitTimeout ms.")
|
||||
case throwable: Throwable =>
|
||||
logError("While handling commitFiles, exception occurs.", throwable)
|
||||
logError(s"$errMsg, an unexpected exception occurred.", throwable)
|
||||
}
|
||||
commitInfo.synchronized {
|
||||
commitInfo.response = CommitFilesResponse(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user