### What changes were proposed in this pull request?
Support to register application info with user identifier and extra info.
### Why are the changes needed?
To provide more insight for the application information.
### Does this PR introduce _any_ user-facing change?
A new RESTful API.
### How was this patch tested?
UT.
Closes#3428 from turboFei/app_info_uid.
Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Support Flink 2.1.
### Why are the changes needed?
Flink 2.1 has already released, which release notes refer to [Release notes - Flink 2.1](https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-2.1).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3404 from SteNicholas/CELEBORN-2093.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Support `MapPartitionData` on DFS.
### Why are the changes needed?
`MapPartitionData` only supports on local, which does not support on DFS. It's recommended to support `MapPartitionData` on DFS for MapPartition mode to align the ability of ReducePartition mode.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`WordCountTestWithHDFS`.
Closes#3349 from SteNicholas/CELEBORN-2047.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Design doc - https://docs.google.com/document/d/1YqK0kua-5rMufJw57kEIrHHGbLnAF9iXM5GdDweMzzg/edit?tab=t.0#heading=h.n5ldma432qnd
- End to End integrity checks provide additional confidence that Celeborn is producing complete as well as correct data
- The checks are hidden behind a client side config that is false by default. Provides users optionality to enable these if required on a per app basis
- Only compatible with Spark at the moment
- No support for Flink (can be considered in future)
- No support for Columnar Shuffle (can be considered in future)
Writer
- Whenever a mapper completes, it reports crc32 and bytes written on a per partition basis to the driver
Driver
- Driver aggregates the mapper reports - and computes aggregated CRC32 and bytes written on per partitionID basis
Reader
- Each CelebornInputStream will report (int shuffleId, int partitionId, int startMapIndex, int endMapIndex, int crc32, long bytes) to driver when it finished reading all data on the stream
- On every report
- Driver will aggregate the CRC32 and bytesRead for the partitionID
- Driver will aggregate mapRange to determine when all sub-paritions have been read for partitionID have been read
- It will then compare the aggregated CRC32 and bytes read with the expected CRC32 and bytes written for the partition
- There is special handling for skewhandlingwithoutMapRangeSplit scenario as well
- In this case, we report the number of sub-partitions and index of the sub-partition instead of startMapIndex and endMapIndex
There is separate handling for skew handling with and without map range split
As a follow up, I will do another PR that will harden up the checks and perform additional checks to add book keeping that every CelebornInputStream makes the required checks
### Why are the changes needed?
https://issues.apache.org/jira/browse/CELEBORN-894
Note: I am putting up this PR even though some tests are failing, since I want to get some early feedback on the code changes.
### Does this PR introduce _any_ user-facing change?
Not sure how to answer this. A new client side config is available to enable the checks if required
### How was this patch tested?
Unit tests + Integration tests
Closes#3261 from gauravkm/gaurav/e2e_checks_v3.
Lead-authored-by: Gaurav Mittal <gaurav@stripe.com>
Co-authored-by: Gaurav Mittal <gauravkm@gmail.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Fixes the FetchFailure handling logic in shouldReportShuffleFetchFailure method to properly handle cases where TaskSetManager cannot be found for a given task ID.
### Why are the changes needed?
The current implementation incorrectly reports FetchFailure when TaskSetManager is not found, which leads to false positive failures in normal fault tolerance scenarios. This happens because:
1. Executor Lost scenarios: When executors are lost due to resource preemption or failures, the associated TaskSetManager gets cleaned up, making it unavailable for lookup
2. Stage cancellation: Cancelled or completed stages may have their TaskSetManager removed
These are all normal scenarios in Spark's fault tolerance mechanism and should not be treated as shuffle failures. The current behavior can cause unnecessary job failures and confusion in debugging actual shuffle issues.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT, Long-running Production Validation
Closes#3339 from gaoyajun02/CELEBORN-2042.
Authored-by: gaoyajun02 <gaoyajun02@meituan.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Rename throwsFetchFailure to stageRerunEnabled
### Why are the changes needed?
Make the code cleaner.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
existing UTs.
Closes#3324 from leixm/CELEBORN-2035.
Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Record the last reported shuffle fetch failure task id.
### Why are the changes needed?
Because the reported shuffle fetch failure task id might be cleaned up fast after recorded.
To prevent flaky test, it is better to record the last reported task id for testing.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA for 3 times.
Closes#3301 from turboFei/app_id_debug.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Bump spark 4.0 version to 4.0.0.
### Why are the changes needed?
Spark 4.0.0 is ready.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes#3282 from turboFei/spark_4.0.
Lead-authored-by: Fei Wang <fwang12@ebay.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
it's a joint work with YutingWang98
currently we have to wait for spark shuffle object gc to clean disk space occupied by celeborn shuffles
As a result, if a shuffle is failed to fetch and retried , the disk space occupied by the failed attempt cannot really be cleaned , we hit this issue internally when we have to deal with 100s of TB level shuffles in a single spark application, any hiccup in servers can double even triple the disk usage
this PR implements the mechanism to delete files from failed-to-fetch shuffles
the main idea is actually simple, it triggers clean up in LifecycleManager when it applies for a new celeborn shuffle id for a retried shuffle write stage
the tricky part is that is to avoid delete shuffle files when it is referred by multiple downstream stages: the PR introduces RunningStageManager to track the dependency between stages
### Why are the changes needed?
saving disk space
### Does this PR introduce _any_ user-facing change?
a new config
### How was this patch tested?
we manually delete some files

from the above screenshot we can see that originally we have shuffle 0, 1 and after 1 faced with chunk fetch failure, it triggers a retry of 0 (shuffle 2), but at this moment, 0 has been deleted from the workers

in the logs, we can see that in the middle the application , the unregister shuffle request was sent for shuffle 0
Closes#3109 from CodingCat/delete_fi.
Lead-authored-by: CodingCat <zhunansjtu@gmail.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Ignore the getReducerFileGroup timeout before shuffle stage end.
### Why are the changes needed?
1. if the getReducerFileGroup timeout is caused by lifecycle manager commitFiles timeout(stage not ended)
2. maybe many tasks failed and would not report fetch failure
3. then it cause the spark application failed eventually.
The shuffle client should ignore the getReducerFileGroup timeout before LifeCycleManager commitFiles complete.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
Closes#3263 from turboFei/is_stage_end.
Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Aggregate push failed batch for the same map ID and attempt ID.
### Why are the changes needed?
To reduce memory usage.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster run.
Closes#3253 from FMX/b1995.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
for non barrier shuffle read stage, LifecycleManager#handleGetShuffleIdForApp always return appshuffleId whether fetch status is true or not.
### Why are the changes needed?
As described in [jira](https://issues.apache.org/jira/browse/CELEBORN-1855), If LifecycleManager only returns appshuffleId whose fetch status is success, the task will fail directly to "there is no finished map stage associated with", but previous fetch fail event reported may not be fatal.So just give it a chance
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#3090 from buska88/celeborn-1855.
Authored-by: lijianfu03 <lijianfu@meituan.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Client should send heartbeat to worker for processing heartbeat to avoid reading idleness of worker which enables heartbeat.
Follow up #1457.
### Why are the changes needed?
In Flink batch jobs, the following exception is caused by closed connection:
```
2025-04-27 23:30:28
java.io.IOException: Client /:9093 is lost, notify related stream 805472050177
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:145)
at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:136)
at org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:74)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:141)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.celeborn.common.network.client.ReconnectHandler.scheduleReconnect(ReconnectHandler.java:93)
at org.apache.celeborn.common.network.client.ReconnectHandler.channelInactive(ReconnectHandler.java:63)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:207)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
at org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:991)
```
The closed connection is caused by reading idleness of worker which enables heartbeat with troubleshooting via debug mode of log.
```
2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE: MessageWithHeader [headerLength: 17, bodyLength: 26]
2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ 38B
2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ COMPLETE
2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:32:31,823 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:32:31,824 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] CLOSE
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] INACTIVE
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] UNREGISTERED
```
The reading idleness of worker which enables heartbeat is resulted via one-way heartbeat from worker to client, which only keeps the channel of client active. Client should handle heartbeat to keep the channel of worker active via sending heartbeat to worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`HeartbeatTest`
Closes#3239 from SteNicholas/CELEBORN-1912.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Change partition manager relocates partition locations will respect the value of `celeborn.storage.availableTypes`.
### Why are the changes needed?
In the current implementation, partition locations created by the change partition manager will use all available storage tiers, which is unexpected.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes#3229 from FMX/b1979.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Remove `celeborn.client.shuffle.mapPartition.split.enabled` to enable shuffle partition split at default for MapPartition.
### Why are the changes needed?
The default value of `celeborn.client.shuffle.mapPartition.split.enabled` is false, which causes that file writer fills the disk for PushData as follows:
```
2025-04-15 20:20:56,759 [push-server-6-4] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-6] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-4] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-3] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-6] ERROR storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed, report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-4] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-1704-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-3] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-0] ERROR storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed, report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-6] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
```
It's recommended to remove celeborn.client.shuffle.mapPartition.split.enabled to enable shuffle partition split at default.
### Does this PR introduce _any_ user-facing change?
`celeborn.client.shuffle.mapPartition.split.enabled` is removed to enable shuffle partition split at default for MapPartition.
### How was this patch tested?
No.
Closes#3217 from SteNicholas/CELEBORN-1969.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Support Flink 2.0. The major changes of Flink 2.0 include:
- https://github.com/apache/flink/pull/25406: Bump target Java version to 11 and drop support for Java 8.
- https://github.com/apache/flink/pull/25551: Replace `InputGateDeploymentDescriptor#getConsumedSubpartitionIndexRange` with `InputGateDeploymentDescriptor#getConsumedSubpartitionRange(index)`.
- https://github.com/apache/flink/pull/25314: Replace `NettyShuffleEnvironmentOptions#NETWORK_EXCLUSIVE_BUFFERS_REQUEST_TIMEOUT_MILLISECONDS` with `NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_REQUEST_TIMEOUT`.
- https://github.com/apache/flink/pull/25731: Introduce `InputGate#resumeGateConsumption`.
### Why are the changes needed?
Flink 2.0 is released which refers to [Release notes - Flink 2.0](https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-2.0).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3179 from SteNicholas/CELEBORN-1925.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Weijie Guo <reswqa@163.com>
### What changes were proposed in this pull request?
For spark celeborn application, if the GetReducerFileGroupResponse is larger than the threshold, Spark driver would broadcast the GetReducerFileGroupResponse to the executors, it prevents the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).
### Why are the changes needed?
To prevent the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).
### Does this PR introduce _any_ user-facing change?
No, the feature is not enabled by defaults.
### How was this patch tested?
UT.
Cluster testing with `spark.celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.enabled=true`.
The broadcast response size should be always about 1kb.


Application succeed.

Closes#3158 from turboFei/broadcast_rgf.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
LifecycleManager respond to RegisterShuffle with max epoch PartitionLocation.
### Why are the changes needed?
Newly spun up executors in a Spark job will still get the partitionLocations with the minEpoch of the celeborn lost worker.
These executors will fail to connect to the lost worker and then call into revive to get the latest PartitionLocation for a given partitionId in `ChangePartitionManager.getLatestPartition()`.
Return with max epoch can reduce such revive requests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT.
Closes#3135 from zaynt4606/clb1822.
Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR partially reverts the change of https://github.com/apache/celeborn/pull/2813, namely, restores the renaming of `celeborn-client-spark-3`
### Why are the changes needed?
The renaming is not necessary, and might cause some confusion, for example, I wrongly interpreted the `spark-3-4` as Spark 3.4, it also increases the backport efforts for branch-0.5
### Does this PR introduce _any_ user-facing change?
No, it's dev only, before/after this change, the end users always use the shaded client
```
celeborn-client-spark-2-shaded_2.11-0.6.0-SNAPSHOT.jar
celeborn-client-spark-3-shaded_2.12-0.6.0-SNAPSHOT.jar
celeborn-client-spark-4-shaded_2.13-0.6.0-SNAPSHOT.jar
```
### How was this patch tested?
Pass GA.
Closes#3133 from pan3793/CELEBORN-1413-followup.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Add logic to support avoid sorting shuffle files for Reduce mode when optimize skew partitions
### Why are the changes needed?
Current logic need sorting shuffle files when read Reduce mode skew partition shuffle files, we found some shuffle sorting timeout and performance issue
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster test and uts
Closes#2373 from wangshengjie123/optimize-skew-partition.
Lead-authored-by: wangshengjie <wangshengjie3@xiaomi.com>
Co-authored-by: wangshengjie3 <wangshengjie3@xiaomi.com>
Co-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Shuang <lvshuang.tb@gmail.com>
Co-authored-by: wangshengjie3 <soldier.sj.wang@gmail.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Disable spark ui when run celeborn spark-it
### Why are the changes needed?
Enabling spark ui in integration tests does not make sense and will incur some overhead.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CI
Closes#3087 from zwangsheng/CELEBORN-1851.
Authored-by: binjie yang <yangbinjie@dewu.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Backport https://github.com/apache/celeborn/pull/3070 to main branch.
## What changes were proposed in this pull request?
Do not trigger fetch failure if a spark task attempt is interrupted(speculation enabled). Do not trigger fetch failure if the RPC of getReducerFileGroup is timeout. This PR is intended for celeborn-0.5 branch.
## Why are the changes needed?
Avoid unnecessary fetch failures and stage re-runs.
## Does this PR introduce any user-facing change?
NO.
## How was this patch tested?
1. GA.
2. Manually tested on cluster with spark speculation tasks.
Here is the test case
```scala
sc.parallelize(1 to 100, 100).flatMap(i => {
(1 to 150000).iterator.map(num => num)
}).groupBy(i => i, 100)
.map(i => {
if (i._1 < 5) {
Thread.sleep(15000)
}
i
})
.repartition(400).count
```
<img width="1384" alt="截屏2025-01-18 16 16 16" src="https://github.com/user-attachments/assets/adf64857-5773-4081-a7d0-fa3439e751eb" /> <img width="1393" alt="截屏2025-01-18 16 16 22" src="https://github.com/user-attachments/assets/ac9bf172-1ab4-4669-a930-872d009f2530" /> <img width="1258" alt="截屏2025-01-18 16 19 15" src="https://github.com/user-attachments/assets/6a8ff3e1-c1fb-4ef2-84d8-b1fc6eb56fa6" /> <img width="892" alt="截屏2025-01-18 16 17 27" src="https://github.com/user-attachments/assets/f9de3841-f7d4-4445-99a3-873235d4abd0" />
Closes#3070 from FMX/branch-0.5-b1838.
Authored-by: mingji <fengmingxiao.fmxalibaba-inc.com>
Closes#3080 from turboFei/b1838.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Rename `org.apache.celeborn.plugin.flink.readclient` to `org.apache.celeborn.plugin.flink.client`.
### Why are the changes needed?
`FlinkShuffleClientImpl` is designed to write and read shuffle data including pushing and fetching shuffle data. Therefore, the package name of `FlinkShuffleClientImpl` should use `org.apache.celeborn.plugin.flink.client`
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3048 from SteNicholas/shuffle-client-package.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Weijie Guo <reswqa@163.com>
### What changes were proposed in this pull request?
Remove out-of-dated flink 1.14 and 1.15.
For more information, please see the discussion thread: https://lists.apache.org/thread/njho00zmkjx5qspcrbrkogy8s4zzmwv9
### Why are the changes needed?
Reduce maintenance burden.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Changes can be covered by existing tests.
Closes#3029 from codenohup/remove-flink14and15.
Authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Add Tez packaging script.
### Why are the changes needed?
To support build tez client.
### Does this PR introduce _any_ user-facing change?
Yes, enable Celeborn with tez support.
### How was this patch tested?
Cluster test.
Closes#3028 from GH-Gloway/1737.
Lead-authored-by: hongguangwei <hongguangwei@bytedance.com>
Co-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
To support Spark 4.0.0 preview.
### Why are the changes needed?
1. Changed Scala to 2.13.
2. Introduce columnar shuffle module for spark 4.0.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Cluster test.
Closes#2813 from FMX/b1413.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Cancel all commit file jobs when handleCommitFiles timeout.
2. Fix timeout commit jobs wont be set `CommitInfo.COMMIT_FINISHED`
### Why are the changes needed?
1. Pending task in commitThreadPool wont be canceled.
3. Timeout commit jobs should set `commitInfo.status`.

### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT test.
The `commitInfo.status` should be `COMMIT_FINISHED` when commitFile jobs timeout.
Cluster test.

Closes#3004 from zaynt4606/clb1783.
Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
For RetryReviveTest, shutdownMiniCluster after each test
### Why are the changes needed?
Currently, the minicluster is not shutdown after each test.
ca8831e55f/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala (L43-L80)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA
Closes#3000 from turboFei/stop_miniCluster.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Fix commitInfo NPE in LifecycleManagerCommitFilesSuite. Not all the workers are assigned slots.
2. Add `assert` in the logic of judgement.
### Why are the changes needed?
Errors in CI.

### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing CI.
Closes#3001 from zaynt4606/clb1778.
Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Add tez integration tests
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2991 from GH-Gloway/1736.
Authored-by: hongguangwei <hongguangwei@bytedance.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Flink supports fallback to vanilla Flink built-in shuffle implementation.
### Why are the changes needed?
When quota is unenough or workers are unavailable, `RemoteShuffleMaster` does not support fallback to `NettyShuffleMaster`, and `RemoteShuffleEnvironment` does not support fallback to `NettyShuffleEnvironment` at present. Flink should support fallback to vanilla Flink built-in shuffle implementation for unenough quota and unavailable workers.

### Does this PR introduce _any_ user-facing change?
- Introduce `ShuffleFallbackPolicy` interface to determine whether fallback to vanilla Flink built-in shuffle implementation.
```
/**
* The shuffle fallback policy determines whether fallback to vanilla Flink built-in shuffle
* implementation.
*/
public interface ShuffleFallbackPolicy {
/**
* Returns whether fallback to vanilla flink built-in shuffle implementation.
*
* param shuffleContext The job shuffle context of Flink.
* param celebornConf The configuration of Celeborn.
* param lifecycleManager The {link LifecycleManager} of Celeborn.
* return Whether fallback to vanilla flink built-in shuffle implementation.
*/
boolean needFallback(
JobShuffleContext shuffleContext,
CelebornConf celebornConf,
LifecycleManager lifecycleManager);
}
```
- Introduce `celeborn.client.flink.shuffle.fallback.policy` config to support shuffle fallback policy configuration.
### How was this patch tested?
- `RemoteShuffleMasterSuiteJ#testRegisterJobWithForceFallbackPolicy`
- `WordCountTestBase#celeborn flink integration test with fallback - word count`
Closes#2932 from SteNicholas/CELEBORN-1700.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Use `setupMiniClusterWithRandomPorts` instead of `setUpMiniCluster`, and set `setUpMiniCluster` to be private.
### Why are the changes needed?
Fix flaky test.
```
HybridShuffleWordCountTest:
Nov 26, 2024 4:44:10 AM org.glassfish.jersey.server.wadl.WadlFeature configure
WARNING: JAXBContext implementation could not be found. WADL feature is disabled.
*** RUN ABORTED ***
java.io.IOException: Failed to bind to localhost/127.0.0.1:32886
at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:349)
at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:310)
at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:234)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73)
at org.apache.celeborn.server.common.http.HttpServer.start(HttpServer.scala:45)
at org.apache.celeborn.server.common.HttpService.startInternal(HttpService.scala:354)
at org.apache.celeborn.server.common.HttpService.startHttpServer(HttpService.scala:210)
at org.apache.celeborn.service.deploy.MiniClusterFeature.createMaster(MiniClusterFeature.scala:115)
at org.apache.celeborn.service.deploy.MiniClusterFeature.setUpMaster(MiniClusterFeature.scala:155)
...
Cause: java.net.BindException: Address already in use
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes#2950 from turboFei/fix_flaky_test.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Fix CelebornHashCheckDiskSuite flaky test .
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
Closes#2937 from onebox-li/fix-flaky-test.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Correct the calculation of worker diskInfo actualUsableSpace.
Make the expression of the function to get the reserve size clearer. (`getMinimumUsableSize` -> `getActualReserveSize`).
Let deviceMonitor startCheck after the first `storageManager.updateDiskInfos()` to avoid disks from being misidentified as HIGH_DISK_USAGE.
Fix PushDataHandler#checkDiskFull judge.
### Why are the changes needed?
Make sure worker disk reserve work correctly.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Cluster test and UT.
Closes#2931 from onebox-li/fix-disk-usablespace.
Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.
2. Change the default value of `celeborn.client.spark.fetch.throwsFetchFailure` from `false` to `true`, which enables spark stage rerun at default.
### Why are the changes needed?
User could not directly understand the meaning of `celeborn.client.spark.fetch.throwsFetchFailure` as whether to enable stage rerun, which means that client throws `FetchFailedException` instead of `CelebornIOException`. It's recommended to introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#2920 from SteNicholas/CELEBORN-1719.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
The UT should test both ture and false condition
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
exiting UT
Closes#2914 from zaynt4606/cmt-clb1717.
Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
1. cache the available workers
2. Only count the available workers device free capacity.
3. place the metrics_AvailableWorkerCount_Value in overall and metrics_WorkerCount_Value in `Master` part
### Why are the changes needed?
Cache the available workers to reduce the computation that need to loop the workers frequently.
To have an accurate device capacity overview that does not include the excluded workers, decommissioning workers, etc.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
<img width="1705" alt="image" src="https://github.com/user-attachments/assets/bee17b4e-785d-4112-8410-dbb684270ec0">
Closes#2827 from turboFei/device_free.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
If shuffle data is lost and enabled throw fetch failures, triggered stage rerun.
### Why are the changes needed?
Rerun stage for shuffle lost scenarios.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes#2894 from FMX/b1701.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Follow up of [https://github.com/apache/celeborn/pull/2835]
Only use dynamic resources when candidates are not enough.
And change the way geting availableWorkers form heartbeat to requestSlots RPC to avoid the burden of heartbeat.
### Why are the changes needed?
No
### Does this PR introduce _any_ user-facing change?
Add another configuration.
### How was this patch tested?
UT
Closes#2852 from zaynt4606/clb1636-flu2.
Authored-by: szt <zaynt4606@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
This is the last PR in the CIP-6 series.
Fix the bug when hybrid shuffle face the buffer which large then 32K.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#2873 from reswqa/11-large-buffer-10month.
Lead-authored-by: Yuxin Tan <tanyuxinwork@gmail.com>
Co-authored-by: Weijie Guo <reswqa@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Add Flink Hybrid Shuffle IT test cases
2. Fix bug in open stream.
### Why are the changes needed?
Test coverage for celeborn + hybrid shuffle
### Does this PR introduce _any_ user-facing change?
No
Closes#2859 from reswqa/10-itcase-10month.
Authored-by: Weijie Guo <reswqa@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Currently, the ChangePartitionManager retrieves workers from the LifeCycleManager's workerSnapshot. However, during the revival process in reallocateChangePartitionRequestSlotsFromCandidates, it does not account for newly added available workers resulting from elastic contraction and expansion. This PR addresses this issue by updating the candidate workers in the ChangePartitionManager to use the available workers reported in the heartbeat from the master.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#2835 from zaynt4606/clbdev.
Authored-by: szt <zaynt4606@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
To support revising lost shuffle IDs in a long-running job such as flink batch jobs.
### Why are the changes needed?
1. To support revise lost shuffles.
2. To add an HTTP endpoint to revise lost shuffles manually.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster tests.
Closes#2746 from FMX/b1600.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
Because the worker port is in use, the driver's worker status may change from shutdown status to unknown, causing the test to fail.
https://github.com/apache/celeborn/actions/runs/10465286274/job/28980278764
```java
- celeborn spark integration test - pushdata timeout will add to pushExcludedWorkers *** FAILED ***
WORKER_UNKNOWN did not equal PUSH_DATA_TIMEOUT_PRIMARY, and WORKER_UNKNOWN did not equal PUSH_DATA_TIMEOUT_REPLICA (PushDataTimeoutTest.scala:150)
```
unit-tests.log
```
24/08/20 05:28:30,400 INFO [celeborn-dispatcher-7] Master: Receive ReportNodeFailure [
Host: localhost
RpcPort: 41487
PushPort: 34259
FetchPort: 45713
ReplicatePort: 35107
InternalPort: 41487
24/08/20 05:29:29,414 WARN [celeborn-client-lifecycle-manager-change-partition-executor-3] WorkerStatusTracker:
Reporting failed workers:
Host:localhost:RpcPort:42267:PushPort:43741:FetchPort:46483:ReplicatePort:43587 PUSH_DATA_TIMEOUT_PRIMARY 2024-08-19T22:29:29.414-0700
Current unknown workers:
Host:localhost:RpcPort:41487:PushPort:34259:FetchPort:45713:ReplicatePort:35107:InternalPort:41487 2024-08-19T22:29:29.108-0700
Current shutdown workers:
Host:localhost:RpcPort:41487:PushPort:34259:FetchPort:45713:ReplicatePort:35107:InternalPort:41487
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes#2697 from cxzl25/CELEBORN-1571.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
In order to speed up the resource releasing,this PR Unregister shuffle in batch;
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT & Local cluster testing
Closes#2701 from zaynt4606/batchUnregister.
Lead-authored-by: szt <zaynt4606@163.com>
Co-authored-by: Zaynt <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Enrich register shuffle related message to support segment-based shuffle.
Note: This version of CIP-6 still only works in blocking mode, but we have extended related fields to give it the potential to reading while writing. Any subsequent changes needed to support reading while writing are recorded in TODO.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need.
Closes#2719 from reswqa/cip6-3-pr.
Lead-authored-by: Weijie Guo <reswqa@163.com>
Co-authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR aims to introduce `warn-unused-import` in Scala.
### Why are the changes needed?
There are currently many invalid imports, which can be checked using `-Ywarn-unused-import`.
And through `silencer` plugin we can avoid some imports required in scala 2.11.
```scala
import org.apache.celeborn.common.util.FunctionConverter._
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes#2689 from cxzl25/CELEBORN-1565.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shaoyun Chen <csy@apache.org>
### What changes were proposed in this pull request?
fix offerSlotsLoadAware's actualUsableSpace condition on diskInfo,
considering diskReserveSize when updateDiskInfos in StorageManager,
so master don't need to calculate usableSpace when offerSlots.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes#2688 from zaynt4606/main.
Authored-by: szt <zaynt4606@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>