[CELEBORN-1701][FOLLOWUP] Support stage rerun for shuffle data lost

### What changes were proposed in this pull request?
Fix an error that may cause the application master retry stage rerun infinitely.

### Why are the changes needed?
Correct the parameters passed.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA.

Closes #3033 from FMX/b1071-1.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
This commit is contained in:
mingji 2024-12-26 17:58:41 +08:00 committed by SteNicholas
parent 7f030d424d
commit 52fa151aa4
No known key found for this signature in database
GPG Key ID: 1FC79E01BA3B84D5

View File

@ -111,7 +111,7 @@ class CelebornShuffleReader[K, C](
fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
} catch {
case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
handleFetchExceptions(shuffleId, 0, ce)
handleFetchExceptions(handle.shuffleId, shuffleId, 0, ce)
case e: Throwable => throw e
}
@ -254,7 +254,7 @@ class CelebornShuffleReader[K, C](
if (exceptionRef.get() != null) {
exceptionRef.get() match {
case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
handleFetchExceptions(handle.shuffleId, partitionId, ce)
handleFetchExceptions(handle.shuffleId, shuffleId, partitionId, ce)
case e => throw e
}
}
@ -289,7 +289,7 @@ class CelebornShuffleReader[K, C](
iter
} catch {
case e @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
handleFetchExceptions(handle.shuffleId, partitionId, e)
handleFetchExceptions(handle.shuffleId, shuffleId, partitionId, e)
}
}
@ -369,17 +369,21 @@ class CelebornShuffleReader[K, C](
}
}
private def handleFetchExceptions(shuffleId: Int, partitionId: Int, ce: Throwable) = {
private def handleFetchExceptions(
appShuffleId: Int,
shuffleId: Int,
partitionId: Int,
ce: Throwable) = {
if (throwsFetchFailure &&
shuffleClient.reportShuffleFetchFailure(handle.shuffleId, shuffleId)) {
shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId)) {
logWarning(s"Handle fetch exceptions for ${shuffleId}-${partitionId}", ce)
throw new FetchFailedException(
null,
handle.shuffleId,
appShuffleId,
-1,
-1,
partitionId,
SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId + "/" + shuffleId,
SparkUtils.FETCH_FAILURE_ERROR_MSG + appShuffleId + "/" + shuffleId,
ce)
} else
throw ce