### What changes were proposed in this pull request? 1. Celeborn supports storage type selection. HDD, SSD, and HDFS are available for now. 2. Add new buffer size for HDFS file writers. 3. Worker support empty working dirs. ### Why are the changes needed? Support HDFS only scenario. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT and cluster. Closes #1619 from FMX/CELEBORN-568. Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com> Co-authored-by: Ethan Feng <fengmingxiao.fmx@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
211 lines
9.3 KiB
Markdown
211 lines
9.3 KiB
Markdown
---
|
|
license: |
|
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
contributor license agreements. See the NOTICE file distributed with
|
|
this work for additional information regarding copyright ownership.
|
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
(the "License"); you may not use this file except in compliance with
|
|
the License. You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
---
|
|
|
|
# Deploy Celeborn
|
|
|
|
1. Unzip the tarball to `$CELEBORN_HOME`
|
|
2. Modify environment variables in `$CELEBORN_HOME/conf/celeborn-env.sh`
|
|
|
|
EXAMPLE:
|
|
```properties
|
|
#!/usr/bin/env bash
|
|
CELEBORN_MASTER_MEMORY=4g
|
|
CELEBORN_WORKER_MEMORY=2g
|
|
CELEBORN_WORKER_OFFHEAP_MEMORY=4g
|
|
```
|
|
3. Modify configurations in `$CELEBORN_HOME/conf/celeborn-defaults.conf`
|
|
|
|
EXAMPLE: single master cluster
|
|
```properties
|
|
# used by client and worker to connect to master
|
|
celeborn.master.endpoints clb-master:9097
|
|
|
|
# used by master to bootstrap
|
|
celeborn.master.host clb-master
|
|
celeborn.master.port 9097
|
|
|
|
celeborn.metrics.enabled true
|
|
celeborn.worker.flusher.buffer.size 256k
|
|
|
|
# If Celeborn workers have local disks and HDFS. Following configs should be added.
|
|
# If Celeborn workers have local disks, use following config.
|
|
# Disk type is HDD by defaut.
|
|
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD
|
|
|
|
# If Celeborn workers don't have local disks. You can use HDFS.
|
|
# Do not set `celeborn.worker.storage.dirs` and use following configs.
|
|
celeborn.storage.activeTypes HDFS
|
|
celeborn.worker.sortPartition.threads 64
|
|
celeborn.worker.commitFiles.timeout 240s
|
|
celeborn.worker.commitFiles.threads 128
|
|
celeborn.master.slot.assign.policy roundrobin
|
|
celeborn.rpc.askTimeout 240s
|
|
celeborn.worker.flusher.hdfs.buffer.size 4m
|
|
celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
|
|
celeborn.worker.replicate.fastFail.duration 240s
|
|
|
|
# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false
|
|
celeborn.worker.monitor.disk.enabled false
|
|
```
|
|
|
|
EXAMPLE: HA cluster
|
|
```properties
|
|
# used by client and worker to connect to master
|
|
celeborn.master.endpoints clb-1:9097,clb-2:9097,clb-3:9097
|
|
|
|
# used by master nodes to bootstrap, every node should know the topology of whole cluster, for each node,
|
|
# `celeborn.master.ha.node.id` should be unique, and `celeborn.master.ha.node.<id>.host` is required.
|
|
celeborn.master.ha.enabled true
|
|
celeborn.master.ha.node.id 1
|
|
celeborn.master.ha.node.1.host clb-1
|
|
celeborn.master.ha.node.1.port 9097
|
|
celeborn.master.ha.node.1.ratis.port 9872
|
|
celeborn.master.ha.node.2.host clb-2
|
|
celeborn.master.ha.node.2.port 9097
|
|
celeborn.master.ha.node.2.ratis.port 9872
|
|
celeborn.master.ha.node.3.host clb-3
|
|
celeborn.master.ha.node.3.port 9097
|
|
celeborn.master.ha.node.3.ratis.port 9872
|
|
celeborn.master.ha.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
|
|
|
|
celeborn.metrics.enabled true
|
|
# If you want to use HDFS as shuffle storage, make sure that flush buffer size is at least 4MB or larger.
|
|
celeborn.worker.flusher.buffer.size 256k
|
|
|
|
# If Celeborn workers have local disks and HDFS. Following configs should be added.
|
|
# If Celeborn workers have local disks, use following config.
|
|
# Disk type is HDD by default.
|
|
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD
|
|
|
|
# If Celeborn workers don't have local disks. You can use HDFS.
|
|
# Do not set `celeborn.worker.storage.dirs` and use following configs.
|
|
celeborn.storage.activeTypes HDFS
|
|
celeborn.worker.sortPartition.threads 64
|
|
celeborn.worker.commitFiles.timeout 240s
|
|
celeborn.worker.commitFiles.threads 128
|
|
celeborn.master.slot.assign.policy roundrobin
|
|
celeborn.rpc.askTimeout 240s
|
|
celeborn.worker.flusher.hdfs.buffer.size 4m
|
|
celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
|
|
celeborn.worker.replicate.fastFail.duration 240s
|
|
|
|
# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false
|
|
celeborn.worker.monitor.disk.enabled false
|
|
```
|
|
|
|
Flink engine related configurations:
|
|
```properties
|
|
# if you are using Celeborn for flink, these settings will be needed
|
|
celeborn.worker.directMemoryRatioForReadBuffer 0.4
|
|
celeborn.worker.directMemoryRatioToResume 0.5
|
|
# these setting will affect performance.
|
|
# If there is enough off-heap memory you can try to increase read buffers.
|
|
# Read buffer max memory usage for a data partition is `taskmanager.memory.segment-size * readBuffersMax`
|
|
celeborn.worker.partition.initial.readBuffersMin 512
|
|
celeborn.worker.partition.initial.readBuffersMax 1024
|
|
celeborn.worker.readBuffer.allocationWait 10ms
|
|
```
|
|
|
|
4. Copy Celeborn and configurations to all nodes
|
|
5. Start all services. If you install Celeborn distribution in same path on every node and your
|
|
cluster can perform SSH login then you can fill `$CELEBORN_HOME/conf/hosts` and
|
|
use `$CELEBORN_HOME/sbin/start-all.sh` to start all
|
|
services. If the installation paths are not identical, you will need to start service manually.
|
|
Start Celeborn master
|
|
`$CELEBORN_HOME/sbin/start-master.sh`
|
|
Start Celeborn worker
|
|
`$CELEBORN_HOME/sbin/start-worker.sh`
|
|
6. If Celeborn start success, the output of Master's log should be like this:
|
|
```
|
|
22/10/08 19:29:11,805 INFO [main] Dispatcher: Dispatcher numThreads: 64
|
|
22/10/08 19:29:11,875 INFO [main] TransportClientFactory: mode NIO threads 64
|
|
22/10/08 19:29:12,057 INFO [main] Utils: Successfully started service 'MasterSys' on port 9097.
|
|
22/10/08 19:29:12,113 INFO [main] Master: Metrics system enabled.
|
|
22/10/08 19:29:12,125 INFO [main] HttpServer: master: HttpServer started on port 9098.
|
|
22/10/08 19:29:12,126 INFO [main] Master: Master started.
|
|
22/10/08 19:29:57,842 INFO [dispatcher-event-loop-19] Master: Registered worker
|
|
Host: 192.168.15.140
|
|
RpcPort: 37359
|
|
PushPort: 38303
|
|
FetchPort: 37569
|
|
ReplicatePort: 37093
|
|
SlotsUsed: 0()
|
|
LastHeartbeat: 0
|
|
Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace: 448284381184, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , /mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /mnt/disk3, usableSpace: 450755608576, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , /mnt/disk2=DiskInfo(maxSlots: 6713, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /mnt/disk2, usableSpace: 450532900864, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , /mnt/disk4=DiskInfo(maxSlots: 6712, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /mnt/disk4, usableSpace: 450456805376, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs }
|
|
WorkerRef: null
|
|
```
|
|
|
|
## Deploy Spark client
|
|
Copy $CELEBORN_HOME/spark/*.jar to $SPARK_HOME/jars/
|
|
|
|
### Spark Configuration
|
|
To use Celeborn, following spark configurations should be added.
|
|
```properties
|
|
spark.shuffle.manager org.apache.spark.shuffle.celeborn.RssShuffleManager
|
|
# must use kryo serializer because java serializer do not support relocation
|
|
spark.serializer org.apache.spark.serializer.KryoSerializer
|
|
|
|
# celeborn master
|
|
spark.celeborn.master.endpoints clb-1:9097,clb-2:9097,clb-3:9097
|
|
spark.shuffle.service.enabled false
|
|
|
|
# options: hash, sort
|
|
# Hash shuffle writer use (partition count) * (celeborn.client.push.buffer.max.size) * (spark.executor.cores) memory.
|
|
# Sort shuffle writer use less memory than hash shuffle writer, if your shuffle partition count is large, try to use sort hash writer.
|
|
spark.celeborn.client.spark.shuffle.writer hash
|
|
|
|
# we recommend set spark.celeborn.client.push.replicate.enabled to true to enable server-side data replication
|
|
# If you have only one worker, this setting must be false
|
|
# If your Celeborn is using HDFS, it's recommended to set this setting to false
|
|
spark.celeborn.client.push.replicate.enabled true
|
|
|
|
# Support for Spark AQE only tested under Spark 3
|
|
# we recommend set localShuffleReader to false to get better performance of Celeborn
|
|
spark.sql.adaptive.localShuffleReader.enabled false
|
|
|
|
# If Celeborn is using HDFS
|
|
spark.celeborn.worker.storage.hdfs.dir hdfs://<namenode>/celeborn
|
|
|
|
# we recommend enabling aqe support to gain better performance
|
|
spark.sql.adaptive.enabled true
|
|
spark.sql.adaptive.skewJoin.enabled true
|
|
```
|
|
|
|
## Deploy Flink client
|
|
Copy $CELEBORN_HOME/flink/*.jar to $FLINK_HOME/lib/
|
|
|
|
### Flink Configuration
|
|
TO use Celeborn, following flink configurations should be added.
|
|
```properties
|
|
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
|
|
celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097
|
|
|
|
celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
|
|
celeborn.client.push.maxReqsInFlight: 128
|
|
|
|
# network connections between peers
|
|
celeborn.data.io.numConnectionsPerPeer: 16
|
|
# threads number may vary according to your cluster but do not set to 1
|
|
celeborn.data.io.threads: 32
|
|
celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
|
|
celeborn.rpc.dispatcher.numThreads: 32
|
|
|
|
# floating buffers may need to change `taskmanager.network.memory.fraction` and `taskmanager.network.memory.max`
|
|
taskmanager.network.memory.floating-buffers-per-gate: 4096
|
|
taskmanager.network.memory.buffers-per-channel: 0
|
|
taskmanager.memory.task.off-heap.size: 512m
|
|
```
|