[CELEBORN-1490][CIP-6] Add Flink hybrid shuffle doc

### What changes were proposed in this pull request?

Add Flink hybrid shuffle doc

### Why are the changes needed?
We need the doc for the new hybrid shuffle mode.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?

no neeed.

Closes #2867 from reswqa/add-hs-doc.

Authored-by: Weijie Guo <reswqa@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
This commit is contained in:
Weijie Guo 2024-11-01 13:37:14 +08:00 committed by SteNicholas
parent 165e914b9b
commit 41fdb8ade1
No known key found for this signature in database
GPG Key ID: 1FC79E01BA3B84D5
3 changed files with 70 additions and 2 deletions

View File

@ -311,10 +311,17 @@ spark.executor.userClassPathFirst false
```
### Deploy Flink client
**Important: Only Flink batch jobs are supported for now.**
Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`.
#### Flink Configuration
To use Celeborn, the following flink configurations should be added.
Celeborn supports two Flink integration strategies: remote shuffle service (since Flink 1.14) and [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) (since Flink 1.20).
To use Celeborn, you can choose one of them and add the following Flink configurations.
##### Flink Remote Shuffle Service Configuration
```properties
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
@ -337,6 +344,25 @@ taskmanager.memory.task.off-heap.size: 512m
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`.
##### Flink Hybrid Shuffle Configuration
```properties
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED
celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097
celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
celeborn.client.push.maxReqsInFlight: 128
# Network connections between peers
celeborn.data.io.numConnectionsPerPeer: 16
# threads number may vary according to your cluster but do not set to 1
celeborn.data.io.threads: 32
celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
celeborn.rpc.dispatcher.numThreads: 32
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`.
### Deploy MapReduce client
Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`.
Meanwhile, configure the following settings in YARN and MapReduce config.

View File

@ -118,6 +118,9 @@ INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success wit
```
## Start Flink with Celeborn
**Important: Only Flink batch jobs are supported for now.**
#### Copy Celeborn Client to Flink's lib
Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x, Flink 1.17.x, Flink 1.18.x, Flink 1.19.x and Flink 1.20.x, copy the corresponding client jar into Flink's
`lib/` directory:
@ -138,12 +141,25 @@ vi conf/flink-conf.yaml
cd $FLINK_HOME
vi conf/config.yaml
```
Choose one of flink integration strategies and add the following configuration:
**(Support Flink 1.14 and above versions) Flink Remote Shuffle Service Config**
```properties
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`.
**(Support Flink 1.20 and above versions) Flink [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) Config**
```properties
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`.
Then deploy the example word count job to the running cluster:
```shell
cd $FLINK_HOME

View File

@ -206,10 +206,17 @@ spark.executor.userClassPathFirst false
```
## Deploy Flink client
**Important: Only Flink batch jobs are supported for now.**
Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`.
### Flink Configuration
To use Celeborn, the following flink configurations should be added.
Celeborn supports two Flink integration strategies: remote shuffle service (since Flink 1.14) and [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) (since Flink 1.20).
To use Celeborn, you can choose one of them and add the following Flink configurations.
#### Flink Remote Shuffle Service Configuration
```properties
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
@ -232,6 +239,25 @@ taskmanager.memory.task.off-heap.size: 512m
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`.
##### Flink Hybrid Shuffle Configuration
```properties
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED
celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097
celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
celeborn.client.push.maxReqsInFlight: 128
# Network connections between peers
celeborn.data.io.numConnectionsPerPeer: 16
# threads number may vary according to your cluster but do not set to 1
celeborn.data.io.threads: 32
celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
celeborn.rpc.dispatcher.numThreads: 32
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`.
## Deploy MapReduce client
Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`.
Meanwhile, configure the following settings in YARN and MapReduce config.