[KYUUBI #527] Support test with MiniYARNCluster
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Support test with mini yarn cluster. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request Closes #538 from turboFei/KYUUBI_527_mini_yarn. Closes #527 8bedc707 [fwang12] use eventually 89794bc6 [fwang12] refactor b955b825 [fwang12] refactor code 2f2baacf [fwang12] save 0d6359ea [fwang12] remove scope in parent pom 442c2c42 [fwang12] [KYUUBI #527] Support test with MiniYARNCluster Authored-by: fwang12 <fwang12@ebay.com> Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
parent
c52b953a99
commit
5867893b32
@ -24,4 +24,9 @@ private[kyuubi] object Tests {
|
||||
.version("1.2.0")
|
||||
.booleanConf
|
||||
.createOptional
|
||||
|
||||
val TESTING_HADOOP_CONF_DIR = ConfigBuilder("kyuubi.testing.hadoop.conf.dir")
|
||||
.version("1.3.0")
|
||||
.stringConf
|
||||
.createOptional
|
||||
}
|
||||
|
||||
@ -33,4 +33,10 @@ object KyuubiHadoopUtils {
|
||||
def getServerPrincipal(principal: String): String = {
|
||||
SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
|
||||
}
|
||||
|
||||
def toSparkPrefixedConf(hadoopConf: Map[String, String]): Map[String, String] = {
|
||||
hadoopConf.map { case (key, value) =>
|
||||
"spark.hadoop." + key -> value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -144,6 +144,12 @@
|
||||
<artifactId>delta-core_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client-minicluster</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@ -27,16 +27,26 @@ import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.kyuubi._
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
|
||||
import org.apache.kyuubi.config.internal.Tests
|
||||
import org.apache.kyuubi.engine.ProcBuilder
|
||||
|
||||
class SparkProcessBuilder(
|
||||
override val proxyUser: String,
|
||||
override val conf: KyuubiConf,
|
||||
override val env: Map[String, String] = sys.env)
|
||||
envMap: Map[String, String] = sys.env)
|
||||
extends ProcBuilder with Logging {
|
||||
|
||||
import SparkProcessBuilder._
|
||||
|
||||
override protected def env: Map[String, String] = {
|
||||
val testingHadoopConfDir = conf.getOption(Tests.TESTING_HADOOP_CONF_DIR.key)
|
||||
if (Utils.isTesting && testingHadoopConfDir.isDefined) {
|
||||
envMap ++ Map("HADOOP_CONF_DIR" -> testingHadoopConfDir.get)
|
||||
} else {
|
||||
envMap
|
||||
}
|
||||
}
|
||||
|
||||
override protected val executable: String = {
|
||||
val sparkHomeOpt = env.get("SPARK_HOME").orElse {
|
||||
val kyuubiPattern = "/kyuubi/"
|
||||
|
||||
@ -51,7 +51,7 @@ trait WithKyuubiServer extends KyuubiFunSuite {
|
||||
conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
|
||||
conf.set(HA_ZK_ACL_ENABLED, false)
|
||||
|
||||
conf.set(ENGINE_INIT_TIMEOUT, 120000L)
|
||||
conf.setIfMissing(ENGINE_INIT_TIMEOUT, 120000L)
|
||||
server = KyuubiServer.startServer(conf)
|
||||
super.beforeAll()
|
||||
Thread.sleep(1500)
|
||||
|
||||
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi
|
||||
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.internal.Tests.TESTING_HADOOP_CONF_DIR
|
||||
import org.apache.kyuubi.server.MiniYarnService
|
||||
import org.apache.kyuubi.util.KyuubiHadoopUtils
|
||||
|
||||
trait WithKyuubiServerWithMiniYarnService extends WithKyuubiServer {
|
||||
protected val kyuubiServerConf: KyuubiConf
|
||||
protected val connectionConf: Map[String, String]
|
||||
private var miniYarnService: MiniYarnService = _
|
||||
|
||||
override protected lazy final val conf: KyuubiConf = {
|
||||
connectionConf.foreach { case (k, v) =>
|
||||
kyuubiServerConf.set(k, v)
|
||||
}
|
||||
kyuubiServerConf
|
||||
}
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
miniYarnService = new MiniYarnService()
|
||||
miniYarnService.initialize(new KyuubiConf(false))
|
||||
miniYarnService.start()
|
||||
|
||||
KyuubiHadoopUtils.toSparkPrefixedConf(miniYarnService.getHadoopConf()).foreach { case (k, v) =>
|
||||
conf.set(k, v)
|
||||
}
|
||||
conf.set(TESTING_HADOOP_CONF_DIR, miniYarnService.getHadoopConfDir())
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
if (miniYarnService != null) {
|
||||
miniYarnService.stop()
|
||||
miniYarnService = null
|
||||
}
|
||||
super.afterAll()
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.operation
|
||||
|
||||
import org.apache.kyuubi.WithKyuubiServerWithMiniYarnService
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf.ENGINE_INIT_TIMEOUT
|
||||
|
||||
class KyuubiOperationYarnClusterSuite extends WithKyuubiServerWithMiniYarnService
|
||||
with JDBCTestUtils {
|
||||
|
||||
override protected val kyuubiServerConf: KyuubiConf = {
|
||||
KyuubiConf().set(ENGINE_INIT_TIMEOUT, 300000L)
|
||||
}
|
||||
|
||||
override protected val connectionConf: Map[String, String] = Map(
|
||||
"spark.master" -> "yarn",
|
||||
"spark.executor.instances" -> "1"
|
||||
)
|
||||
|
||||
test("KYUUBI #527- Support test with mini yarn cluster") {
|
||||
withJdbcStatement() { statement =>
|
||||
val resultSet = statement.executeQuery("""SELECT "${spark.app.id}" as id""")
|
||||
assert(resultSet.next())
|
||||
assert(resultSet.getString("id").startsWith("application_"))
|
||||
}
|
||||
}
|
||||
|
||||
override protected def jdbcUrl: String = getJdbcUrl
|
||||
}
|
||||
@ -0,0 +1,102 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.server
|
||||
|
||||
import java.io.File
|
||||
import java.net.InetAddress
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster
|
||||
import org.scalatest.concurrent.Eventually._
|
||||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.service.AbstractService
|
||||
|
||||
class MiniYarnService(name: String) extends AbstractService(name) {
|
||||
def this() = this(classOf[MiniYarnService].getSimpleName)
|
||||
|
||||
private var hadoopConfDir: File = _
|
||||
private var yarnConf: YarnConfiguration = _
|
||||
private var yarnCluster: MiniYARNCluster = _
|
||||
|
||||
private def newYarnConfig(): YarnConfiguration = {
|
||||
val yarnConfig = new YarnConfiguration()
|
||||
// Disable the disk utilization check to avoid the test hanging when people's disks are
|
||||
// getting full.
|
||||
yarnConfig.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
|
||||
"100.0")
|
||||
|
||||
// capacity-scheduler.xml is missing in hadoop-client-minicluster so this is a workaround
|
||||
yarnConfig.set("yarn.scheduler.capacity.root.queues", "default")
|
||||
yarnConfig.setInt("yarn.scheduler.capacity.root.default.capacity", 100)
|
||||
yarnConfig.setFloat("yarn.scheduler.capacity.root.default.user-limit-factor", 1)
|
||||
yarnConfig.setInt("yarn.scheduler.capacity.root.default.maximum-capacity", 100)
|
||||
yarnConfig.set("yarn.scheduler.capacity.root.default.state", "RUNNING")
|
||||
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_submit_applications", "*")
|
||||
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*")
|
||||
yarnConfig.setInt("yarn.scheduler.capacity.node-locality-delay", -1)
|
||||
|
||||
// Set bind host to localhost to avoid java.net.BindException
|
||||
yarnConfig.set("yarn.resourcemanager.bind-host", "localhost")
|
||||
|
||||
// enable proxy
|
||||
val currentUser = UserGroupInformation.getCurrentUser.getShortUserName
|
||||
yarnConfig.set(s"hadoop.proxyuser.$currentUser.groups", "*")
|
||||
yarnConfig.set(s"hadoop.proxyuser.$currentUser.hosts", "*")
|
||||
yarnConfig
|
||||
}
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
hadoopConfDir = Utils.createTempDir().toFile
|
||||
yarnConf = newYarnConfig()
|
||||
yarnCluster = new MiniYARNCluster(name, 1, 1, 1)
|
||||
yarnCluster.init(yarnConf)
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
override def start(): Unit = {
|
||||
yarnCluster.start()
|
||||
val config = yarnCluster.getConfig()
|
||||
eventually(timeout(10.seconds), interval(100.milliseconds)) {
|
||||
config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0"
|
||||
}
|
||||
info(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
|
||||
super.start()
|
||||
}
|
||||
|
||||
override def stop(): Unit = {
|
||||
if (yarnCluster != null) yarnCluster.stop()
|
||||
super.stop()
|
||||
}
|
||||
|
||||
def getHadoopConf(): Map[String, String] = {
|
||||
val hostName = InetAddress.getLocalHost.getHostName
|
||||
yarnCluster.getConfig.iterator().asScala.map { kv =>
|
||||
kv.getKey -> kv.getValue.replaceAll(hostName, "localhost")
|
||||
}.toMap
|
||||
}
|
||||
|
||||
def getHadoopConfDir(): String = {
|
||||
hadoopConfDir.getAbsolutePath
|
||||
}
|
||||
}
|
||||
6
pom.xml
6
pom.xml
@ -1082,6 +1082,12 @@
|
||||
<version>${ldapsdk.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client-minicluster</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user