diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index f1c9d1f42..b384f8793 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -123,6 +123,11 @@
org.mockito
mockito-core
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala
index ae32fd8dd..88a560486 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationReport
+import org.apache.hadoop.yarn.api.records.YarnApplicationState._
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -41,7 +42,12 @@ private[kyuubi] object KyuubiHadoopUtil {
}
def getApplications: Seq[ApplicationReport] = {
- yarnClient.getApplications(Set("SPARK").asJava).asScala
+ yarnClient.getApplications(Set("SPARK").asJava).asScala.filter { p =>
+ p.getYarnApplicationState match {
+ case ACCEPTED | NEW | NEW_SAVING | SUBMITTED | RUNNING => true
+ case _ => false
+ }
+ }
}
def killYarnAppByName(appName: String): Unit = {
diff --git a/kyuubi-server/src/test/resources/kyuubi-test.conf b/kyuubi-server/src/test/resources/kyuubi-test.conf
index 959d1520e..1b18a9d1b 100644
--- a/kyuubi-server/src/test/resources/kyuubi-test.conf
+++ b/kyuubi-server/src/test/resources/kyuubi-test.conf
@@ -1 +1 @@
-spark.kyuubi.test 1
\ No newline at end of file
+spark.kyuubi.test=1
diff --git a/kyuubi-server/src/test/resources/yarn-site.xml b/kyuubi-server/src/test/resources/yarn-site.xml
new file mode 100644
index 000000000..c2d99557e
--- /dev/null
+++ b/kyuubi-server/src/test/resources/yarn-site.xml
@@ -0,0 +1,32 @@
+
+
+
+
+ yarn.resourcemanager.address
+ localhost:37001
+
+
+
+ yarn.resourcemanager.hostname
+
+
+
+ yarn.minicluster.fixed.ports
+ true
+
+
diff --git a/kyuubi-server/src/test/scala/org/apache/spark/KyuubiConfSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/KyuubiConfSuite.scala
index 7dee58252..da0a1bdfa 100644
--- a/kyuubi-server/src/test/scala/org/apache/spark/KyuubiConfSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/spark/KyuubiConfSuite.scala
@@ -43,4 +43,10 @@ class KyuubiConfSuite extends SparkFunSuite {
}
assert(conf.get(KyuubiConf.AUTHENTICATION_METHOD) === "KERBEROS")
}
+
+ test("register") {
+ val e = intercept[IllegalArgumentException](
+ KyuubiConf.register(KyuubiConf.AUTHENTICATION_METHOD))
+ assert(e.getMessage.contains("spark.kyuubi.authentication has been registered"))
+ }
}
diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/KyuubiConfSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/KyuubiConfSuite.scala
new file mode 100644
index 000000000..95c42a84d
--- /dev/null
+++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/KyuubiConfSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package yaooqinn.kyuubi
+
+import org.apache.spark.{KyuubiConf, SparkConf, SparkFunSuite}
+import org.apache.spark.KyuubiConf._
+
+class KyuubiConfSuite extends SparkFunSuite {
+ private val conf: SparkConf = new SparkConf()
+
+ KyuubiConf.getAllDefaults.foreach { case (k, v) => conf.set(k, v) }
+ test("implicits") {
+
+ assert(conf.getOption(AUTHORIZATION_ENABLE).nonEmpty)
+ assert(!conf.get(AUTHORIZATION_ENABLE).toBoolean)
+
+ assert(conf.getOption(YARN_CONTAINER_TIMEOUT).nonEmpty)
+ assert(conf.get(YARN_CONTAINER_TIMEOUT) === "60000ms")
+
+ assert(conf.getOption(BACKEND_SESSION_WAIT_OTHER_TIMES).nonEmpty)
+ assert(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES).toInt === 60)
+
+ assert(conf.getOption(AUTHENTICATION_METHOD).nonEmpty)
+ assert(conf.get(AUTHENTICATION_METHOD) === "NONE")
+ }
+
+}
diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala
new file mode 100644
index 000000000..bd67357b6
--- /dev/null
+++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package yaooqinn.kyuubi.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.{ApplicationId, ContainerLaunchContext, Resource, YarnApplicationState}
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.apache.hadoop.yarn.util.Records
+import org.apache.spark.SparkFunSuite
+import org.scalatest.BeforeAndAfterEach
+
+class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+ private var cluster: MiniYARNCluster = _
+ private val yarnClient = YarnClient.createYarnClient()
+
+ override def beforeAll(): Unit = {
+ cluster = new MiniYARNCluster(this.getClass.getSimpleName, 1, 1, 1, 1)
+ val hadoopConf = new Configuration()
+ cluster.init(hadoopConf)
+ cluster.start()
+ yarnClient.init(hadoopConf)
+ yarnClient.start()
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ cluster.stop()
+ super.afterAll()
+ }
+
+ override def beforeEach(): Unit = {
+
+ super.beforeEach()
+ }
+
+ test("kill yarn application by name") {
+ withYarnApplication { id =>
+ KyuubiHadoopUtil.killYarnAppByName(id.toString)
+ assert(KyuubiHadoopUtil.getApplications.isEmpty)
+ }
+ }
+
+ test("kill yarn application by id") {
+ withYarnApplication { id =>
+ KyuubiHadoopUtil.killYarnApp(yarnClient.getApplicationReport(id))
+ assert(KyuubiHadoopUtil.getApplications.isEmpty)
+ }
+ }
+
+ test("do as") {
+
+ val user1 = UserGroupInformation.getCurrentUser
+ val userName1 = user1.getShortUserName
+ val userName2 = "test"
+ val user2 = UserGroupInformation.createProxyUser(userName2, user1)
+
+ def testf(expectedUser: String): Boolean = {
+ UserGroupInformation.getCurrentUser.getShortUserName == expectedUser
+ }
+
+ KyuubiHadoopUtil.doAs(user1) {
+ testf(userName1)
+ }
+
+ KyuubiHadoopUtil.doAs(user2) {
+ testf(userName2)
+ }
+ }
+
+ test("get applications") {
+ withYarnApplication { id =>
+ assert(KyuubiHadoopUtil.getApplications.head.getApplicationId === id)
+ }
+ }
+
+ def withYarnApplication(f: ApplicationId => Unit): Unit = {
+ val application = yarnClient.createApplication()
+ val response = application.getNewApplicationResponse
+ val applicationId = response.getApplicationId
+ val context = application.getApplicationSubmissionContext
+ context.setApplicationName(applicationId.toString)
+ context.setApplicationType("SPARK")
+ val capability = Records.newRecord(classOf[Resource])
+ capability.setMemory(10)
+ capability.setVirtualCores(1)
+ context.setResource(capability)
+ context.setAMContainerSpec(Records.newRecord(classOf[ContainerLaunchContext]))
+ yarnClient.submitApplication(context)
+ assert(KyuubiHadoopUtil.getApplications.nonEmpty)
+ assert(KyuubiHadoopUtil.getApplications.head.getYarnApplicationState !==
+ YarnApplicationState.KILLED)
+ f(applicationId)
+ }
+}
diff --git a/pom.xml b/pom.xml
index f55ab90bd..2b14acee8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -324,6 +324,12 @@
1.10.19
test
+
+ org.apache.hadoop
+ hadoop-minicluster
+ ${hadoop.version}
+ test
+
@@ -345,7 +351,7 @@
spark-2.3
- 2.3.1
+ 2.3.2
3.0.3