diff --git a/README.md b/README.md index 3733e19ef..95c2c2a38 100644 --- a/README.md +++ b/README.md @@ -311,10 +311,17 @@ spark.executor.userClassPathFirst false ``` ### Deploy Flink client + +**Important: Only Flink batch jobs are supported for now.** + Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`. #### Flink Configuration -To use Celeborn, the following flink configurations should be added. +Celeborn supports two Flink integration strategies: remote shuffle service (since Flink 1.14) and [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) (since Flink 1.20). + +To use Celeborn, you can choose one of them and add the following Flink configurations. + +##### Flink Remote Shuffle Service Configuration ```properties shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING @@ -337,6 +344,25 @@ taskmanager.memory.task.off-heap.size: 512m ``` **Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`. +##### Flink Hybrid Shuffle Configuration +```properties +shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory +taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory +execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL +jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED + +celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097 +celeborn.client.shuffle.batchHandleReleasePartition.enabled: true +celeborn.client.push.maxReqsInFlight: 128 +# Network connections between peers +celeborn.data.io.numConnectionsPerPeer: 16 +# threads number may vary according to your cluster but do not set to 1 +celeborn.data.io.threads: 32 +celeborn.client.shuffle.batchHandleCommitPartition.threads: 32 +celeborn.rpc.dispatcher.numThreads: 32 +``` +**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`. + ### Deploy MapReduce client Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`. Meanwhile, configure the following settings in YARN and MapReduce config. diff --git a/docs/README.md b/docs/README.md index d439ed221..64c50357d 100644 --- a/docs/README.md +++ b/docs/README.md @@ -118,6 +118,9 @@ INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success wit ``` ## Start Flink with Celeborn + +**Important: Only Flink batch jobs are supported for now.** + #### Copy Celeborn Client to Flink's lib Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x, Flink 1.17.x, Flink 1.18.x, Flink 1.19.x and Flink 1.20.x, copy the corresponding client jar into Flink's `lib/` directory: @@ -138,12 +141,25 @@ vi conf/flink-conf.yaml cd $FLINK_HOME vi conf/config.yaml ``` + +Choose one of flink integration strategies and add the following configuration: + +**(Support Flink 1.14 and above versions) Flink Remote Shuffle Service Config** ```properties shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING ``` **Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`. +**(Support Flink 1.20 and above versions) Flink [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) Config** +```properties +shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory +taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory +execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL +jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED +``` +**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`. + Then deploy the example word count job to the running cluster: ```shell cd $FLINK_HOME diff --git a/docs/deploy.md b/docs/deploy.md index a8498b526..e67b827da 100644 --- a/docs/deploy.md +++ b/docs/deploy.md @@ -206,10 +206,17 @@ spark.executor.userClassPathFirst false ``` ## Deploy Flink client + +**Important: Only Flink batch jobs are supported for now.** + Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`. ### Flink Configuration -To use Celeborn, the following flink configurations should be added. +Celeborn supports two Flink integration strategies: remote shuffle service (since Flink 1.14) and [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) (since Flink 1.20). + +To use Celeborn, you can choose one of them and add the following Flink configurations. + +#### Flink Remote Shuffle Service Configuration ```properties shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING @@ -232,6 +239,25 @@ taskmanager.memory.task.off-heap.size: 512m ``` **Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`. +##### Flink Hybrid Shuffle Configuration +```properties +shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory +taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory +execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL +jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED + +celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097 +celeborn.client.shuffle.batchHandleReleasePartition.enabled: true +celeborn.client.push.maxReqsInFlight: 128 +# Network connections between peers +celeborn.data.io.numConnectionsPerPeer: 16 +# threads number may vary according to your cluster but do not set to 1 +celeborn.data.io.threads: 32 +celeborn.client.shuffle.batchHandleCommitPartition.threads: 32 +celeborn.rpc.dispatcher.numThreads: 32 +``` +**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`. + ## Deploy MapReduce client Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`. Meanwhile, configure the following settings in YARN and MapReduce config.