celeborn/common
caohaotian 4d4012e4c3 [CELEBORN-2040] Avoid throw FetchFailedException when GetReducerFileGroupResponse failed via broadcast
### What changes were proposed in this pull request?

In our production environment, when obtaining GetReducerFileGroupResponse via broadcast[CELEBORN-1921], failures may occur due to reasons such as:Executor preemption or local disk errors when task writing broadcast data. These scenarios throw a CelebornIOException, which is eventually converted to a FetchFailedException.

However, I think these errors are not caused by shuffle-related metadata loss, so a FetchFailedException should not be thrown to trigger a stage retry. Instead, the task should simply fail and be retried at the task level.

### Why are the changes needed?

To reduce false positive fetch failures.

case 1: During deserialization of the GetReducerFileGroupResponse broadcast, ExecutorLostFailure happend because Container was preempted,  leads to reporting a fetch failure.
```
25/06/16 08:39:21 INFO Executor task launch worker for task 30724 SparkUtils: Deserializing GetReducerFileGroupResponse broadcast for shuffle: 0
25/06/16 08:39:21 INFO Executor task launch worker for task 30724 TorrentBroadcast: Started reading broadcast variable 7 with 3 pieces (estimated total size 12.0 MiB)
......

25/06/16 08:39:21 ERROR Executor task launch worker for task 30724 SparkUtils: Failed to deserialize GetReducerFileGroupResponse for shuffle: 0
java.io.IOException: org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1387)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.shuffle.celeborn.SparkUtils.lambda$deserializeGetReducerFileGroupResponse$4(SparkUtils.java:600)
	at org.apache.celeborn.common.util.KeyLock.withLock(KeyLock.scala:65)
	at org.apache.spark.shuffle.celeborn.SparkUtils.deserializeGetReducerFileGroupResponse(SparkUtils.java:585)
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:485)
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:480)
	at org.apache.celeborn.client.ShuffleClient.deserializeReducerFileGroupResponse(ShuffleClient.java:321)
	at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1876)
	at org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
	at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
	at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
	at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:225)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.scheduler.Task.run(Task.scala:130)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:477)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1428)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:480)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:105)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:89)
	at org.apache.spark.storage.BlockManagerMaster.getLocationsAndStatus(BlockManagerMaster.scala:93)
	at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1179)
	at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1341)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:180)
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:169)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:253)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231)
	at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1380)
	... 40 more
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask739bc42 rejected from java.util.concurrent.ScheduledThreadPoolExecutor66c2c5b0[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
	at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
	at org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:264)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:552)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:556)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:104)
	... 54 more

25/06/16 08:39:21 ERROR Executor task launch worker for task 30723 ShuffleClientImpl: Exception raised while call GetReducerFileGroup for 0.
org.apache.celeborn.common.exception.CelebornIOException: Failed to get GetReducerFileGroupResponse broadcast for shuffle: 0
	at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
......

25/06/16 08:39:21 WARN Executor task launch worker for task 30724 CelebornShuffleReader: Handle fetch exceptions for 0-0
org.apache.celeborn.common.exception.CelebornIOException: Failed to load file group of shuffle 0 partition 4643! Failed to get GetReducerFileGroupResponse broadcast for shuffle: 0
	at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1943)
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
......
Caused by: org.apache.celeborn.common.exception.CelebornIOException: Failed to get GetReducerFileGroupResponse broadcast for shuffle: 0
	at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
	at org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
	at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
	at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
	... 27 more
```
case 2: During deserialization of the GetReducerFileGroupResponse broadcast, a failure to create the local directory leads to reporting a fetch failure.
```
25/05/27 07:27:03 INFO Executor task launch worker for task 20399 SparkUtils: Deserializing GetReducerFileGroupResponse broadcast for shuffle: 1
25/05/27 07:27:03 INFO Executor task launch worker for task 20399 TorrentBroadcast: Started reading broadcast variable 5 with 1 pieces (estimated total size 4.0 MiB)
25/05/27 07:27:03 INFO Executor task launch worker for task 20399 TorrentBroadcast: Reading broadcast variable 5 took 0 ms
25/05/27 07:27:03 INFO Executor task launch worker for task 20399 MemoryStore: Block broadcast_5 stored as values in memory (estimated size 980.4 KiB, free 6.3 GiB)
25/05/27 07:27:03 WARN Executor task launch worker for task 20399 BlockManager: Putting block broadcast_5 failed due to exception java.io.IOException: Failed to create local dir in /data12/hadoop/yarn/nm-local-dir/usercache/......
25/05/27 07:27:03 WARN Executor task launch worker for task 20399 BlockManager: Block broadcast_5 was not removed normally.
25/05/27 07:27:03 ERROR Executor task launch worker for task 20399 Utils: Exception encountered
java.io.IOException: Failed to create local dir in /data12/hadoop/yarn/nm-local-dir/usercache/......
	at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:93)
	at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:114)
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:2050)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1574)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1611)
	at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:1467)
	at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1936)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:262)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231)
	at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1380)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.shuffle.celeborn.SparkUtils.lambda$deserializeGetReducerFileGroupResponse$4(SparkUtils.java:600)
	at org.apache.celeborn.common.util.KeyLock.withLock(KeyLock.scala:65)
	at org.apache.spark.shuffle.celeborn.SparkUtils.deserializeGetReducerFileGroupResponse(SparkUtils.java:585)
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:485)
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:480)
	at org.apache.celeborn.client.ShuffleClient.deserializeReducerFileGroupResponse(ShuffleClient.java:321)
	at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1876)
	at org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
	at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1877)
	at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
	at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:225)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:60)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.scheduler.Task.run(Task.scala:130)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:477)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1428)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:480)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

25/05/27 07:27:03 ERROR Executor task launch worker for task 20399 ShuffleClientImpl: Exception raised while call GetReducerFileGroup for 1.
org.apache.celeborn.common.exception.CelebornIOException: Failed to get GetReducerFileGroupResponse broadcast for shuffle: 1
......

25/05/27 07:27:03 WARN Executor task launch worker for task 20399 CelebornShuffleReader: Handle fetch exceptions for 1-0
org.apache.celeborn.common.exception.CelebornIOException: Failed to load file group of shuffle 1 partition 4001! Failed to get GetReducerFileGroupResponse broadcast for shuffle: 1
	at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1943)
	at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
......
Caused by: org.apache.celeborn.common.exception.CelebornIOException: Failed to get GetReducerFileGroupResponse broadcast for shuffle: 1
	at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
......

```

### Does this PR introduce _any_ user-facing change?
When `ShuffleClient.deserializeReducerFileGroupResponse(shuffleId, response.broadcast())` return null, will not report a fetch failure, instead, the task will simply fail.

### How was this patch tested?
Long-running Production Validation

Closes #3341 from vastian180/CELEBORN-2040.

Lead-authored-by: caohaotian <caohaotian@meituan.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-06-22 23:59:29 -07:00
..
benchmarks
src [CELEBORN-2040] Avoid throw FetchFailedException when GetReducerFileGroupResponse failed via broadcast 2025-06-22 23:59:29 -07:00
pom.xml [CELEBORN-1530] support MPU for S3 2024-11-22 15:03:53 +08:00