Commit Graph

152 Commits

Author SHA1 Message Date
Wang, Fei
d038dd2b32 [CELEBORN-1258] Support to register application info with user identifier and extra info
### What changes were proposed in this pull request?
Support to register application info with user identifier and extra info.

### Why are the changes needed?
To provide more insight for the application information.

### Does this PR introduce _any_ user-facing change?
A new RESTful API.

### How was this patch tested?
UT.

Closes #3428 from turboFei/app_info_uid.

Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-09-01 11:15:40 +08:00
SteNicholas
75446a05d3 [CELEBORN-2093] Support Flink 2.1
### What changes were proposed in this pull request?

Support Flink 2.1.

### Why are the changes needed?

Flink 2.1 has already released, which release notes refer to [Release notes - Flink 2.1](https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-2.1).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3404 from SteNicholas/CELEBORN-2093.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-04 14:12:55 +08:00
SteNicholas
ae40222351 [CELEBORN-2047] Support MapPartitionData on DFS
### What changes were proposed in this pull request?

Support `MapPartitionData` on DFS.

### Why are the changes needed?

`MapPartitionData` only supports on local, which does not support on DFS. It's recommended to support `MapPartitionData` on DFS for MapPartition mode to align the ability of ReducePartition mode.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`WordCountTestWithHDFS`.

Closes #3349 from SteNicholas/CELEBORN-2047.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-07-26 22:11:32 +08:00
Gaurav Mittal
cde33d953b [CELEBORN-894] End to End Integrity Checks
### What changes were proposed in this pull request?
Design doc - https://docs.google.com/document/d/1YqK0kua-5rMufJw57kEIrHHGbLnAF9iXM5GdDweMzzg/edit?tab=t.0#heading=h.n5ldma432qnd

- End to End integrity checks provide additional confidence that Celeborn is producing complete as well as correct data
- The checks are hidden behind a client side config that is false by default. Provides users optionality to enable these if required on a per app basis
- Only compatible with Spark at the moment
- No support for Flink (can be considered in future)
- No support for Columnar Shuffle (can be considered in future)

Writer
- Whenever a mapper completes, it reports crc32 and bytes written on a per partition basis to the driver

Driver
- Driver aggregates the mapper reports - and computes aggregated CRC32 and bytes written on per partitionID basis

Reader
- Each CelebornInputStream will report (int shuffleId, int partitionId, int startMapIndex, int endMapIndex, int crc32, long bytes) to driver when it finished reading all data on the stream
- On every report
  - Driver will aggregate the CRC32 and bytesRead for the partitionID
  - Driver will aggregate mapRange to determine when all sub-paritions have been read for partitionID have been read
  - It will then compare the aggregated CRC32 and bytes read with the expected CRC32 and bytes written for the partition
  - There is special handling for skewhandlingwithoutMapRangeSplit scenario as well
  - In this case, we report the number of sub-partitions and index of the sub-partition instead of startMapIndex and endMapIndex

There is separate handling for skew handling with and without map range split

As a follow up, I will do another PR that will harden up the checks and perform additional checks to add book keeping that every CelebornInputStream makes the required checks

### Why are the changes needed?
https://issues.apache.org/jira/browse/CELEBORN-894

Note: I am putting up this PR even though some tests are failing, since I want to get some early feedback on the code changes.

### Does this PR introduce _any_ user-facing change?
Not sure how to answer this. A new client side config is available to enable the checks if required

### How was this patch tested?
Unit tests + Integration tests

Closes #3261 from gauravkm/gaurav/e2e_checks_v3.

Lead-authored-by: Gaurav Mittal <gaurav@stripe.com>
Co-authored-by: Gaurav Mittal <gauravkm@gmail.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-06-28 09:19:57 +08:00
gaoyajun02
6a097944cf [CELEBORN-2042] Fix FetchFailure handling when TaskSetManager is not found
### What changes were proposed in this pull request?
Fixes the FetchFailure handling logic in shouldReportShuffleFetchFailure method to properly handle cases where TaskSetManager cannot be found for a given task ID.

### Why are the changes needed?
The current implementation incorrectly reports FetchFailure when TaskSetManager is not found, which leads to false positive failures in normal fault tolerance scenarios. This happens because:
1. Executor Lost scenarios: When executors are lost due to resource preemption or failures, the associated TaskSetManager gets cleaned up, making it unavailable for lookup
2. Stage cancellation: Cancelled or completed stages may have their TaskSetManager removed

These are all normal scenarios in Spark's fault tolerance mechanism and should not be treated as shuffle failures. The current behavior can cause unnecessary job failures and confusion in debugging actual shuffle issues.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT, Long-running Production Validation

Closes #3339 from gaoyajun02/CELEBORN-2042.

Authored-by: gaoyajun02 <gaoyajun02@meituan.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-06-18 10:22:10 -07:00
Xianming Lei
edeeb4b30a [CELEBORN-1719][FOLLOWUP] Rename throwsFetchFailure to stageRerunEnabled
### What changes were proposed in this pull request?
Rename throwsFetchFailure to stageRerunEnabled

### Why are the changes needed?
Make the code cleaner.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
existing UTs.

Closes #3324 from leixm/CELEBORN-2035.

Authored-by: Xianming Lei <xianming.lei@shopee.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-06-11 19:33:19 +08:00
Wang, Fei
60fa6d0ee7 [CELEBORN-1720][FOLLOWUP] Fix flakyTest - check if fetch failure task another attempt is running or successful
### What changes were proposed in this pull request?
Record the last reported shuffle fetch failure task id.

### Why are the changes needed?
Because the reported shuffle fetch failure task id might be cleaned up fast after recorded.

To prevent flaky test, it is better to record the last reported task id for testing.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
GA for 3 times.

Closes #3301 from turboFei/app_id_debug.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-06-06 11:11:12 -07:00
Fei Wang
b44730771d [CELEBORN-1413][FOLLOWUP] Bump spark 4.0 version to 4.0.0
### What changes were proposed in this pull request?
Bump spark 4.0 version to 4.0.0.

### Why are the changes needed?
Spark 4.0.0 is ready.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
GA.

Closes #3282 from turboFei/spark_4.0.

Lead-authored-by: Fei Wang <fwang12@ebay.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-05-28 17:56:08 +08:00
CodingCat
0b5a09a9f7 [CELEBORN-1896] delete data from failed to fetch shuffles
### What changes were proposed in this pull request?

it's a joint work with YutingWang98

currently we have to wait for spark shuffle object gc to clean disk space occupied by celeborn shuffles

As a result, if a shuffle is failed to fetch and retried , the disk space occupied by the failed attempt cannot really be cleaned , we hit this issue internally when we have to deal with 100s of TB level shuffles in a single spark application, any hiccup in servers can double even triple the disk usage

this PR implements the mechanism to delete files from failed-to-fetch shuffles

the main idea is actually simple, it triggers clean up in LifecycleManager when it applies for a new celeborn shuffle id for a retried shuffle write stage

the tricky part is that is to avoid delete shuffle files when it is referred by multiple downstream stages: the PR introduces RunningStageManager to track the dependency between stages

### Why are the changes needed?

saving disk space

### Does this PR introduce _any_ user-facing change?

a new config

### How was this patch tested?

we manually delete some files

![image](https://github.com/user-attachments/assets/4136cd52-78b2-44e7-8244-db3c5bf9d9c4)

from the above screenshot we can see that originally we have shuffle 0, 1 and after 1 faced with chunk fetch failure, it triggers a retry of 0 (shuffle 2), but at this moment, 0 has been deleted from the workers

![image](https://github.com/user-attachments/assets/7d3b4d90-ae5a-4a54-8dec-a5005850ef0a)

in the logs, we can see that in the middle the application , the unregister shuffle request was sent for shuffle 0

Closes #3109 from CodingCat/delete_fi.

Lead-authored-by: CodingCat <zhunansjtu@gmail.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-05-21 11:23:11 +08:00
Wang, Fei
ec62d924c5 [CELEBORN-2000] Ignore the getReducerFileGroup timeout before shuffle stage end
### What changes were proposed in this pull request?

Ignore the getReducerFileGroup timeout before shuffle stage end.
### Why are the changes needed?

1. if the getReducerFileGroup timeout is caused by lifecycle manager commitFiles timeout(stage not ended)
2. maybe many tasks failed and would not report fetch failure
3. then it cause the spark application failed eventually.

The shuffle client should ignore the getReducerFileGroup timeout before LifeCycleManager commitFiles complete.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

UT

Closes #3263 from turboFei/is_stage_end.

Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-05-20 14:16:46 +08:00
mingji
4205f83da3 [CELEBORN-1995] Optimize memory usage for push failed batches
### What changes were proposed in this pull request?
Aggregate push failed batch for the same map ID and attempt ID.

### Why are the changes needed?
To reduce memory usage.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA and cluster run.

Closes #3253 from FMX/b1995.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-05-18 07:19:26 -07:00
lijianfu03
045411ac34 [CELEBORN-1855] LifecycleManager return appshuffleId for non barrier stage when fetch fail has been reported
### What changes were proposed in this pull request?
for non barrier shuffle read stage, LifecycleManager#handleGetShuffleIdForApp always return appshuffleId whether fetch status is true or not.

### Why are the changes needed?

As described in [jira](https://issues.apache.org/jira/browse/CELEBORN-1855), If LifecycleManager only returns appshuffleId whose fetch status is success, the task will fail directly to "there is no finished map stage associated with", but previous fetch fail event reported may not be fatal.So just give it a chance

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #3090 from buska88/celeborn-1855.

Authored-by: lijianfu03 <lijianfu@meituan.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-05-13 16:14:03 +08:00
SteNicholas
9dd6587d15 [CELEBORN-1912] Client should send heartbeat to worker for processing heartbeat to avoid reading idleness of worker which enables heartbeat
### What changes were proposed in this pull request?

Client should send heartbeat to worker for processing heartbeat to avoid reading idleness of worker which enables heartbeat.

Follow up #1457.

### Why are the changes needed?

In Flink batch jobs, the following exception is caused by closed connection:
```
2025-04-27 23:30:28
java.io.IOException: Client /:9093 is lost, notify related stream 805472050177
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:145)
	at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:136)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:74)
	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:141)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at org.apache.celeborn.common.network.client.ReconnectHandler.scheduleReconnect(ReconnectHandler.java:93)
	at org.apache.celeborn.common.network.client.ReconnectHandler.channelInactive(ReconnectHandler.java:63)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:207)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
	at org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
	at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:991)
```
The closed connection is caused by reading idleness of worker which enables heartbeat with troubleshooting via debug mode of log.
```
2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE: MessageWithHeader [headerLength: 17, bodyLength: 26]
2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ 38B
2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ COMPLETE
2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:32:31,823 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:32:31,824 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] CLOSE
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] INACTIVE
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] UNREGISTERED
```
The reading idleness of worker which enables heartbeat is resulted via one-way heartbeat from worker to client, which only keeps the channel of client active. Client should handle heartbeat to keep the channel of worker active via sending heartbeat to worker.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`HeartbeatTest`

Closes #3239 from SteNicholas/CELEBORN-1912.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-05-08 10:09:50 +08:00
mingji
ec4ea9773c [CELEBORN-1979] Change partition manager should respect the celeborn.storage.availableTypes
### What changes were proposed in this pull request?
Change partition manager relocates partition locations will respect the value of `celeborn.storage.availableTypes`.

### Why are the changes needed?
In the current implementation, partition locations created by the change partition manager will use all available storage tiers, which is unexpected.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA.

Closes #3229 from FMX/b1979.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-04-28 11:23:04 +08:00
SteNicholas
553b3abc3b [CELEBORN-1969] Remove celeborn.client.shuffle.mapPartition.split.enabled to enable shuffle partition split at default for MapPartition
### What changes were proposed in this pull request?

Remove `celeborn.client.shuffle.mapPartition.split.enabled` to enable shuffle partition split at default for MapPartition.

### Why are the changes needed?

The default value of `celeborn.client.shuffle.mapPartition.split.enabled` is false, which causes that file writer fills the disk for PushData as follows:

```
2025-04-15 20:20:56,759 [push-server-6-4] WARN  worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-6] WARN  worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-4] WARN  worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-3] WARN  worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-6] ERROR storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed, report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-4] WARN  worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-1704-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-3] WARN  worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-0] ERROR storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed, report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-6] WARN  worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
```

It's recommended to remove celeborn.client.shuffle.mapPartition.split.enabled to enable shuffle partition split at default.

### Does this PR introduce _any_ user-facing change?

`celeborn.client.shuffle.mapPartition.split.enabled` is removed to enable shuffle partition split at default for MapPartition.

### How was this patch tested?

No.

Closes #3217 from SteNicholas/CELEBORN-1969.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-04-22 11:37:53 +08:00
SteNicholas
2b8f3520f9 [CELEBORN-1925] Support Flink 2.0
### What changes were proposed in this pull request?

Support Flink 2.0. The major changes of Flink 2.0 include:

- https://github.com/apache/flink/pull/25406: Bump target Java version to 11 and drop support for Java 8.
- https://github.com/apache/flink/pull/25551: Replace `InputGateDeploymentDescriptor#getConsumedSubpartitionIndexRange` with `InputGateDeploymentDescriptor#getConsumedSubpartitionRange(index)`.
- https://github.com/apache/flink/pull/25314: Replace `NettyShuffleEnvironmentOptions#NETWORK_EXCLUSIVE_BUFFERS_REQUEST_TIMEOUT_MILLISECONDS` with `NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_REQUEST_TIMEOUT`.
- https://github.com/apache/flink/pull/25731: Introduce `InputGate#resumeGateConsumption`.

### Why are the changes needed?

Flink 2.0 is released which refers to [Release notes - Flink 2.0](https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-2.0).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3179 from SteNicholas/CELEBORN-1925.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Weijie Guo <reswqa@163.com>
2025-04-07 15:23:20 +08:00
Wang, Fei
5e12b7d607 [CELEBORN-1921] Broadcast large GetReducerFileGroupResponse to prevent Spark driver network exhausted
### What changes were proposed in this pull request?

For spark celeborn application, if the GetReducerFileGroupResponse is larger than the threshold, Spark driver would broadcast the GetReducerFileGroupResponse to the executors, it prevents the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).

### Why are the changes needed?
To prevent the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).

### Does this PR introduce _any_ user-facing change?
No, the feature is not enabled by defaults.

### How was this patch tested?

UT.

Cluster testing with `spark.celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.enabled=true`.

The broadcast response size should be always about 1kb.
![image](https://github.com/user-attachments/assets/d5d1b751-762d-43c8-8a84-0674630a5638)
![image](https://github.com/user-attachments/assets/4841a29e-5d11-4932-9fa5-f6e78b7bc521)
Application succeed.
![image](https://github.com/user-attachments/assets/9b570f70-1433-4457-90ae-b8292e5476ba)

Closes #3158 from turboFei/broadcast_rgf.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-04-01 08:29:21 -07:00
zhengtao
b5fab42604 [CELEBORN-1822] Respond to RegisterShuffle with max epoch PartitionLocation to avoid revive
### What changes were proposed in this pull request?
LifecycleManager respond to RegisterShuffle with max epoch PartitionLocation.

### Why are the changes needed?
Newly spun up executors in a Spark job will still get the partitionLocations with the minEpoch of the celeborn lost worker.

These executors will fail to connect to the lost worker and then call into revive to get the latest PartitionLocation for a given partitionId in `ChangePartitionManager.getLatestPartition()`.

Return with max epoch can reduce such revive requests.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT.

Closes #3135 from zaynt4606/clb1822.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-03-14 16:08:05 +08:00
Cheng Pan
e85207e2c7 [CELEBORN-1413][FOLLOWUP] Rename celeborn-client-spark-3-4 back to celeborn-client-spark-3
### What changes were proposed in this pull request?

This PR partially reverts the change of https://github.com/apache/celeborn/pull/2813, namely, restores the renaming of `celeborn-client-spark-3`

### Why are the changes needed?

The renaming is not necessary, and might cause some confusion, for example, I wrongly interpreted the `spark-3-4` as Spark 3.4, it also increases the backport efforts for branch-0.5

### Does this PR introduce _any_ user-facing change?

No, it's dev only, before/after this change, the end users always use the shaded client

```
celeborn-client-spark-2-shaded_2.11-0.6.0-SNAPSHOT.jar
celeborn-client-spark-3-shaded_2.12-0.6.0-SNAPSHOT.jar
celeborn-client-spark-4-shaded_2.13-0.6.0-SNAPSHOT.jar
```

### How was this patch tested?

Pass GA.

Closes #3133 from pan3793/CELEBORN-1413-followup.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-04 22:25:10 +08:00
wangshengjie
d659e06d45 [CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files
### What changes were proposed in this pull request?
Add logic to support avoid sorting shuffle files for Reduce mode when optimize skew partitions

### Why are the changes needed?
Current logic need sorting shuffle files when read Reduce mode skew partition shuffle files, we found some shuffle sorting timeout and performance issue

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Cluster test and uts

Closes #2373 from wangshengjie123/optimize-skew-partition.

Lead-authored-by: wangshengjie <wangshengjie3@xiaomi.com>
Co-authored-by: wangshengjie3 <wangshengjie3@xiaomi.com>
Co-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Shuang <lvshuang.tb@gmail.com>
Co-authored-by: wangshengjie3 <soldier.sj.wang@gmail.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-02-19 16:57:44 +08:00
binjie yang
2b0a755870 [CELEBORN-1851] Disable spark ui when run celeborn spark-it
### What changes were proposed in this pull request?
Disable spark ui when run celeborn spark-it

### Why are the changes needed?
Enabling spark ui in integration tests does not make sense and will incur some overhead.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI

Closes #3087 from zwangsheng/CELEBORN-1851.

Authored-by: binjie yang <yangbinjie@dewu.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-02-06 16:22:58 +08:00
mingji
75b697d815 [CELEBORN-1838] Interrupt spark task should not report fetch failure
Backport https://github.com/apache/celeborn/pull/3070 to main branch.
## What changes were proposed in this pull request?
Do not trigger fetch failure if a spark task attempt is interrupted(speculation enabled). Do not trigger fetch failure if the RPC of getReducerFileGroup is timeout. This PR is intended for celeborn-0.5 branch.

## Why are the changes needed?
Avoid unnecessary fetch failures and stage re-runs.

## Does this PR introduce any user-facing change?
NO.

## How was this patch tested?
1. GA.
2. Manually tested on cluster with spark speculation tasks.

Here is the test case
```scala
sc.parallelize(1 to 100, 100).flatMap(i => {
        (1 to 150000).iterator.map(num => num)
      }).groupBy(i => i, 100)
      .map(i => {
        if (i._1 < 5) {
          Thread.sleep(15000)
        }
        i
      })
      .repartition(400).count
```

<img width="1384" alt="截屏2025-01-18 16 16 16" src="https://github.com/user-attachments/assets/adf64857-5773-4081-a7d0-fa3439e751eb" /> <img width="1393" alt="截屏2025-01-18 16 16 22" src="https://github.com/user-attachments/assets/ac9bf172-1ab4-4669-a930-872d009f2530" /> <img width="1258" alt="截屏2025-01-18 16 19 15" src="https://github.com/user-attachments/assets/6a8ff3e1-c1fb-4ef2-84d8-b1fc6eb56fa6" /> <img width="892" alt="截屏2025-01-18 16 17 27" src="https://github.com/user-attachments/assets/f9de3841-f7d4-4445-99a3-873235d4abd0" />

Closes #3070 from FMX/branch-0.5-b1838.

Authored-by: mingji <fengmingxiao.fmxalibaba-inc.com>

Closes #3080 from turboFei/b1838.

Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-01-23 14:46:36 +08:00
Wang, Fei
ad933815b6 [CELEBORN-1720] Prevent stage re-run if another task attempt is running or successful
### What changes were proposed in this pull request?
Prevent stage re-run if another task attempt is running.

If a shuffle read task can not read the shuffle data and the task another attempt is running or successful, just throw the CelebornIOException instead of FetchFailureException.

The app will not failure before reach the task maxFailures.

<img width="1610" alt="image" src="https://github.com/user-attachments/assets/ffc6d80e-7c90-4729-adf7-6f8c46a8f226">

### Why are the changes needed?
I met below issue because I set the wrong parameters, I should set `spark.celeborn.data.io.connectTime=30s` but set the `spark.celeborn.data.io.connectionTime=30s`, and the Disk IO Utils was high at that time.

0. speculation is enabled
1. one task failed to fetch shuffle 0 in stage 5.
2. then it triggered the stage 0 re-run (stage 4)
3. then stage 5 retry, however, no task run in stage 5 (retry 1)
<img width="1212" alt="image" src="https://github.com/user-attachments/assets/555f36b0-0f0d-452d-af0b-1573601165e2">
4. because the speculation task succeeded, so no task in stage 5(retry 1)
<img width="1715" alt="image" src="https://github.com/user-attachments/assets/7f315149-1d5c-4c32-ae9b-87b099b3297f">

Due the stage re-run is heavy, so I wonder that, we should ignore the shuffle fetch failure, if there is another task attempt running.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

UT for the SparkUtils method only, due it is impossible to add UT for speculation.

d5da49d56d/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala (L236-L244)

<img width="867" alt="image" src="https://github.com/user-attachments/assets/f93bd14f-0f34-4c81-a8db-13be511405d9">

For local master, it would not start the speculationScheduler.

d5da49d56d/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala (L322-L346)

<img width="1010" alt="image" src="https://github.com/user-attachments/assets/477729a4-2fc1-47e9-b128-522c6e2ceb48">

and it is also not allowed to launch speculative task on the same host.

Closes #2921 from turboFei/task_id.

Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-01-16 11:09:44 +08:00
SteNicholas
4ccb0c7fce [MINOR] Rename org.apache.celeborn.plugin.flink.readclient to org.apache.celeborn.plugin.flink.client
### What changes were proposed in this pull request?

Rename `org.apache.celeborn.plugin.flink.readclient` to `org.apache.celeborn.plugin.flink.client`.

### Why are the changes needed?

`FlinkShuffleClientImpl` is designed to write and read shuffle data including pushing and fetching shuffle data. Therefore, the package name of `FlinkShuffleClientImpl` should use `org.apache.celeborn.plugin.flink.client`

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3048 from SteNicholas/shuffle-client-package.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Weijie Guo <reswqa@163.com>
2025-01-03 20:53:54 +08:00
codenohup
a57238024e
[CELEBORN-1801] Remove out-of-dated flink 1.14 and 1.15
### What changes were proposed in this pull request?
Remove out-of-dated flink 1.14 and 1.15.

For more information, please see the discussion thread: https://lists.apache.org/thread/njho00zmkjx5qspcrbrkogy8s4zzmwv9

### Why are the changes needed?
Reduce maintenance burden.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
Changes can be covered by existing tests.

Closes #3029 from codenohup/remove-flink14and15.

Authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
2024-12-30 15:33:44 +08:00
hongguangwei
d0d8edfe22 [CELEBORN-1737] Support build tez client package
### What changes were proposed in this pull request?
Add Tez packaging script.

### Why are the changes needed?
To support build tez client.

### Does this PR introduce _any_ user-facing change?
Yes, enable Celeborn with tez support.

### How was this patch tested?
Cluster test.

Closes #3028 from GH-Gloway/1737.

Lead-authored-by: hongguangwei <hongguangwei@bytedance.com>
Co-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-12-30 11:01:19 +08:00
mingji
fde6365f68 [CELEBORN-1413] Support Spark 4.0
### What changes were proposed in this pull request?
To support Spark 4.0.0 preview.

### Why are the changes needed?
1. Changed Scala to 2.13.
2. Introduce columnar shuffle module for spark 4.0.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Cluster test.

Closes #2813 from FMX/b1413.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-12-24 18:12:27 +08:00
Wang, Fei
680b072b5b [CELEBORN-1753] Optimize the code for exists and find method
### What changes were proposed in this pull request?

Optimize the code for `exists` and `find`.

1.  Enhance the performance to lookup workerInfo by workerUniqueId instead of looping the collection:
 74c1ec0a7f/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala (L65-L66)

Change the type to:
```
 type ShuffleAllocatedWorkers =
    ConcurrentHashMap[Int, ConcurrentHashMap[String, ShufflePartitionLocationInfo]]
```
And save the `WorkerInfo` into `ShufflePartitionLocationInfo`.
```
class ShufflePartitionLocationInfo(val workerInfo: WorkerInfo) {
...
}
```

So that, we can get the `WorkerInfo` by worker uniqueId fast.

2. Reduce the loop cost for below code: 33ba0e02f5/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala (L455-L466)

### Why are the changes needed?

Enhance the performance.
Address comments:
https://github.com/apache/celeborn/pull/2959#pullrequestreview-2466200199
https://github.com/apache/celeborn/pull/2959#issuecomment-2505137166

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

GA

Closes #2962 from turboFei/CELEBORN_1753_exists.

Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-12-23 17:56:20 +08:00
zhengtao
67971df68f [CELEBORN-1783] Fix Pending task in commitThreadPool wont be canceled
### What changes were proposed in this pull request?
1. Cancel all commit file jobs when handleCommitFiles timeout.
2. Fix timeout commit jobs wont be set `CommitInfo.COMMIT_FINISHED`

### Why are the changes needed?
1. Pending task in commitThreadPool wont be canceled.
3. Timeout commit jobs should set `commitInfo.status`.
![image](https://github.com/user-attachments/assets/38528460-8114-4c42-8dc2-a47ec396f99e)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT test.
The `commitInfo.status` should be `COMMIT_FINISHED` when commitFile jobs timeout.
Cluster test.
![image](https://github.com/user-attachments/assets/01beb183-da7e-4e44-85e1-3836fcad3c79)

Closes #3004 from zaynt4606/clb1783.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-12-19 14:24:36 +08:00
Wang, Fei
d85fb7826f [CELEBORN-1711][TEST] RetryReviveTest - shutdownMiniCluster after each test
### What changes were proposed in this pull request?
For RetryReviveTest, shutdownMiniCluster after each test

### Why are the changes needed?

Currently, the minicluster is not shutdown after each test.
ca8831e55f/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RetryReviveTest.scala (L43-L80)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
GA

Closes #3000 from turboFei/stop_miniCluster.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-12-16 15:43:41 +08:00
zhengtao
4aabe37765 [CELEBORN-1778] Fix commitInfo NPE and add assert in LifecycleManagerCommitFilesSuite
### What changes were proposed in this pull request?
1. Fix commitInfo NPE in LifecycleManagerCommitFilesSuite. Not all the workers are assigned slots.
2. Add `assert` in the logic of judgement.

### Why are the changes needed?
Errors in CI.
![image](https://github.com/user-attachments/assets/d4335c65-7a10-4db9-8446-694093bbde31)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing CI.

Closes #3001 from zaynt4606/clb1778.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-12-16 15:41:37 +08:00
hongguangwei
ca8831e55f [CELEBORN-1736] Add tez integration tests
### What changes were proposed in this pull request?
Add tez integration tests

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #2991 from GH-Gloway/1736.

Authored-by: hongguangwei <hongguangwei@bytedance.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-12-13 14:06:08 +08:00
SteNicholas
9cd6d96167 [CELEBORN-1700] Flink supports fallback to vanilla Flink built-in shuffle implementation
### What changes were proposed in this pull request?

Flink supports fallback to vanilla Flink built-in shuffle implementation.

### Why are the changes needed?

When quota is unenough or workers are unavailable, `RemoteShuffleMaster` does not support fallback to `NettyShuffleMaster`, and `RemoteShuffleEnvironment` does not support fallback to `NettyShuffleEnvironment` at present. Flink should support fallback to vanilla Flink built-in shuffle implementation for unenough quota and unavailable workers.

![Flink Shuffle Fallback](https://github.com/user-attachments/assets/538374b4-f14c-40f4-abfc-76e25b7af3ff)

### Does this PR introduce _any_ user-facing change?

- Introduce `ShuffleFallbackPolicy` interface to determine whether fallback to vanilla Flink built-in shuffle implementation.

```
/**
 * The shuffle fallback policy determines whether fallback to vanilla Flink built-in shuffle
 * implementation.
 */
public interface ShuffleFallbackPolicy {

  /**
   * Returns whether fallback to vanilla flink built-in shuffle implementation.
   *
   * param shuffleContext The job shuffle context of Flink.
   * param celebornConf The configuration of Celeborn.
   * param lifecycleManager The {link LifecycleManager} of Celeborn.
   * return Whether fallback to vanilla flink built-in shuffle implementation.
   */
  boolean needFallback(
      JobShuffleContext shuffleContext,
      CelebornConf celebornConf,
      LifecycleManager lifecycleManager);
}
```

- Introduce `celeborn.client.flink.shuffle.fallback.policy` config to support shuffle fallback policy configuration.

### How was this patch tested?

- `RemoteShuffleMasterSuiteJ#testRegisterJobWithForceFallbackPolicy`
- `WordCountTestBase#celeborn flink integration test with fallback - word count`

Closes #2932 from SteNicholas/CELEBORN-1700.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-27 21:44:07 +08:00
Wang, Fei
9255e4ff87
[CELEBORN-1747] Fix flaky test - HybridShuffleWordCountTest
### What changes were proposed in this pull request?
Use `setupMiniClusterWithRandomPorts` instead of `setUpMiniCluster`, and set `setUpMiniCluster` to be private.

### Why are the changes needed?
Fix flaky test.
```
HybridShuffleWordCountTest:
Nov 26, 2024 4:44:10 AM org.glassfish.jersey.server.wadl.WadlFeature configure
WARNING: JAXBContext implementation could not be found. WADL feature is disabled.
*** RUN ABORTED ***
  java.io.IOException: Failed to bind to localhost/127.0.0.1:32886
  at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:349)
  at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:310)
  at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
  at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:234)
  at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:73)
  at org.apache.celeborn.server.common.http.HttpServer.start(HttpServer.scala:45)
  at org.apache.celeborn.server.common.HttpService.startInternal(HttpService.scala:354)
  at org.apache.celeborn.server.common.HttpService.startHttpServer(HttpService.scala:210)
  at org.apache.celeborn.service.deploy.MiniClusterFeature.createMaster(MiniClusterFeature.scala:115)
  at org.apache.celeborn.service.deploy.MiniClusterFeature.setUpMaster(MiniClusterFeature.scala:155)
  ...
  Cause: java.net.BindException: Address already in use

```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
GA.

Closes #2950 from turboFei/fix_flaky_test.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-11-26 14:32:11 +08:00
onebox-li
05ccd96905 [CELEBORN-1727][FOLLOWUP] Fix CelebornHashCheckDiskSuite flaky test
### What changes were proposed in this pull request?
Fix CelebornHashCheckDiskSuite flaky test .

### Why are the changes needed?
Ditto.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT.

Closes #2937 from onebox-li/fix-flaky-test.

Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-22 17:13:54 +08:00
onebox-li
351173bacd [CELEBORN-1727] Correct the calculation of worker diskInfo actualUsableSpace
### What changes were proposed in this pull request?
Correct the calculation of worker diskInfo actualUsableSpace.
Make the expression of the function to get the reserve size clearer. (`getMinimumUsableSize` -> `getActualReserveSize`).
Let deviceMonitor startCheck after the first `storageManager.updateDiskInfos()` to avoid disks from being misidentified as HIGH_DISK_USAGE.
Fix PushDataHandler#checkDiskFull judge.

### Why are the changes needed?
Make sure worker disk reserve work correctly.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Cluster test and UT.

Closes #2931 from onebox-li/fix-disk-usablespace.

Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-21 16:17:46 +08:00
SteNicholas
1d0032b925
[CELEBORN-1719] Introduce celeborn.client.spark.stageRerun.enabled with alternative celeborn.client.spark.fetch.throwsFetchFailure to enable spark stage rerun
### What changes were proposed in this pull request?

1. Introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.
2. Change the default value of `celeborn.client.spark.fetch.throwsFetchFailure` from `false` to `true`, which enables spark stage rerun at default.

### Why are the changes needed?

User could not directly understand the meaning of `celeborn.client.spark.fetch.throwsFetchFailure` as whether to enable stage rerun, which means that client throws `FetchFailedException` instead of `CelebornIOException`. It's recommended to introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #2920 from SteNicholas/CELEBORN-1719.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-11-20 19:30:26 +08:00
zhengtao
36ebdf07dc
[CELEBORN-1717] Fix ReusedExchangedSuit UT bug
### What changes were proposed in this pull request?
The UT should test both ture and false condition

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?
exiting UT

Closes #2914 from zaynt4606/cmt-clb1717.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-11-15 19:27:09 +08:00
Wang, Fei
81a0d5113c [CELEBORN-1660] Cache available workers and only count the available workers device free capacity
### What changes were proposed in this pull request?
1. cache the available workers
2. Only count the available workers device free capacity.
3. place the metrics_AvailableWorkerCount_Value in overall and metrics_WorkerCount_Value in `Master` part

### Why are the changes needed?
Cache  the available workers to reduce the computation that need to loop the workers frequently.
To have an accurate device capacity overview that does not include the excluded workers, decommissioning workers, etc.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
UT.

<img width="1705" alt="image" src="https://github.com/user-attachments/assets/bee17b4e-785d-4112-8410-dbb684270ec0">

Closes #2827 from turboFei/device_free.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-14 11:10:45 +08:00
mingji
42d5d426a1 [CELEBORN-1071] Support stage rerun for shuffle data lost
### What changes were proposed in this pull request?
If shuffle data is lost and enabled throw fetch failures, triggered stage rerun.

### Why are the changes needed?
Rerun stage for shuffle lost scenarios.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA.

Closes #2894 from FMX/b1701.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-12 10:07:26 +08:00
szt
64f201dd83 [CELEBORN-1636][FOLLOWUP] Dynamic resources will only be utilized in case of candidates shortages
### What changes were proposed in this pull request?
Follow up of [https://github.com/apache/celeborn/pull/2835]
Only use dynamic resources when candidates are not enough.
And change the way geting availableWorkers form heartbeat to requestSlots RPC to avoid the burden of heartbeat.

### Why are the changes needed?
No

### Does this PR introduce _any_ user-facing change?
Add another configuration.

### How was this patch tested?
UT

Closes #2852 from zaynt4606/clb1636-flu2.

Authored-by: szt <zaynt4606@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-05 18:10:01 +08:00
Yuxin Tan
7ebd168f80 [CELEBORN-1490][CIP-6] Support process large buffer in flink hybrid shuffle
### What changes were proposed in this pull request?

This is the last PR in the CIP-6 series.

Fix the bug when hybrid shuffle face the buffer which large then 32K.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #2873 from reswqa/11-large-buffer-10month.

Lead-authored-by: Yuxin Tan <tanyuxinwork@gmail.com>
Co-authored-by: Weijie Guo <reswqa@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-04 16:57:43 +08:00
Weijie Guo
c12e8881ab
[CELEBORN-1490][CIP-6] Add Flink Hybrid Shuffle IT test cases
### What changes were proposed in this pull request?
1. Add Flink Hybrid Shuffle IT test cases
2. Fix bug in open stream.

### Why are the changes needed?

Test coverage for celeborn + hybrid shuffle

### Does this PR introduce _any_ user-facing change?
No

Closes #2859 from reswqa/10-itcase-10month.

Authored-by: Weijie Guo <reswqa@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-11-01 17:27:24 +08:00
szt
7685fa7db2 [CELEBORN-1636] Client supports dynamic update of Worker resources on the server
### What changes were proposed in this pull request?
Currently, the ChangePartitionManager retrieves workers from the LifeCycleManager's workerSnapshot. However, during the revival process in reallocateChangePartitionRequestSlotsFromCandidates, it does not account for newly added available workers resulting from elastic contraction and expansion. This PR addresses this issue by updating the candidate workers in the ChangePartitionManager to use the available workers reported in the heartbeat from the master.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #2835 from zaynt4606/clbdev.

Authored-by: szt <zaynt4606@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-10-28 09:49:31 +08:00
mingji
df01fadc9f
[CELEBORN-1601] Support revise lost shuffles
### What changes were proposed in this pull request?
To support revising lost shuffle IDs in a long-running job such as flink batch jobs.

### Why are the changes needed?
1. To support revise lost shuffles.
2. To add an HTTP endpoint to revise lost shuffles manually.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
Cluster tests.

Closes #2746 from FMX/b1600.

Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-10-21 16:44:37 +08:00
sychen
362865f2ce [CELEBORN-1571] Fix flaky test - pushdata timeout will add to pushExcludedWorker
### What changes were proposed in this pull request?

### Why are the changes needed?

Because the worker port is in use, the driver's worker status may change from shutdown status to unknown, causing the test to fail.

https://github.com/apache/celeborn/actions/runs/10465286274/job/28980278764
```java
- celeborn spark integration test - pushdata timeout will add to pushExcludedWorkers *** FAILED ***
  WORKER_UNKNOWN did not equal PUSH_DATA_TIMEOUT_PRIMARY, and WORKER_UNKNOWN did not equal PUSH_DATA_TIMEOUT_REPLICA (PushDataTimeoutTest.scala:150)
```

unit-tests.log
```
24/08/20 05:28:30,400 INFO [celeborn-dispatcher-7] Master: Receive ReportNodeFailure [
Host: localhost
RpcPort: 41487
PushPort: 34259
FetchPort: 45713
ReplicatePort: 35107
InternalPort: 41487

24/08/20 05:29:29,414 WARN [celeborn-client-lifecycle-manager-change-partition-executor-3] WorkerStatusTracker:
Reporting failed workers:
Host:localhost:RpcPort:42267:PushPort:43741:FetchPort:46483:ReplicatePort:43587   PUSH_DATA_TIMEOUT_PRIMARY   2024-08-19T22:29:29.414-0700
Current unknown workers:
Host:localhost:RpcPort:41487:PushPort:34259:FetchPort:45713:ReplicatePort:35107:InternalPort:41487   2024-08-19T22:29:29.108-0700
Current shutdown workers:
Host:localhost:RpcPort:41487:PushPort:34259:FetchPort:45713:ReplicatePort:35107:InternalPort:41487
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

Closes #2697 from cxzl25/CELEBORN-1571.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-10-11 14:13:49 +08:00
szt
6629be858b [CELEBORN-1574] Speed up unregister shuffle by batch processing
### What changes were proposed in this pull request?
In order to speed up the resource releasing,this PR Unregister shuffle in batch;

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT & Local cluster testing

Closes #2701 from zaynt4606/batchUnregister.

Lead-authored-by: szt <zaynt4606@163.com>
Co-authored-by: Zaynt <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-10-08 14:03:41 +08:00
Weijie Guo
30f7da556f [CELEBORN-1490][CIP-6] Enrich register shuffle method
### What changes were proposed in this pull request?

Enrich register shuffle related message to support segment-based shuffle.

Note: This version of CIP-6 still only works in blocking mode, but we have extended related fields to give it the potential to reading while writing. Any subsequent changes needed to support reading while writing are recorded in TODO.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
No need.

Closes #2719 from reswqa/cip6-3-pr.

Lead-authored-by: Weijie Guo <reswqa@163.com>
Co-authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-09-10 18:09:29 +08:00
sychen
3ee672e15d
[CELEBORN-1565] Introduce warn-unused-import in Scala
### What changes were proposed in this pull request?
This PR aims to introduce `warn-unused-import` in Scala.

### Why are the changes needed?
There are currently many invalid imports, which can be checked using `-Ywarn-unused-import`.
And through `silencer`  plugin we can avoid some imports required in scala 2.11.

```scala
import org.apache.celeborn.common.util.FunctionConverter._
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

Closes #2689 from cxzl25/CELEBORN-1565.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shaoyun Chen <csy@apache.org>
2024-08-29 13:43:17 +08:00
szt
9f0af3456a [CELEBORN-1564] Fix actualUsableSpace of offerSlotsLoadAware condition on diskInfo
### What changes were proposed in this pull request?
fix offerSlotsLoadAware's actualUsableSpace condition on diskInfo,
considering diskReserveSize when updateDiskInfos in StorageManager,
so master don't need to calculate usableSpace when offerSlots.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #2688 from zaynt4606/main.

Authored-by: szt <zaynt4606@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-08-26 14:17:55 +08:00