### What changes were proposed in this pull request? Design doc - https://docs.google.com/document/d/1YqK0kua-5rMufJw57kEIrHHGbLnAF9iXM5GdDweMzzg/edit?tab=t.0#heading=h.n5ldma432qnd - End to End integrity checks provide additional confidence that Celeborn is producing complete as well as correct data - The checks are hidden behind a client side config that is false by default. Provides users optionality to enable these if required on a per app basis - Only compatible with Spark at the moment - No support for Flink (can be considered in future) - No support for Columnar Shuffle (can be considered in future) Writer - Whenever a mapper completes, it reports crc32 and bytes written on a per partition basis to the driver Driver - Driver aggregates the mapper reports - and computes aggregated CRC32 and bytes written on per partitionID basis Reader - Each CelebornInputStream will report (int shuffleId, int partitionId, int startMapIndex, int endMapIndex, int crc32, long bytes) to driver when it finished reading all data on the stream - On every report - Driver will aggregate the CRC32 and bytesRead for the partitionID - Driver will aggregate mapRange to determine when all sub-paritions have been read for partitionID have been read - It will then compare the aggregated CRC32 and bytes read with the expected CRC32 and bytes written for the partition - There is special handling for skewhandlingwithoutMapRangeSplit scenario as well - In this case, we report the number of sub-partitions and index of the sub-partition instead of startMapIndex and endMapIndex There is separate handling for skew handling with and without map range split As a follow up, I will do another PR that will harden up the checks and perform additional checks to add book keeping that every CelebornInputStream makes the required checks ### Why are the changes needed? https://issues.apache.org/jira/browse/CELEBORN-894 Note: I am putting up this PR even though some tests are failing, since I want to get some early feedback on the code changes. ### Does this PR introduce _any_ user-facing change? Not sure how to answer this. A new client side config is available to enable the checks if required ### How was this patch tested? Unit tests + Integration tests Closes #3261 from gauravkm/gaurav/e2e_checks_v3. Lead-authored-by: Gaurav Mittal <gaurav@stripe.com> Co-authored-by: Gaurav Mittal <gauravkm@gmail.com> Co-authored-by: Fei Wang <cn.feiwang@gmail.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com> |
||
|---|---|---|
| .. | ||
| assets | ||
| configuration | ||
| developers | ||
| celeborn_cli.md | ||
| celeborn_ratis_shell.md | ||
| cluster_planning.md | ||
| decommissioning.md | ||
| deploy_on_k8s.md | ||
| deploy.md | ||
| migration.md | ||
| monitoring.md | ||
| quota_management.md | ||
| README.md | ||
| restapi.md | ||
| security.md | ||
| upgrading.md | ||
| worker_tags.md | ||
| hide | license | |
|---|---|---|
|
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. |
Quick Start
This documentation gives a quick start guide for running Spark/Flink/MapReduce with Apache Celeborn™.
Download Celeborn
Download the latest Celeborn binary from the Downloading Page.
Decompress the binary and set $CELEBORN_HOME.
tar -C <DST_DIR> -zxvf apache-celeborn-<VERSION>-bin.tgz
export CELEBORN_HOME=<Decompressed path>
Configure Logging and Storage
Configure Logging
cd $CELEBORN_HOME/conf
cp log4j2.xml.template log4j2.xml
Configure Storage
Configure the directory to store shuffle data, for example $CELEBORN_HOME/shuffle.
cd $CELEBORN_HOME/conf
echo "celeborn.worker.storage.dirs=$CELEBORN_HOME/shuffle" > celeborn-defaults.conf
Start Celeborn Service
Start Master
cd $CELEBORN_HOME
./sbin/start-master.sh
You should see Master's ip:port in the log:
INFO [main] NettyRpcEnvFactory: Starting RPC Server [Master] on 192.168.2.109:9097 with advertised endpoint 192.168.2.109:9097
Start Worker
Use the Master's IP and Port to start Worker:
cd $CELEBORN_HOME
./sbin/start-worker.sh celeborn://<Master IP>:<Master Port>
You should see the following message in Worker's log:
INFO [main] MasterClient: connect to master 192.168.2.109:9097.
INFO [main] Worker: Register worker successfully.
INFO [main] Worker: Worker started.
And also the following message in Master's log:
INFO [dispatcher-event-loop-9] Master: Registered worker
Host: 192.168.2.109
RpcPort: 57806
PushPort: 57807
FetchPort: 57809
ReplicatePort: 57808
SlotsUsed: 0
LastHeartbeat: 0
HeartbeatElapsedSeconds: xxx
Disks:
DiskInfo0: xxx
UserResourceConsumption: empty
WorkerRef: null
Start Spark with Celeborn
Copy Celeborn Client to Spark's jars
Celeborn release binary contains clients for Spark 2.x and Spark 3.x, copy the corresponding client jar into Spark's
jars/ directory:
cp $CELEBORN_HOME/spark/celeborn-client-spark-<spark.major.version>-shaded_<scala.binary.version>-<celeborn.version>.jar $SPARK_HOME/jars/
Start spark-shell
Set spark.shuffle.manager to Celeborn's ShuffleManager, and turn off spark.shuffle.service.enabled:
cd $SPARK_HOME
./bin/spark-shell \
--conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
--conf spark.shuffle.service.enabled=false
Then run the following test case:
spark.sparkContext
.parallelize(1 to 10, 10)
.flatMap(_ => (1 to 100).iterator.map(num => num))
.repartition(10)
.count
During the Spark Job, you should see the following message in Celeborn Master's log:
Master: Offer slots successfully for 10 reducers of local-1690000152711-0 on 1 workers.
And the following message in Celeborn Worker's log:
INFO [dispatcher-event-loop-9] Controller: Reserved 10 primary location and 0 replica location for local-1690000152711-0
INFO [dispatcher-event-loop-8] Controller: Start commitFiles for local-1690000152711-0
INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 10 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
Start Flink with Celeborn
Important: Only Flink batch jobs are supported for now.
Copy Celeborn Client to Flink's lib
Celeborn release binary contains clients for Flink 1.16.x, Flink 1.17.x, Flink 1.18.x, Flink 1.19.x, Flink 1.20.x and Flink 2.0.x, copy the corresponding client jar into Flink's
lib/ directory:
cp $CELEBORN_HOME/flink/celeborn-client-flink-<flink.version>-shaded_<scala.binary.version>-<celeborn.version>.jar $FLINK_HOME/lib/
Add Celeborn configuration to Flink's conf
Set shuffle-service-factory.class to Celeborn's ShuffleServiceFactory in Flink configuration file:
- Flink 1.16.x, Flink 1.17.x, Flink 1.18.x
cd $FLINK_HOME
vi conf/flink-conf.yaml
- Flink 1.19.x, Flink 1.20.x, Flink 2.0.x
cd $FLINK_HOME
vi conf/config.yaml
Choose one of flink integration strategies and add the following configuration:
(Support Flink 1.16 and above versions) Flink Remote Shuffle Service Config
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
Note: The config option execution.batch-shuffle-mode should configure as ALL_EXCHANGES_BLOCKING.
(Support Flink 1.20 and above versions) Flink hybrid shuffle Config
shuffle-service-factory.class: org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class: org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
jobmanager.partition.hybrid.partition-data-consume-constraint: ALL_PRODUCERS_FINISHED
Note: The config option execution.batch-shuffle-mode should configure as ALL_EXCHANGES_HYBRID_FULL.
Then deploy the example word count job to the running cluster:
cd $FLINK_HOME
./bin/flink run examples/streaming/WordCount.jar --execution-mode BATCH
During the Flink Job, you should see the following message in Celeborn Master's log:
Master: Offer slots successfully for 1 reducers of local-1690000152711-0 on 1 workers.
And the following message in Celeborn Worker's log:
INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for local-1690000152711-0
INFO [dispatcher-event-loop-3] Controller: Start commitFiles for local-1690000152711-0
INFO [async-reply] Controller: CommitFiles for local-1690000152711-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.
Start MapReduce With Celeborn
Copy Celeborn Client to MapReduce's classpath
- Copy
$CELEBORN_HOME/mr/*.jarintomapreduce.application.classpathandyarn.application.classpath.
cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_<scala.binary.version>-<celeborn.version>.jar <mapreduce.application.classpath>
cp $CELEBORN_HOME/mr/celeborn-client-mr-shaded_<scala.binary.version>-<celeborn.version>.jar <yarn.application.classpath>
- Restart your yarn cluster.
Add Celeborn configuration to MapReduce's conf
- Modify configurations in
${HADOOP_CONF_DIR}/yarn-site.xml.
<configuration>
<property>
<name>yarn.app.mapreduce.am.job.recovery.enable</name>
<value>false</value>
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<!-- Append 'org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn' to this setting -->
<value>org.apache.celeborn.mapreduce.v2.app.MRAppMasterWithCeleborn</value>
</property>
</configuration>
- Modify configurations in
${HADOOP_CONF_DIR}/mapred-site.xml.
<configuration>
<property>
<name>mapreduce.job.reduce.slowstart.completedmaps</name>
<value>1</value>
</property>
<property>
<name>mapreduce.celeborn.master.endpoints</name>
<!-- Replace placeholder to the real master address -->
<value>placeholder</value>
</property>
<property>
<name>mapreduce.job.map.output.collector.class</name>
<value>org.apache.hadoop.mapred.CelebornMapOutputCollector</value>
</property>
<property>
<name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
<value>org.apache.hadoop.mapreduce.task.reduce.CelebornShuffleConsumer</value>
</property>
</configuration>
Note: MRAppMasterWithCeleborn supports setting mapreduce.celeborn.master.endpoints via environment variable CELEBORN_MASTER_ENDPOINTS.
Meanwhile, MRAppMasterWithCeleborn disables yarn.app.mapreduce.am.job.recovery.enable and sets mapreduce.job.reduce.slowstart.completedmaps to 1 by default.
Then deploy the example word count to the running cluster for verifying whether above configurations are correct.
cd $HADOOP_HOME
./bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar wordcount /someinput /someoutput
During the MapReduce Job, you should see the following message in Celeborn Master's log:
Master: Offer slots successfully for 1 reducers of application_1694674023293_0003-0 on 1 workers.
And the following message in Celeborn Worker's log:
INFO [dispatcher-event-loop-4] Controller: Reserved 1 primary location and 0 replica location for application_1694674023293_0003-0
INFO [dispatcher-event-loop-3] Controller: Start commitFiles for application_1694674023293_0003-0
INFO [async-reply] Controller: CommitFiles for application_1694674023293_0003-0 success with 1 committed primary partitions, 0 empty primary partitions , 0 failed primary partitions, 0 committed replica partitions, 0 empty replica partitions , 0 failed replica partitions.