--- hide: - navigation license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --- Quick Start === This documentation gives a quick start guide for running Spark/Flink/MapReduce with Apache Celeborn™. ### Download Celeborn Download the latest Celeborn binary from the [Downloading Page](https://celeborn.apache.org/download/). Decompress the binary and set `$CELEBORN_HOME`. ```shell tar -C -zxvf apache-celeborn--bin.tgz export CELEBORN_HOME= ``` ## Configure Logging and Storage #### Configure Logging ```shell cd $CELEBORN_HOME/conf cp log4j2.xml.template log4j2.xml ``` #### Configure Storage Configure the directory to store shuffle data, for example `$CELEBORN_HOME/shuffle`. ```shell cd $CELEBORN_HOME/conf echo "celeborn.worker.storage.dirs=$CELEBORN_HOME/shuffle" > celeborn-defaults.conf ``` ## Start Celeborn Service #### Start Master ```shell cd $CELEBORN_HOME ./sbin/start-master.sh ``` You should see `Master`'s ip:port in the log: ```log INFO [main] NettyRpcEnvFactory: Starting RPC Server [Master] on 192.168.2.109:9097 with advertised endpoint 192.168.2.109:9097 ``` #### Start Worker Use the Master's IP and Port to start Worker: ```shell cd $CELEBORN_HOME ./sbin/start-worker.sh celeborn://: ``` You should see the following message in Worker's log: ```log INFO [main] MasterClient: connect to master 192.168.2.109:9097. INFO [main] Worker: Register worker successfully. INFO [main] Worker: Worker started. ``` And also the following message in Master's log: ```log INFO [dispatcher-event-loop-9] Master: Registered worker Host: 192.168.2.109 RpcPort: 57806 PushPort: 57807 FetchPort: 57809 ReplicatePort: 57808 SlotsUsed: 0 LastHeartbeat: 0 HeartbeatElapsedSeconds: xxx Disks: DiskInfo0: xxx UserResourceConsumption: empty WorkerRef: null ``` ## Start Spark with Celeborn #### Copy Celeborn Client to Spark's jars Celeborn release binary contains clients for Spark 2.x and Spark 3.x, copy the corresponding client jar into Spark's `jars/` directory: ```shell cp $CELEBORN_HOME/spark/celeborn-client-spark--shaded_-.jar $SPARK_HOME/jars/ ``` #### Start spark-shell Set `spark.shuffle.manager` to Celeborn's ShuffleManager, and turn off `spark.shuffle.service.enabled`: ```shell cd $SPARK_HOME ./bin/spark-shell \ --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \ --conf spark.shuffle.service.enabled=false ``` Then run the following test case: ```scala spark.sparkContext .parallelize(1 to 10, 10) .flatMap(_ => (1 to 100).iterator.map(num => num)) .repartition(10) .count ``` During the Spark Job, you should see the following message in Celeborn Master's log: ```log Master: Offer slots successfully for 10 reducers of local-1690000152711-0 on 1 workers. ``` And the following message in Celeborn Worker's log: ```log INFO [dispatcher-event-loop-9] Controller: Reserved 10 primary location and 0 replica location for local-1690000152711-0 INFO [dispatcher-event-loop-8] Controller: Start commitFiles for local-1690000152711-0 INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 10 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions. ``` ## Start Flink with Celeborn **Important: Only Flink batch jobs are supported for now.** #### Copy Celeborn Client to Flink's lib Celeborn release binary contains clients for Flink 1.16.x, Flink 1.17.x, Flink 1.18.x, Flink 1.19.x, Flink 1.20.x, Flink 2.0.x and Flink 2.1.x, copy the corresponding client jar into Flink's `lib/` directory: ```shell cp $CELEBORN_HOME/flink/celeborn-client-flink--shaded_-.jar $FLINK_HOME/lib/ ``` #### Add Celeborn configuration to Flink's conf Set `shuffle-service-factory.class` to Celeborn's ShuffleServiceFactory in Flink configuration file: - Flink 1.16.x, Flink 1.17.x, Flink 1.18.x ```shell cd $FLINK_HOME vi conf/flink-conf.yaml ``` - Flink 1.19.x, Flink 1.20.x, Flink 2.0.x, Flink 2.1.x ```shell cd $FLINK_HOME vi conf/config.yaml ``` Choose one of flink integration strategies and add the following configuration: **(Support Flink 1.16 and above versions) Flink Remote Shuffle Service Config** ```properties shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING ``` **Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_BLOCKING`. **(Support Flink 1.20 and above versions) Flink [hybrid shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle) Config** ```properties shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED ``` **Note**: The config option `execution.batch-shuffle-mode` should configure as `ALL_EXCHANGES_HYBRID_FULL`. Then deploy the example word count job to the running cluster: ```shell cd $FLINK_HOME ./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH ``` During the Flink Job, you should see the following message in Celeborn Master's log: ```log Master: Offer slots successfully for 1 reducers of local-1690000152711-0 on 1 workers. ``` And the following message in Celeborn Worker's log: ```log INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for local-1690000152711-0 INFO [dispatcher-event-loop-3] Controller: Start commitFiles for local-1690000152711-0 INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions. ``` ## Start MapReduce With Celeborn ### Copy Celeborn Client to MapReduce's classpath 1. Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and `yarn.application.classpath`. ```shell cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_-.jar cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_-.jar ``` 2. Restart your yarn cluster. ### Add Celeborn configuration to MapReduce's conf - Modify configurations in `${HADOOP_CONF_DIR}/yarn-site.xml`. ```xml yarn.app.mapreduce.am.job.recovery.enable false yarn.app.mapreduce.am.command-opts org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn ``` - Modify configurations in `${HADOOP_CONF_DIR}/mapred-site.xml`. ```xml mapreduce.job.reduce.slowstart.completedmaps 1 mapreduce.celeborn.master.endpoints placeholder mapreduce.job.map.output.collector.class org.apache.hadoop.mapred.CelebornMapOutputCollector mapreduce.job.reduce.shuffle.consumer.plugin.class org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer ``` **Note**: `MRAppMasterWithCeleborn` supports setting `mapreduce.celeborn.master.endpoints` via environment variable `CELEBORN_MASTER_ENDPOINTS`. Meanwhile, `MRAppMasterWithCeleborn` disables `yarn.app.mapreduce.am.job.recovery.enable` and sets `mapreduce.job.reduce.slowstart.completedmaps` to 1 by default. Then deploy the example word count to the running cluster for verifying whether above configurations are correct. ```shell cd $HADOOP_HOME ./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar wordcount /someinput /someoutput ``` During the MapReduce Job, you should see the following message in Celeborn Master's log: ```log Master: Offer slots successfully for 1 reducers of application_1694674023293_0003-0 on 1 workers. ``` And the following message in Celeborn Worker's log: ```log INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for application_1694674023293_0003-0 INFO [dispatcher-event-loop-3] Controller: Start commitFiles for application_1694674023293_0003-0 INFO [async-reply] Controller: CommitFiles for application_1694674023293_0003-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions. ```