### What changes were proposed in this pull request?
Reuse FileChannel/FSDataInputStream in PartitionDataReader to avoid open too many files.
### Why are the changes needed?
In previous implementations, different PartitionDataReader reused the same FileChannel/FSDataInputStream,but in #3349
changed this logic, so we need to maintain logical consistency
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#3445 from Alibaba-HZY/mirror-fix.
Authored-by: daowu.hzy <daowu.hzy@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
…T StatusCode
### What changes were proposed in this pull request?
When fileWriter is closed, it should return HARD_SPLIT StatusCode
### Why are the changes needed?
When fileWriter is closed, it should return HARD_SPLIT StatusCode
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes#3448 from xy2953396112/CELEBORN-2127.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix the condition for using OSS storage
### Why are the changes needed?
When OSS is enabled, the local disk should be used first if it is available.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes#3463 from xy2953396112/CELEBORN-2139.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Avoiding multiple accesses to HDFS When writting index file.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#3460 from taowenjun/CELEBORN-2138.
Authored-by: taowenjun <1483633867@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Remove unused `MAPGROUP` `PartitionType`.
### Why are the changes needed?
`PartitionType` `MAPGROUP` is unused at present, which could be removed.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3459 from SteNicholas/CELEBORN-2137.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix NettyRpcEnv,Master,Worker `Source` to avoid thread leak
### Why are the changes needed?
1. NettyRpcEnv should clean rpcSource to prevent resource leak.
2. Master clean resourceConsumptionSource, masterSource, threadPoolSource, jVMSource, jVMCPUSource, systemMiscSource
3. Worker clean clean workerSource.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
local
Closes#3418 from xy2953396112/CELEBORN-2104.
Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
…ased on the mount point
### What changes were proposed in this pull request?
When creating a DiskFile, retrieve the storage type based on the mount point
### Why are the changes needed?
If the worker disk is an SSD, it should be set to the SSD type, and the storage type can be retrieved based on the mount point.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes#3456 from xy2953396112/CELEBORN-2134.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Close hadoopFs FileSystem when worker is closed.
### Why are the changes needed?
When the worker is closed, close the hadoopFs FileSystem to avoid resource leakage.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3449 from xy2953396112/CELEBORN-2128.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
log only outputs fileid, exception only has file path, and no file length.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
```
25/08/25 18:06:37,083 ERROR [pool-1-thread-1] PartitionFilesSorter: Sorting file application-1-/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/Celeborn1344631182030527062sort-suite path /var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/Celeborn1344631182030527062sort-suite length 2453444321 timeout after 1ms
```
Closes#3446 from cxzl25/CELEBORN-2125.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Introduce `IsHighWorkload` metric to monitor worker overload status.
### Why are the changes needed?
There is no any metric to monitor worker overload status at present.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
[Grafana test](https://xy2953396112.grafana.net/public-dashboards/22ab1750ef874a1bb39b5879b81a24cf).
Closes#3435 from xy2953396112/CELEBORN-2118.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Add log for commit file size.
### Why are the changes needed?
Statistics on the file size information of successfully committed files enables offline analysis of file write sizes, assessment of the proportion of small files, and implementation of optimizations based on file size data.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3444 from xy2953396112/CELEBORN-2123.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Add `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric.
### Why are the changes needed?
Introduce `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric to record status of pause push data.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test. [Grafana](https://xy2953396112.grafana.net/public-dashboards/21af8e2844234c438e74c741211f0032)
Closes#3426 from xy2953396112/CELEBORN-2112.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
…ation UniqueId info
### What changes were proposed in this pull request?
CommitFile/Reserved location shows detail primary location UniqueId info
### Why are the changes needed?
CommitFile/Reserved should display detailed partitionLocation uniqueId logs to facilitate troubleshooting.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes#3420 from xy2953396112/controller_log.
Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Introduce `SorterCacheHitRate` metric to monitor the hit reate of index cache for sorter.
### Why are the changes needed?
Monitor the hit rate of `PartitionFilesSorter#indexCache`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The verified grafana dashboard: https://xy2953396112.grafana.net/public-dashboards/5d1177ee0f784b53ad817fde919141b7Closes#3416 from xy2953396112/CELEBORN_2102.
Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Remove redundant `PartitionType`.
### Why are the changes needed?
`PartitionType` is included in `PartitionDataWriterContext`, therefore it is not necessary to use `PartitionType` as method parameter.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3422 from xy2953396112/remove_useless_partition_type.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
`DfsTierWriter` should close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` for close resource to avoid resource leak for destroy file writer.
### Why are the changes needed?
`DfsTierWriter` does not close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` in `closeResource`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3433 from SteNicholas/CELEBORN-2119.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
This PR adds support for zstd decompression in CppClient.
### Why are the changes needed?
To support reading from Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes#3411 from Jraaay/feat/cpp_client_zstd_decompression.
Authored-by: Jray <1075860716@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR adds support for lz4 decompression in CppClient.
### Why are the changes needed?
To support reading from Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes#3402 from Jraaay/feat/cpp_client_lz4_decompression.
Authored-by: Jray <1075860716@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Apply for a byte array in advance and use it as a transfer when copying is needed during flush
### Why are the changes needed?
For HdfsFlushTask, OssFlushTask, and S3FlushTask, you need to copy the CompositeByteBuf in the parameter to a byte array when flushing, and then use the respective clients to write the byte array to the storage.
When the flush throughput rate is very high, this copying will cause very serious GC problems and affect the performance of the worker
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
cluster test
Closes#3394 from TheodoreLx/copy-on-flush.
Authored-by: TheodoreLx <1548069580@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix the condition of `StoragePolicy` that worker uses memory storage
### Why are the changes needed?
The condition of `StoragePolicy` that worker uses memory storage is `order.contains(StorageInfo.Type.MEMORY.name())`, which condition is wrong because `Option#contains` is as follows:
```
final def contains[A1 >: A](elem: A1): Boolean = !isEmpty && this.get == elem
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3408 from SteNicholas/CELEBORN-1844.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Add missing break in resumeByPinnedMemory
### Why are the changes needed?
Avoid execute `resumePush` twice when resume by pinned memory from `PUSH_AND_REPLICATE_PAUSED` state.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA and cluster tests.
Closes#3407 from Flyangz/bugfix/fix-resumeByPinnedMemory-switch.
Authored-by: liuyang62 <liuyang62@staff.sina.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout
### Why are the changes needed?
1. the COMMIT_FILES_FAIL_COUNT is 0 even we meet SHUFFLE_DATA_LOST caused by commit files failure
Spark executor log:
```
25/07/30 10:10:39 WARN CelebornShuffleReader: Handle fetch exceptions for 0-0org.apache.celeborn.common.exception.CelebornIOException: Failed to load file group of shuffle 0 partition 441! Request GetReducerFileGroup(0,false,V1) return SHUFFLE_DATA_LOST for 0.
```
Spark driver log:
```
25/07/30 10:10:38 ERROR ReducePartitionCommitHandler: Failed to handle stageEnd for 0, lost file!
25/07/30 10:10:38 ERROR ReducePartitionCommitHandler:
For shuffle application_1750652300305_10219240_1-0 partition data lost:
Lost partition 307-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
Lost partition 1289-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
```
Worker log:
```
java.io.IOException: Wait pending actions timeout.
at org.apache.celeborn.service.deploy.worker.storage.TierWriterBase.waitOnNoPending(TierWriter.scala:158)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Closes#3403 from turboFei/commit_failed.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
`S3FlushTask` and `OssFlushTask` should close `ByteArrayInputStream` to avoid resource leak.
### Why are the changes needed?
`S3FlushTask` and `OssFlushTask` don't close `ByteArrayInputStream` at present, which may cause resource leak.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3395 from SteNicholas/CELEBORN-2086.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Refactoring similar code about configuration usage.
### Why are the changes needed?
Improve scalability for possible new remote storage in the future.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
Closes#3353 from Kalvin2077/draft.
Authored-by: Kalvin2077 <wk.huang2077@outlook.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Support `MapPartitionData` on DFS.
### Why are the changes needed?
`MapPartitionData` only supports on local, which does not support on DFS. It's recommended to support `MapPartitionData` on DFS for MapPartition mode to align the ability of ReducePartition mode.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`WordCountTestWithHDFS`.
Closes#3349 from SteNicholas/CELEBORN-2047.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
`id-epoch` cannot locate which application is specific. We can add shuffle key to the log.
```
25/07/25 04:23:06,439 ERROR [celeborn-push-timeout-checker-0] PushDataHandler: PushMergedData replicate failed for partitionLocation: PartitionLocation[
id-epoch:1623-0
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes#3390 from cxzl25/CELEBORN-2081.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Fix OpenStreamTime metrics for PbOpenStreamList request
### Why are the changes needed?
For `PbOpenStreamList` request, the `OpenStreamTime` metrics is not calculated.
cf3c05d668/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala (L140-L160)
And the `workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey) ` is meaningless inside `handleReduceOpenStreamInternal`. cf3c05d668/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala (L335-L341)
The `handleReduceOpenStreamInternal` is called in
1. inside `handleOpenStreamInternal`, the `OpenStreamTime` metrics is handled correctly
2. for `PbOpenStreamList, the `OpenStreamTime` metrics is not handled.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually testing.
Closes#3376 from turboFei/buf_size.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
Added metrics for the amount of data written to different storage types, including Local, HDFS, OSS, and S3
Currently, there is a lack of data volume written to each storage, and it is impossible to monitor the size and speed of writing.
no
Cluster Test
Closes#3362 from TheodoreLx/add-flush-count-metric.
Authored-by: TheodoreLx <1548069580@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Clean up deprecated Guava API usage.
### Why are the changes needed?
There are deprecated Guava API usage, including:
1. Made modifications to Throwables.propagate with reference to https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate
- For cases where it is known to be a checked exception, including `IOException`, `GeneralSecurityException`, `SaslException`, and `RocksDBException`, none of which are subclasses of RuntimeException or Error, directly replaced Throwables.propagate(e) with `throw new RuntimeException(e);`.
- For cases where it cannot be determined whether it is a checked exception or an unchecked exception or Error, use
```
throwIfUnchecked(e);
throw new RuntimeException(e);
```
to replace `Throwables.propagate(e)`.
0c33dd12b1/guava/src/com/google/common/base/Throwables.java (L199-L235)
```
/**
* ...
* deprecated To preserve behavior, use {code throw e} or {code throw new RuntimeException(e)}
* directly, or use a combination of {link #throwIfUnchecked} and {code throw new
* RuntimeException(e)}. But consider whether users would be better off if your API threw a
* different type of exception. For background on the deprecation, read <a
* href="https://goo.gl/Ivn2kc">Why we deprecated {code Throwables.propagate}</a>.
*/
CanIgnoreReturnValue
J2ktIncompatible
GwtIncompatible
Deprecated
public static RuntimeException propagate(Throwable throwable) {
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}
```
Backport https://github.com/apache/spark/pull/48248
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3367 from SteNicholas/CELEBORN-2067.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Try to use memory storage first if it is available.
To increase performance, if a cluster is set to use MEMORY and HDFS.
### Why are the changes needed?
To keep the old behavior as in release 0.5, always try to use memory storage first because the slots allocator won't allocate slots on memory storage.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes#3352 from FMX/b1844-2.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Inspired by [FLINK-38038](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-38038?filter=allissues]), I used [Tongyi Lingma](https://lingma.aliyun.com/) and qwen3-thinking LLM to identify and fix some typo issues in the Celeborn codebase. For example:
- backLog → backlog
- won`t → won't
- can to be read → can be read
- mapDataPartition → mapPartitionData
- UserDefinePasswordAuthenticationProviderImpl → UserDefinedPasswordAuthenticationProviderImpl
### Why are the changes needed?
Remove typos to improve source code readability for users and ease development for developers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Code and documentation cleanup does not require additional testing.
Closes#3356 from codenohup/fix-typo.
Authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Support write map partition to DFS.
### Why are the changes needed?
`DiskFileInfo` is wrong when flink write,it did not distinguish the types of FileMeta.
indexBuffer will also fail when flink write in the dfs case,because indexBuffer.array will throw `UnsupportedOperationException`(it is allocated by direct memory).
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#3350 from Alibaba-HZY/fix-flink-writer-hdfs.
Lead-authored-by: daowu.hzy <daowu.hzy@alibaba-inc.com>
Co-authored-by: Nicholas Jiang <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Design doc - https://docs.google.com/document/d/1YqK0kua-5rMufJw57kEIrHHGbLnAF9iXM5GdDweMzzg/edit?tab=t.0#heading=h.n5ldma432qnd
- End to End integrity checks provide additional confidence that Celeborn is producing complete as well as correct data
- The checks are hidden behind a client side config that is false by default. Provides users optionality to enable these if required on a per app basis
- Only compatible with Spark at the moment
- No support for Flink (can be considered in future)
- No support for Columnar Shuffle (can be considered in future)
Writer
- Whenever a mapper completes, it reports crc32 and bytes written on a per partition basis to the driver
Driver
- Driver aggregates the mapper reports - and computes aggregated CRC32 and bytes written on per partitionID basis
Reader
- Each CelebornInputStream will report (int shuffleId, int partitionId, int startMapIndex, int endMapIndex, int crc32, long bytes) to driver when it finished reading all data on the stream
- On every report
- Driver will aggregate the CRC32 and bytesRead for the partitionID
- Driver will aggregate mapRange to determine when all sub-paritions have been read for partitionID have been read
- It will then compare the aggregated CRC32 and bytes read with the expected CRC32 and bytes written for the partition
- There is special handling for skewhandlingwithoutMapRangeSplit scenario as well
- In this case, we report the number of sub-partitions and index of the sub-partition instead of startMapIndex and endMapIndex
There is separate handling for skew handling with and without map range split
As a follow up, I will do another PR that will harden up the checks and perform additional checks to add book keeping that every CelebornInputStream makes the required checks
### Why are the changes needed?
https://issues.apache.org/jira/browse/CELEBORN-894
Note: I am putting up this PR even though some tests are failing, since I want to get some early feedback on the code changes.
### Does this PR introduce _any_ user-facing change?
Not sure how to answer this. A new client side config is available to enable the checks if required
### How was this patch tested?
Unit tests + Integration tests
Closes#3261 from gauravkm/gaurav/e2e_checks_v3.
Lead-authored-by: Gaurav Mittal <gaurav@stripe.com>
Co-authored-by: Gaurav Mittal <gauravkm@gmail.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Specify `extractionDir` of `AsyncProfilerLoader` with `celeborn.worker.jvmProfiler.localDir`.
### Why are the changes needed?
`AsyncProfilerLoader` uses `user.home` directory to store the extracted libraries by default . When `user.home` directory is not initialized, it will cause `AsyncProfilerLoader#load` to fail. `extractionDir` of `AsyncProfilerLoader` could be specified with `celeborn.worker.jvmProfiler.localDir` to avoid failure of loading.
Backport https://github.com/apache/spark/pull/51229.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#3345 from SteNicholas/CELEBORN-2046.
Lead-authored-by: SteNicholas <programgeek@163.com>
Co-authored-by: 子懿 <ziyi.jxf@antgroup.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Proactively cleanup from ChunkStreamManager when stream is closed.
### Why are the changes needed?
Stream gets closed only when shuffle expires at master, which can take a while.
In meantime, workers incur have nontrivial memory utilization.
### Does this PR introduce _any_ user-facing change?
No. Reduces memory usage in workers.
### How was this patch tested?
Existing unit tests
Closes#3343 from mridulm/proactively-cleanup-streams.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Return SoftSplit status when there is no hard split for pushMergeData
### Why are the changes needed?
HardSplit support was introduced in [CELEBORN-1721](https://issues.apache.org/jira/browse/CELEBORN-1721), and it works well when the client and server are of the same version. However, using a 0.5.x client with a 0.6.x server can significantly impact shuffle write performance. This is because the 0.6.x server returns hardsplit status whenever there is any soft or hard split, leading the 0.5.x client to perform hardsplit on every partition. To maintain compatibility during upgrades, it's essential to preserve the original behavior for 0.5.x clients.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA & 1T tpcds with 0.5.4 version client, according to the test results, after applying this PR, revive decreased significantly, and performance improved.
#### test use 0.6.0-rc2 server, 0.5.4 client

#### test use 0.7.0 server + this pr, 0.5.4 client

Closes#3342 from RexXiong/CELEBORN-1721-FOLLOWUP.
Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
If the create file order list doesn't have the corresponding storage types, use 0 as the index.
### Why are the changes needed?
This is needed in two scenarios:
1. For existing clients, the change partition will select MEMORY as the default storage tier, which will cause the revived partition to utilize memory storage even if the application is not configured to do so. This is the cause of [CELEBORN-2043], and fixed by [CELEBORN-2021](https://github.com/apache/celeborn/pull/3302).
2. Evicted files will need to exclude itself storage type in the create file order list. which means that the storage type does not exist in the create file order list.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes#3344 from FMX/b2043.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
- Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
- Be sure to keep the PR description updated to reflect all changes.
- Please write your PR title to summarize what this PR proposes.
- If possible, provide a concise example to reproduce the issue for a faster review.
-->
### What changes were proposed in this pull request?
Added a commit files request fail count metric.
### Why are the changes needed?
To monitor and tune the configurations around the commit files workflow.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Local setup
<img width="739" alt="Screenshot 2025-06-04 at 10 51 06 AM" src="https://github.com/user-attachments/assets/d6256028-d8b7-4a81-90b1-3dcbf61adeba" />
Closes#3307 from s0nskar/commit_metric.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Adding register with master fail count metric for worker
### Why are the changes needed?
This will help put monitoring around if workers are not able to register with master like wrong endpoints are passed or master becomes unavailable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Local setup
<img width="724" alt="Screenshot 2025-06-04 at 10 44 56 AM" src="https://github.com/user-attachments/assets/1f84557b-5df8-422f-b602-bb5316a72a0e" />
Closes#3308 from s0nskar/worker_register_metric.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
updateProduceBytes should be called even if updateProduceBytes throws exception
### Why are the changes needed?
To make UserProduceSpeed metrics more accurate
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes#3322 from leixm/CELEBORN-2033.
Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Some minor performance optimizations in the internal implementation
### Why are the changes needed?
During our use of Flink with Celeborn, we identified several minor optimizations that can be made:
1. In the client side, the Flink-Celeborn client parses the `pushDataTimeout` configuration too frequently, which is unnecessary and cpu-intensive.
2. On the worker side, Celeborn needs to filter readers that are able for reading. However, using Java Stream's collection operations is costly in terms of performance.
3. Also on the worker side, Celeborn currently checks whether a reader can continue reading by comparing the current read offset with the total file size. This check involves retrieving the total file size, which is an expensive operation. Since this value is constant, it should be cached in memory instead of being fetched multiple times.
4. In the Flink’s hybrid shuffle integration, the `EndOfSegment` event should not be bundled with data buffers. If it is, there is a risk of data corruption or misinterpretation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test.
Closes#3318 from codenohup/CELEBORN-2029.
Authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
At a new parameter, allow CelebornShuffleReader to decompress data on demand
### Why are the changes needed?
In the current scenario of Gluten Fallback, a stage can have both Native and Java shuffles simultaneously. In [CELEBORN-1625](https://issues.apache.org/jira/browse/CELEBORN-1625), we introduced a new parameter to conditionally skip compression. Alongside this, we should add a corresponding parameter to ensure that the Celeborn shuffle reader does not decompress data when compression is skipped at pushOrMergeData.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes#3317 from RexXiong/CELEBORN-2027.
Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Add a retry mechanism when completing S3 multipart upload to ensure that completeMultipartUpload is retry when facing retryable exception like SlowDown one
### Why are the changes needed?
While running a “simple” spark jobs creating 10TiB of shuffle data (repartition from 100k partition to 20) the job was constantly failing when all files should be committed. relying on SOFT `celeborn.client.shuffle.partitionSplit.mode`
Despite an increase of `celeborn.storage.s3.mpu.maxRetries` up to `200`. Job was still failing due to SlowDown exception
Adding some debug logs on the retry policy from AWS S3 SDK I've seen that the policy is never called when doing completeMultipartUpload action while it is well called on other actions. See https://issues.apache.org/jira/browse/CELEBORN-2003
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Created a cluster on a kubernetes server relying on S3 storage.
Launch a 10TiB shuffle from 100000 partitions to 200 partitions with SOFT `celeborn.client.shuffle.partitionSplit.mode`
The job succeed and well display some warn logs indicating that the `completeMultipartUpload` is retried due to SlowDown:
```
bucket ******* key poc/spark-2c86663c948243d19c127e90f704a3d5/0/35-39-0 uploadId Pbaq.pp1qyLvtGbfZrMwA8RgLJ4QYanAMhmv0DvKUk0m6.GlCKdC3ICGngn7Q7iIa0Dw1h3wEn78EoogMlYgFD6.tDqiatOTbFprsNkk0qzLu9KY8YCC48pqaINcvgi8c1gQKKhsf1zZ.5Et5j40wQ-- upload failed to complete, will retry (1/10)
com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: null; Status Code: 0; Error Code: SlowDown; Request ID: RAV5MXX3B9Z3ZHTG; S3 Extended Request ID: 9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S; Proxy: null), S3 Extended Request ID: 9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$CompleteMultipartUploadHandler.doEndElement(XmlResponsesSaxParser.java:1906)
```
Closes#3293 from ashangit/nfraison/CELEBORN-2003.
Authored-by: nicolas.fraison@datadoghq.com <nicolas.fraison@datadoghq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Fix a NPE when reading HDFS files.
2. Change partition manager will generate correct storage info.
3. Add assertions for tier writers.
### Why are the changes needed?
Regression for release 0.6.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster tests.
Closes#3302 from FMX/b2021.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Minor logging improvement around commit files to log shuffleKey.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
Some logs will change.
### How was this patch tested?
NA
Closes#3270 from s0nskar/CELEBORN-1775.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Improve Aliyun OSS support including `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`.
### Why are the changes needed?
There are many methods where OSS support is lacking in `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3268 from SteNicholas/CELEBORN-1916.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
OpenStreamTime should use requestId to record cost time instead of shuffleKey
### Why are the changes needed?
OpenStreamTime is wrong because there will be multiple OpenStream requests for the same shuffleKey in the same time period.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes#3258 from leixm/CELEBORN-1999.
Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Introduce disruptor dependency to support asynchronous logging of log4j2.
### Why are the changes needed?
We add `-Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector` in `CELEBORN_MASTER_JAVA_OPTS` and `CELEBORN_WOKRER_JAVA_OPTS` for production environment. `AsyncLoggerContextSelector` depends on disruptor dependency. Therefore, it's recommend to introduce disruptor dependency to support log4j2 asynchronous loggers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Cluster test.
Closes#3246 from SteNicholas/CELEBORN-1994.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>