[CELEBORN-638] Migrate configurations celeborn.ha.master.* to celeborn.master.ha.*

### What changes were proposed in this pull request?

It was discussed during the last meeting, but abandoned due to the complication.

### Why are the changes needed?

Make the configuration unified.

### Does this PR introduce _any_ user-facing change?

Yes, but the legacy configurations still take effect.

### How was this patch tested?

New UTs.

Closes #1549 from pan3793/CELEBORN-638.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
Cheng Pan 2023-06-16 18:18:26 +08:00 committed by zky.zhoukeyong
parent 859d021eeb
commit e22379c3ab
15 changed files with 254 additions and 118 deletions

View File

@ -10,7 +10,7 @@ This project uses check-style plugins. Run some checks before you create a new p
If you have changed configuration, run following command to refresh docs.
```shell
UPDATE=1 build/mvn clean test -pl common -am -Pspark-3.3 -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite
UPDATE=1 build/mvn clean test -pl common -am -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite
```
## How to Contribute

View File

@ -125,19 +125,19 @@ EXAMPLE: HA cluster
celeborn.master.endpoints clb-1:9097,clb-2:9097,clb-3:9097
# used by master nodes to bootstrap, every node should know the topology of whole cluster, for each node,
# `celeborn.ha.master.node.id` should be unique, and `celeborn.ha.master.node.<id>.host` is required.
celeborn.ha.enabled true
celeborn.ha.master.node.id 1
celeborn.ha.master.node.1.host clb-1
celeborn.ha.master.node.1.port 9097
celeborn.ha.master.node.1.ratis.port 9872
celeborn.ha.master.node.2.host clb-2
celeborn.ha.master.node.2.port 9097
celeborn.ha.master.node.2.ratis.port 9872
celeborn.ha.master.node.3.host clb-3
celeborn.ha.master.node.3.port 9097
celeborn.ha.master.node.3.ratis.port 9872
celeborn.ha.master.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
# `celeborn.master.ha.node.id` should be unique, and `celeborn.master.ha.node.<id>.host` is required.
celeborn.master.ha.enabled true
celeborn.master.ha.node.id 1
celeborn.master.ha.node.1.host clb-1
celeborn.master.ha.node.1.port 9097
celeborn.master.ha.node.1.ratis.port 9872
celeborn.master.ha.node.2.host clb-2
celeborn.master.ha.node.2.port 9097
celeborn.master.ha.node.2.ratis.port 9872
celeborn.master.ha.node.3.host clb-3
celeborn.master.ha.node.3.port 9097
celeborn.master.ha.node.3.ratis.port 9872
celeborn.master.ha.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
celeborn.metrics.enabled true
# If you want to use HDFS as shuffle storage, make sure that flush buffer size is at least 4MB or larger.
@ -299,6 +299,6 @@ because ratis meta will store expired states of the last running cluster.
Here are some instructions:
1. Stop all workers.
2. Stop all masters.
3. Clean all master's ratis meta storage directory(celeborn.ha.master.ratis.raft.server.storage.dir).
3. Clean all master's ratis meta storage directory(celeborn.master.ha.ratis.raft.server.storage.dir).
4. Start all masters.
5. Start all workers.

View File

@ -31,10 +31,10 @@ data:
{{- $namespace := .Release.Namespace }}
celeborn.master.endpoints={{ range until (.Values.masterReplicas |int) }}celeborn-master-{{ . }}.celeborn-master-svc.{{ $namespace }}.svc.{{ $.Values.cluster.name }}.local,{{ end }}
{{- range until (.Values.masterReplicas |int) }}
celeborn.ha.master.node.{{ . }}.host=celeborn-master-{{ . }}.celeborn-master-svc.{{ $namespace }}.svc.{{ $.Values.cluster.name }}.local
celeborn.master.ha.node.{{ . }}.host=celeborn-master-{{ . }}.celeborn-master-svc.{{ $namespace }}.svc.{{ $.Values.cluster.name }}.local
{{- end }}
{{- $dirs := .Values.volumes.master }}
celeborn.ha.master.ratis.raft.server.storage.dir={{ (index $dirs 0).mountPath }}
celeborn.master.ha.ratis.raft.server.storage.dir={{ (index $dirs 0).mountPath }}
{{- $path := "" }}
{{- range $worker := .Values.volumes.worker }}
{{- if eq $path "" }}

View File

@ -73,7 +73,7 @@ volumes:
# celeborn configurations
celeborn:
celeborn.ha.enabled: true
celeborn.master.ha.enabled: true
celeborn.metrics.enabled: true
celeborn.master.metrics.prometheus.port: 9098
celeborn.worker.metrics.prometheus.port: 9096

View File

@ -544,32 +544,44 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
}
val nodeConfPrefix = extractPrefix(HA_MASTER_NODE_HOST.key, "<id>")
getAllWithPrefix(nodeConfPrefix)
val nodeIds = getAllWithPrefix(nodeConfPrefix)
.map(_._1)
.filterNot(_.equals("id"))
.map(k => extractPrefix(k, "."))
.distinct
// CELEBORN-638: compatible with `celeborn.ha.master.*`, expect to remove before 0.4.0
val legacyNodeConfPrefix = extractPrefix(HA_MASTER_NODE_HOST.alternatives.head._1, "<id>")
val legacyNodeIds = getAllWithPrefix(legacyNodeConfPrefix)
.map(_._1)
.filterNot(_.equals("id"))
.map(k => extractPrefix(k, "."))
.distinct
(nodeIds ++ legacyNodeIds).distinct
}
def haMasterNodeHost(nodeId: String): String = {
val key = HA_MASTER_NODE_HOST.key.replace("<id>", nodeId)
get(key, Utils.localHostName)
val legacyKey = HA_MASTER_NODE_HOST.alternatives.head._1.replace("<id>", nodeId)
get(key, get(legacyKey, Utils.localHostName))
}
def haMasterNodePort(nodeId: String): Int = {
val key = HA_MASTER_NODE_PORT.key.replace("<id>", nodeId)
getInt(key, HA_MASTER_NODE_PORT.defaultValue.get)
val legacyKey = HA_MASTER_NODE_PORT.alternatives.head._1.replace("<id>", nodeId)
getInt(key, getInt(legacyKey, HA_MASTER_NODE_PORT.defaultValue.get))
}
def haMasterRatisHost(nodeId: String): String = {
val key = HA_MASTER_NODE_RATIS_HOST.key.replace("<id>", nodeId)
val fallbackKey = HA_MASTER_NODE_HOST.key.replace("<id>", nodeId)
get(key, get(fallbackKey))
val legacyKey = HA_MASTER_NODE_RATIS_HOST.alternatives.head._1.replace("<id>", nodeId)
get(key, get(legacyKey, haMasterNodeHost(nodeId)))
}
def haMasterRatisPort(nodeId: String): Int = {
val key = HA_MASTER_NODE_RATIS_PORT.key.replace("<id>", nodeId)
getInt(key, HA_MASTER_NODE_RATIS_PORT.defaultValue.get)
val legacyKey = HA_MASTER_NODE_RATIS_PORT.alternatives.head._1.replace("<id>", nodeId)
getInt(key, getInt(legacyKey, HA_MASTER_NODE_RATIS_PORT.defaultValue.get))
}
def haMasterRatisRpcType: String = get(HA_MASTER_RATIS_RPC_TYPE)
@ -1499,156 +1511,175 @@ object CelebornConf extends Logging {
.createWithDefault(9097)
val HA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.ha.enabled")
buildConf("celeborn.master.ha.enabled")
.withAlternative("celeborn.ha.enabled")
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.doc("When true, master nodes run as Raft cluster mode.")
.booleanConf
.createWithDefault(false)
val HA_MASTER_NODE_ID: OptionalConfigEntry[String] =
buildConf("celeborn.ha.master.node.id")
buildConf("celeborn.master.ha.node.id")
.withAlternative("celeborn.ha.master.node.id")
.doc("Node id for master raft cluster in HA mode, if not define, " +
"will be inferred by hostname.")
.version("0.2.0")
.version("0.3.0")
.stringConf
.createOptional
val HA_MASTER_NODE_HOST: ConfigEntry[String] =
buildConf("celeborn.ha.master.node.<id>.host")
buildConf("celeborn.master.ha.node.<id>.host")
.withAlternative("celeborn.ha.master.node.<id>.host")
.categories("ha")
.doc("Host to bind of master node <id> in HA mode.")
.version("0.2.0")
.version("0.3.0")
.stringConf
.createWithDefaultString("<required>")
val HA_MASTER_NODE_PORT: ConfigEntry[Int] =
buildConf("celeborn.ha.master.node.<id>.port")
buildConf("celeborn.master.ha.node.<id>.port")
.withAlternative("celeborn.ha.master.node.<id>.port")
.categories("ha")
.doc("Port to bind of master node <id> in HA mode.")
.version("0.2.0")
.version("0.3.0")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "invalid port")
.createWithDefault(9097)
val HA_MASTER_NODE_RATIS_HOST: OptionalConfigEntry[String] =
buildConf("celeborn.ha.master.node.<id>.ratis.host")
buildConf("celeborn.master.ha.node.<id>.ratis.host")
.withAlternative("celeborn.ha.master.node.<id>.ratis.host")
.internal
.categories("ha")
.doc("Ratis host to bind of master node <id> in HA mode. If not provided, " +
s"fallback to ${HA_MASTER_NODE_HOST.key}.")
.version("0.2.0")
.version("0.3.0")
.stringConf
.createOptional
val HA_MASTER_NODE_RATIS_PORT: ConfigEntry[Int] =
buildConf("celeborn.ha.master.node.<id>.ratis.port")
buildConf("celeborn.master.ha.node.<id>.ratis.port")
.withAlternative("celeborn.ha.master.node.<id>.ratis.port")
.categories("ha")
.doc("Ratis port to bind of master node <id> in HA mode.")
.version("0.2.0")
.version("0.3.0")
.intConf
.checkValue(p => p >= 1024 && p < 65535, "invalid port")
.createWithDefault(9872)
val HA_MASTER_RATIS_RPC_TYPE: ConfigEntry[String] =
buildConf("celeborn.ha.master.ratis.raft.rpc.type")
buildConf("celeborn.master.ha.ratis.raft.rpc.type")
.withAlternative("celeborn.ha.master.ratis.raft.rpc.type")
.categories("ha")
.doc("RPC type for Ratis, available options: netty, grpc.")
.version("0.2.0")
.version("0.3.0")
.stringConf
.transform(_.toLowerCase)
.checkValues(Set("netty", "grpc"))
.createWithDefault("netty")
val HA_MASTER_RATIS_STORAGE_DIR: ConfigEntry[String] =
buildConf("celeborn.ha.master.ratis.raft.server.storage.dir")
buildConf("celeborn.master.ha.ratis.raft.server.storage.dir")
.withAlternative("celeborn.ha.master.ratis.raft.server.storage.dir")
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.stringConf
.createWithDefault("/tmp/ratis")
val HA_MASTER_RATIS_LOG_SEGMENT_SIZE_MAX: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.log.segment.size.max")
buildConf("celeborn.master.ha.ratis.raft.server.log.segment.size.max")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.segment.size.max")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4MB")
val HA_MASTER_RATIS_LOG_PREALLOCATED_SIZE: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.log.preallocated.size")
buildConf("celeborn.master.ha.ratis.raft.server.log.preallocated.size")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.preallocated.size")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4MB")
val HA_MASTER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS: ConfigEntry[Int] =
buildConf("celeborn.ha.master.ratis.raft.server.log.appender.buffer.element-limit")
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.buffer.element-limit")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.buffer.element-limit")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.intConf
.createWithDefault(1024)
val HA_MASTER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.log.appender.buffer.byte-limit")
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.buffer.byte-limit")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.buffer.byte-limit")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("32MB")
val HA_MASTER_RATIS_LOG_INSTABLL_SNAPSHOT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.ha.master.ratis.raft.server.log.appender.install.snapshot.enabled")
buildConf("celeborn.master.ha.ratis.raft.server.log.appender.install.snapshot.enabled")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.appender.install.snapshot.enabled")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.booleanConf
.createWithDefault(true)
val HA_MASTER_RATIS_LOG_PURGE_GAP: ConfigEntry[Int] =
buildConf("celeborn.ha.master.ratis.raft.server.log.purge.gap")
buildConf("celeborn.master.ha.ratis.raft.server.log.purge.gap")
.withAlternative("celeborn.ha.master.ratis.raft.server.log.purge.gap")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.intConf
.createWithDefault(1000000)
val HA_MASTER_RATIS_RPC_REQUEST_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.rpc.request.timeout")
buildConf("celeborn.master.ha.ratis.raft.server.rpc.request.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.request.timeout")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("3s")
val HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.retrycache.expirytime")
buildConf("celeborn.master.ha.ratis.raft.server.retrycache.expirytime")
.withAlternative("celeborn.ha.master.ratis.raft.server.retrycache.expirytime")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("600s")
val HA_MASTER_RATIS_RPC_TIMEOUT_MIN: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.rpc.timeout.min")
buildConf("celeborn.master.ha.ratis.raft.server.rpc.timeout.min")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.timeout.min")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("3s")
val HA_MASTER_RATIS_RPC_TIMEOUT_MAX: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.rpc.timeout.max")
buildConf("celeborn.master.ha.ratis.raft.server.rpc.timeout.max")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.timeout.max")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")
val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.first.election.timeout.min")
buildConf("celeborn.master.ha.ratis.first.election.timeout.min")
.withAlternative("celeborn.ha.master.ratis.first.election.timeout.min")
.internal
.categories("ha")
.version("0.3.0")
@ -1656,7 +1687,8 @@ object CelebornConf extends Logging {
.createWithDefaultString("3s")
val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.first.election.timeout.max")
buildConf("celeborn.master.ha.ratis.first.election.timeout.max")
.withAlternative("celeborn.ha.master.ratis.first.election.timeout.max")
.internal
.categories("ha")
.version("0.3.0")
@ -1664,50 +1696,56 @@ object CelebornConf extends Logging {
.createWithDefaultString("5s")
val HA_MASTER_RATIS_NOTIFICATION_NO_LEADER_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.notification.no-leader.timeout")
buildConf("celeborn.master.ha.ratis.raft.server.notification.no-leader.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.notification.no-leader.timeout")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("30s")
val HA_MASTER_RATIS_RPC_SLOWNESS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.rpc.slowness.timeout")
buildConf("celeborn.master.ha.ratis.raft.server.rpc.slowness.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.rpc.slowness.timeout")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("120s")
val HA_MASTER_RATIS_ROLE_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.role.check.interval")
buildConf("celeborn.master.ha.ratis.raft.server.role.check.interval")
.withAlternative("celeborn.ha.master.ratis.raft.server.role.check.interval")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
val HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.enabled")
buildConf("celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.enabled")
.withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.enabled")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.booleanConf
.createWithDefault(true)
val HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.threshold")
buildConf("celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.threshold")
.withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.threshold")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.longConf
.createWithDefault(200000L)
val HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM: ConfigEntry[Int] =
buildConf("celeborn.ha.master.ratis.raft.server.snapshot.retention.file.num")
buildConf("celeborn.master.ha.ratis.raft.server.snapshot.retention.file.num")
.withAlternative("celeborn.ha.master.ratis.raft.server.snapshot.retention.file.num")
.internal
.categories("ha")
.version("0.2.0")
.version("0.3.0")
.intConf
.createWithDefault(3)

View File

@ -39,8 +39,8 @@ The followings are best practices of naming configs for some common cases:
1. When adding configs for a big feature, it's better to create an umbrella config that
can turn the feature on/off, with a name like `featureName.enabled`. The other configs
of this feature should be put under the `featureName` namespace. For example:
- celeborn.ha.enabled
- celeborn.ha.client.maxRetries
- celeborn.master.ha.enabled
- celeborn.master.ha.client.maxRetries
2. When adding a boolean config, the name should be a verb that describes what
happens if this config is set to true, e.g. `celeborn.worker.closeIdleConnections`.
3. When adding a config to specify a time duration, it's better to put the time unit

View File

@ -36,12 +36,12 @@ import org.apache.celeborn.common.util.Utils
*
* To run the entire test suite:
* {{{
* build/mvn clean test -pl common -am -Pspark-3.3 -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite
* build/mvn clean test -pl common -am -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite
* }}}
*
* To re-generate golden files for entire suite, run:
* {{{
* UPDATE=1 build/mvn clean test -pl common -am -Pspark-3.3 -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite
* UPDATE=1 build/mvn clean test -pl common -am -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite
* }}}
*/
class ConfigurationSuite extends AnyFunSuite {
@ -163,7 +163,7 @@ class ConfigurationSuite extends AnyFunSuite {
} finally writer.close()
} else {
val expected = Files.readAllLines(goldenFile).asScala
val updateCmd = "UPDATE=1 build/mvn clean test -pl common -am -Pspark-3.3 " +
val updateCmd = "UPDATE=1 build/mvn clean test -pl common -am " +
"-Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite"
val hint =

View File

@ -154,10 +154,10 @@ class CelebornConfSuite extends CelebornFunSuite {
test("extract masterNodeIds") {
val conf = new CelebornConf()
.set("celeborn.ha.master.node.id", "1")
.set("celeborn.ha.master.node.1.host", "clb-1")
.set("celeborn.ha.master.node.2.host", "clb-1")
.set("celeborn.ha.master.node.3.host", "clb-1")
.set("celeborn.master.ha.node.id", "1")
.set("celeborn.master.ha.node.1.host", "clb-1")
.set("celeborn.master.ha.node.2.host", "clb-1")
.set("celeborn.master.ha.node.3.host", "clb-1")
assert(conf.haMasterNodeIds.sorted === Array("1", "2", "3"))
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common
import org.apache.celeborn.CelebornFunSuite
class HAConfSuite extends CelebornFunSuite {
private def verifyConf(conf: CelebornConf): Unit = {
assert(conf.haMasterNodeIds.sorted === Array("1", "2", "3"))
assert(conf.haMasterNodeHost("1") === "clb-1")
assert(conf.haMasterNodeHost("2") === "clb-2")
assert(conf.haMasterNodeHost("3") === "clb-3")
assert(conf.haMasterNodePort("1") === 10000)
assert(conf.haMasterNodePort("2") === 20000)
assert(conf.haMasterNodePort("3") === 30000)
assert(conf.haMasterRatisHost("1") === "clb-1")
assert(conf.haMasterRatisHost("2") === "clb-2")
assert(conf.haMasterRatisHost("3") === "clb-3")
assert(conf.haMasterRatisPort("1") === 11111)
assert(conf.haMasterRatisPort("2") === 22222)
assert(conf.haMasterRatisPort("3") === 33333)
}
val OLD_HA_PREFIX = "celeborn.ha.master"
val NEW_HA_PREFIX = "celeborn.master.ha"
test("extract masterNodeIds - old") {
val conf = new CelebornConf()
.set(s"$OLD_HA_PREFIX.node.id", "1")
.set(s"$OLD_HA_PREFIX.node.1.host", "clb-1")
.set(s"$OLD_HA_PREFIX.node.1.port", "10000")
.set(s"$OLD_HA_PREFIX.node.1.ratis.port", "11111")
.set(s"$OLD_HA_PREFIX.node.2.host", "clb-2")
.set(s"$OLD_HA_PREFIX.node.2.port", "20000")
.set(s"$OLD_HA_PREFIX.node.2.ratis.port", "22222")
.set(s"$OLD_HA_PREFIX.node.3.host", "clb-3")
.set(s"$OLD_HA_PREFIX.node.3.port", "30000")
.set(s"$OLD_HA_PREFIX.node.3.ratis.port", "33333")
verifyConf(conf)
}
test("extract masterNodeIds - new") {
val conf = new CelebornConf()
.set(s"$NEW_HA_PREFIX.node.id", "1")
.set(s"$NEW_HA_PREFIX.node.1.host", "clb-1")
.set(s"$NEW_HA_PREFIX.node.1.port", "10000")
.set(s"$NEW_HA_PREFIX.node.1.ratis.port", "11111")
.set(s"$NEW_HA_PREFIX.node.2.host", "clb-2")
.set(s"$NEW_HA_PREFIX.node.2.port", "20000")
.set(s"$NEW_HA_PREFIX.node.2.ratis.port", "22222")
.set(s"$NEW_HA_PREFIX.node.3.host", "clb-3")
.set(s"$NEW_HA_PREFIX.node.3.port", "30000")
.set(s"$NEW_HA_PREFIX.node.3.ratis.port", "33333")
verifyConf(conf)
}
test("extract masterNodeIds - mix") {
val conf = new CelebornConf()
.set(s"$NEW_HA_PREFIX.node.id", "1")
.set(s"$OLD_HA_PREFIX.node.id", "invalid")
.set(s"$NEW_HA_PREFIX.node.1.host", "clb-1")
.set(s"$NEW_HA_PREFIX.node.1.port", "10000")
.set(s"$NEW_HA_PREFIX.node.1.ratis.port", "11111")
.set(s"$NEW_HA_PREFIX.node.2.host", "clb-2")
.set(s"$NEW_HA_PREFIX.node.2.port", "20000")
.set(s"$NEW_HA_PREFIX.node.2.ratis.port", "22222")
.set(s"$OLD_HA_PREFIX.node.2.host", "invalid")
.set(s"$OLD_HA_PREFIX.node.2.port", "44444")
.set(s"$OLD_HA_PREFIX.node.2.ratis.port", "44444")
.set(s"$OLD_HA_PREFIX.node.3.host", "clb-3")
.set(s"$OLD_HA_PREFIX.node.3.port", "30000")
.set(s"$OLD_HA_PREFIX.node.3.ratis.port", "33333")
verifyConf(conf)
}
}

View File

@ -32,16 +32,16 @@ celeborn.worker.storage.dirs /mnt/disk1,/mnt/disk2,/mnt/disk
celeborn.master.endpoints clb-1:9097,clb-2:9098,clb-3:9099
celeborn.ha.enabled true
celeborn.ha.master.node.1.host clb-1
celeborn.ha.master.node.1.port 9097
celeborn.ha.master.node.1.ratis.port 9872
celeborn.ha.master.node.2.host clb-2
celeborn.ha.master.node.2.port 9098
celeborn.ha.master.node.2.ratis.port 9873
celeborn.ha.master.node.3.host clb-3
celeborn.ha.master.node.3.port 9099
celeborn.ha.master.node.3.ratis.port 9874
celeborn.ha.master.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.enabled true
celeborn.ha.master.ratis.raft.server.snapshot.auto.trigger.threshold 200000
celeborn.master.ha.enabled true
celeborn.master.ha.node.1.host clb-1
celeborn.master.ha.node.1.port 9097
celeborn.master.ha.node.1.ratis.port 9872
celeborn.master.ha.node.2.host clb-2
celeborn.master.ha.node.2.port 9098
celeborn.master.ha.node.2.ratis.port 9873
celeborn.master.ha.node.3.host clb-3
celeborn.master.ha.node.3.port 9099
celeborn.master.ha.node.3.ratis.port 9874
celeborn.master.ha.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.enabled true
celeborn.master.ha.ratis.raft.server.snapshot.auto.trigger.threshold 200000

View File

@ -73,7 +73,7 @@ $ celeborn-ratis sh -D<property=value> ...
**Note:**
Celeborn HA uses `NETTY` as the default RPC type, for details please refer to configuration `celeborn.ha.master.ratis.raft.rpc.type`. But Ratis uses `GRPC` as the default RPC type. So if the user wants to use Ratis shell to access Ratis cluster which uses `NETTY` RPC type, the generic option `-Draft.rpc.type=NETTY` should be set to change the RPC type of Ratis shell to Netty.
Celeborn HA uses `NETTY` as the default RPC type, for details please refer to configuration `celeborn.master.ha.ratis.raft.rpc.type`. But Ratis uses `GRPC` as the default RPC type. So if the user wants to use Ratis shell to access Ratis cluster which uses `NETTY` RPC type, the generic option `-Draft.rpc.type=NETTY` should be set to change the RPC type of Ratis shell to Netty.
## election
The `election` command manages leader election.

View File

@ -19,10 +19,10 @@ license: |
<!--begin-include-->
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
| celeborn.ha.enabled | false | When true, master nodes run as Raft cluster mode. | 0.2.0 |
| celeborn.ha.master.node.&lt;id&gt;.host | &lt;required&gt; | Host to bind of master node <id> in HA mode. | 0.2.0 |
| celeborn.ha.master.node.&lt;id&gt;.port | 9097 | Port to bind of master node <id> in HA mode. | 0.2.0 |
| celeborn.ha.master.node.&lt;id&gt;.ratis.port | 9872 | Ratis port to bind of master node <id> in HA mode. | 0.2.0 |
| celeborn.ha.master.ratis.raft.rpc.type | netty | RPC type for Ratis, available options: netty, grpc. | 0.2.0 |
| celeborn.ha.master.ratis.raft.server.storage.dir | /tmp/ratis | | 0.2.0 |
| celeborn.master.ha.enabled | false | When true, master nodes run as Raft cluster mode. | 0.3.0 |
| celeborn.master.ha.node.&lt;id&gt;.host | &lt;required&gt; | Host to bind of master node <id> in HA mode. | 0.3.0 |
| celeborn.master.ha.node.&lt;id&gt;.port | 9097 | Port to bind of master node <id> in HA mode. | 0.3.0 |
| celeborn.master.ha.node.&lt;id&gt;.ratis.port | 9872 | Ratis port to bind of master node <id> in HA mode. | 0.3.0 |
| celeborn.master.ha.ratis.raft.rpc.type | netty | RPC type for Ratis, available options: netty, grpc. | 0.3.0 |
| celeborn.master.ha.ratis.raft.server.storage.dir | /tmp/ratis | | 0.3.0 |
<!--end-include-->

View File

@ -51,24 +51,24 @@ EXAMPLE: HA cluster
celeborn.master.endpoints clb-1:9097,clb-2:9097,clb-3:9097
# used by master nodes to bootstrap, every node should know the topology of whole cluster, for each node,
# `celeborn.ha.master.node.id` should be unique, and `celeborn.ha.master.node.<id>.host` is required.
celeborn.ha.enabled true
celeborn.ha.master.node.id 1
celeborn.ha.master.node.1.host clb-1
celeborn.ha.master.node.1.port 9097
celeborn.ha.master.node.1.ratis.port 9872
celeborn.ha.master.node.2.host clb-2
celeborn.ha.master.node.2.port 9097
celeborn.ha.master.node.2.ratis.port 9872
celeborn.ha.master.node.3.host clb-3
celeborn.ha.master.node.3.port 9097
celeborn.ha.master.node.3.ratis.port 9872
celeborn.ha.master.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
# `celeborn.master.ha.node.id` should be unique, and `celeborn.master.ha.node.<id>.host` is required.
celeborn.master.ha.enabled true
celeborn.master.ha.node.id 1
celeborn.master.ha.node.1.host clb-1
celeborn.master.ha.node.1.port 9097
celeborn.master.ha.node.1.ratis.port 9872
celeborn.master.ha.node.2.host clb-2
celeborn.master.ha.node.2.port 9097
celeborn.master.ha.node.2.ratis.port 9872
celeborn.master.ha.node.3.host clb-3
celeborn.master.ha.node.3.port 9097
celeborn.master.ha.node.3.ratis.port 9872
celeborn.master.ha.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
celeborn.metrics.enabled true
# If you want to use HDFS as shuffle storage, make sure that flush buffer size is at least 4MB or larger.
celeborn.worker.flusher.buffer.size 256k
# Disk type is HDD by defaut.
# Disk type is HDD by default.
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD
# If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false
celeborn.worker.monitor.disk.enabled false

View File

@ -27,3 +27,6 @@ license: |
- From 0.3.0 on the default value for `celeborn.worker.storage.workingDir` is changed from `hadoop/rss-worker/shuffle_data` to `rss-worker/shuffle_data`,
users who want to use origin working dir path should set this configuration.
- Since 0.3.0, configuration namespace `celeborn.ha.master` is deprecated, and will be removed in the future versions.
All configurations `celeborn.ha.master.*` should migrate to `celeborn.master.ha.*`.

View File

@ -53,7 +53,7 @@ volumes:
# celeborn configurations
celeborn:
celeborn.ha.enabled: false
celeborn.master.ha.enabled: false
celeborn.metrics.enabled: false
celeborn.master.metrics.prometheus.port: 9098
celeborn.worker.metrics.prometheus.port: 9096