diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/internal/Tests.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/internal/Tests.scala index 13089a642..8b8101e34 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/internal/Tests.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/internal/Tests.scala @@ -24,4 +24,9 @@ private[kyuubi] object Tests { .version("1.2.0") .booleanConf .createOptional + + val TESTING_HADOOP_CONF_DIR = ConfigBuilder("kyuubi.testing.hadoop.conf.dir") + .version("1.3.0") + .stringConf + .createOptional } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala index e6a009b93..736bad5b8 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala @@ -33,4 +33,10 @@ object KyuubiHadoopUtils { def getServerPrincipal(principal: String): String = { SecurityUtil.getServerPrincipal(principal, "0.0.0.0") } + + def toSparkPrefixedConf(hadoopConf: Map[String, String]): Map[String, String] = { + hadoopConf.map { case (key, value) => + "spark.hadoop." + key -> value + } + } } diff --git a/kyuubi-main/pom.xml b/kyuubi-main/pom.xml index 5f8b80d4e..49bc3de00 100644 --- a/kyuubi-main/pom.xml +++ b/kyuubi-main/pom.xml @@ -144,6 +144,12 @@ delta-core_${scala.binary.version} test + + + org.apache.hadoop + hadoop-client-minicluster + test + diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 1440b21eb..0f44068b8 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -27,16 +27,26 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE +import org.apache.kyuubi.config.internal.Tests import org.apache.kyuubi.engine.ProcBuilder class SparkProcessBuilder( override val proxyUser: String, override val conf: KyuubiConf, - override val env: Map[String, String] = sys.env) + envMap: Map[String, String] = sys.env) extends ProcBuilder with Logging { import SparkProcessBuilder._ + override protected def env: Map[String, String] = { + val testingHadoopConfDir = conf.getOption(Tests.TESTING_HADOOP_CONF_DIR.key) + if (Utils.isTesting && testingHadoopConfDir.isDefined) { + envMap ++ Map("HADOOP_CONF_DIR" -> testingHadoopConfDir.get) + } else { + envMap + } + } + override protected val executable: String = { val sparkHomeOpt = env.get("SPARK_HOME").orElse { val kyuubiPattern = "/kyuubi/" diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala index a18c522f2..980c50105 100644 --- a/kyuubi-main/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala @@ -51,7 +51,7 @@ trait WithKyuubiServer extends KyuubiFunSuite { conf.set(HA_ZK_QUORUM, zkServer.getConnectString) conf.set(HA_ZK_ACL_ENABLED, false) - conf.set(ENGINE_INIT_TIMEOUT, 120000L) + conf.setIfMissing(ENGINE_INIT_TIMEOUT, 120000L) server = KyuubiServer.startServer(conf) super.beforeAll() Thread.sleep(1500) diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/WithKyuubiServerWithMiniYarnService.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/WithKyuubiServerWithMiniYarnService.scala new file mode 100644 index 000000000..f388e3344 --- /dev/null +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/WithKyuubiServerWithMiniYarnService.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.internal.Tests.TESTING_HADOOP_CONF_DIR +import org.apache.kyuubi.server.MiniYarnService +import org.apache.kyuubi.util.KyuubiHadoopUtils + +trait WithKyuubiServerWithMiniYarnService extends WithKyuubiServer { + protected val kyuubiServerConf: KyuubiConf + protected val connectionConf: Map[String, String] + private var miniYarnService: MiniYarnService = _ + + override protected lazy final val conf: KyuubiConf = { + connectionConf.foreach { case (k, v) => + kyuubiServerConf.set(k, v) + } + kyuubiServerConf + } + + override def beforeAll(): Unit = { + miniYarnService = new MiniYarnService() + miniYarnService.initialize(new KyuubiConf(false)) + miniYarnService.start() + + KyuubiHadoopUtils.toSparkPrefixedConf(miniYarnService.getHadoopConf()).foreach { case (k, v) => + conf.set(k, v) + } + conf.set(TESTING_HADOOP_CONF_DIR, miniYarnService.getHadoopConfDir()) + super.beforeAll() + } + + override def afterAll(): Unit = { + if (miniYarnService != null) { + miniYarnService.stop() + miniYarnService = null + } + super.afterAll() + } +} diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala new file mode 100644 index 000000000..a7af8ce1e --- /dev/null +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationYarnClusterSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.kyuubi.WithKyuubiServerWithMiniYarnService +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.ENGINE_INIT_TIMEOUT + +class KyuubiOperationYarnClusterSuite extends WithKyuubiServerWithMiniYarnService + with JDBCTestUtils { + + override protected val kyuubiServerConf: KyuubiConf = { + KyuubiConf().set(ENGINE_INIT_TIMEOUT, 300000L) + } + + override protected val connectionConf: Map[String, String] = Map( + "spark.master" -> "yarn", + "spark.executor.instances" -> "1" + ) + + test("KYUUBI #527- Support test with mini yarn cluster") { + withJdbcStatement() { statement => + val resultSet = statement.executeQuery("""SELECT "${spark.app.id}" as id""") + assert(resultSet.next()) + assert(resultSet.getString("id").startsWith("application_")) + } + } + + override protected def jdbcUrl: String = getJdbcUrl +} diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala new file mode 100644 index 000000000..83f0bb7d6 --- /dev/null +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server + +import java.io.File +import java.net.InetAddress + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.server.MiniYARNCluster +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ + +import org.apache.kyuubi.Utils +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.service.AbstractService + +class MiniYarnService(name: String) extends AbstractService(name) { + def this() = this(classOf[MiniYarnService].getSimpleName) + + private var hadoopConfDir: File = _ + private var yarnConf: YarnConfiguration = _ + private var yarnCluster: MiniYARNCluster = _ + + private def newYarnConfig(): YarnConfiguration = { + val yarnConfig = new YarnConfiguration() + // Disable the disk utilization check to avoid the test hanging when people's disks are + // getting full. + yarnConfig.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", + "100.0") + + // capacity-scheduler.xml is missing in hadoop-client-minicluster so this is a workaround + yarnConfig.set("yarn.scheduler.capacity.root.queues", "default") + yarnConfig.setInt("yarn.scheduler.capacity.root.default.capacity", 100) + yarnConfig.setFloat("yarn.scheduler.capacity.root.default.user-limit-factor", 1) + yarnConfig.setInt("yarn.scheduler.capacity.root.default.maximum-capacity", 100) + yarnConfig.set("yarn.scheduler.capacity.root.default.state", "RUNNING") + yarnConfig.set("yarn.scheduler.capacity.root.default.acl_submit_applications", "*") + yarnConfig.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*") + yarnConfig.setInt("yarn.scheduler.capacity.node-locality-delay", -1) + + // Set bind host to localhost to avoid java.net.BindException + yarnConfig.set("yarn.resourcemanager.bind-host", "localhost") + + // enable proxy + val currentUser = UserGroupInformation.getCurrentUser.getShortUserName + yarnConfig.set(s"hadoop.proxyuser.$currentUser.groups", "*") + yarnConfig.set(s"hadoop.proxyuser.$currentUser.hosts", "*") + yarnConfig + } + + override def initialize(conf: KyuubiConf): Unit = { + hadoopConfDir = Utils.createTempDir().toFile + yarnConf = newYarnConfig() + yarnCluster = new MiniYARNCluster(name, 1, 1, 1) + yarnCluster.init(yarnConf) + super.initialize(conf) + } + + override def start(): Unit = { + yarnCluster.start() + val config = yarnCluster.getConfig() + eventually(timeout(10.seconds), interval(100.milliseconds)) { + config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0" + } + info(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") + super.start() + } + + override def stop(): Unit = { + if (yarnCluster != null) yarnCluster.stop() + super.stop() + } + + def getHadoopConf(): Map[String, String] = { + val hostName = InetAddress.getLocalHost.getHostName + yarnCluster.getConfig.iterator().asScala.map { kv => + kv.getKey -> kv.getValue.replaceAll(hostName, "localhost") + }.toMap + } + + def getHadoopConfDir(): String = { + hadoopConfDir.getAbsolutePath + } +} diff --git a/pom.xml b/pom.xml index 0bd309997..f045df3a0 100644 --- a/pom.xml +++ b/pom.xml @@ -1082,6 +1082,12 @@ ${ldapsdk.version} + + org.apache.hadoop + hadoop-client-minicluster + ${hadoop.version} + + org.apache.zookeeper zookeeper