### What changes were proposed in this pull request? When users deploy using the release binary as outlined in the documentation, the instructions for copying the client JAR can be unclear. ### Why are the changes needed? No ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested?  Closes #2877 from zaynt4606/md. Authored-by: szt <zaynt4606@163.com> Signed-off-by: SteNicholas <programgeek@163.com>
244 lines
9.8 KiB
Markdown
244 lines
9.8 KiB
Markdown
---
|
|
hide:
|
|
- navigation
|
|
|
|
license: |
|
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
contributor license agreements. See the NOTICE file distributed with
|
|
this work for additional information regarding copyright ownership.
|
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
(the "License"); you may not use this file except in compliance with
|
|
the License. You may obtain a copy of the License at
|
|
|
|
https://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
---
|
|
Quick Start
|
|
===
|
|
This documentation gives a quick start guide for running Spark/Flink/MapReduce with Apache Celeborn™.
|
|
|
|
### Download Celeborn
|
|
Download the latest Celeborn binary from the [Downloading Page](https://celeborn.apache.org/download/).
|
|
Decompress the binary and set `$CELEBORN_HOME`.
|
|
```shell
|
|
tar -C <DST_DIR> -zxvf apache-celeborn-<VERSION>-bin.tgz
|
|
export CELEBORN_HOME=<Decompressed path>
|
|
```
|
|
|
|
## Configure Logging and Storage
|
|
#### Configure Logging
|
|
```shell
|
|
cd $CELEBORN_HOME/conf
|
|
cp log4j2.xml.template log4j2.xml
|
|
```
|
|
#### Configure Storage
|
|
Configure the directory to store shuffle data, for example `$CELEBORN_HOME/shuffle`.
|
|
```shell
|
|
cd $CELEBORN_HOME/conf
|
|
echo "celeborn.worker.storage.dirs=$CELEBORN_HOME/shuffle" > celeborn-defaults.conf
|
|
```
|
|
|
|
## Start Celeborn Service
|
|
#### Start Master
|
|
```shell
|
|
cd $CELEBORN_HOME
|
|
./sbin/start-master.sh
|
|
```
|
|
You should see `Master`'s ip:port in the log:
|
|
```log
|
|
INFO [main] NettyRpcEnvFactory: Starting RPC Server [Master] on 192.168.2.109:9097 with advertised endpoint 192.168.2.109:9097
|
|
```
|
|
#### Start Worker
|
|
Use the Master's IP and Port to start Worker:
|
|
```shell
|
|
cd $CELEBORN_HOME
|
|
./sbin/start-worker.sh celeborn://<Master IP>:<Master Port>
|
|
```
|
|
You should see the following message in Worker's log:
|
|
```log
|
|
INFO [main] MasterClient: connect to master 192.168.2.109:9097.
|
|
INFO [main] Worker: Register worker successfully.
|
|
INFO [main] Worker: Worker started.
|
|
```
|
|
And also the following message in Master's log:
|
|
```log
|
|
INFO [dispatcher-event-loop-9] Master: Registered worker
|
|
Host: 192.168.2.109
|
|
RpcPort: 57806
|
|
PushPort: 57807
|
|
FetchPort: 57809
|
|
ReplicatePort: 57808
|
|
SlotsUsed: 0
|
|
LastHeartbeat: 0
|
|
HeartbeatElapsedSeconds: xxx
|
|
Disks:
|
|
DiskInfo0: xxx
|
|
UserResourceConsumption: empty
|
|
WorkerRef: null
|
|
```
|
|
|
|
## Start Spark with Celeborn
|
|
#### Copy Celeborn Client to Spark's jars
|
|
Celeborn release binary contains clients for Spark 2.x and Spark 3.x, copy the corresponding client jar into Spark's
|
|
`jars/` directory:
|
|
```shell
|
|
cp $CELEBORN_HOME/spark/celeborn-client-spark-<spark.major.version>-shaded_<scala.binary.version>-<celeborn.version>.jar $SPARK_HOME/jars/
|
|
```
|
|
#### Start spark-shell
|
|
Set `spark.shuffle.manager` to Celeborn's ShuffleManager, and turn off `spark.shuffle.service.enabled`:
|
|
```shell
|
|
cd $SPARK_HOME
|
|
|
|
./bin/spark-shell \
|
|
--conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
|
|
--conf spark.shuffle.service.enabled=false
|
|
```
|
|
Then run the following test case:
|
|
```scala
|
|
spark.sparkContext
|
|
.parallelize(1 to 10, 10)
|
|
.flatMap(_ => (1 to 100).iterator.map(num => num))
|
|
.repartition(10)
|
|
.count
|
|
```
|
|
During the Spark Job, you should see the following message in Celeborn Master's log:
|
|
```log
|
|
Master: Offer slots successfully for 10 reducers of local-1690000152711-0 on 1 workers.
|
|
```
|
|
And the following message in Celeborn Worker's log:
|
|
```log
|
|
INFO [dispatcher-event-loop-9] Controller: Reserved 10 primary location and 0 replica location for local-1690000152711-0
|
|
INFO [dispatcher-event-loop-8] Controller: Start commitFiles for local-1690000152711-0
|
|
INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 10 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
|
|
```
|
|
|
|
## Start Flink with Celeborn
|
|
|
|
**Important: Only Flink batch jobs are supported for now.**
|
|
|
|
#### Copy Celeborn Client to Flink's lib
|
|
Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x, Flink 1.17.x, Flink 1.18.x, Flink 1.19.x and Flink 1.20.x, copy the corresponding client jar into Flink's
|
|
`lib/` directory:
|
|
```shell
|
|
cp $CELEBORN_HOME/flink/celeborn-client-flink-<flink.version>-shaded_<scala.binary.version>-<celeborn.version>.jar $FLINK_HOME/lib/
|
|
```
|
|
#### Add Celeborn configuration to Flink's conf
|
|
Set `shuffle-service-factory.class` to Celeborn's ShuffleServiceFactory in Flink configuration file:
|
|
|
|
- Flink 1.14.x, Flink 1.15.x, Flink 1.17.x, Flink 1.18.x
|
|
```shell
|
|
cd $FLINK_HOME
|
|
vi conf/flink-conf.yaml
|
|
```
|
|
|
|
- Flink 1.19.x, Flink 1.20.x
|
|
```shell
|
|
cd $FLINK_HOME
|
|
vi conf/config.yaml
|
|
```
|
|
|
|
Choose one of flink integration strategies and add the following configuration:
|
|
|
|
**(Support Flink 1.14 and above versions) Flink Remote Shuffle Service Config**
|
|
```properties
|
|
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
|
|
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
|
|
```
|
|
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`.
|
|
|
|
**(Support Flink 1.20 and above versions) Flink [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) Config**
|
|
```properties
|
|
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
|
|
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
|
|
execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
|
|
jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED
|
|
```
|
|
**Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`.
|
|
|
|
Then deploy the example word count job to the running cluster:
|
|
```shell
|
|
cd $FLINK_HOME
|
|
|
|
./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH
|
|
```
|
|
During the Flink Job, you should see the following message in Celeborn Master's log:
|
|
```log
|
|
Master: Offer slots successfully for 1 reducers of local-1690000152711-0 on 1 workers.
|
|
```
|
|
And the following message in Celeborn Worker's log:
|
|
```log
|
|
INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for local-1690000152711-0
|
|
INFO [dispatcher-event-loop-3] Controller: Start commitFiles for local-1690000152711-0
|
|
INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
|
|
```
|
|
|
|
## Start MapReduce With Celeborn
|
|
### Copy Celeborn Client to MapReduce's classpath
|
|
1. Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`.
|
|
```shell
|
|
cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_<scala.binary.version>-<celeborn.version>.jar <mapreduce.application.classpath>
|
|
cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_<scala.binary.version>-<celeborn.version>.jar <yarn.application.classpath>
|
|
```
|
|
2. Restart your yarn cluster.
|
|
### Add Celeborn configuration to MapReduce's conf
|
|
- Modify configurations in `${HADOOP_CONF_DIR}/yarn-site.xml`.
|
|
```xml
|
|
<configuration>
|
|
<property>
|
|
<name>yarn.app.mapreduce.am.job.recovery.enable</name>
|
|
<value>false</value>
|
|
</property>
|
|
|
|
<property>
|
|
<name>yarn.app.mapreduce.am.command-opts</name>
|
|
<!-- Append 'org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn' to this setting -->
|
|
<value>org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn</value>
|
|
</property>
|
|
</configuration>
|
|
```
|
|
- Modify configurations in `${HADOOP_CONF_DIR}/mapred-site.xml`.
|
|
```xml
|
|
<configuration>
|
|
<property>
|
|
<name>mapreduce.job.reduce.slowstart.completedmaps</name>
|
|
<value>1</value>
|
|
</property>
|
|
<property>
|
|
<name>mapreduce.celeborn.master.endpoints</name>
|
|
<!-- Replace placeholder to the real master address -->
|
|
<value>placeholder</value>
|
|
</property>
|
|
<property>
|
|
<name>mapreduce.job.map.output.collector.class</name>
|
|
<value>org.apache.hadoop.mapred.CelebornMapOutputCollector</value>
|
|
</property>
|
|
<property>
|
|
<name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
|
|
<value>org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer</value>
|
|
</property>
|
|
</configuration>
|
|
```
|
|
**Note**: `MRAppMasterWithCeleborn` supports setting `mapreduce.celeborn.master.endpoints` via environment variable `CELEBORN_MASTER_ENDPOINTS`.
|
|
Meanwhile, `MRAppMasterWithCeleborn` disables `yarn.app.mapreduce.am.job.recovery.enable` and sets `mapreduce.job.reduce.slowstart.completedmaps` to 1 by default.
|
|
|
|
Then deploy the example word count to the running cluster for verifying whether above configurations are correct.
|
|
```shell
|
|
cd $HADOOP_HOME
|
|
|
|
./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar wordcount /someinput /someoutput
|
|
```
|
|
During the MapReduce Job, you should see the following message in Celeborn Master's log:
|
|
```log
|
|
Master: Offer slots successfully for 1 reducers of application_1694674023293_0003-0 on 1 workers.
|
|
```
|
|
And the following message in Celeborn Worker's log:
|
|
```log
|
|
INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for application_1694674023293_0003-0
|
|
INFO [dispatcher-event-loop-3] Controller: Start commitFiles for application_1694674023293_0003-0
|
|
INFO [async-reply] Controller: CommitFiles for application_1694674023293_0003-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
|
|
``` |