celeborn/docs
Fu Chen 349ee8b1cb Revert "[CELEBORN-255] Add counter of outstandingFetches, outstanding…
…Rpcs and outstandingPushes to metrics"

This reverts commit bfa341c32f.

### What changes were proposed in this pull request?

### Why are the changes needed?

https://github.com/apache/incubator-celeborn/pull/1992#issuecomment-1776760369

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #2032 from cfmcgrady/revert-pr-1992.

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
2023-10-24 17:18:54 +08:00
..
assets [CELEBORN-905] Redraw the flowchart backpressure.svg after worker pause logic is reconstructed 2023-08-24 11:51:01 +08:00
configuration [CELEBORN-1046] Add an expiration time configuration for app directory to clean up 2023-10-17 19:23:49 +08:00
developers [CELEBORN-987][FOLLOWUP][DOC] README#Build and sbt#System Requirements should extend to Scala 2.13 and Spark 3.5 2023-10-14 09:54:22 +08:00
celeborn_ratis_shell.md [CELEBORN-877][DOC] Document on SBT 2023-08-11 12:17:55 +08:00
cluster_planning.md [CELEBORN-877][DOC] Document on SBT 2023-08-11 12:17:55 +08:00
deploy_on_k8s.md [CELEBORN-877][DOC] Document on SBT 2023-08-11 12:17:55 +08:00
deploy.md [CELEBORN-1010] Update docs about spark.shuffle.service.enabled 2023-10-08 09:15:42 +08:00
migration.md [CELEBORN-1021] Celeborn support arbitary Ratis configs and client rpc timeout 2023-10-18 10:26:11 +08:00
monitoring.md Revert "[CELEBORN-255] Add counter of outstandingFetches, outstanding… 2023-10-24 17:18:54 +08:00
README.md [MINOR] Remove unexpected $ symbol 2023-09-27 19:51:26 +08:00
upgrading.md [CELEBORN-877][DOC] Document on SBT 2023-08-11 12:17:55 +08:00

hide license
navigation
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Quick Start

This documentation gives a quick start guide for running Apache Spark/Flink with Apache Celeborn(Incubating).

Download Celeborn

Download the latest Celeborn binary from the Downloading Page. Decompress the binary and set $CELEBORN_HOME

tar -C <DST_DIR> -zxvf apache-celeborn-<VERSION>-bin.tgz
export CELEBORN_HOME=<Decompressed path>

Configure Logging and Storage

Configure Logging

cd $CELEBORN_HOME/conf
cp log4j2.xml.template log4j2.xml

Configure Storage

Configure the directory to store shuffle data, for example $CELEBORN_HOME/shuffle

cd $CELEBORN_HOME/conf
echo "celeborn.worker.storage.dirs=$CELEBORN_HOME/shuffle" > celeborn-defaults.conf

Start Celeborn Service

Start Master

cd $CELEBORN_HOME
./sbin/start-master.sh

You should see Master's ip:port in the log:

INFO [main] NettyRpcEnvFactory: Starting RPC Server [MasterSys] on 192.168.2.109:9097 with advisor endpoint 192.168.2.109:9097

Start Worker

Use the Master's IP and Port to start Worker:

cd $CELEBORN_HOME
./sbin/start-worker.sh celeborn://<Master IP>:<Master Port>

You should see the following message in Worker's log:

INFO [main] MasterClient: connect to master 192.168.2.109:9097.
INFO [main] Worker: Register worker successfully.
INFO [main] Worker: Worker started.

And also the following message in Master's log:

INFO [dispatcher-event-loop-9] Master: Registered worker
Host: 192.168.2.109
RpcPort: 57806
PushPort: 57807
FetchPort: 57809
ReplicatePort: 57808
SlotsUsed: 0
LastHeartbeat: 0
HeartbeatElapsedSeconds: xxx
Disks:
  DiskInfo0: xxx
UserResourceConsumption: empty
WorkerRef: null

Start Spark with Celeborn

Copy Celeborn Client to Spark's jars

Celeborn release binary contains clients for Spark 2.x and Spark 3.x, copy the corresponding client jar into Spark's jars/ directory:

cp $CELEBORN_HOME/spark/<Celeborn Client Jar> $SPARK_HOME/jars/

Start spark-shell

Set spark.shuffle.manager to Celeborn's ShuffleManager, and turn off spark.shuffle.service.enabled:

cd $SPARK_HOME

./bin/spark-shell \
--conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
--conf spark.shuffle.service.enabled=false

Then run the following test case:

spark.sparkContext
  .parallelize(1 to 10, 10)
  .flatMap(_ => (1 to 100).iterator.map(num => num))
  .repartition(10)
  .count

During the Spark Job, you should see the following message in Celeborn Master's log:

Master: Offer slots successfully for 10 reducers of local-1690000152711-0 on 1 workers.

And the following message in Celeborn Worker's log:

INFO [dispatcher-event-loop-9] Controller: Reserved 10 primary location and 0 replica location for local-1690000152711-0
INFO [dispatcher-event-loop-8] Controller: Start commitFiles for local-1690000152711-0
INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 10 committed primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions, 0 failed replica partitions.

Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x and Flink 1.17.x, copy the corresponding client jar into Flink's lib/ directory:

cp $CELEBORN_HOME/flink/<Celeborn Client Jar> $FLINK_HOME/lib/

Set shuffle-service-factory.class to Celeborn's ShuffleServiceFactory in Flink configuration file:

cd $FLINK_HOME
vi conf/flink-conf.yaml
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory

Then deploy the example word count job to the running cluster:

cd $FLINK_HOME

./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH

During the Flink Job, you should see the following message in Celeborn Master's log:

Master: Offer slots successfully for 1 reducers of local-1690000152711-0 on 1 workers.

And the following message in Celeborn Worker's log:

INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for local-1690000152711-0
INFO [dispatcher-event-loop-3] Controller: Start commitFiles for local-1690000152711-0
INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 1 committed primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions, 0 failed replica partitions.

Start MapReduce With Celeborn

Add Celeborn client jar to MapReduce's classpath

1.Add $CELEBORN_HOME/mr/*.jar to mapreduce.application.classpath and yarn.application.classpath. 2.Restart your yarn cluster.

Add Celeborn configurations to MapReduce's conf

Modify ${HADOOP_CONF_DIR}/yarn-site.xml

<configuration>
    <property>
        <name>yarn.app.mapreduce.am.job.recovery.enable</name>
        <value>false</value>
    </property>

    <property>
        <name>yarn.app.mapreduce.am.command-opts</name>
        <!-- Append 'org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn' to this setting  -->
        <value>org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn</value>
    </property>
</configuration>

Modify ${HADOOP_CONF_DIR}/mapred-site.xml

<configuration>
    <property>
        <name>mapreduce.job.reduce.slowstart.completedmaps</name>
        <value>1</value>
    </property>
    <property>
        <name>mapreduce.celeborn.master.endpoints</name>
        <!-- Replace placeholder to the real master address       -->
        <value>placeholder</value>
    </property>
    <property>
        <name>mapreduce.job.map.output.collector.class</name>
        <value>org.apache.hadoop.mapred.CelebornMapOutputCollector</value>
    </property>
    <property>
        <name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
        <value>org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer</value>
    </property>
</configuration>

Then you can run a word count to check whether your configs are correct.

cd $HADOOP_HOME
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar wordcount /sometext /someoutput

During the MapReduce Job, you should see the following message in Celeborn Master's log:

Master: Offer slots successfully for 1 reducers of application_1694674023293_0003-0 on 1 workers.

And the following message in Celeborn Worker's log:

INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for application_1694674023293_0003-0
INFO [dispatcher-event-loop-3] Controller: Start commitFiles for application_1694674023293_0003-0
INFO [async-reply] Controller: CommitFiles for application_1694674023293_0003-0 success with 1 committed primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions, 0 failed replica partitions.