diff --git a/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala b/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala new file mode 100644 index 000000000..2af1e730d --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.network + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.google.common.base.Strings +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.CommonConfigurationKeysPublic +import org.apache.hadoop.net.{CachedDNSToSwitchMapping, DNSToSwitchMapping, NetworkTopology, Node, NodeBase, ScriptBasedMapping} +import org.apache.hadoop.util.ReflectionUtils + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.util.CelebornHadoopUtils + +class CelebornRackResolver(celebornConf: CelebornConf) extends Logging { + + private val dnsToSwitchMapping: DNSToSwitchMapping = { + val conf: Configuration = CelebornHadoopUtils.newConfiguration(celebornConf) + val dnsToSwitchMappingClass = + conf.getClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + classOf[ScriptBasedMapping], + classOf[DNSToSwitchMapping]) + ReflectionUtils.newInstance(dnsToSwitchMappingClass, conf) + .asInstanceOf[DNSToSwitchMapping] match { + case c: CachedDNSToSwitchMapping => c + case o => new CachedDNSToSwitchMapping(o) + } + } + + def resolve(hostName: String): String = { + coreResolve(Seq(hostName)).head.getNetworkLocation + } + + def resolve(hostNames: Seq[String]): Seq[Node] = { + coreResolve(hostNames) + } + + private def coreResolve(hostNames: Seq[String]): Seq[Node] = { + if (hostNames.isEmpty) { + return Seq.empty + } + val nodes = new ArrayBuffer[Node] + // dnsToSwitchMapping is thread-safe + val rNameList = dnsToSwitchMapping.resolve(hostNames.toList.asJava).asScala + if (rNameList == null || rNameList.isEmpty) { + hostNames.foreach(nodes += new NodeBase(_, NetworkTopology.DEFAULT_RACK)) + logInfo(s"Got an error when resolving hostNames. " + + s"Falling back to ${NetworkTopology.DEFAULT_RACK} for all") + } else { + for ((hostName, rName) <- hostNames.zip(rNameList)) { + if (Strings.isNullOrEmpty(rName)) { + nodes += new NodeBase(hostName, NetworkTopology.DEFAULT_RACK) + logDebug(s"Could not resolve $hostName. " + + s"Falling back to ${NetworkTopology.DEFAULT_RACK}") + } else { + nodes += new NodeBase(hostName, rName) + } + } + } + nodes.toList + } +} diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala new file mode 100644 index 000000000..39b70ff18 --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.util + +import org.apache.hadoop.conf.Configuration + +import org.apache.celeborn.common.CelebornConf + +object CelebornHadoopUtils { + private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = { + val hadoopConf = new Configuration() + appendSparkHadoopConfigs(conf, hadoopConf) + hadoopConf + } + + private def appendSparkHadoopConfigs(conf: CelebornConf, hadoopConf: Configuration): Unit = { + // Copy any "celeborn.hadoop.foo=bar" celeborn properties into conf as "foo=bar" + for ((key, value) <- conf.getAll if key.startsWith("celeborn.hadoop.")) { + hadoopConf.set(key.substring("celeborn.hadoop.".length), value) + } + } +} diff --git a/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala b/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala new file mode 100644 index 000000000..268f3b75e --- /dev/null +++ b/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.network + +import java.io.File + +import org.apache.hadoop.fs.CommonConfigurationKeysPublic.{NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY} +import org.apache.hadoop.net.{Node, TableMapping} +import org.apache.hadoop.shaded.com.google.common.base.Charsets +import org.apache.hadoop.shaded.com.google.common.io.Files +import org.junit.Assert.assertEquals + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.CelebornConf + +class CelebornRackResolverSuite extends CelebornFunSuite { + + test("Test TableMapping") { + val hostName1 = "1.2.3.4" + val hostName2 = "5.6.7.8" + val mapFile: File = File.createTempFile(getClass.getSimpleName + ".testResolve", ".txt") + Files.asCharSink(mapFile, Charsets.UTF_8).write( + hostName1 + " /rack1\n" + hostName2 + "\t/rack2\n") + mapFile.deleteOnExit() + + val conf = new CelebornConf + conf.set( + "celeborn.hadoop." + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + classOf[TableMapping].getName) + conf.set("celeborn.hadoop." + NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath) + val resolver = new CelebornRackResolver(conf) + + val names = Seq(hostName1, hostName2) + + val result: Seq[Node] = resolver.resolve(names) + assertEquals(names.size, result.size) + assertEquals("/rack1", result(0).getNetworkLocation) + assertEquals("/rack2", result(1).getNetworkLocation) + } +}