Commit Graph

470 Commits

Author SHA1 Message Date
Minchu Yang
44d772df75 [CELEBORN-1882] Support configuring the SSL handshake timeout for SSLHandler
### What changes were proposed in this pull request?
Support configuring the SSL handshake timeout for SSLHandler, for `rpc_app` and `rpc_service` transport modules.

### Why are the changes needed?
To make the SSLHandler handshake timeout configurable. We are working on ramping shuffle traffic to Celeborn internally, and have observed spark task failures which related to the connection timeout. This will make SSLHandler handshake timeout in line with our ecosystem’s production config, and should minimize those failures and improve robustness.

### Does this PR introduce _any_ user-facing change?
Introduces a new server side configuration.

### How was this patch tested?
Added a new UT, validated with existing UTs.

Closes #3120 from rmcyang/rmcyang/CELEBORN-1882.

Authored-by: Minchu Yang <minyang@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2025-02-27 15:43:32 -06:00
wangshengjie
d659e06d45 [CELEBORN-1319] Optimize skew partition logic for Reduce Mode to avoid sorting shuffle files
### What changes were proposed in this pull request?
Add logic to support avoid sorting shuffle files for Reduce mode when optimize skew partitions

### Why are the changes needed?
Current logic need sorting shuffle files when read Reduce mode skew partition shuffle files, we found some shuffle sorting timeout and performance issue

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Cluster test and uts

Closes #2373 from wangshengjie123/optimize-skew-partition.

Lead-authored-by: wangshengjie <wangshengjie3@xiaomi.com>
Co-authored-by: wangshengjie3 <wangshengjie3@xiaomi.com>
Co-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Shuang <lvshuang.tb@gmail.com>
Co-authored-by: wangshengjie3 <soldier.sj.wang@gmail.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-02-19 16:57:44 +08:00
KenGeng
2097fcdfea [CELEBORN-1870] Fix typos in in 'Developer' documents
### What changes were proposed in this pull request?
Fix typo in 'Developer' documents.

### Why are the changes needed?
Improve the accurary of the doc.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Only doc changed. No test.

Closes #3108 from bgeng777/CELEBORN-1870.

Authored-by: KenGeng <samuelgeng7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-02-19 14:31:14 +08:00
zhengtao
fc459c0f7d [CELEBORN-1757] Add retry when sending RPC to LifecycleManager
### What changes were proposed in this pull request?
Retry seding RPC to LifecycleManager when TimeoutException.

### Why are the changes needed?
RPC messages are processed by `Dispatcher.threadpool` which its numThreads depends on `numUsableCores`.
In some cases (k8s) the numThreads of LifecycleManager are not enough while the RPCs are a lot so there are TimeoutExceptions.
Add retry when there are TimeoutExceptions.

### Does this PR introduce _any_ user-facing change?
No.

Another way is to adjust the configuration `celeborn.lifecycleManager.rpc.dispatcher.threads` to add the numThreads.
This way is more affective.

### How was this patch tested?
Cluster testing.

Closes #3008 from zaynt4606/clb1757.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-02-17 11:27:02 -08:00
Nicholas Jiang
1455b6e2f3 [CELEBORN-1860] Remove unused celeborn.<module>.io.enableVerboseMetrics option
### What changes were proposed in this pull request?

Remove unused `celeborn.<module>.io.enableVerboseMetrics` option.

### Why are the changes needed?

`celeborn.<module>.io.enableVerboseMetrics` option is unused, which could be replaced with `celeborn.network.memory.allocator.verbose.metric`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`CelebornConfSuite`

Closes #3094 from SteNicholas/CELEBORN-1860.

Authored-by: Nicholas Jiang <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-02-12 11:42:26 +08:00
xinyuwang1
9f8a89e61e [CELEBORN-1841] Support custom implementation of EventExecutorChooser to avoid deadlock when calling await in EventLoop thread
### What changes were proposed in this pull request?
Support custom implementation of EventExecutorChooser to avoid deadlock when calling await in EventLoop thread

### Why are the changes needed?
In Flink Celeborn Client, you can create a new connection in the EventLoop thread. To wait for the connection to complete, cf.await is called, which can cause a deadlock because the thread bound to the newly connected channel may be the same as the current EventLoop thread. The current thread is suspended by wait and needs to wait for the current thread to notify. This change is to avoid binding the same thread.

### Does this PR introduce _any_ user-facing change?
celeborn.<module>.io.conflictAvoidChooser.enable is introduced.

### How was this patch tested?
manual test

Closes #3071 from littlexyw/fix_dead_lock.

Authored-by: xinyuwang1 <xinyuwang1@xiaohongshu.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-02-10 22:25:55 +08:00
sychen
b9e4bbb5a7
[MINOR] Change some config version
### What changes were proposed in this pull request?

0.6.0 -> 0.5.2

`celeborn.master.endpoints.resolver`

0.6.0 -> 0.5.1

`celeborn.client.chunk.prefetch.enabled`

`celeborn.client.inputStream.creation.window`

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #3082 from cxzl25/config_version.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shaoyun Chen <csy@apache.org>
2025-02-08 17:55:56 +08:00
Xianming Lei
9131c1e07a [CELEBORN-1792] MemoryManager resume should use pinnedDirectMemory instead of usedDirectMemory
### What changes were proposed in this pull request?
Congestion and MemoryManager should use pinnedDirectMemory instead of usedDirectMemory

### Why are the changes needed?
In our production environment, after worker pausing, the usedDirectMemory keep high and does not decrease. The worker node is permanently blacklisted and cannot be used.

This problem has been bothering us for a long time. When the thred cache is turned off, in fact, **after ctx.channel().config().setAutoRead(false), the netty framework will still hold some ByteBufs**. This part of ByteBuf result in a lot of PoolChunks cannot be released.

In netty, if a chunk is 16M and 8k of this chunk has been allocated, then the pinnedMemory is 8k and the activeMemory is 16M. The remaining (16M-8k) memory can be allocated, but not yet allocated, netty allocates and releases memory in chunk units, so the 8k that has been allocated will result in 16M that cannot be returned to the operating system.

Here are some scenes from our production/test environment:

We config 10gb off-heap memory for worker, other configs as below:
```
celeborn.network.memory.allocator.allowCache                         false
celeborn.worker.monitor.memory.check.interval                         100ms
celeborn.worker.monitor.memory.report.interval                        10s
celeborn.worker.directMemoryRatioToPauseReceive                       0.75
celeborn.worker.directMemoryRatioToPauseReplicate                     0.85
celeborn.worker.directMemoryRatioToResume                             0.5
```

When receiving high traffic, the worker's usedDirectMemory increases. After triggering trim and pause, usedDirectMemory still does not reach the resume threshold, and worker was excluded.

![image](https://github.com/user-attachments/assets/40f5609e-fbf9-4841-84ec-69a69256edf4)

So we checked the heap snapshot of the abnormal worker, we can see that there are a large number of DirectByteBuffers in the heap memory. These DirectByteBuffers are all 4mb in size, which is exactly the size of chunksize. According to the path to gc root, DirectByteBuffer is held by PoolChunk, and these 4m only have 160k pinnedBytes.

![image](https://github.com/user-attachments/assets/3d755ef3-164c-4b5b-bec1-aaf039c0c0a5)

![image](https://github.com/user-attachments/assets/17907753-2f42-4617-a95e-1ee980553fb9)

There are many ByteBufs that are not released

![image](https://github.com/user-attachments/assets/b87eb1a9-313f-4f42-baa8-227fd49c19b6)

The stack shows that these ByteBufs are allocated by netty
![image](https://github.com/user-attachments/assets/f8783f99-507a-44a8-9de5-7215a5eed1db)

We tried to reproduce this situation in the test environment. When the same problem occurred, we added a restful api of the worker to force the worker to resume. After the resume, the worker returned to normal, and PushDataHandler handled many delayed requests.

![image](https://github.com/user-attachments/assets/be37039b-97b8-4ae8-a64f-a2003bea613e)

![image](https://github.com/user-attachments/assets/24b1c8ad-131c-4bd6-adcb-bad655cfbdbf)

So I think that when pinnedMemory is not high enough, we should not trigger pause and congestion, because at this time a large part of the memory can still be allocated.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UTs.

Closes #3018 from leixm/CELEBORN-1792.

Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-01-22 14:30:20 +08:00
SteNicholas
30e46eee28
[CELEBORN-1842] Bump ap-loader version from 3.0-8 to 3.0-9
### What changes were proposed in this pull request?

Bump ap-loader version from 3.0-8 to 3.0-9.

### Why are the changes needed?

ap-loader has already released v3.0-9, which should bump version from 3.0-8 for `JVMProfiler`.

Backport:

1. https://github.com/apache/spark/pull/46402
2. https://github.com/apache/spark/pull/49440

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3072 from SteNicholas/CELEBORN-1842.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-01-21 12:22:00 +08:00
zhengtao
f2751c2802 [CELEBORN-1829] Replace waitThreadPoll's thread pool with ScheduledExecutorService in Controller
### What changes were proposed in this pull request?
1. Replace waitThreadPoll's thread pool with ScheduledExecutorService.
2. commitFile should reply when shuffleCommitTimeout.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Cluster test & UT.

Closes #3059 from zaynt4606/clb1829.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-01-18 13:00:04 +08:00
zhengtao
ac0d335f40 [CELEBORN-1831] Add ratis commitIndex metrics
### What changes were proposed in this pull request?
Add two metrics (raft commitIndex of each master and maxCommitIndex - minCommitIndex value).

### Why are the changes needed?
To observe the metadata synchronization of the raft cluster.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Cluster test.
![image](https://github.com/user-attachments/assets/f354a3cd-e3b3-4af0-98c2-fc13330b2d81)

Closes #3063 from zaynt4606/clb1831.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-01-17 10:58:06 +08:00
Nan
ca60613f2f [CELEBORN-1817] add committed file size metrics
### What changes were proposed in this pull request?

this PR adds the file size metrics for workers

### Why are the changes needed?

the reason for us to add this metric is that we observed that, likely due to the delayed processing of split messages, we have jobs writing 40-50g files even the split threshold is 10g (we use soft split)

we want to have this metrics to monitor the severity of the issue

### Does this PR introduce _any_ user-facing change?

yes, one more metrics

### How was this patch tested?

(ignore the dashboard title, it's a dummy one)

![image](https://github.com/user-attachments/assets/d88c15e6-d740-4def-94d5-03666bbb38ca)

Closes #3047 from CodingCat/committed_file_size.

Authored-by: Nan <nzhu@pinterest.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-01-07 10:17:45 +08:00
wuziyi
f886751e80 [CELEBORN-1812] Distinguish sorting-file from sort-tasks waiting to be submitted
### What changes were proposed in this pull request?

Current implementation uses `
shuffleSortTaskDeque.size()` as current sorting file count.This value might be more appropriately described as the sort tasks waiting to be submitted to `fileSorterExecutors`. And the actual current sorting file number ( doing some disk-io operation etc) should be get from `sortingShuffleFiles`.

### Why are the changes needed?

Add metrics to monitor current sorting files which is making disk-io operations.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

![image](https://github.com/user-attachments/assets/6ffed37e-ad12-4d8d-a4aa-2b2695a92168)

Closes #3040 from Z1Wu/fix/sorting_file_metrics.

Authored-by: wuziyi <wuziyi02@corp.netease.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-01-04 10:27:53 +08:00
SteNicholas
16762c659c
[CELEBORN-1774][FOLLOWUP] Change celeborn.<module>.io.mode optional to explain default behavior in description
### What changes were proposed in this pull request?

Change `celeborn.<module>.io.mode` optional to explain default behavior in description.

### Why are the changes needed?

The default value of `celeborn.<module>.io.mode` in document could be changed by whether epoll mode is available for different os. Therefore, `celeborn.<module>.io.mode` should be changed to optional and explained the default behavior in description of option.

Follow up https://github.com/apache/celeborn/pull/3039#discussion_r1899340272.

### Does this PR introduce _any_ user-facing change?

`celeborn.<module>.io.mode` is optional and explains default behavior in description.

### How was this patch tested?

CI.

Closes #3044 from SteNicholas/CELEBORN-1774.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-01-02 21:15:19 +08:00
mingji
4ec02286e8
[CELEBORN-1811] Update default value for celeborn.master.slot.assign.extraSlots
### What changes were proposed in this pull request?
To avoid possible worker load skew for the stages with tiny reducer numbers.

### Why are the changes needed?
If a stage has tiny reducers and skewed partitions, The default value will lead to serious worker load imbalance cause some workers unable to handle shuffle data.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
GA and cluster test.

Closes #3039 from FMX/1811.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-12-31 15:37:28 +08:00
codenohup
a57238024e
[CELEBORN-1801] Remove out-of-dated flink 1.14 and 1.15
### What changes were proposed in this pull request?
Remove out-of-dated flink 1.14 and 1.15.

For more information, please see the discussion thread: https://lists.apache.org/thread/njho00zmkjx5qspcrbrkogy8s4zzmwv9

### Why are the changes needed?
Reduce maintenance burden.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
Changes can be covered by existing tests.

Closes #3029 from codenohup/remove-flink14and15.

Authored-by: codenohup <huangxu.walker@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
2024-12-30 15:33:44 +08:00
xiyu.zk
4b60dae0f0 [CELEBORN-1789][DOC] Document on Java Columnar Shuffle
### What changes were proposed in this pull request?
Introduction to Celeborn's Java Columnar Shuffle

### Why are the changes needed?
Introduction to Celeborn's Java Columnar Shuffle

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI

Closes #3010 from kerwin-zk/CELEBORN-1789.

Authored-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-12-24 11:40:18 +08:00
Sanskar Modi
80523214e4 [MINOR] Add documentation for CELEBORN_NO_DAEMONIZE
### What changes were proposed in this pull request?

Add documentation for `CELEBORN_NO_DAEMONIZE`

### Why are the changes needed?

Currently the celeborn processes starts in background and it was difficult to figure out how to change that behaviour. Setting this flag to true, will allow Celeborn processes to run in foreground.

### Does this PR introduce _any_ user-facing change?
NA

### How was this patch tested?
NA

Closes #3020 from s0nskar/no-daemonize-docs.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-12-23 10:31:37 +08:00
SteNicholas
0eb8af98de [CELEBORN-1774] Update default value of celeborn.<module>.io.mode to whether epoll mode is available
### What changes were proposed in this pull request?

Update default value of `celeborn.<module>.io.mode` to whether epoll mode is available. Meanwhile, the io mode of transport is `NIO` for unavailable epoll mode.

### Why are the changes needed?

The JDK NIO bug produces the situation that empty polling of `Selector` could cause CPU 100%, which refers to

1. [JDK-2147719 : (se) Selector doesn't block on Selector.select(timeout) (lnx)](https://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719)
2. [JDK-6403933 : (se) Selector doesn't block on Selector.select(timeout) (lnx)](https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933)

When the epoll mode is available, the default IO mode should be `EPOLL`, which backports [NettyServer.java#L92](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java#L92). Meanwhile, the transport io mode should be `NIO` when the epoll mode is unavailable.

### Does this PR introduce _any_ user-facing change?

Change the default value of `celeborn.<module>.io.mode`.

### How was this patch tested?

CI.

Closes #2994 from SteNicholas/CELEBORN-1774.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-12-17 15:26:01 +08:00
Sanskar Modi
91d8f955ca [CELEBORN-1622][CIP-11] Adding documentation for Worker Tags feature
### What changes were proposed in this pull request?

Adding documentation for Worker Tags feature

### Why are the changes needed?

https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn

### Does this PR introduce _any_ user-facing change?
NA

### How was this patch tested?
NA

Closes #2981 from s0nskar/tags_docu.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-12-10 15:56:58 +08:00
Sanskar Modi
782393af05 [CELEBORN-1748] Deprecate identity provider configs tied with quota
### What changes were proposed in this pull request?

Deprecate identity configs related with quota –
```
        "celeborn.quota.identity.provider"
        "celeborn.quota.identity.user-specific.tenant"
        "celeborn.quota.identity.user-specific.userName"
```
In favour of identity configs independent of quota

```
        "celeborn.identity.provider"
        "celeborn.identity.user-specific.tenant"
        "celeborn.identity.user-specific.userName"
```

### Why are the changes needed?

Current identity configs are tied with quota but identity should be free of quota because other pieces like tags are also using it. In future other new components can also make use of identity.

### Does this PR introduce _any_ user-facing change?

NA

### How was this patch tested?
Existing UTs

Closes #2952 from s0nskar/fix_identity.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-12-04 09:28:40 +08:00
zhaohehuhu
b204a26010 [CELEBORN-1755] Update doc to include S3 as one of storage layers
### What changes were proposed in this pull request?

as titile

### Why are the changes needed?

The doc fail to mention S3 as one of storage layers

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Closes #2963 from zhaohehuhu/dev-1128.

Authored-by: zhaohehuhu <luoyedeyi@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-12-02 11:00:18 +08:00
Wang, Fei
aea680d1cc
[CELEBORN-1752] Migration guide for unexpected shuffle RESTful api change since 0.5.0
### What changes were proposed in this pull request?

Add migration doc for RESTful api change for celeborn 0.5.0.

### Why are the changes needed?

There was a typo in https://github.com/apache/celeborn/pull/2371, the `/shuffles` api was renamed to `/shuffle`.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

GA.

Closes #2960 from turboFei/shuffles_api.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-11-28 16:16:47 +08:00
Sanskar Modi
259dfcd988 [CELEBORN-1621][FOLLOWUP] Support enabling worker tags via config
### What changes were proposed in this pull request?

- Adding support to enable/disable worker tags feature by a master config flag.
- Fixed BUG: After this change #2936, admins can also define the tagsExpr for users. In a case user is passing an empty tagsExpr current code will ignore the admin defined tagsExpr and allow job to use all workers.

### Why are the changes needed?

https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn

### Does this PR introduce _any_ user-facing change?

NA

### How was this patch tested?
Existing UTs

Closes #2953 from s0nskar/tags-enabled.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-28 11:22:35 +08:00
Wang, Fei
59163c2a23 [CELEBORN-1745] Remove application top disk usage code
### What changes were proposed in this pull request?
Remove the code for app top disk usage both in master and worker end.

Prefer to use below prometheus expr to figure out the top app usages.
```
topk(50, sum by (applicationId) (metrics_diskBytesWritten_Value{role="worker", applicationId!=""}))
```

### Why are the changes needed?
To address comments: https://github.com/apache/celeborn/pull/2947#issuecomment-2499564978

> Due to the application dimension resource consumption, this feature should be included in the deprecated features. Maybe you can remove the codes for application top disk usage.

### Does this PR introduce _any_ user-facing change?

Yes, remove the app top disk usage api.

### How was this patch tested?
GA.

Closes #2949 from turboFei/remove_app_top_usage.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-28 10:55:34 +08:00
SteNicholas
9cd6d96167 [CELEBORN-1700] Flink supports fallback to vanilla Flink built-in shuffle implementation
### What changes were proposed in this pull request?

Flink supports fallback to vanilla Flink built-in shuffle implementation.

### Why are the changes needed?

When quota is unenough or workers are unavailable, `RemoteShuffleMaster` does not support fallback to `NettyShuffleMaster`, and `RemoteShuffleEnvironment` does not support fallback to `NettyShuffleEnvironment` at present. Flink should support fallback to vanilla Flink built-in shuffle implementation for unenough quota and unavailable workers.

![Flink Shuffle Fallback](https://github.com/user-attachments/assets/538374b4-f14c-40f4-abfc-76e25b7af3ff)

### Does this PR introduce _any_ user-facing change?

- Introduce `ShuffleFallbackPolicy` interface to determine whether fallback to vanilla Flink built-in shuffle implementation.

```
/**
 * The shuffle fallback policy determines whether fallback to vanilla Flink built-in shuffle
 * implementation.
 */
public interface ShuffleFallbackPolicy {

  /**
   * Returns whether fallback to vanilla flink built-in shuffle implementation.
   *
   * param shuffleContext The job shuffle context of Flink.
   * param celebornConf The configuration of Celeborn.
   * param lifecycleManager The {link LifecycleManager} of Celeborn.
   * return Whether fallback to vanilla flink built-in shuffle implementation.
   */
  boolean needFallback(
      JobShuffleContext shuffleContext,
      CelebornConf celebornConf,
      LifecycleManager lifecycleManager);
}
```

- Introduce `celeborn.client.flink.shuffle.fallback.policy` config to support shuffle fallback policy configuration.

### How was this patch tested?

- `RemoteShuffleMasterSuiteJ#testRegisterJobWithForceFallbackPolicy`
- `WordCountTestBase#celeborn flink integration test with fallback - word count`

Closes #2932 from SteNicholas/CELEBORN-1700.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-27 21:44:07 +08:00
Sanskar Modi
712d9a496e [CELEBORN-1621][CIP-11] Predefined worker tags expr via dynamic configs
### What changes were proposed in this pull request?

Support predefined tags expression for tenant and users via dynamic config. Using this admin can configure tags for users/tenants and give permission to special users to provide custom tags expression.

### Why are the changes needed?

https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn

### Does this PR introduce _any_ user-facing change?

NA

### How was this patch tested?
UTs

Closes #2936 from s0nskar/admin_tags.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-26 20:40:30 +08:00
zhaohehuhu
a2d3972318 [CELEBORN-1530] support MPU for S3
### What changes were proposed in this pull request?

as title

### Why are the changes needed?
AWS S3 doesn't support append, so Celeborn had to copy the historical data from s3 to worker and write to s3 again, which heavily scales out the write. This PR implements a better solution via MPU to avoid copy-and-write.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

![WechatIMG257](https://github.com/user-attachments/assets/968d9162-e690-4767-8bed-e490e3055753)

I conducted an experiment with a 1GB input dataset to compare the performance of Celeborn using only S3 storage versus using SSD storage. The results showed that Celeborn with SSD storage was approximately three times faster than with only S3 storage.

<img width="1728" alt="Screenshot 2024-11-16 at 13 02 10" src="https://github.com/user-attachments/assets/8f879c47-c01a-4004-9eae-1c266c1f3ef2">

The above screenshot is the second test with 5000 mapper and reducer that I did.

Closes #2830 from zhaohehuhu/dev-1021.

Lead-authored-by: zhaohehuhu <luoyedeyi@163.com>
Co-authored-by: He Zhao <luoyedeyi459@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-22 15:03:53 +08:00
SteNicholas
1d0032b925
[CELEBORN-1719] Introduce celeborn.client.spark.stageRerun.enabled with alternative celeborn.client.spark.fetch.throwsFetchFailure to enable spark stage rerun
### What changes were proposed in this pull request?

1. Introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.
2. Change the default value of `celeborn.client.spark.fetch.throwsFetchFailure` from `false` to `true`, which enables spark stage rerun at default.

### Why are the changes needed?

User could not directly understand the meaning of `celeborn.client.spark.fetch.throwsFetchFailure` as whether to enable stage rerun, which means that client throws `FetchFailedException` instead of `CelebornIOException`. It's recommended to introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #2920 from SteNicholas/CELEBORN-1719.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-11-20 19:30:26 +08:00
Erik.fang
8fd44b42ba [CELEBORN-1634] Support queue time/processing time metrics for rpc framework
### What changes were proposed in this pull request?

implement queue time/processing time metrics for rpc framework

### Why are the changes needed?

to identify rpc processing bottelneck

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

local

Closes #2784 from ErikFang/main-rpc-metrics.

Lead-authored-by: Erik.fang <fmerik@gmail.com>
Co-authored-by: 仲甫 <fangming@antgroup.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-18 09:39:56 +08:00
Wang, Fei
81a0d5113c [CELEBORN-1660] Cache available workers and only count the available workers device free capacity
### What changes were proposed in this pull request?
1. cache the available workers
2. Only count the available workers device free capacity.
3. place the metrics_AvailableWorkerCount_Value in overall and metrics_WorkerCount_Value in `Master` part

### Why are the changes needed?
Cache  the available workers to reduce the computation that need to loop the workers frequently.
To have an accurate device capacity overview that does not include the excluded workers, decommissioning workers, etc.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
UT.

<img width="1705" alt="image" src="https://github.com/user-attachments/assets/bee17b4e-785d-4112-8410-dbb684270ec0">

Closes #2827 from turboFei/device_free.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-14 11:10:45 +08:00
Wang, Fei
7fbf0e2fa5
[MINOR] Fix missing blanks in docs
### What changes were proposed in this pull request?

When looking into the source code, I found some blank delimiter is missing in the ConfigEntry doc.
In this PR, I go through all the ConfigEntry docs to fix the missing blank in the description.

### Why are the changes needed?
Fix typo.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

GA.

Closes #2917 from turboFei/nit_docs.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
2024-11-14 10:42:36 +08:00
SteNicholas
9083dd401c [CELEBORN-1504][FOLLOWUP] Document adds Flink 1.16 support
### What changes were proposed in this pull request?

1. Document adds Flink 1.16 support including `README.md`, `deploy.md`.
2. Update description of `celeborn.client.shuffle.compression.codec` to change the supported Flink version for ZSTD.

### Why are the changes needed?

#2619 has supported Flink 1.16, which should update the document for the support. Meanwhile, since Flink version 1.16, zstd is supported for Flink shuffle client.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2904 from SteNicholas/CELEBORN-1504.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-13 21:47:29 +08:00
SteNicholas
169b6f6973 [CELEBORN-1685] ShuffleFallbackPolicy supports ShuffleFallbackCount metric
### What changes were proposed in this pull request?

1. `ShuffleFallbackPolicy` supports `ShuffleFallbackCount` metric to provide the shuffle fallback count of each fallback policy.
2. Introduce `ShuffleTotalCount` metric to record the total count of shuffle.
3. Fix Spark 2 does not increment shuffle count via `LifecycleManager`.

### Why are the changes needed?

The implementations of `ShuffleFallbackPolicy` does not support `ShuffleFallbackCount` metric at present. Meanwhile, Bilibili production practice needs `ShuffleFallbackCount` of different `ShuffleFallbackPolicy`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Cluster test.

Closes #2891 from SteNicholas/CELEBORN-1685.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-11 10:37:25 +08:00
Wang, Fei
71e3c03a13 [MINOR] Fix docs typo
### What changes were proposed in this pull request?
Fix docs typo.

### Why are the changes needed?

Fix docs typo.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

GA.

Closes #2890 from turboFei/nit.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-07 20:34:14 +08:00
Wang, Fei
f1bda46de4 [CELEBORN-1680] Introduce ShuffleFallbackCount metrics
### What changes were proposed in this pull request?

As title, introduce metrics_ShuffleFallbackCount_Value.

### Why are the changes needed?
To provide the insights that how many shuffles fallback to spark built-in shuffle service. It is helpful for us  to deprecate the ESS progressively.

Currently, we plan to set the `celeborn.client.spark.shuffle.fallback.numPartitionsThreshold` to fallback the shuffle with too large shuffle partitions number, for example: 50k.

In the future, we plan to limit the acceptable maximum shuffle partition number so that the bad job would be rejected and not impact the celeborn master health.

### Does this PR introduce _any_ user-facing change?
Yes, new metrics.

### How was this patch tested?
UT.
<img width="1188" alt="image" src="https://github.com/user-attachments/assets/8193c12c-5dc9-4783-b64b-6a8449a1bea4">

Closes #2866 from turboFei/record_fallback.

Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-07 11:42:17 +08:00
Weijie Guo
f2e9043028 [CELEBORN-1687] Highlight flink session cluster issue in doc
### What changes were proposed in this pull request?

If we use celeborn shuffle service, we can't submit both batch and streaming to the same flink session cluster. This should be highlight in doc.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

No need.

Closes #2879 from reswqa/session-doc.

Authored-by: Weijie Guo <reswqa@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-06 10:52:34 +08:00
szt
64f201dd83 [CELEBORN-1636][FOLLOWUP] Dynamic resources will only be utilized in case of candidates shortages
### What changes were proposed in this pull request?
Follow up of [https://github.com/apache/celeborn/pull/2835]
Only use dynamic resources when candidates are not enough.
And change the way geting availableWorkers form heartbeat to requestSlots RPC to avoid the burden of heartbeat.

### Why are the changes needed?
No

### Does this PR introduce _any_ user-facing change?
Add another configuration.

### How was this patch tested?
UT

Closes #2852 from zaynt4606/clb1636-flu2.

Authored-by: szt <zaynt4606@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-11-05 18:10:01 +08:00
SteNicholas
38156414a1 [CELEBORN-1678][FOLLOWUP] Improve Celeborn CLI user guide
### What changes were proposed in this pull request?

Improve Celeborn CLI user guide including:

- Add license of Celeborn CLI user guide.
- Optimize the introduction of setup and usage for Celeborn CLI.
- Optimize the navigation of Celeborn CLI to combine Celeborn Ratis Shell.

### Why are the changes needed?

There is no license in Celeborn CLI user guide. Meanwhile, there are certain improvement in user guide including the license, navigation, and the introduction of setup and usage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2875 from SteNicholas/CELEBORN-1678.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-11-05 15:53:28 +08:00
szt
ec67366b7a
[CELEBORN-1684] Fix ambiguous client jar expression of document
### What changes were proposed in this pull request?
When users deploy using the release binary as outlined in the documentation, the instructions for copying the client JAR can be unclear.

### Why are the changes needed?
No

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
![image](https://github.com/user-attachments/assets/a4e7c415-8f0e-44bd-8d18-18462896e27c)

Closes #2877 from zaynt4606/md.

Authored-by: szt <zaynt4606@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-11-05 13:48:22 +08:00
Weijie Guo
41fdb8ade1
[CELEBORN-1490][CIP-6] Add Flink hybrid shuffle doc
### What changes were proposed in this pull request?

Add Flink hybrid shuffle doc

### Why are the changes needed?
We need the doc for the new hybrid shuffle mode.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?

no neeed.

Closes #2867 from reswqa/add-hs-doc.

Authored-by: Weijie Guo <reswqa@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-11-01 13:37:14 +08:00
Shuang
14baec8388
[CELEBORN-1673] Support retry create client
### What changes were proposed in this pull request?
As title

### Why are the changes needed?
Currently, only Flink retries establishing a client when a connection problem occurs. This would be beneficial for all other engines to implement as well.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #2855 from RexXiong/CELEBORN-1673.

Lead-authored-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Co-authored-by: lvshuang.xjs <lvshuang.xjs@taobao.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-10-31 14:45:48 +08:00
Aravind Patnam
12f25d3d0f [CELEBORN-1678] Add Celeborn CLI User guide in README
### What changes were proposed in this pull request?
adding user guide to README for cli

### Why are the changes needed?
better user experience when using CLI.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
N/A

Closes #2862 from akpatnam25/CELEBORN-1678.

Authored-by: Aravind Patnam <akpatnam25@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-10-30 19:58:34 +08:00
Xianming Lei
6ad02f14a9 [CELEBORN-1577][PHASE1] Storage quota should support interrupt shuffle
### What changes were proposed in this pull request?
Support interrupt shuffle on client side.

I will develop the following functions in order
1.  Client supports interrupt shuffle
2. Master supports calculating app-level shuffle usage

### Why are the changes needed?
The current storage quota logic can only limit new shuffles, and cannot limit the writing of existing shuffles. In our production environment, there is such an scenario: the cluster is small, but the user's app single shuffle is large which occupied disk resources, we want to interrupt those shuffle.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unable to test this part independently, Additional tests will be added after completing the second part.

Closes #2801 from leixm/CELEBORN-1577-1.

Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-10-30 16:28:09 +08:00
szt
7685fa7db2 [CELEBORN-1636] Client supports dynamic update of Worker resources on the server
### What changes were proposed in this pull request?
Currently, the ChangePartitionManager retrieves workers from the LifeCycleManager's workerSnapshot. However, during the revival process in reallocateChangePartitionRequestSlotsFromCandidates, it does not account for newly added available workers resulting from elastic contraction and expansion. This PR addresses this issue by updating the candidate workers in the ChangePartitionManager to use the available workers reported in the heartbeat from the master.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #2835 from zaynt4606/clbdev.

Authored-by: szt <zaynt4606@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-10-28 09:49:31 +08:00
SteNicholas
464a7c71a9 [CELEBORN-1651] Support ratio threshold of unhealthy disks for excluding worker
### What changes were proposed in this pull request?

Support ratio threshold of unhealthy disks for excluding worker with `celeborn.master.excludeWorker.unhealthyDiskRatioThreshold`.

### Why are the changes needed?

We often encounter issues such as disk input/output errors in production practice. When a bad disk occurs, the worker will be maintained to decommission for repairing the machine disk. The reason is that generally the fault will be repaired in time after it is discovered. It is possible that the machine will not trigger all disk failures if it is out of warranty. It can be replaced directly when it is under warranty. If the disk fails after it is out of warranty, you need to purchase the disk yourself for replacement. At the same time, submitting the disk for repair at one time will affect the failure rate judgment of the system group and scenario. In addition, the occurrence of bad disks will bring about some management problems, such as continuous alarms, and the handling of disk failures is relatively customized.

Therefore, it's recommended to configure ratio threshold of unhealthy disks for excluding worker, which does not need to wait for all unhealthy disks to exclude corresponding worker.

### Does this PR introduce _any_ user-facing change?

Introduce `celeborn.master.excludeWorker.unhealthyDiskRatioThreshold` to configure max ratio of unhealthy disks for excluding worker.

### How was this patch tested?

Cluster test.

Closes #2812 from SteNicholas/CELEBORN-1651.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-10-24 11:43:22 +08:00
mingji
df01fadc9f
[CELEBORN-1601] Support revise lost shuffles
### What changes were proposed in this pull request?
To support revising lost shuffle IDs in a long-running job such as flink batch jobs.

### Why are the changes needed?
1. To support revise lost shuffles.
2. To add an HTTP endpoint to revise lost shuffles manually.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
Cluster tests.

Closes #2746 from FMX/b1600.

Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2024-10-21 16:44:37 +08:00
Wang, Fei
bcb43183af [CELEBORN-1629][FOLLOWUP] Fix broken RESTful api link
### What changes were proposed in this pull request?
Fix the broken link.

### Why are the changes needed?
Followup for https://github.com/apache/celeborn/pull/2779.
The RESTful api docs was renamed from webapi.md to restapi.md in https://github.com/apache/celeborn/pull/2775.

And due these two PRs were merged in sequence nearly, so I did not aware this change.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

<img width="1255" alt="image" src="https://github.com/user-attachments/assets/a09aecf8-6e7e-458b-871d-f8dd5a0ac6b2">
<img width="937" alt="image" src="https://github.com/user-attachments/assets/bcefeecf-7a24-4616-9f5e-f2a11f464769">

Closes #2828 from turboFei/ratis_docs_link.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-10-21 11:42:52 +08:00
Xianming Lei
7c9a008a14 [CELEBORN-1487][PHASE2] CongestionController support dynamic config
### What changes were proposed in this pull request?
CongestionController support dynamic config

### Why are the changes needed?
Currently, Celeborn only supports quota management based on disk file bytes/count, and this quota management cannot cope with sudden increases in traffic, which will cause corrupt to the cluster.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT.

Closes #2817 from leixm/CELEBORN-1487-2.

Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2024-10-18 15:41:51 +08:00
SteNicholas
497bfdf5d7 [CELEBORN-1640] NettyMemoryMetrics supports numHeapArenas, numDirectArenas, tinyCacheSize, smallCacheSize, normalCacheSize, numThreadLocalCaches and chunkSize
### What changes were proposed in this pull request?

`NettyMemoryMetrics` supports `numHeapArenas`, `numDirectArenas`, `tinyCacheSize`, `smallCacheSize`, `normalCacheSize`, `numThreadLocalCaches` and `chunkSize` from `PooledByteBufAllocatorMetric`. Meanwhile, remove `server_` prefix from metric name of netty memory metric in `monitoring.md`.

### Why are the changes needed?

`PooledByteBufAllocatorMetric` provides the following API to support netty memory metrics:

```
public int numHeapArenas() {
  return this.allocator.numHeapArenas();
}

public int numDirectArenas() {
  return this.allocator.numDirectArenas();
}

public List<PoolArenaMetric> heapArenas() {
  return this.allocator.heapArenas();
}

public List<PoolArenaMetric> directArenas() {
  return this.allocator.directArenas();
}

public int numThreadLocalCaches() {
  return this.allocator.numThreadLocalCaches();
}

public int tinyCacheSize() {
  return this.allocator.tinyCacheSize();
}

public int smallCacheSize() {
  return this.allocator.smallCacheSize();
}

public int normalCacheSize() {
  return this.allocator.normalCacheSize();
}

public int chunkSize() {
  return this.allocator.chunkSize();
}

public long usedHeapMemory() {
  return this.allocator.usedHeapMemory();
}

public long usedDirectMemory() {
  return this.allocator.usedDirectMemory();
}
```

`NettyMemoryMetrics` only supports `usedHeapMemory` and `usedDirectMemory`, which could support `numHeapArenas`, `numDirectArenas`, `tinyCacheSize`, `smallCacheSize`, `normalCacheSize`, `numThreadLocalCaches` and `chunkSize` from `PooledByteBufAllocatorMetric`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

[Celeborn Grafana Dashboard](https://stenicholas.grafana.net/public-dashboards/a520ca36a33843a38bbde28387023f97)

Closes #2802 from SteNicholas/CELEBORN-1640.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2024-10-17 18:12:08 +08:00