### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
An NPE will be thrown when the ```TransportMessage``` payload is null, and there is no check here.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing ut.
Closes#3330 from Jraaay/main.
Authored-by: Jray <1075860716@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Specify `extractionDir` of `AsyncProfilerLoader` with `celeborn.worker.jvmProfiler.localDir`.
### Why are the changes needed?
`AsyncProfilerLoader` uses `user.home` directory to store the extracted libraries by default . When `user.home` directory is not initialized, it will cause `AsyncProfilerLoader#load` to fail. `extractionDir` of `AsyncProfilerLoader` could be specified with `celeborn.worker.jvmProfiler.localDir` to avoid failure of loading.
Backport https://github.com/apache/spark/pull/51229.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#3345 from SteNicholas/CELEBORN-2046.
Lead-authored-by: SteNicholas <programgeek@163.com>
Co-authored-by: 子懿 <ziyi.jxf@antgroup.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Proactively cleanup from ChunkStreamManager when stream is closed.
### Why are the changes needed?
Stream gets closed only when shuffle expires at master, which can take a while.
In meantime, workers incur have nontrivial memory utilization.
### Does this PR introduce _any_ user-facing change?
No. Reduces memory usage in workers.
### How was this patch tested?
Existing unit tests
Closes#3343 from mridulm/proactively-cleanup-streams.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Return SoftSplit status when there is no hard split for pushMergeData
### Why are the changes needed?
HardSplit support was introduced in [CELEBORN-1721](https://issues.apache.org/jira/browse/CELEBORN-1721), and it works well when the client and server are of the same version. However, using a 0.5.x client with a 0.6.x server can significantly impact shuffle write performance. This is because the 0.6.x server returns hardsplit status whenever there is any soft or hard split, leading the 0.5.x client to perform hardsplit on every partition. To maintain compatibility during upgrades, it's essential to preserve the original behavior for 0.5.x clients.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA & 1T tpcds with 0.5.4 version client, according to the test results, after applying this PR, revive decreased significantly, and performance improved.
#### test use 0.6.0-rc2 server, 0.5.4 client

#### test use 0.7.0 server + this pr, 0.5.4 client

Closes#3342 from RexXiong/CELEBORN-1721-FOLLOWUP.
Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
If the create file order list doesn't have the corresponding storage types, use 0 as the index.
### Why are the changes needed?
This is needed in two scenarios:
1. For existing clients, the change partition will select MEMORY as the default storage tier, which will cause the revived partition to utilize memory storage even if the application is not configured to do so. This is the cause of [CELEBORN-2043], and fixed by [CELEBORN-2021](https://github.com/apache/celeborn/pull/3302).
2. Evicted files will need to exclude itself storage type in the create file order list. which means that the storage type does not exist in the create file order list.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes#3344 from FMX/b2043.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
In our production environment, when obtaining GetReducerFileGroupResponse via broadcast[CELEBORN-1921], failures may occur due to reasons such as:Executor preemption or local disk errors when task writing broadcast data. These scenarios throw a CelebornIOException, which is eventually converted to a FetchFailedException.
However, I think these errors are not caused by shuffle-related metadata loss, so a FetchFailedException should not be thrown to trigger a stage retry. Instead, the task should simply fail and be retried at the task level.
### Why are the changes needed?
To reduce false positive fetch failures.
case 1: During deserialization of the GetReducerFileGroupResponse broadcast, ExecutorLostFailure happend because Container was preempted, leads to reporting a fetch failure.
```
25/06/16 08:39:21 INFO Executor task launch worker for task 30724 SparkUtils: Deserializing GetReducerFileGroupResponse broadcast for shuffle: 0
25/06/16 08:39:21 INFO Executor task launch worker for task 30724 TorrentBroadcast: Started reading broadcast variable 7 with 3 pieces (estimated total size 12.0 MiB)
......
25/06/16 08:39:21 ERROR Executor task launch worker for task 30724 SparkUtils: Failed to deserialize GetReducerFileGroupResponse for shuffle: 0
java.io.IOException: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1387)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.shuffle.celeborn.SparkUtils.lambda$deserializeGetReducerFileGroupResponse$4(SparkUtils.java:600)
at org.apache.celeborn.common.util.KeyLock.withLock(KeyLock.scala:65)
at org.apache.spark.shuffle.celeborn.SparkUtils.deserializeGetReducerFileGroupResponse(SparkUtils.java:585)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:485)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:480)
at org.apache.celeborn.client.ShuffleClient.deserializeReducerFileGroupResponse(ShuffleClient.java:321)
at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1876)
at org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:225)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.scheduler.Task.run(Task.scala:130)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:477)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1428)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:480)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:105)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:89)
at org.apache.spark.storage.BlockManagerMaster.getLocationsAndStatus(BlockManagerMaster.scala:93)
at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1179)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:1341)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:180)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:169)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:253)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1380)
... 40 more
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask739bc42 rejected from java.util.concurrent.ScheduledThreadPoolExecutor66c2c5b0[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:264)
at org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:552)
at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:556)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:104)
... 54 more
25/06/16 08:39:21 ERROR Executor task launch worker for task 30723 ShuffleClientImpl: Exception raised while call GetReducerFileGroup for 0.
org.apache.celeborn.common.exception.CelebornIOException: Failed to get GetReducerFileGroupResponse broadcast for shuffle: 0
at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
......
25/06/16 08:39:21 WARN Executor task launch worker for task 30724 CelebornShuffleReader: Handle fetch exceptions for 0-0
org.apache.celeborn.common.exception.CelebornIOException: Failed to load file group of shuffle 0 partition 4643! Failed to get GetReducerFileGroupResponse broadcast for shuffle: 0
at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1943)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
......
Caused by: org.apache.celeborn.common.exception.CelebornIOException: Failed to get GetReducerFileGroupResponse broadcast for shuffle: 0
at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
at org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
... 27 more
```
case 2: During deserialization of the GetReducerFileGroupResponse broadcast, a failure to create the local directory leads to reporting a fetch failure.
```
25/05/27 07:27:03 INFO Executor task launch worker for task 20399 SparkUtils: Deserializing GetReducerFileGroupResponse broadcast for shuffle: 1
25/05/27 07:27:03 INFO Executor task launch worker for task 20399 TorrentBroadcast: Started reading broadcast variable 5 with 1 pieces (estimated total size 4.0 MiB)
25/05/27 07:27:03 INFO Executor task launch worker for task 20399 TorrentBroadcast: Reading broadcast variable 5 took 0 ms
25/05/27 07:27:03 INFO Executor task launch worker for task 20399 MemoryStore: Block broadcast_5 stored as values in memory (estimated size 980.4 KiB, free 6.3 GiB)
25/05/27 07:27:03 WARN Executor task launch worker for task 20399 BlockManager: Putting block broadcast_5 failed due to exception java.io.IOException: Failed to create local dir in /data12/hadoop/yarn/nm-local-dir/usercache/......
25/05/27 07:27:03 WARN Executor task launch worker for task 20399 BlockManager: Block broadcast_5 was not removed normally.
25/05/27 07:27:03 ERROR Executor task launch worker for task 20399 Utils: Exception encountered
java.io.IOException: Failed to create local dir in /data12/hadoop/yarn/nm-local-dir/usercache/......
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:93)
at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:114)
at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:2050)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1574)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1611)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:1467)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1936)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:262)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1380)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.shuffle.celeborn.SparkUtils.lambda$deserializeGetReducerFileGroupResponse$4(SparkUtils.java:600)
at org.apache.celeborn.common.util.KeyLock.withLock(KeyLock.scala:65)
at org.apache.spark.shuffle.celeborn.SparkUtils.deserializeGetReducerFileGroupResponse(SparkUtils.java:585)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:485)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader$$anon$5.apply(CelebornShuffleReader.scala:480)
at org.apache.celeborn.client.ShuffleClient.deserializeReducerFileGroupResponse(ShuffleClient.java:321)
at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1876)
at org.apache.celeborn.client.ShuffleClientImpl.lambda$updateFileGroup$9(ShuffleClientImpl.java:1935)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1877)
at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1931)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:225)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:60)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.scheduler.Task.run(Task.scala:130)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:477)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1428)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:480)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
25/05/27 07:27:03 ERROR Executor task launch worker for task 20399 ShuffleClientImpl: Exception raised while call GetReducerFileGroup for 1.
org.apache.celeborn.common.exception.CelebornIOException: Failed to get GetReducerFileGroupResponse broadcast for shuffle: 1
......
25/05/27 07:27:03 WARN Executor task launch worker for task 20399 CelebornShuffleReader: Handle fetch exceptions for 1-0
org.apache.celeborn.common.exception.CelebornIOException: Failed to load file group of shuffle 1 partition 4001! Failed to get GetReducerFileGroupResponse broadcast for shuffle: 1
at org.apache.celeborn.client.ShuffleClientImpl.updateFileGroup(ShuffleClientImpl.java:1943)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.read(CelebornShuffleReader.scala:119)
......
Caused by: org.apache.celeborn.common.exception.CelebornIOException: Failed to get GetReducerFileGroupResponse broadcast for shuffle: 1
at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroupInternal(ShuffleClientImpl.java:1878)
......
```
### Does this PR introduce _any_ user-facing change?
When `ShuffleClient.deserializeReducerFileGroupResponse(shuffleId, response.broadcast())` return null, will not report a fetch failure, instead, the task will simply fail.
### How was this patch tested?
Long-running Production Validation
Closes#3341 from vastian180/CELEBORN-2040.
Lead-authored-by: caohaotian <caohaotian@meituan.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Support testing of dynamic configuration management cli.
### Why are the changes needed?
The tests of dynamic configuration management cli are disabled since dynamic conf is not enabled in unit tests, which should support testing dynamic configuration management cli.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`TestCelebornCliCommands`.
Closes#3340 from SteNicholas/CELEBORN-1056.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Fixes the FetchFailure handling logic in shouldReportShuffleFetchFailure method to properly handle cases where TaskSetManager cannot be found for a given task ID.
### Why are the changes needed?
The current implementation incorrectly reports FetchFailure when TaskSetManager is not found, which leads to false positive failures in normal fault tolerance scenarios. This happens because:
1. Executor Lost scenarios: When executors are lost due to resource preemption or failures, the associated TaskSetManager gets cleaned up, making it unavailable for lookup
2. Stage cancellation: Cancelled or completed stages may have their TaskSetManager removed
These are all normal scenarios in Spark's fault tolerance mechanism and should not be treated as shuffle failures. The current behavior can cause unnecessary job failures and confusion in debugging actual shuffle issues.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT, Long-running Production Validation
Closes#3339 from gaoyajun02/CELEBORN-2042.
Authored-by: gaoyajun02 <gaoyajun02@meituan.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Batching few log line on celeborn client side, which logs too much data in spark driver logs for large application.
- Change partition print updated partition for each worker individually and prints too many lines if many workers are involved in shuffle.
```
25/06/02 08:04:29 INFO LifecycleManager: Reserve buffer success for shuffleId 6
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2477 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 1285 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2660 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2194 epoch from 0 to 1), (partition 1760 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 1429 epoch from 0 to 1), (partition 2300 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 517 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2627 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2901 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2903 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2067 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 569 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2633 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2813 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 1817 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 148 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 2098 epoch from 0 to 1)].
25/06/02 08:04:29 INFO ChangePartitionManager: [Update partition] success for shuffle 6, succeed partitions: [(partition 1554 epoch from 0 to 1)].
```
- Clear shuffle print each shuffle id individually
```
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 0.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 1.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 2.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 3.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 4.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 5.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 6.
25/06/02 08:01:51 INFO LifecycleManager: Clear shuffle 7.
```
### Why are the changes needed?
Both of the above logs gets printed a lot on celeborn client and can be merged in a single line.
### Does this PR introduce _any_ user-facing change?
Client logs will have slight change.
### How was this patch tested?
NA
Closes#3313 from s0nskar/improve_logs.
Lead-authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Support upsert and delete of dynamic configuration management.
### Why are the changes needed?
There is only listing dynamic configuration interface for dynamic configuration management. It should support upserting and deleting dynamic configuration.
### Does this PR introduce _any_ user-facing change?
- Rest API:
- `/api/v1/conf/dynamic/upsert` to upsert dynamic configurations.
- `/api/v1/conf/dynamic/delete` to delete dynamic configurations.
- CLI:
- `--upsert-dynamic-conf` to upsert dynamic configurations.
- `--delete-dynamic-conf` to upsert dynamic configurations.
### How was this patch tested?
- `ConfigServiceSuiteJ`
- `ApiV1BaseResourceSuite`
- `TestCelebornCliCommands`
Closes#3323 from SteNicholas/CELEBORN-1056.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Update the config version to 0.5.5 for `celeborn.worker.flusher.local.gatherAPI.enabled`.
### Why are the changes needed?
Followup https://github.com/apache/celeborn/pull/3335
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA
Closes#3338 from turboFei/CELEBORN-1931_follow.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
- Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
- Be sure to keep the PR description updated to reflect all changes.
- Please write your PR title to summarize what this PR proposes.
- If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
Added a commit files request fail count metric.
### Why are the changes needed?
To monitor and tune the configurations around the commit files workflow.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Local setup
<img width="739" alt="Screenshot 2025-06-04 at 10 51 06 AM" src="https://github.com/user-attachments/assets/d6256028-d8b7-4a81-90b1-3dcbf61adeba" />
Closes#3307 from s0nskar/commit_metric.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA & grafana
Closes#3333 from RexXiong/CELEBORN-1817-FOLLOWUP.
Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
For release mode, check the JAVA_HOME variables and set JDK version to 8 by default
```
./build/make-distribution.sh --sbt-enabled --release
```
### Why are the changes needed?
1. Address comments: https://github.com/apache/celeborn/pull/3282#discussion_r2149146734
2. Set JDK 8 for release mode by default is necessary
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#3331 from turboFei/JAVA_HOME.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Support celeborn optimize skew partitions patch for Spark v3.5.6 and v4.0.0.
### Why are the changes needed?
There is no patch of celeborn optimize skew partitions for Spark v4.0.0. Meanwhile, Spark v3.5.6 could not apply `Celeborn-Optimize-Skew-Partitions-spark3_5.patch` because of https://github.com/apache/spark/pull/50946.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
```
$ git checkout v3.5.6
Previous HEAD position was fa33ea000a0 Preparing Spark release v4.0.0-rc7
HEAD is now at 303c18c7466 Preparing Spark release v3.5.6-rc1
$ git apply --check /celeborn/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_5_6.patch
$ git checkout v4.0.0
Previous HEAD position was 303c18c7466 Preparing Spark release v3.5.6-rc1
HEAD is now at fa33ea000a0 Preparing Spark release v4.0.0-rc7
$ git apply --check /celeborn/assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark4_0.patch
```
Closes#3329 from SteNicholas/CELEBORN-1319.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Improve check quota message.
### Why are the changes needed?
Make check quota message clearer.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes#3328 from leixm/follow_CELEBORN-1577.
Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Adding register with master fail count metric for worker
### Why are the changes needed?
This will help put monitoring around if workers are not able to register with master like wrong endpoints are passed or master becomes unavailable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Local setup
<img width="724" alt="Screenshot 2025-06-04 at 10 44 56 AM" src="https://github.com/user-attachments/assets/1f84557b-5df8-422f-b602-bb5316a72a0e" />
Closes#3308 from s0nskar/worker_register_metric.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
updateProduceBytes should be called even if updateProduceBytes throws exception
### Why are the changes needed?
To make UserProduceSpeed metrics more accurate
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes#3322 from leixm/CELEBORN-2033.
Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Rename throwsFetchFailure to stageRerunEnabled
### Why are the changes needed?
Make the code cleaner.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
existing UTs.
Closes#3324 from leixm/CELEBORN-2035.
Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Support to show Celeborn CLI version for sub command.
### Why are the changes needed?
celeborn-cli [master|worker] -V does not show anything.
```
(base) ➜ apache-celeborn-0.6.0-bin-ebay ./sbin/celeborn-cli -V
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Celeborn CLI - Celeborn 0.6.0
(base) ➜ apache-celeborn-0.6.0-bin-ebay ./sbin/celeborn-cli -V
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Celeborn CLI - Celeborn 0.6.0
(base) ➜ apache-celeborn-0.6.0-bin-ebay ./sbin/celeborn-cli master -V
(base) ➜ apache-celeborn-0.6.0-bin-ebay ./sbin/celeborn-cli worker -V
(base) ➜ apache-celeborn-0.6.0-bin-ebay ./sbin/celeborn-cli master -h
Usage: celeborn-cli master [-hV] [--apps=appId] [--auth-header=authHeader]
...
(base) ➜ apache-celeborn-0.6.0-bin-ebay ./sbin/celeborn-cli worker -h
Usage: celeborn-cli worker [-hV] [--apps=appId] [--auth-header=authHeader]
...
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
```
(base) ➜ celeborn git:(cli_version) ./dist/sbin/celeborn-cli -V
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Celeborn CLI - Celeborn 0.7.0-SNAPSHOT
(base) ➜ celeborn git:(cli_version) ./dist/sbin/celeborn-cli master -V
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Celeborn CLI - Celeborn 0.7.0-SNAPSHOT
(base) ➜ celeborn git:(cli_version) ./dist/sbin/celeborn-cli worker -V
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Celeborn CLI - Celeborn 0.7.0-SNAPSHOT
(base) ➜ celeborn git:(cli_version)
```
Closes#3321 from turboFei/cli_version.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Follow up PR for https://github.com/apache/celeborn/pull/3286 – Handling IOException wrapped inside CelebornException.
### Why are the changes needed?
`org.apache.celeborn.common.util.ThreadUtils#awaitResult` wraps non-timeout exception into CelebornException because of which it is not getting caught and retries are not working.
Ex –
```
org.apache.celeborn.common.exception.CelebornRuntimeException: setupLifecycleManagerRef failed!
at org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1834)
at org.apache.celeborn.client.ShuffleClient.get(ShuffleClient.java:89)
at org.apache.spark.shuffle.celeborn.SparkShuffleManager.getWriter(SparkShuffleManager.java:241)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:556)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:559)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.celeborn.common.exception.CelebornException: Exception thrown in awaitResult:
at org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:320)
at org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:78)
at org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRefByAddr(RpcEnv.scala:111)
at org.apache.celeborn.common.rpc.RpcEnv.$anonfun$setupEndpointRef$1(RpcEnv.scala:133)
at org.apache.celeborn.common.util.Utils$.withRetryOnTimeoutOrIOException(Utils.scala:1306)
at org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:133)
at org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1828)
... 12 more
Caused by: java.net.SocketException: Connection reset
at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:426)
at org.apache.celeborn.shaded.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:255)
at org.apache.celeborn.shaded.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at org.apache.celeborn.shaded.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
NA
Closes#3315 from s0nskar/ioexception.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Bump Spark from 3.5.5 to 3.5.6.
### Why are the changes needed?
Spark 3.5.6 has been announced to release: [Spark 3.5.6 released](https://spark.apache.org/news/spark-3-5-6-released.html). The profile spark-3.5 could bump Spark from 3.5.5 to 3.5.6.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3319 from SteNicholas/CELEBORN-2030.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Some minor performance optimizations in the internal implementation
### Why are the changes needed?
During our use of Flink with Celeborn, we identified several minor optimizations that can be made:
1. In the client side, the Flink-Celeborn client parses the `pushDataTimeout` configuration too frequently, which is unnecessary and cpu-intensive.
2. On the worker side, Celeborn needs to filter readers that are able for reading. However, using Java Stream's collection operations is costly in terms of performance.
3. Also on the worker side, Celeborn currently checks whether a reader can continue reading by comparing the current read offset with the total file size. This check involves retrieving the total file size, which is an expensive operation. Since this value is constant, it should be cached in memory instead of being fetched multiple times.
4. In the Flink’s hybrid shuffle integration, the `EndOfSegment` event should not be bundled with data buffers. If it is, there is a risk of data corruption or misinterpretation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test.
Closes#3318 from codenohup/CELEBORN-2029.
Authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
At a new parameter, allow CelebornShuffleReader to decompress data on demand
### Why are the changes needed?
In the current scenario of Gluten Fallback, a stage can have both Native and Java shuffles simultaneously. In [CELEBORN-1625](https://issues.apache.org/jira/browse/CELEBORN-1625), we introduced a new parameter to conditionally skip compression. Alongside this, we should add a corresponding parameter to ensure that the Celeborn shuffle reader does not decompress data when compression is skipped at pushOrMergeData.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#3317 from RexXiong/CELEBORN-2027.
Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Record the last reported shuffle fetch failure task id.
### Why are the changes needed?
Because the reported shuffle fetch failure task id might be cleaned up fast after recorded.
To prevent flaky test, it is better to record the last reported task id for testing.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA for 3 times.
Closes#3301 from turboFei/app_id_debug.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Speculatively tasks will be interrupted if another task succeeds. In this case, interrupting the speculative execution can lead to client retries, and ignoring the InterruptedException might prevent the task from being killed promptly.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
PASS GA
Closes#3314 from RexXiong/CELEBORN-1673-FOLLOWUP.
Authored-by: lvshuang.xjs <lvshuang.xjs@taobao.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
This PR is part of [CIP17: Interruption Aware Slot Selection](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-17%3A+Interruption+Aware+Slot+Selection).
It introduces a REST api for external services to notify master about interruptions/schedules.
### Why are the changes needed?
To nofify master of upcoming interruption notices in the worker fleet. Master can then use these to proactively deprioritize workers that might be in scope for interruption sooner.
### Does this PR introduce _any_ user-facing change?
new rest api
### How was this patch tested?
added unit tests.
Closes#3285 from akpatnam25/CELEBORN-2014.
Authored-by: Aravind Patnam <akpatnam25@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Add a retry mechanism when completing S3 multipart upload to ensure that completeMultipartUpload is retry when facing retryable exception like SlowDown one
### Why are the changes needed?
While running a “simple” spark jobs creating 10TiB of shuffle data (repartition from 100k partition to 20) the job was constantly failing when all files should be committed. relying on SOFT `celeborn.client.shuffle.partitionSplit.mode`
Despite an increase of `celeborn.storage.s3.mpu.maxRetries` up to `200`. Job was still failing due to SlowDown exception
Adding some debug logs on the retry policy from AWS S3 SDK I've seen that the policy is never called when doing completeMultipartUpload action while it is well called on other actions. See https://issues.apache.org/jira/browse/CELEBORN-2003
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Created a cluster on a kubernetes server relying on S3 storage.
Launch a 10TiB shuffle from 100000 partitions to 200 partitions with SOFT `celeborn.client.shuffle.partitionSplit.mode`
The job succeed and well display some warn logs indicating that the `completeMultipartUpload` is retried due to SlowDown:
```
bucket ******* key poc/spark-2c86663c948243d19c127e90f704a3d5/0/35-39-0 uploadId Pbaq.pp1qyLvtGbfZrMwA8RgLJ4QYanAMhmv0DvKUk0m6.GlCKdC3ICGngn7Q7iIa0Dw1h3wEn78EoogMlYgFD6.tDqiatOTbFprsNkk0qzLu9KY8YCC48pqaINcvgi8c1gQKKhsf1zZ.5Et5j40wQ-- upload failed to complete, will retry (1/10)
com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: null; Status Code: 0; Error Code: SlowDown; Request ID: RAV5MXX3B9Z3ZHTG; S3 Extended Request ID: 9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S; Proxy: null), S3 Extended Request ID: 9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$CompleteMultipartUploadHandler.doEndElement(XmlResponsesSaxParser.java:1906)
```
Closes#3293 from ashangit/nfraison/CELEBORN-2003.
Authored-by: nicolas.fraison@datadoghq.com <nicolas.fraison@datadoghq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
- Add `namespace` to the object metadata for all namespace-scoped resources.
- Update related Helm unit tests.
- Bump chart `appVersion` to `0.5.4` since the Docker image with this tag has already been released.
### Why are the changes needed?
- Improve the Helm charts.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Run Helm unit tests by `helm unittest charts/celeborn --file "tests/**/*_test.yaml" --strict --debug`.
Closes#3295 from ChenYi015/helm/namespace.
Authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
Spark4 uses scala 2.13, but it cannot be connected to the celeborn master compiled with scala 2.12.
For example, the first node of `celeborn.master.endpoints` configured by client is not a leader, and RpcFailure will be returned at this time.
```
https://github.com/scala/bug/issues/11207https://users.scala-lang.org/t/serialversionuid-change-between-scala-2-12-6-and-2-12-7/3478
```
```java
java.io.InvalidClassException: org.apache.celeborn.common.rpc.netty.RpcFailure; local class incompatible: stream classdesc serialVersionUID = 2793139166962436434, local class serialVersionUID = -1724324816907181707
at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:598) ~[?:?]
```
```bash
scala -cp /tmp/celeborn-client-spark-3-shaded_2.12-0.5.4.jar
```
```scala
:paste -raw
package org.apache.celeborn {
class Y {
def printId = {
val clazz = classOf[org.apache.celeborn.common.rpc.netty.RpcFailure]
val uid = java.io.ObjectStreamClass.lookup(clazz).getSerialVersionUID
println(s"Scala version: ${scala.util.Properties.versionNumberString}")
println(s"serialVersionUID: $uid")
}
}
}
new org.apache.celeborn.Y().printId
```
2.11
```
Scala version: 2.11.12
serialVersionUID: 2793139166962436434
```
2.12
```
Scala version: 2.12.19
serialVersionUID: 2793139166962436434
```
2.13
```
Scala version: 2.13.16
serialVersionUID: -1724324816907181707
```
### Does this PR introduce _any_ user-facing change?
If we used the master compiled with 2.13 before, it may be incompatible.
### How was this patch tested?
local test
```
Scala version: 2.13.16
serialVersionUID: 2793139166962436434
```
Closes#3309 from cxzl25/CELEBORN-2025.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Skip building the Tez client when releasing 0.6.0.
### Why are the changes needed?
The Tez client has not been fully verified, it will need some time before it is ready.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
NO.
Closes#3312 from FMX/b2026.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Add `--auth-header` option to usage of CLI commands.
Follow up #3300.
### Why are the changes needed?
#3300 is lack of `--auth-header` option in usage of CLI commands.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
```
./build/make-distribution.sh
./dist/sbin/celeborn-cli master -h
./dist/sbin/celeborn-cli worker -h
```
Closes#3311 from SteNicholas/CELEBORN-2020.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
```java
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/celeborn/shaded/org/apache/commons/io/output/ByteArrayOutputStream
at org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO.<init>(CelebornShuffleDataIO.java:41)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:75)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:53)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2834)
at scala.collection.immutable.List.flatMap(List.scala:294)
at scala.collection.immutable.List.flatMap(List.scala:79)
at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2826)
at org.apache.spark.shuffle.ShuffleDataIOUtils$.loadShuffleDataIO(ShuffleDataIOUtils.scala:35)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:633)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:3055)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
local test
Closes#3304 from cxzl25/CELEBORN-2022.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Fix a NPE when reading HDFS files.
2. Change partition manager will generate correct storage info.
3. Add assertions for tier writers.
### Why are the changes needed?
Regression for release 0.6.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster tests.
Closes#3302 from FMX/b2021.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix the remaining master sub commands that does not transfer auth header.
### Why are the changes needed?
Before, this mistake was not detected by GA.
Because the authentication configs was not trasnferred when setting mini celeborn cluster.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
Closes#3303 from turboFei/auth_header_followup.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: 子懿 <ziyi.jxf@antgroup.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
```java
Exception in thread "main" java.lang.NoSuchMethodError: 'boolean org.apache.spark.util.Utils.isLocalMaster(org.apache.spark.SparkConf)'
at org.apache.spark.shuffle.celeborn.SparkShuffleManager.executorCores(SparkShuffleManager.java:464)
at org.apache.spark.shuffle.celeborn.SparkShuffleManager.<init>(SparkShuffleManager.java:117)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:75)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:53)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
at org.apache.spark.util.Utils$.instantiateSerializerOrShuffleManager(Utils.scala:2584)
at org.apache.spark.shuffle.ShuffleManager$.create(ShuffleManager.scala:108)
at org.apache.spark.SparkEnv.initializeShuffleManager(SparkEnv.scala:226)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:589)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:3055
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
local test
Closes#3305 from cxzl25/CELEBORN-2023.
Lead-authored-by: sychen <sychen@ctrip.com>
Co-authored-by: cxzl25 <3898450+cxzl25@users.noreply.github.com>
Signed-off-by: 子懿 <ziyi.jxf@antgroup.com>
### What changes were proposed in this pull request?
Support min number of workers to assign slots on for a shuffle.
### Why are the changes needed?
PR https://github.com/apache/celeborn/pull/3039 updated the default value of `celeborn.master.slot.assign.extraSlots` to avoid skew in shuffle stage with less number of reducers. However, it will also affect the stage with large number of reducers, thus not ideal.
We are introducing a new config `celeborn.master.slot.assign.minWorkers` which will ensure that shuffle stages with less number of reducers will not cause load imbalance on few nodes.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
NA
Closes#3297 from s0nskar/min_workers.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Support http authentication for Celeborn CLI.
### Why are the changes needed?
Current CLI does not work if the authentication is enabled for master or worker.
### Does this PR introduce _any_ user-facing change?
Yes, a new option.
### How was this patch tested?
UT.
Closes#3300 from turboFei/cli_auth.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Introduce `ShuffleMetricGroup` for `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics.
Follow up #3272.
### Why are the changes needed?
`numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics should put shuffle id into variables, which could introduce `ShuffleMetricGroup` to support. Meanwhile, #3272 would print many same logs as follows that shoud be improved:
```
2025-05-28 10:48:54,433 WARN [flink-akka.actor.default-dispatcher-18] org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported.[11.66.62.202, taskmanager, antc4flink3980005426-taskmanager-3-70, antc4flink3980005426, [vertex-2]HashJoin(joinType=[LeftOuterJoin], where=[(f0 = f00)], select=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60], build=[right]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60]), 2, Shuffle, Remote, 1]
```
### Does this PR introduce _any_ user-facing change?
Introduce `celeborn.client.flink.metrics.scope.shuffle` config option to define the scope format string that is applied to all metrics scoped to a shuffle:
- Variables:
- Shuffle: `<task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>, <shuffle_id>`.
- Metrics:
Scope | Metrics | Description | Type
-- | -- | -- | --
Shuffle | numBytesIn | The total number of bytes this shuffle has read. | Counter |
Shuffle | numBytesOut| The total number of bytes this shuffle has written. | Counter |
Shuffle | numRecordsOut | The total number of records this shuffle has written. | Counter |
Shuffle | numBytesInPerSecond | The number of bytes this shuffle reads per second. | Meter |
Shuffle | numBytesOutPerSecond | The number of bytes this shuffle writes per second. | Meter |
Shuffle | numRecordsOutPerSecond | The number of records this shuffle writes per second. | Meter |
### How was this patch tested?
Manual test.


Closes#3296 from SteNicholas/CELEBORN-2005.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Weijie Guo <reswqa@163.com>
### What changes were proposed in this pull request?
Revert role name change in [CELEBORN-1627](https://github.com/apache/celeborn/pull/2777)
### Why are the changes needed?
Fix the issue where the case of name affects the metrics dashboard
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual
Closes#3299 from RexXiong/CELEBORN-1627-FOLLOWUP.
Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Support dependencies of `spark-4.0` profile.
Follow up #3282.
### Why are the changes needed?
#3282 is lack of dependencies support of `spark-4.0` profile.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Dependencies check: maven-jdk17 (spark-4.0).
Closes#3298 from SteNicholas/CELEBORN-1413.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Bump spark 4.0 version to 4.0.0.
### Why are the changes needed?
Spark 4.0.0 is ready.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes#3282 from turboFei/spark_4.0.
Lead-authored-by: Fei Wang <fwang12@ebay.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Exclude worker in lifecycle manager if the commit files request on workers fails with `COMMIT_FILE_EXCEPTION` or after multiple retries.
### Why are the changes needed?
If worker is under high load and not able to process request because of high CPU, we should exclude it so it will not affect the next retry to shuffle stage.
Internally, we are seeing commit file futures in worker under high load are getting timed out and next retry of the stage is again picking same servers and failing. Similarly, we are seeing continuous RpcTimeout for workers but those workers are again getting selected for next retry.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
NA
Closes#3276 from s0nskar/worker_exlude_on_commit_exception.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
- Add support for configuring volumeClaimTemplates by adding new values `master.volumeClaimTemplates` and `worker.volumeClaimTemplates`.
### Why are the changes needed?
Use volume claim template to support various storage backend.
### Does this PR introduce _any_ user-facing change?
Yes. New Helm values `master.volumeClaimTemplates` and `worker.volumeClaimTemplates` are added.
### How was this patch tested?
```bash
helm unittest charts/celeborn --file "tests/**/*_test.yaml" --strict --debug
```
Closes#3277 from ChenYi015/helm/volume-claim-templates.
Authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
- Support retry on IOException failures for RpcRequest in addition with RpcTimeoutException.
- Moved duplicate code to Utils
### Why are the changes needed?
Currently if a request fails with SocketException or IOException it does not get retried which leads to stage failures. Celeborn should retry on such connection failures.
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
NA
Closes#3286 from s0nskar/setup_lifecycle_exception.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
Config `celeborn.master.slot.assign.loadAware.fetchTimeWeight` default value is 1, and slotsallocation document is configured as 0.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes#3287 from cxzl25/minor_doc_slot.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Add license for http5.
637c42338e/dev/deps/dependencies-server (L36-L38)
### Why are the changes needed?
Fix license.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual
Closes#3281 from turboFei/license.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>