Commit Graph

594 Commits

Author SHA1 Message Date
daowu.hzy
3efc60cc9f [CELEBORN-2047] Reuse FileChannel/FSDataInputStream in PartitionDataReader
Some checks failed
Celeborn SBT CI / spark3 (8, 2.12.15, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.15, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.17, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.18, 2.12, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.18, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.5, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark4 (17, 2.13.16, 2.13, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 4.0) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.16, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.16, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.17, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.17, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.18, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.18, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.19, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.19, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.20, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.20, 8) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.0, 11) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.0, 17) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.1, 11) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.1, 17) (push) Has been cancelled
Celeborn SBT CI / mr (11) (push) Has been cancelled
Celeborn SBT CI / mr (8) (push) Has been cancelled
Celeborn SBT CI / openapi-codegen-check (11) (push) Has been cancelled
Style check / License (push) Has been cancelled
Lint Check for Web / lint (push) Has been cancelled
### What changes were proposed in this pull request?

Reuse FileChannel/FSDataInputStream in PartitionDataReader to avoid  open too many files.

### Why are the changes needed?

In previous implementations, different PartitionDataReader reused the same FileChannel/FSDataInputStream,but in #3349
changed this logic, so we need to maintain logical consistency

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manual test.

Closes #3445 from Alibaba-HZY/mirror-fix.

Authored-by: daowu.hzy <daowu.hzy@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 16:14:56 +08:00
xxx
449cb5d588 [CELEBORN-2127] When fileWriter is closed, it should return HARD_SPLIT StatusCode
…T StatusCode

### What changes were proposed in this pull request?

When fileWriter is closed, it should return HARD_SPLIT StatusCode

### Why are the changes needed?

When fileWriter is closed, it should return HARD_SPLIT StatusCode

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3448 from xy2953396112/CELEBORN-2127.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 16:04:32 +08:00
xxx
750aeefbc6 [CELEBORN-2139] Fix the condition for using OSS storage
### What changes were proposed in this pull request?

Fix the condition for using OSS storage

### Why are the changes needed?

When OSS is enabled, the local disk should be used first if it is available.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3463 from xy2953396112/CELEBORN-2139.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 15:40:42 +08:00
taowenjun
6c102441c3 [CELEBORN-2138] Avoiding multiple accesses to HDFS When writting index file
### What changes were proposed in this pull request?

Avoiding multiple accesses to HDFS When writting index file.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #3460 from taowenjun/CELEBORN-2138.

Authored-by: taowenjun <1483633867@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 10:43:13 +08:00
SteNicholas
7f08eb8f1d [CELEBORN-2137] Remove unused MAPGROUP PartitionType
### What changes were proposed in this pull request?

Remove unused `MAPGROUP` `PartitionType`.

### Why are the changes needed?

`PartitionType` `MAPGROUP` is unused at present, which could be removed.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3459 from SteNicholas/CELEBORN-2137.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 09:58:07 +08:00
dz
2817f7fb9e [CELEBORN-2104] Clean up sources of NettyRpcEnv, Master and Worker to avoid thread leaks
### What changes were proposed in this pull request?

Fix NettyRpcEnv,Master,Worker `Source` to avoid thread leak

### Why are the changes needed?

1. NettyRpcEnv should clean rpcSource to prevent resource leak.
2. Master clean resourceConsumptionSource, masterSource, threadPoolSource, jVMSource, jVMCPUSource, systemMiscSource
3. Worker clean clean workerSource.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

local

Closes #3418 from xy2953396112/CELEBORN-2104.

Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-29 19:04:19 +08:00
xxx
db70473de2 [CELEBORN-2134] When creating a DiskFile, retrieve the storage type b…
…ased on the mount point

### What changes were proposed in this pull request?

 When creating a DiskFile, retrieve the storage type based on the mount point

### Why are the changes needed?

If the worker disk is an SSD, it should be set to the SSD type, and the storage type can be retrieved based on the mount point.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3456 from xy2953396112/CELEBORN-2134.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-28 17:21:51 +08:00
xxx
d4e13b6ba2 [CELEBORN-2128] Close hadoopFs FileSystem when worker is closed
### What changes were proposed in this pull request?

Close hadoopFs FileSystem when worker is closed.

### Why are the changes needed?

When the worker is closed, close the hadoopFs FileSystem to avoid resource leakage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3449 from xy2953396112/CELEBORN-2128.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-27 14:12:55 +08:00
sychen
679df6c0f5 [CELEBORN-2125] Imporve PartitionFilesSorter sort timeout log
### What changes were proposed in this pull request?

### Why are the changes needed?
log only outputs fileid, exception only has file path, and no file length.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

```
25/08/25 18:06:37,083 ERROR [pool-1-thread-1] PartitionFilesSorter: Sorting file application-1-/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/Celeborn1344631182030527062sort-suite path /var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/Celeborn1344631182030527062sort-suite length 2453444321 timeout after 1ms
```

Closes #3446 from cxzl25/CELEBORN-2125.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-26 19:23:22 +08:00
xxx
a9490d6e24 [CELEBORN-2118] Introduce IsHighWorkload metric to monitor worker overload status
### What changes were proposed in this pull request?

Introduce `IsHighWorkload` metric to monitor worker overload status.

### Why are the changes needed?

There is no any metric to monitor worker overload status at present.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

[Grafana test](https://xy2953396112.grafana.net/public-dashboards/22ab1750ef874a1bb39b5879b81a24cf).

Closes #3435 from xy2953396112/CELEBORN-2118.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-25 20:46:17 +08:00
xxx
1d6299717f [CELEBORN-2123] Add log for commit file size
### What changes were proposed in this pull request?

Add log for commit file size.

### Why are the changes needed?

Statistics on the file size information of successfully committed files enables offline analysis of file write sizes, assessment of the proportion of small files, and implementation of optimizations based on file size data.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3444 from xy2953396112/CELEBORN-2123.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-25 17:18:32 +08:00
xxx
661a096b77 [CELEBORN-2112] Introduce PausePushDataStatus and PausePushDataAndReplicateStatus metric to record status of pause push data
### What changes were proposed in this pull request?

Add `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric.

### Why are the changes needed?

Introduce `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric to record status of pause push data.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual test. [Grafana](https://xy2953396112.grafana.net/public-dashboards/21af8e2844234c438e74c741211f0032)

Closes #3426 from xy2953396112/CELEBORN-2112.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-21 11:17:44 +08:00
dz
8a37a7ca17 [CELEBORN-2106] CommitFile/Reserved location shows detail primary location UniqueId
…ation UniqueId info

### What changes were proposed in this pull request?

CommitFile/Reserved location shows detail primary location UniqueId info

### Why are the changes needed?

CommitFile/Reserved should display detailed partitionLocation uniqueId logs to facilitate troubleshooting.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3420 from xy2953396112/controller_log.

Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-21 10:45:30 +08:00
dz
11b41f97ad [CELEBORN-2102] Introduce SorterCacheHitRate metric to monitor the hit reate of index cache for sorter
### What changes were proposed in this pull request?

Introduce `SorterCacheHitRate` metric to monitor the hit reate of index cache for sorter.

### Why are the changes needed?

Monitor the hit rate of `PartitionFilesSorter#indexCache`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The verified grafana dashboard: https://xy2953396112.grafana.net/public-dashboards/5d1177ee0f784b53ad817fde919141b7

Closes #3416 from xy2953396112/CELEBORN_2102.

Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-20 10:47:38 +08:00
xxx
b537798e37 [CELEBORN-2108] Remove redundant PartitionType
### What changes were proposed in this pull request?

Remove redundant `PartitionType`.

### Why are the changes needed?

`PartitionType` is included in `PartitionDataWriterContext`, therefore it is not necessary to use `PartitionType` as method parameter.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3422 from xy2953396112/remove_useless_partition_type.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-19 16:38:19 +08:00
SteNicholas
adfc563828 [CELEBORN-2119] DfsTierWriter should close s3MultipartUploadHandler and ossMultipartUploadHandler for close resource
### What changes were proposed in this pull request?

`DfsTierWriter` should close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` for close resource to avoid resource leak for destroy file writer.

### Why are the changes needed?

`DfsTierWriter` does not close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` in `closeResource`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3433 from SteNicholas/CELEBORN-2119.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-19 14:57:16 +08:00
Jray
eb5e8a46f8 [CELEBORN-2091] Support Zstd Decompression in CppClient
### What changes were proposed in this pull request?
This PR adds support for zstd decompression in CppClient.

### Why are the changes needed?
To support reading from Celeborn with CppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By compilation and UTs.

Closes #3411 from Jraaay/feat/cpp_client_zstd_decompression.

Authored-by: Jray <1075860716@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-11 15:55:32 +08:00
Jray
cfb490c938 [CELEBORN-2090] Support Lz4 Decompression in CppClient
### What changes were proposed in this pull request?
This PR adds support for lz4 decompression in CppClient.

### Why are the changes needed?
To support reading from Celeborn with CppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By compilation and UTs.

Closes #3402 from Jraaay/feat/cpp_client_lz4_decompression.

Authored-by: Jray <1075860716@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-08 18:19:48 +08:00
TheodoreLx
1ead784fa1 [CELEBORN-2085] Use a fixed buffer for flush copying to reduce GC
### What changes were proposed in this pull request?

Apply for a byte array in advance and use it as a transfer when copying is needed during flush

### Why are the changes needed?

For HdfsFlushTask, OssFlushTask, and S3FlushTask, you need to copy the CompositeByteBuf in the parameter to a byte array when flushing, and then use the respective clients to write the byte array to the storage.
When the flush throughput rate is very high, this copying will cause very serious GC problems and affect the performance of the worker

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

cluster test

Closes #3394 from TheodoreLx/copy-on-flush.

Authored-by: TheodoreLx <1548069580@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-08-08 13:57:21 +08:00
SteNicholas
5a459250b0
[CELEBORN-1844][FOLLOWUP] Fix the condition of StoragePolicy that worker uses memory storage
### What changes were proposed in this pull request?

Fix the condition of `StoragePolicy` that worker uses memory storage

### Why are the changes needed?

The condition of `StoragePolicy` that worker uses memory storage is `order.contains(StorageInfo.Type.MEMORY.name())`, which condition is wrong because `Option#contains` is as follows:

```
final def contains[A1 >: A](elem: A1): Boolean = !isEmpty && this.get == elem
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3408 from SteNicholas/CELEBORN-1844.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-06 10:36:07 +08:00
liuyang62
fdcc108689 [CELEBORN-1792][FOLLOWUP] Add missing break in resumeByPinnedMemory
### What changes were proposed in this pull request?
Add missing break in resumeByPinnedMemory

### Why are the changes needed?
Avoid execute `resumePush` twice when resume by pinned memory from `PUSH_AND_REPLICATE_PAUSED` state.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA and cluster tests.

Closes #3407 from Flyangz/bugfix/fix-resumeByPinnedMemory-switch.

Authored-by: liuyang62 <liuyang62@staff.sina.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-05 20:32:55 +08:00
Wang, Fei
604485779c [CELEBORN-2092] Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout
### What changes were proposed in this pull request?
Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout

### Why are the changes needed?

1. the COMMIT_FILES_FAIL_COUNT is 0 even we meet SHUFFLE_DATA_LOST caused by commit files failure

Spark executor log:
```

25/07/30 10:10:39 WARN CelebornShuffleReader: Handle fetch exceptions for 0-0org.apache.celeborn.common.exception.CelebornIOException: Failed to load file group of shuffle 0 partition 441! Request GetReducerFileGroup(0,false,V1) return SHUFFLE_DATA_LOST for 0.
```

Spark driver log:
```
25/07/30 10:10:38 ERROR ReducePartitionCommitHandler: Failed to handle stageEnd for 0, lost file!

25/07/30 10:10:38 ERROR ReducePartitionCommitHandler:
For shuffle application_1750652300305_10219240_1-0 partition data lost:
Lost partition 307-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
Lost partition 1289-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
```

Worker log:
```
java.io.IOException: Wait pending actions timeout.
	at org.apache.celeborn.service.deploy.worker.storage.TierWriterBase.waitOnNoPending(TierWriter.scala:158)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Closes #3403 from turboFei/commit_failed.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-07-31 21:12:21 -07:00
SteNicholas
392f6186df [CELEBORN-2086] S3FlushTask and OssFlushTask should close ByteArrayInputStream to avoid resource leak
### What changes were proposed in this pull request?

`S3FlushTask` and `OssFlushTask` should close `ByteArrayInputStream` to avoid resource leak.

### Why are the changes needed?

`S3FlushTask` and `OssFlushTask` don't close `ByteArrayInputStream` at present, which may cause resource leak.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3395 from SteNicholas/CELEBORN-2086.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-07-29 17:19:18 +08:00
Kalvin2077
c6e68fddfa [CELEBORN-2053] Refactor remote storage configration usage
### What changes were proposed in this pull request?

Refactoring similar code about configuration usage.

### Why are the changes needed?

Improve scalability for possible new remote storage in the future.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.

Closes #3353 from Kalvin2077/draft.

Authored-by: Kalvin2077 <wk.huang2077@outlook.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-07-28 16:56:32 +08:00
SteNicholas
ae40222351 [CELEBORN-2047] Support MapPartitionData on DFS
### What changes were proposed in this pull request?

Support `MapPartitionData` on DFS.

### Why are the changes needed?

`MapPartitionData` only supports on local, which does not support on DFS. It's recommended to support `MapPartitionData` on DFS for MapPartition mode to align the ability of ReducePartition mode.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`WordCountTestWithHDFS`.

Closes #3349 from SteNicholas/CELEBORN-2047.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-07-26 22:11:32 +08:00
sychen
abd6233a50 [CELEBORN-2081] PushDataHandler onFailure log shuffle key
### What changes were proposed in this pull request?

### Why are the changes needed?

`id-epoch` cannot locate which application is specific. We can add shuffle key to the log.

```
25/07/25 04:23:06,439 ERROR [celeborn-push-timeout-checker-0] PushDataHandler: PushMergedData replicate failed for partitionLocation: PartitionLocation[
  id-epoch:1623-0
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

Closes #3390 from cxzl25/CELEBORN-2081.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-07-25 20:53:46 +08:00
Wang, Fei
8a0d0d5fd4 [CELEBORN-2075] Fix OpenStreamTime metrics for PbOpenStreamList request
### What changes were proposed in this pull request?
Fix OpenStreamTime metrics for PbOpenStreamList request

### Why are the changes needed?

For `PbOpenStreamList` request, the `OpenStreamTime` metrics is not calculated.

cf3c05d668/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala (L140-L160)

And the   `workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey) ` is meaningless inside `handleReduceOpenStreamInternal`. cf3c05d668/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala (L335-L341)

The `handleReduceOpenStreamInternal` is called in
1. inside `handleOpenStreamInternal`, the `OpenStreamTime` metrics is handled correctly
2. for `PbOpenStreamList, the `OpenStreamTime` metrics is not handled.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually testing.

Closes #3376 from turboFei/buf_size.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-07-22 17:52:38 +08:00
TheodoreLx
d09b424756 [CELEBORN-2061] Introduce metrics to count the amount of data flushed into different storage types
Added metrics for the amount of data written to different storage types, including Local, HDFS, OSS, and S3

Currently, there is a lack of data volume written to each storage, and it is impossible to monitor the size and speed of writing.

no

Cluster Test

Closes #3362 from TheodoreLx/add-flush-count-metric.

Authored-by: TheodoreLx <1548069580@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-07-21 19:38:35 +08:00
Wang, Fei
979f2e2148 [CELEBORN-2073] Fix PartitionFileSizeBytes metrics
### What changes were proposed in this pull request?

This PR fix two issues:

1. followup https://github.com/apache/celeborn/pull/3047, the metrics positions for `PartitionFileSizeBytes` on grafana dashboard are wrong.
2. follow up https://github.com/apache/celeborn/pull/3085, PartitionFileSizeBytes does not work.

### Why are the changes needed?

1. The metrics positions are not correct, they should be placed under `Worker` row. But now, they are at the end.
<img width="1727" height="247" alt="image" src="https://github.com/user-attachments/assets/87a7eb1d-e296-4730-8986-efbf48aa35e6" />

2. the metrics does not work after 951b626a98 (diff-93aed69b393af59cefdfa6f5293f4dfb9cba96a9be23f3eec0bbe7d61f6d65be)
<img width="2072" height="282" alt="image" src="https://github.com/user-attachments/assets/28d0b404-914a-49e5-ac71-f399b3c3d44a" />

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

1. the metrics position looks good now.
<img width="1703" height="534" alt="image" src="https://github.com/user-attachments/assets/f5b78d37-9d84-4241-9285-e9a2ba0b12b2" />

2. UT

Closes #3374 from turboFei/fix_metrics_pos.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-07-21 14:25:06 +08:00
SteNicholas
6a0e19c076 [CELEBORN-2067] Clean up deprecated Guava API usage
### What changes were proposed in this pull request?

Clean up deprecated Guava API usage.

### Why are the changes needed?

There are deprecated Guava API usage, including:

1. Made modifications to Throwables.propagate with reference to https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate

- For cases where it is known to be a checked exception, including `IOException`, `GeneralSecurityException`, `SaslException`, and `RocksDBException`, none of which are subclasses of RuntimeException or Error, directly replaced Throwables.propagate(e) with `throw new RuntimeException(e);`.

- For cases where it cannot be determined whether it is a checked exception or an unchecked exception or Error, use

```
throwIfUnchecked(e);
throw new RuntimeException(e);
```

to replace `Throwables.propagate(e)`.

0c33dd12b1/guava/src/com/google/common/base/Throwables.java (L199-L235)

```
  /**
   * ...
   * deprecated To preserve behavior, use {code throw e} or {code throw new RuntimeException(e)}
   *     directly, or use a combination of {link #throwIfUnchecked} and {code throw new
   *     RuntimeException(e)}. But consider whether users would be better off if your API threw a
   *     different type of exception. For background on the deprecation, read <a
   *     href="https://goo.gl/Ivn2kc">Why we deprecated {code Throwables.propagate}</a>.
   */
  CanIgnoreReturnValue
  J2ktIncompatible
  GwtIncompatible
  Deprecated
  public static RuntimeException propagate(Throwable throwable) {
    throwIfUnchecked(throwable);
    throw new RuntimeException(throwable);
  }
```

Backport https://github.com/apache/spark/pull/48248

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3367 from SteNicholas/CELEBORN-2067.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-07-18 17:39:50 +08:00
mingji
532cedbfd2 [CELEBORN-1844][FOLLOWUP] alway try to use memory storage if available
### What changes were proposed in this pull request?
Try to use memory storage first if it is available.
To increase performance, if a cluster is set to use MEMORY and HDFS.

### Why are the changes needed?
To keep the old behavior as in release 0.5, always try to use memory storage first because the slots allocator won't allocate slots on memory storage.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA.

Closes #3352 from FMX/b1844-2.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-07-10 15:55:29 +08:00
codenohup
0fa600ade1 [CELEBORN-2055] Fix some typos
### What changes were proposed in this pull request?
Inspired by [FLINK-38038](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-38038?filter=allissues]), I used [Tongyi Lingma](https://lingma.aliyun.com/) and qwen3-thinking LLM to identify and fix some typo issues in the Celeborn codebase. For example:
- backLog → backlog
- won`t → won't
- can to be read → can be read
- mapDataPartition → mapPartitionData
- UserDefinePasswordAuthenticationProviderImpl → UserDefinedPasswordAuthenticationProviderImpl

### Why are the changes needed?
Remove typos to improve source code readability for users and ease development for developers.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Code and documentation cleanup does not require additional testing.

Closes #3356 from codenohup/fix-typo.

Authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-07-10 12:01:02 +08:00
daowu.hzy
41b5154030 [CELEBORN-2051] Support write MapPartition to DFS
### What changes were proposed in this pull request?

Support write map partition to DFS.

### Why are the changes needed?

`DiskFileInfo` is wrong when flink write,it did not distinguish the types of FileMeta.
indexBuffer will also fail when flink write in the dfs case,because indexBuffer.array will throw `UnsupportedOperationException`(it is allocated by direct memory).

### Does this PR introduce any user-facing change?

No.

###  How was this patch tested?

Manual test.

Closes #3350 from Alibaba-HZY/fix-flink-writer-hdfs.

Lead-authored-by: daowu.hzy <daowu.hzy@alibaba-inc.com>
Co-authored-by: Nicholas Jiang <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-07-03 11:41:10 +08:00
Gaurav Mittal
cde33d953b [CELEBORN-894] End to End Integrity Checks
### What changes were proposed in this pull request?
Design doc - https://docs.google.com/document/d/1YqK0kua-5rMufJw57kEIrHHGbLnAF9iXM5GdDweMzzg/edit?tab=t.0#heading=h.n5ldma432qnd

- End to End integrity checks provide additional confidence that Celeborn is producing complete as well as correct data
- The checks are hidden behind a client side config that is false by default. Provides users optionality to enable these if required on a per app basis
- Only compatible with Spark at the moment
- No support for Flink (can be considered in future)
- No support for Columnar Shuffle (can be considered in future)

Writer
- Whenever a mapper completes, it reports crc32 and bytes written on a per partition basis to the driver

Driver
- Driver aggregates the mapper reports - and computes aggregated CRC32 and bytes written on per partitionID basis

Reader
- Each CelebornInputStream will report (int shuffleId, int partitionId, int startMapIndex, int endMapIndex, int crc32, long bytes) to driver when it finished reading all data on the stream
- On every report
  - Driver will aggregate the CRC32 and bytesRead for the partitionID
  - Driver will aggregate mapRange to determine when all sub-paritions have been read for partitionID have been read
  - It will then compare the aggregated CRC32 and bytes read with the expected CRC32 and bytes written for the partition
  - There is special handling for skewhandlingwithoutMapRangeSplit scenario as well
  - In this case, we report the number of sub-partitions and index of the sub-partition instead of startMapIndex and endMapIndex

There is separate handling for skew handling with and without map range split

As a follow up, I will do another PR that will harden up the checks and perform additional checks to add book keeping that every CelebornInputStream makes the required checks

### Why are the changes needed?
https://issues.apache.org/jira/browse/CELEBORN-894

Note: I am putting up this PR even though some tests are failing, since I want to get some early feedback on the code changes.

### Does this PR introduce _any_ user-facing change?
Not sure how to answer this. A new client side config is available to enable the checks if required

### How was this patch tested?
Unit tests + Integration tests

Closes #3261 from gauravkm/gaurav/e2e_checks_v3.

Lead-authored-by: Gaurav Mittal <gaurav@stripe.com>
Co-authored-by: Gaurav Mittal <gauravkm@gmail.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-06-28 09:19:57 +08:00
SteNicholas
3ee3a26220 [CELEBORN-2046] Specify extractionDir of AsyncProfilerLoader with celeborn.worker.jvmProfiler.localDir
### What changes were proposed in this pull request?

Specify `extractionDir` of `AsyncProfilerLoader` with `celeborn.worker.jvmProfiler.localDir`.

### Why are the changes needed?

`AsyncProfilerLoader` uses `user.home` directory to store the extracted libraries by default . When `user.home` directory is not initialized, it will cause `AsyncProfilerLoader#load` to fail. `extractionDir` of `AsyncProfilerLoader` could be specified with `celeborn.worker.jvmProfiler.localDir` to avoid failure of loading.

Backport https://github.com/apache/spark/pull/51229.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual test.

Closes #3345 from SteNicholas/CELEBORN-2046.

Lead-authored-by: SteNicholas <programgeek@163.com>
Co-authored-by: 子懿 <ziyi.jxf@antgroup.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-06-25 10:15:38 -07:00
Mridul Muralidharan
8ae9737601 [CELEBORN-2044] Proactively cleanup stream state from ChunkStreamManager when the stream ends
### What changes were proposed in this pull request?
Proactively cleanup from ChunkStreamManager when stream is closed.

### Why are the changes needed?

Stream gets closed only when shuffle expires at master, which can take a while.
In meantime, workers incur have nontrivial memory utilization.

### Does this PR introduce _any_ user-facing change?

No. Reduces memory usage in workers.

### How was this patch tested?

Existing unit tests

Closes #3343 from mridulm/proactively-cleanup-streams.

Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2025-06-24 12:51:00 -05:00
Shuang
582726fff8 [CELEBORN-1721][FOLLOWUP] Return softsplit if there is no hardsplit for pushMergeData
### What changes were proposed in this pull request?
Return SoftSplit status when there is no hard split for pushMergeData

### Why are the changes needed?
HardSplit support was introduced in [CELEBORN-1721](https://issues.apache.org/jira/browse/CELEBORN-1721), and it works well when the client and server are of the same version. However, using a 0.5.x client with a 0.6.x server can significantly impact shuffle write performance. This is because the 0.6.x server returns hardsplit status whenever there is any soft or hard split, leading the 0.5.x client to perform hardsplit on every partition. To maintain compatibility during upgrades, it's essential to preserve the original behavior for 0.5.x clients.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GA & 1T tpcds with 0.5.4 version client, according to the test results, after applying this PR, revive decreased significantly, and performance improved.

#### test use 0.6.0-rc2 server, 0.5.4 client
![image](https://github.com/user-attachments/assets/f9d640d7-1dc4-438b-8320-428a7d23dc93)

#### test use 0.7.0 server + this pr,  0.5.4 client
![image](https://github.com/user-attachments/assets/d198055a-698c-48ec-9246-9170d2ac64cc)

Closes #3342 from RexXiong/CELEBORN-1721-FOLLOWUP.

Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-06-23 17:32:10 -07:00
mingji
676beca616 [CELEBORN-2043] Fix IndexOutOfBoundsException exception in getEvictedFileWriter
### What changes were proposed in this pull request?
If the create file order list doesn't have the corresponding storage types, use 0 as the index.

### Why are the changes needed?
This is needed in two scenarios:
1. For existing clients, the change partition will select MEMORY as the default storage tier, which will cause the revived partition to utilize memory storage even if the application is not configured to do so. This is the cause of [CELEBORN-2043], and fixed by [CELEBORN-2021](https://github.com/apache/celeborn/pull/3302).
2. Evicted files will need to exclude itself storage type in the create file order list. which means that the storage type does not exist in the create file order list.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA and cluster.

Closes #3344 from FMX/b2043.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-06-23 00:49:26 -07:00
Sanskar Modi
2a2c6e4687 [CELEBORN-2024] Publish commit files fail count metrics
<!--
Thanks for sending a pull request!  Here are some tips for you:
  - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
  - Be sure to keep the PR description updated to reflect all changes.
  - Please write your PR title to summarize what this PR proposes.
  - If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
Added a commit files request fail count metric.

### Why are the changes needed?
To monitor and tune the configurations around the commit files workflow.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Local setup

<img width="739" alt="Screenshot 2025-06-04 at 10 51 06 AM" src="https://github.com/user-attachments/assets/d6256028-d8b7-4a81-90b1-3dcbf61adeba" />

Closes #3307 from s0nskar/commit_metric.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-06-17 11:52:45 -07:00
Sanskar Modi
80bdb46801 [CELEBORN-1892] Adding register with master fail count metric for worker
### What changes were proposed in this pull request?

Adding register with master fail count metric for worker

### Why are the changes needed?

This will help put monitoring around if workers are not able to register with master like wrong endpoints are passed or master becomes unavailable.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
Local setup

<img width="724" alt="Screenshot 2025-06-04 at 10 44 56 AM" src="https://github.com/user-attachments/assets/1f84557b-5df8-422f-b602-bb5316a72a0e" />

Closes #3308 from s0nskar/worker_register_metric.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-06-11 11:04:59 -07:00
Xianming Lei
bbd3bb4814 [CELEBORN-2033] updateProduceBytes should be called even if updateProduceBytes throws exception
### What changes were proposed in this pull request?
updateProduceBytes should be called even if updateProduceBytes throws exception

### Why are the changes needed?
To make UserProduceSpeed ​​metrics more accurate

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #3322 from leixm/CELEBORN-2033.

Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-06-11 10:54:24 -07:00
codenohup
feba7baec6 [CELEBORN-2029][FLINK] Some minor optimizations in the Flink integration
### What changes were proposed in this pull request?
Some minor performance optimizations in the internal implementation

### Why are the changes needed?
During our use of Flink with Celeborn, we identified several minor optimizations that can be made:

1. In the client side, the Flink-Celeborn client parses the `pushDataTimeout` configuration too frequently, which is unnecessary and cpu-intensive.

2. On the worker side, Celeborn needs to filter readers that are able for reading. However, using Java Stream's collection operations is costly in terms of performance.

3. Also on the worker side, Celeborn currently checks whether a reader can continue reading by comparing the current read offset with the total file size. This check involves retrieving the total file size, which is an expensive operation. Since this value is constant, it should be cached in memory instead of being fetched multiple times.

4. In the Flink’s hybrid shuffle integration, the `EndOfSegment` event should not be bundled with data buffers. If it is, there is a risk of data corruption or misinterpretation.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual test.

Closes #3318 from codenohup/CELEBORN-2029.

Authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-06-09 16:37:40 +08:00
Shuang
da84baed4d [CELEBORN-2027] Allow CelebornShuffleReader to decompress data on demand
### What changes were proposed in this pull request?
At a new parameter, allow CelebornShuffleReader to decompress data on demand

### Why are the changes needed?
In the current scenario of Gluten Fallback, a stage can have both Native and Java shuffles simultaneously. In [CELEBORN-1625](https://issues.apache.org/jira/browse/CELEBORN-1625), we introduced a new parameter to conditionally skip compression. Alongside this, we should add a corresponding parameter to ensure that the Celeborn shuffle reader does not decompress data when compression is skipped at pushOrMergeData.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GA

Closes #3317 from RexXiong/CELEBORN-2027.

Authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-06-09 15:10:46 +08:00
nicolas.fraison@datadoghq.com
061cdc3820 [CELEBORN-2003] Add retry mechanism when completing S3 multipart upload
### What changes were proposed in this pull request?

Add a retry mechanism when completing S3 multipart upload to ensure that completeMultipartUpload is retry when facing retryable exception like SlowDown one

### Why are the changes needed?

While running a “simple” spark jobs creating 10TiB of shuffle data (repartition from 100k partition to 20) the job was constantly failing when all files should be committed. relying on SOFT `celeborn.client.shuffle.partitionSplit.mode`

Despite an increase of `celeborn.storage.s3.mpu.maxRetries` up to `200`. Job was still failing due to SlowDown exception
Adding some debug logs on the retry policy from AWS S3 SDK I've seen that the policy is never called when doing completeMultipartUpload action while it is well called on other actions. See https://issues.apache.org/jira/browse/CELEBORN-2003

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Created a cluster on a kubernetes server relying on S3 storage.
Launch a 10TiB shuffle from 100000 partitions to 200 partitions with SOFT `celeborn.client.shuffle.partitionSplit.mode`
The job succeed and well display some warn logs indicating that the `completeMultipartUpload` is retried due to SlowDown:
```
bucket ******* key poc/spark-2c86663c948243d19c127e90f704a3d5/0/35-39-0 uploadId Pbaq.pp1qyLvtGbfZrMwA8RgLJ4QYanAMhmv0DvKUk0m6.GlCKdC3ICGngn7Q7iIa0Dw1h3wEn78EoogMlYgFD6.tDqiatOTbFprsNkk0qzLu9KY8YCC48pqaINcvgi8c1gQKKhsf1zZ.5Et5j40wQ-- upload failed to complete, will retry (1/10)
com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: null; Status Code: 0; Error Code: SlowDown; Request ID: RAV5MXX3B9Z3ZHTG; S3 Extended Request ID: 9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S; Proxy: null), S3 Extended Request ID: 9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S 	at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$CompleteMultipartUploadHandler.doEndElement(XmlResponsesSaxParser.java:1906)
```

Closes #3293 from ashangit/nfraison/CELEBORN-2003.

Authored-by: nicolas.fraison@datadoghq.com <nicolas.fraison@datadoghq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-06-06 10:15:26 +08:00
mingji
7bde738e5e [CELEBORN-2021] Fix issues on regression HDFS and OSS before release 0.6
### What changes were proposed in this pull request?
1. Fix a NPE when reading HDFS files.
2. Change partition manager will generate correct storage info.
3. Add assertions for tier writers.

### Why are the changes needed?
Regression for release 0.6.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
Cluster tests.

Closes #3302 from FMX/b2021.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-06-04 15:37:50 +08:00
Sanskar Modi
082f0dd8c5 [CELEBORN-1775][FOLLOWUP] Improve logging around commit files
### What changes were proposed in this pull request?
Minor logging improvement around commit files to log shuffleKey.

### Why are the changes needed?
Ditto.

### Does this PR introduce _any_ user-facing change?
Some logs will change.

### How was this patch tested?
NA

Closes #3270 from s0nskar/CELEBORN-1775.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-05-21 16:37:38 -07:00
SteNicholas
46d9d63e1f [CELEBORN-1916][FOLLOWUP] Improve Aliyun OSS support
### What changes were proposed in this pull request?

Improve Aliyun OSS support including `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`.

### Why are the changes needed?

There are many methods where OSS support is lacking in `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3268 from SteNicholas/CELEBORN-1916.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-05-21 11:44:50 +08:00
zhengtao
e8ae23bc7a [CELEBORN-1960] Fix PauseSpentTime only append the interval check time
### What changes were proposed in this pull request?
Fix the pauseTime metrics count error.

### Why are the changes needed?
Assume that 0 is NONE PAUSED status, 1 is PAUSE PUSH, 2 is PAUSE PUSH AND REPLICATE.
Every check interval will record the status.
Here is the status changements:
0 -> 0 -> 1 -> 1 -> 1 -> 1 -> 2 -> 2 -> 2 -> 1 -> 1 -> 1 -> 0 -> 0 -> 0

The previous code only count the interval time and every interval time will update the pauseStartTime.
ps(pauseStartTime), pe(pauseEndTime)
0 -> 0 -> 1(ps)-> 1(ps ) -> 1(ps ) -> 1(ps) -> 2(rs) -> 2(rs) -> 2(re-rs) -> 1(ps) -> 1(ps) ->` _1(ps) -> 0(pe)_` -> 0 -> 0

It should be
0 -> 0 -> 1(ps)-> 1 -> 1-> 1 -> 2(rs) -> 2 -> 2 -> 1(re) -> 1-> 1 -> 0(pe) -> 0 -> 0

0 -> 0 -> 1(ps)-> 1-> 1 -> 0(pe) -> 0 -> 0

0 -> 0 -> 2(ps, rs)-> 2-> 2 -> 0(pe, re) -> 0

0 -> 0 -> 1(ps)-> 1-> 2(rs) -> 2 -> 0(pe, re)

0 -> 0 -> 2(ps, rs)-> 2-> 1(re) -> 1 -> 0(pe)

The pauseRpelicaTime should include pausePushTime.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #3207 from zaynt4606/clb1960.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-05-15 23:15:28 +08:00
Xianming Lei
d03efcbdb3 [CELEBORN-1999] OpenStreamTime should use requestId to record cost time
### What changes were proposed in this pull request?
OpenStreamTime should use requestId to record cost time instead of shuffleKey

### Why are the changes needed?
OpenStreamTime is wrong because there will be multiple OpenStream requests for the same shuffleKey in the same time period.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #3258 from leixm/CELEBORN-1999.

Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-05-15 01:52:14 -07:00
SteNicholas
8e66ac833a [CELEBORN-1994] Introduce disruptor dependency to support asynchronous logging of log4j2
### What changes were proposed in this pull request?

Introduce disruptor dependency to support asynchronous logging of log4j2.

### Why are the changes needed?

We add `-Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector` in `CELEBORN_MASTER_JAVA_OPTS` and `CELEBORN_WOKRER_JAVA_OPTS` for production environment. `AsyncLoggerContextSelector` depends on disruptor dependency. Therefore, it's recommend to introduce disruptor dependency to support log4j2 asynchronous loggers.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Cluster test.

Closes #3246 from SteNicholas/CELEBORN-1994.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-05-13 19:45:51 +08:00