From 41fdb8ade10b97615533438607a2cb1add3a4232 Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Fri, 1 Nov 2024 13:37:14 +0800 Subject: [PATCH] [CELEBORN-1490][CIP-6] Add Flink hybrid shuffle doc ### What changes were proposed in this pull request? Add Flink hybrid shuffle doc ### Why are the changes needed? We need the doc for the new hybrid shuffle mode. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? no neeed. Closes #2867 from reswqa/add-hs-doc. Authored-by: Weijie Guo Signed-off-by: SteNicholas --- README.md | 28 +++++++++++++++++++++++++++- docs/README.md | 16 ++++++++++++++++ docs/deploy.md | 28 +++++++++++++++++++++++++++- 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3733e19ef..95c2c2a38 100644 --- a/README.md +++ b/README.md @@ -311,10 +311,17 @@ spark.executor.userClassPathFirst false ``` ### Deploy Flink client + +**Important: Only Flink batch jobs are supported for now.** + Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`. #### Flink Configuration -To use Celeborn, the following flink configurations should be added. +Celeborn supports two Flink integration strategies: remote shuffle service (since Flink 1.14) and [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) (since Flink 1.20). + +To use Celeborn, you can choose one of them and add the following Flink configurations. + +##### Flink Remote Shuffle Service Configuration ```properties shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING @@ -337,6 +344,25 @@ taskmanager.memory.task.off-heap.size: 512m ``` **Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`. +##### Flink Hybrid Shuffle Configuration +```properties +shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory +taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory +execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL +jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED + +celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097 +celeborn.client.shuffle.batchHandleReleasePartition.enabled: true +celeborn.client.push.maxReqsInFlight: 128 +# Network connections between peers +celeborn.data.io.numConnectionsPerPeer: 16 +# threads number may vary according to your cluster but do not set to 1 +celeborn.data.io.threads: 32 +celeborn.client.shuffle.batchHandleCommitPartition.threads: 32 +celeborn.rpc.dispatcher.numThreads: 32 +``` +**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`. + ### Deploy MapReduce client Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`. Meanwhile, configure the following settings in YARN and MapReduce config. diff --git a/docs/README.md b/docs/README.md index d439ed221..64c50357d 100644 --- a/docs/README.md +++ b/docs/README.md @@ -118,6 +118,9 @@ INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success wit ``` ## Start Flink with Celeborn + +**Important: Only Flink batch jobs are supported for now.** + #### Copy Celeborn Client to Flink's lib Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x, Flink 1.17.x, Flink 1.18.x, Flink 1.19.x and Flink 1.20.x, copy the corresponding client jar into Flink's `lib/` directory: @@ -138,12 +141,25 @@ vi conf/flink-conf.yaml cd $FLINK_HOME vi conf/config.yaml ``` + +Choose one of flink integration strategies and add the following configuration: + +**(Support Flink 1.14 and above versions) Flink Remote Shuffle Service Config** ```properties shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING ``` **Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`. +**(Support Flink 1.20 and above versions) Flink [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) Config** +```properties +shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory +taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory +execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL +jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED +``` +**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`. + Then deploy the example word count job to the running cluster: ```shell cd $FLINK_HOME diff --git a/docs/deploy.md b/docs/deploy.md index a8498b526..e67b827da 100644 --- a/docs/deploy.md +++ b/docs/deploy.md @@ -206,10 +206,17 @@ spark.executor.userClassPathFirst false ``` ## Deploy Flink client + +**Important: Only Flink batch jobs are supported for now.** + Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`. ### Flink Configuration -To use Celeborn, the following flink configurations should be added. +Celeborn supports two Flink integration strategies: remote shuffle service (since Flink 1.14) and [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) (since Flink 1.20). + +To use Celeborn, you can choose one of them and add the following Flink configurations. + +#### Flink Remote Shuffle Service Configuration ```properties shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING @@ -232,6 +239,25 @@ taskmanager.memory.task.off-heap.size: 512m ``` **Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`. +##### Flink Hybrid Shuffle Configuration +```properties +shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory +taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory +execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL +jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED + +celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097 +celeborn.client.shuffle.batchHandleReleasePartition.enabled: true +celeborn.client.push.maxReqsInFlight: 128 +# Network connections between peers +celeborn.data.io.numConnectionsPerPeer: 16 +# threads number may vary according to your cluster but do not set to 1 +celeborn.data.io.threads: 32 +celeborn.client.shuffle.batchHandleCommitPartition.threads: 32 +celeborn.rpc.dispatcher.numThreads: 32 +``` +**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`. + ## Deploy MapReduce client Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`. Meanwhile, configure the following settings in YARN and MapReduce config.