[CELEBORN-445] Add CelebornRackResolver to support rack reoslve (#1366)
This commit is contained in:
parent
87267a6493
commit
f16c7b414e
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.celeborn.common.network
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import com.google.common.base.Strings
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
|
||||
import org.apache.hadoop.net.{CachedDNSToSwitchMapping, DNSToSwitchMapping, NetworkTopology, Node, NodeBase, ScriptBasedMapping}
|
||||
import org.apache.hadoop.util.ReflectionUtils
|
||||
|
||||
import org.apache.celeborn.common.CelebornConf
|
||||
import org.apache.celeborn.common.internal.Logging
|
||||
import org.apache.celeborn.common.util.CelebornHadoopUtils
|
||||
|
||||
class CelebornRackResolver(celebornConf: CelebornConf) extends Logging {
|
||||
|
||||
private val dnsToSwitchMapping: DNSToSwitchMapping = {
|
||||
val conf: Configuration = CelebornHadoopUtils.newConfiguration(celebornConf)
|
||||
val dnsToSwitchMappingClass =
|
||||
conf.getClass(
|
||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
classOf[ScriptBasedMapping],
|
||||
classOf[DNSToSwitchMapping])
|
||||
ReflectionUtils.newInstance(dnsToSwitchMappingClass, conf)
|
||||
.asInstanceOf[DNSToSwitchMapping] match {
|
||||
case c: CachedDNSToSwitchMapping => c
|
||||
case o => new CachedDNSToSwitchMapping(o)
|
||||
}
|
||||
}
|
||||
|
||||
def resolve(hostName: String): String = {
|
||||
coreResolve(Seq(hostName)).head.getNetworkLocation
|
||||
}
|
||||
|
||||
def resolve(hostNames: Seq[String]): Seq[Node] = {
|
||||
coreResolve(hostNames)
|
||||
}
|
||||
|
||||
private def coreResolve(hostNames: Seq[String]): Seq[Node] = {
|
||||
if (hostNames.isEmpty) {
|
||||
return Seq.empty
|
||||
}
|
||||
val nodes = new ArrayBuffer[Node]
|
||||
// dnsToSwitchMapping is thread-safe
|
||||
val rNameList = dnsToSwitchMapping.resolve(hostNames.toList.asJava).asScala
|
||||
if (rNameList == null || rNameList.isEmpty) {
|
||||
hostNames.foreach(nodes += new NodeBase(_, NetworkTopology.DEFAULT_RACK))
|
||||
logInfo(s"Got an error when resolving hostNames. " +
|
||||
s"Falling back to ${NetworkTopology.DEFAULT_RACK} for all")
|
||||
} else {
|
||||
for ((hostName, rName) <- hostNames.zip(rNameList)) {
|
||||
if (Strings.isNullOrEmpty(rName)) {
|
||||
nodes += new NodeBase(hostName, NetworkTopology.DEFAULT_RACK)
|
||||
logDebug(s"Could not resolve $hostName. " +
|
||||
s"Falling back to ${NetworkTopology.DEFAULT_RACK}")
|
||||
} else {
|
||||
nodes += new NodeBase(hostName, rName)
|
||||
}
|
||||
}
|
||||
}
|
||||
nodes.toList
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.celeborn.common.util
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
|
||||
import org.apache.celeborn.common.CelebornConf
|
||||
|
||||
object CelebornHadoopUtils {
|
||||
private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = {
|
||||
val hadoopConf = new Configuration()
|
||||
appendSparkHadoopConfigs(conf, hadoopConf)
|
||||
hadoopConf
|
||||
}
|
||||
|
||||
private def appendSparkHadoopConfigs(conf: CelebornConf, hadoopConf: Configuration): Unit = {
|
||||
// Copy any "celeborn.hadoop.foo=bar" celeborn properties into conf as "foo=bar"
|
||||
for ((key, value) <- conf.getAll if key.startsWith("celeborn.hadoop.")) {
|
||||
hadoopConf.set(key.substring("celeborn.hadoop.".length), value)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.celeborn.common.network
|
||||
|
||||
import java.io.File
|
||||
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic.{NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY}
|
||||
import org.apache.hadoop.net.{Node, TableMapping}
|
||||
import org.apache.hadoop.shaded.com.google.common.base.Charsets
|
||||
import org.apache.hadoop.shaded.com.google.common.io.Files
|
||||
import org.junit.Assert.assertEquals
|
||||
|
||||
import org.apache.celeborn.CelebornFunSuite
|
||||
import org.apache.celeborn.common.CelebornConf
|
||||
|
||||
class CelebornRackResolverSuite extends CelebornFunSuite {
|
||||
|
||||
test("Test TableMapping") {
|
||||
val hostName1 = "1.2.3.4"
|
||||
val hostName2 = "5.6.7.8"
|
||||
val mapFile: File = File.createTempFile(getClass.getSimpleName + ".testResolve", ".txt")
|
||||
Files.asCharSink(mapFile, Charsets.UTF_8).write(
|
||||
hostName1 + " /rack1\n" + hostName2 + "\t/rack2\n")
|
||||
mapFile.deleteOnExit()
|
||||
|
||||
val conf = new CelebornConf
|
||||
conf.set(
|
||||
"celeborn.hadoop." + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
classOf[TableMapping].getName)
|
||||
conf.set("celeborn.hadoop." + NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath)
|
||||
val resolver = new CelebornRackResolver(conf)
|
||||
|
||||
val names = Seq(hostName1, hostName2)
|
||||
|
||||
val result: Seq[Node] = resolver.resolve(names)
|
||||
assertEquals(names.size, result.size)
|
||||
assertEquals("/rack1", result(0).getNetworkLocation)
|
||||
assertEquals("/rack2", result(1).getNetworkLocation)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user