From 7401ca3c6e7f250833766a160faa6d31a24fe2f5 Mon Sep 17 00:00:00 2001
From: Kent Yao <11215016@zju.edu.cn>
Date: Sat, 2 Mar 2019 01:16:50 +0800
Subject: [PATCH] [KYUUBI-135][KYUUBI-142]add support for spark 2.4.0 release
(#157)
* fix #135 spark 2.4.0 support
* fix #142
* import
* add ut
* fix ut
* fix ut
* fix ut
* fix ut
---
.travis.yml | 3 +
kyuubi-server/pom.xml | 3 +
.../scala/org/apache/spark/SparkEnv.scala | 12 ++--
.../hive/client/IsolatedClientLoader.scala | 28 +++++++-
...rverPage.scala => KyuubiSessionPage.scala} | 24 ++++---
...nPage.scala => KyuubiSessionSubPage.scala} | 12 ++--
...ServerTab.scala => KyuubiSessionTab.scala} | 12 ++--
.../org/apache/spark/ui/KyuubiUIUtils.scala | 59 ++++++++++++++++
.../kyuubi/spark/SparkSessionWithUGI.scala | 4 +-
.../kyuubi/ui/KyuubiServerMonitor.scala | 6 +-
.../yaooqinn/kyuubi/utils/ReflectUtils.scala | 23 ++++++
.../apache/spark/KyuubiSparkUtilSuite.scala | 2 +-
.../spark/ui/KyuubiSessionPageSuite.scala | 50 +++++++++++++
.../spark/ui/KyuubiSessionSubPageSuite.scala | 70 +++++++++++++++++++
...uite.scala => KyuubiSessionTabSuite.scala} | 16 ++---
.../apache/spark/ui/KyuubiUIUtilsSuite.scala | 63 +++++++++++++++++
.../operation/OperationManagerSuite.scala | 4 ++
.../kyuubi/server/FrontendServiceSuite.scala | 5 +-
.../kyuubi/ui/KyuubiServerMonitorSuite.scala | 4 +-
.../kyuubi/utils/ReflectUtilsSuite.scala | 12 ++++
pom.xml | 8 +++
21 files changed, 374 insertions(+), 46 deletions(-)
rename kyuubi-server/src/main/scala/org/apache/spark/ui/{KyuubiServerPage.scala => KyuubiSessionPage.scala} (88%)
rename kyuubi-server/src/main/scala/org/apache/spark/ui/{KyuubiServerSessionPage.scala => KyuubiSessionSubPage.scala} (91%)
rename kyuubi-server/src/main/scala/org/apache/spark/ui/{KyuubiServerTab.scala => KyuubiSessionTab.scala} (84%)
create mode 100644 kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiUIUtils.scala
create mode 100644 kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionPageSuite.scala
create mode 100644 kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionSubPageSuite.scala
rename kyuubi-server/src/test/scala/org/apache/spark/ui/{KyuubiServerTabSuite.scala => KyuubiSessionTabSuite.scala} (82%)
create mode 100644 kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiUIUtilsSuite.scala
diff --git a/.travis.yml b/.travis.yml
index 9367164b0..94a2b44be 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -28,6 +28,9 @@ deploy:
jobs:
include:
+ - stage: spark2.4
+ language: scala
+ script: ./build/mvn clean install -Pspark-2.4 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.3
language: scala
script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index a7a5bbba4..0915f7b2d 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -307,6 +307,9 @@
${project.build.testOutputDirectory}/kyuubi-server-${project.version}.jar
+
+ ${project.build.testOutputDirectory}/spark-warehouse
+
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala b/kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala
index 3a95d00ac..ad95697f6 100644
--- a/kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/kyuubi-server/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -139,8 +139,11 @@ class SparkEnv (
}
object SparkEnv extends Logging {
+ import scala.collection.JavaConverters._
+
info("Loaded Kyuubi Supplied SparkEnv Class...")
- private val env = new ConcurrentHashMap[String, SparkEnv]()
+ @volatile private var env: SparkEnv = _
+ private val envs = new ConcurrentHashMap[String, SparkEnv]()
private[spark] val driverSystemName = "sparkDriver"
private[spark] val executorSystemName = "sparkExecutor"
@@ -150,10 +153,11 @@ object SparkEnv extends Logging {
def set(e: SparkEnv) {
if (e == null) {
debug(s"Kyuubi: Removing SparkEnv for $user")
- env.remove(user)
+ envs.remove(user)
} else {
debug(s"Kyuubi: Registering SparkEnv for $user")
- env.put(user, e)
+ envs.put(user, e)
+ env = e
}
}
@@ -162,7 +166,7 @@ object SparkEnv extends Logging {
*/
def get: SparkEnv = {
debug(s"Kyuubi: Get SparkEnv for $user")
- env.getOrDefault(user, env.values().iterator().next())
+ envs.getOrDefault(user, envs.values().asScala.headOption.getOrElse(env))
}
/**
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/kyuubi-server/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 5320f4851..616a29797 100644
--- a/kyuubi-server/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/kyuubi-server/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -22,7 +22,8 @@ import java.net.URL
import scala.util.Try
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader
import org.apache.spark.util.MutableURLClassLoader
@@ -56,6 +57,8 @@ private[hive] class IsolatedClientLoader(
val barrierPrefixes: Seq[String] = Seq.empty)
extends Logging {
+ import KyuubiSparkUtil._
+
// Check to make sure that the root classloader does not know about Hive.
assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
@@ -75,7 +78,28 @@ private[hive] class IsolatedClientLoader(
/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
- new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
+
+ val ctor = classOf[HiveClientImpl].getConstructors.head
+ if (majorVersion(SPARK_VERSION) == 2 && minorVersion(SPARK_VERSION) > 3) {
+ val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
+ ctor.newInstance(
+ version,
+ warehouseDir,
+ sparkConf,
+ hadoopConf,
+ config,
+ baseClassLoader,
+ this).asInstanceOf[HiveClientImpl]
+ } else {
+ ctor.newInstance(
+ version,
+ sparkConf,
+ hadoopConf,
+ config,
+ baseClassLoader,
+ this).asInstanceOf[HiveClientImpl]
+ }
+
}
/**
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiServerPage.scala b/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiSessionPage.scala
similarity index 88%
rename from kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiServerPage.scala
rename to kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiSessionPage.scala
index 0867d70d9..a5790b232 100644
--- a/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiServerPage.scala
+++ b/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiSessionPage.scala
@@ -27,8 +27,10 @@ import org.apache.spark.ui.UIUtils._
import yaooqinn.kyuubi.ui.{ExecutionInfo, ExecutionState, SessionInfo}
-/** Page for Spark Web UI that shows statistics of the kyuubi server */
-class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
+/**
+ * Page for Spark Web UI that shows statistics of the kyuubi server
+ */
+class KyuubiSessionPage(parent: KyuubiSessionTab) extends WebUIPage("") {
private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime()
@@ -44,10 +46,10 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
{listener.getOnlineSessionNum} session(s) are online,
running {listener.getTotalRunning} SQL statement(s)
++
- generateSessionStatsTable() ++
- generateSQLStatsTable()
+ generateSessionStatsTable(request) ++
+ generateSQLStatsTable(request)
}
- UIUtils.headerSparkPage("Kyuubi Server", content, parent, Some(5000))
+ KyuubiUIUtils.headerSparkPage(request, "Kyuubi Session - Application View", content, parent)
}
/** Generate basic stats of the kyuubi server program */
@@ -64,7 +66,7 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
}
/** Generate stats of batch statements of the kyuubi server program */
- private def generateSQLStatsTable(): Seq[Node] = {
+ private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = {
val numStatement = listener.getExecutionList.size
val table = if (numStatement > 0) {
val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration",
@@ -73,7 +75,8 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
def generateDataRow(info: ExecutionInfo): Seq[Node] = {
val jobLink = info.jobId.map { id: String =>
-
+
[{id}]
}
@@ -135,7 +138,7 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
}
/** Generate stats of batch sessions of the kyuubi server program */
- private def generateSessionStatsTable(): Seq[Node] = {
+ private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
val sessionList = listener.getSessionList
val numBatches = sessionList.size
val table = if (numBatches > 0) {
@@ -143,8 +146,8 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration",
"Total Execute")
def generateDataRow(session: SessionInfo): Seq[Node] = {
- val sessionLink = "%s/%s/session?id=%s"
- .format(UIUtils.prependBaseUri(parent.basePath), parent.prefix, session.sessionId)
+ val sessionLink = "%s/%s/session?id=%s".format(
+ KyuubiUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId)
| {session.userName} |
{session.ip} |
@@ -178,7 +181,6 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
content
}
-
/**
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiServerSessionPage.scala b/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiSessionSubPage.scala
similarity index 91%
rename from kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiServerSessionPage.scala
rename to kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiSessionSubPage.scala
index 56d0e6a01..50fbb9e46 100644
--- a/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiServerSessionPage.scala
+++ b/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiSessionSubPage.scala
@@ -28,7 +28,7 @@ import org.apache.spark.ui.UIUtils._
import yaooqinn.kyuubi.ui.{ExecutionInfo, ExecutionState}
/** Page for Spark Web UI that shows statistics of jobs running in the thrift server */
-class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("session") {
+class KyuubiSessionSubPage(parent: KyuubiSessionTab) extends WebUIPage("session") {
private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime
@@ -52,9 +52,9 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
Session created at {formatDate(sessionStat.startTimestamp)},
Total run {sessionStat.totalExecution} SQL
++
- generateSQLStatsTable(sessionStat.sessionId)
+ generateSQLStatsTable(request, sessionStat.sessionId)
}
- UIUtils.headerSparkPage("Kyuubi Session", content, parent, Some(5000))
+ KyuubiUIUtils.headerSparkPage(request, "Kyuubi Session", content, parent)
}
/** Generate basic stats of the kyuubi server program */
@@ -71,7 +71,7 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
}
/** Generate stats of batch statements of the kyuubi server program */
- private def generateSQLStatsTable(sessionID: String): Seq[Node] = {
+ private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = {
val executionList = listener.getExecutionList
.filter(_.sessionId == sessionID)
val numStatement = executionList.size
@@ -82,7 +82,8 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
def generateDataRow(info: ExecutionInfo): Seq[Node] = {
val jobLink = info.jobId.map { id: String =>
-
+
[{id}]
}
@@ -142,6 +143,7 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
}
{errorSummary}{details} |
}
+
/**
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiServerTab.scala b/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiSessionTab.scala
similarity index 84%
rename from kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiServerTab.scala
rename to kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiSessionTab.scala
index a4c852fc1..ffa05d415 100644
--- a/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiServerTab.scala
+++ b/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiSessionTab.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ui
import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.ui.KyuubiServerTab._
+import org.apache.spark.ui.KyuubiSessionTab._
import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}
@@ -26,22 +26,22 @@ import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}
* Spark Web UI tab that shows statistics of jobs running in the thrift server.
* This assumes the given SparkContext has enabled its SparkUI.
*/
-class KyuubiServerTab(userName: String, sparkContext: SparkContext)
+class KyuubiSessionTab(userName: String, sparkContext: SparkContext)
extends SparkUITab(getSparkUI(sparkContext), userName) {
override val name = s"Kyuubi Tab 4 $userName"
val parent = getSparkUI(sparkContext)
- // KyuubiServerTab renders by different listener's content, identified by user.
+ // KyuubiSessionTab renders by different listener's content, identified by user.
val listener = KyuubiServerMonitor.getListener(userName).getOrElse {
val lr = new KyuubiServerListener(sparkContext.conf)
KyuubiServerMonitor.setListener(userName, lr)
lr
}
- attachPage(new KyuubiServerPage(this))
- attachPage(new KyuubiServerSessionPage(this))
+ attachPage(new KyuubiSessionPage(this))
+ attachPage(new KyuubiSessionSubPage(this))
parent.attachTab(this)
def detach() {
@@ -49,7 +49,7 @@ class KyuubiServerTab(userName: String, sparkContext: SparkContext)
}
}
-object KyuubiServerTab {
+object KyuubiSessionTab {
def getSparkUI(sparkContext: SparkContext): SparkUI = {
sparkContext.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiUIUtils.scala b/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiUIUtils.scala
new file mode 100644
index 000000000..e9e840744
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/spark/ui/KyuubiUIUtils.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.KyuubiSparkUtil._
+
+import yaooqinn.kyuubi.utils.ReflectUtils
+
+object KyuubiUIUtils {
+
+ private val className = "org.apache.spark.ui.UIUtils"
+
+ /** Returns a spark page with correctly formatted headers */
+ def headerSparkPage(
+ request: HttpServletRequest,
+ title: String,
+ content: => Seq[Node],
+ activeTab: SparkUITab): Seq[Node] = {
+ val methodMirror = ReflectUtils.reflectStaticMethodScala(className, "headerSparkPage")
+ if (equalOrHigherThan("2.4")) {
+ methodMirror(request, title, content, activeTab, Some(5000), None, false, false)
+ .asInstanceOf[Seq[Node]]
+ } else {
+ methodMirror(title, content, activeTab, Some(5000), None, false, false)
+ .asInstanceOf[Seq[Node]]
+ }
+ }
+
+ def prependBaseUri(
+ request: HttpServletRequest,
+ basePath: String = "",
+ resource: String = ""): String = {
+ val method = ReflectUtils.reflectStaticMethodScala(className, "prependBaseUri")
+ if (equalOrHigherThan("2.4")) {
+ method(request, basePath, resource).asInstanceOf[String]
+ } else {
+ method(basePath, resource).asInstanceOf[String]
+ }
+ }
+}
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala
index abe9f7f9f..3d6c85c8a 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext}
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil._
import org.apache.spark.sql.SparkSession
-import org.apache.spark.ui.KyuubiServerTab
+import org.apache.spark.ui.KyuubiSessionTab
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.author.AuthzHelper
@@ -190,7 +190,7 @@ class SparkSessionWithUGI(
KyuubiServerMonitor.setListener(userName, new KyuubiServerListener(conf))
KyuubiServerMonitor.getListener(userName)
.foreach(_sparkSession.sparkContext.addSparkListener)
- val uiTab = new KyuubiServerTab(userName, _sparkSession.sparkContext)
+ val uiTab = new KyuubiSessionTab(userName, _sparkSession.sparkContext)
KyuubiServerMonitor.addUITab(_sparkSession.sparkContext.sparkUser, uiTab)
}
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ui/KyuubiServerMonitor.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ui/KyuubiServerMonitor.scala
index 05adf4d81..06a189350 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ui/KyuubiServerMonitor.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/ui/KyuubiServerMonitor.scala
@@ -20,12 +20,12 @@ package yaooqinn.kyuubi.ui
import scala.collection.mutable.HashMap
import org.apache.spark.SparkException
-import org.apache.spark.ui.KyuubiServerTab
+import org.apache.spark.ui.KyuubiSessionTab
object KyuubiServerMonitor {
- private[this] val uiTabs = new HashMap[String, KyuubiServerTab]()
+ private[this] val uiTabs = new HashMap[String, KyuubiSessionTab]()
private[this] val listeners = new HashMap[String, KyuubiServerListener]()
@@ -37,7 +37,7 @@ object KyuubiServerMonitor {
listeners.get(user)
}
- def addUITab(user: String, ui: KyuubiServerTab): Unit = {
+ def addUITab(user: String, ui: KyuubiSessionTab): Unit = {
uiTabs.put(user, ui)
}
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/ReflectUtils.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/ReflectUtils.scala
index aa3dc6618..2fc309ffe 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/ReflectUtils.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/ReflectUtils.scala
@@ -221,4 +221,27 @@ object ReflectUtils extends Logging {
case Failure(e) => throw KyuubiSparkUtil.findCause(e)
}
}
+
+
+ /**
+ * Call a static method of an Scala Object
+ *
+ * @param className scala object fully qualified name
+ * @param methodName the method name
+ * @return
+ */
+ def reflectStaticMethodScala(
+ className: String,
+ methodName: String): scala.reflect.api.Mirrors#MethodMirror = {
+ try {
+ val mirror = ru.runtimeMirror(KyuubiSparkUtil.getContextOrSparkClassLoader)
+ val moduleSymbol = mirror.staticModule(className)
+ val moduleMirror = mirror.reflectModule(moduleSymbol)
+ val method = moduleMirror.symbol.typeSignature.decl(ru.TermName(methodName)).asMethod
+ val instanceMirror = mirror.reflect(moduleMirror.instance)
+ instanceMirror.reflectMethod(method)
+ } catch {
+ case e: Exception => throw KyuubiSparkUtil.findCause(e)
+ }
+ }
}
diff --git a/kyuubi-server/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala
index a65ae2595..16c54f2da 100644
--- a/kyuubi-server/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala
@@ -160,7 +160,7 @@ class KyuubiSparkUtilSuite extends SparkFunSuite with Logging {
assert(KyuubiSparkUtil.equalOrHigherThan("1.6.3"))
assert(KyuubiSparkUtil.equalOrHigherThan("2.0.2"))
assert(KyuubiSparkUtil.equalOrHigherThan(SPARK_COMPILE_VERSION))
- assert(!KyuubiSparkUtil.equalOrHigherThan("2.4.1"))
+ assert(!KyuubiSparkUtil.equalOrHigherThan("2.9.1"))
assert(!KyuubiSparkUtil.equalOrHigherThan("3.0.0"))
}
diff --git a/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionPageSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionPageSuite.scala
new file mode 100644
index 000000000..4086d370a
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionPageSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.util.Try
+
+import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
+import org.scalatest.mock.MockitoSugar
+
+class KyuubiSessionPageSuite extends SparkFunSuite with MockitoSugar {
+
+ var sc: SparkContext = _
+ var user: String = _
+ var tab: KyuubiSessionTab = _
+
+ override def beforeAll(): Unit = {
+ val conf = new SparkConf(loadDefaults = true).setMaster("local").setAppName("test")
+ sc = new SparkContext(conf)
+ user = KyuubiSparkUtil.getCurrentUserName
+ tab = new KyuubiSessionTab(user, sc)
+ }
+
+ override def afterAll(): Unit = {
+ sc.stop()
+ }
+
+ test("render kyuubi session page") {
+ val page = new KyuubiSessionPage(tab)
+ val request = mock[HttpServletRequest]
+ assert(Try { page.render(request) }.isSuccess )
+ }
+
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionSubPageSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionSubPageSuite.scala
new file mode 100644
index 000000000..81bb806ef
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionSubPageSuite.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.util.Try
+
+import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
+import org.mockito.Mockito.when
+import org.scalatest.mock.MockitoSugar
+
+import yaooqinn.kyuubi.ui.{ExecutionInfo, KyuubiServerListener, SessionInfo}
+
+class KyuubiSessionSubPageSuite extends SparkFunSuite with MockitoSugar {
+
+ var sc: SparkContext = _
+ var user: String = _
+ var tab: KyuubiSessionTab = _
+
+ override def beforeAll(): Unit = {
+ val conf = new SparkConf(loadDefaults = true).setMaster("local").setAppName("test")
+ sc = new SparkContext(conf)
+ user = KyuubiSparkUtil.getCurrentUserName
+ tab = new KyuubiSessionTab(user, sc)
+ }
+
+ override def afterAll(): Unit = {
+ sc.stop()
+ }
+
+ test("render kyuubi session page") {
+ val page = new KyuubiSessionSubPage(tab)
+
+ val request = mock[HttpServletRequest]
+ intercept[IllegalArgumentException](page.render(request))
+
+ val id = "id1"
+ when(request.getParameter("id")).thenReturn(id)
+ intercept[IllegalArgumentException](page.render(request))
+
+ val sessionInfo = mock[SessionInfo]
+ val tab1 = mock[KyuubiSessionTab]
+ when(request.getParameter("id")).thenReturn(id)
+ val listener = mock[KyuubiServerListener]
+ when(tab1.listener).thenReturn(listener)
+ when(listener.getSession(id)).thenReturn(Some(sessionInfo))
+ when(sessionInfo.sessionId).thenReturn("1")
+ when(listener.getExecutionList).thenReturn(Seq[ExecutionInfo]())
+ when(tab1.appName).thenReturn("name")
+ when(tab1.headerTabs).thenReturn(Seq[WebUITab]())
+ val page2 = new KyuubiSessionSubPage(tab1)
+ assert(Try { page2.render(request) }.isSuccess )
+ }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiServerTabSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionTabSuite.scala
similarity index 82%
rename from kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiServerTabSuite.scala
rename to kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionTabSuite.scala
index 5e30b103d..dca04eda7 100644
--- a/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiServerTabSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiSessionTabSuite.scala
@@ -21,17 +21,17 @@ import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite
import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}
-class KyuubiServerTabSuite extends SparkFunSuite {
+class KyuubiSessionTabSuite extends SparkFunSuite {
var sc: SparkContext = _
var user: String = _
- var tab: KyuubiServerTab = _
+ var tab: KyuubiSessionTab = _
override def beforeAll(): Unit = {
val conf = new SparkConf(loadDefaults = true).setMaster("local").setAppName("test")
sc = new SparkContext(conf)
user = KyuubiSparkUtil.getCurrentUserName
- tab = new KyuubiServerTab(user, sc)
+ tab = new KyuubiSessionTab(user, sc)
}
override def afterAll(): Unit = {
@@ -48,9 +48,9 @@ class KyuubiServerTabSuite extends SparkFunSuite {
assert(tab.listener !== lr)
assert(KyuubiServerMonitor.getListener(user).get === tab.listener)
KyuubiServerMonitor.setListener("user1", lr)
- val tab2 = new KyuubiServerTab("user1", sc)
+ val tab2 = new KyuubiSessionTab("user1", sc)
assert(tab2.listener === lr)
- val tab3 = new KyuubiServerTab("user2", sc)
+ val tab3 = new KyuubiSessionTab("user2", sc)
assert(tab3.listener === KyuubiServerMonitor.getListener("user2").get)
}
@@ -60,12 +60,12 @@ class KyuubiServerTabSuite extends SparkFunSuite {
}
test("testDetach") {
- assert(KyuubiServerTab.getSparkUI(sc).getTabs.contains(tab))
+ assert(KyuubiSessionTab.getSparkUI(sc).getTabs.contains(tab))
tab.detach()
- assert(!KyuubiServerTab.getSparkUI(sc).getTabs.contains(tab))
+ assert(!KyuubiSessionTab.getSparkUI(sc).getTabs.contains(tab))
}
test("testGetSparkUI") {
- assert(KyuubiServerTab.getSparkUI(sc) === sc.ui.get)
+ assert(KyuubiSessionTab.getSparkUI(sc) === sc.ui.get)
}
}
diff --git a/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiUIUtilsSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiUIUtilsSuite.scala
new file mode 100644
index 000000000..cc8c9684a
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/spark/ui/KyuubiUIUtilsSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.util.Try
+
+import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
+import org.scalatest.mock.MockitoSugar
+
+class KyuubiUIUtilsSuite extends SparkFunSuite with MockitoSugar {
+
+ var sc: SparkContext = _
+ var user: String = _
+ var tab: KyuubiSessionTab = _
+
+ override def beforeAll(): Unit = {
+ val conf = new SparkConf(loadDefaults = true).setMaster("local").setAppName("test")
+ KyuubiSparkUtil.setupCommonConfig(conf)
+ sc = new SparkContext(conf)
+ user = KyuubiSparkUtil.getCurrentUserName
+ tab = new KyuubiSessionTab(user, sc)
+ }
+
+ override def afterAll(): Unit = {
+ sc.stop()
+ }
+
+ test("spark page header") {
+ val request = mock[HttpServletRequest]
+ val title = "KyuubiServer test ui" * 10
+ val content = |
+ assert(Try { KyuubiUIUtils.headerSparkPage(request, title, content, tab) }.isSuccess)
+ }
+
+ test("prepend base uri") {
+ val request = mock[HttpServletRequest]
+ val baseUri = KyuubiUIUtils.prependBaseUri(request)
+
+ val basePath = KyuubiUIUtils.prependBaseUri(request, "1")
+ assert(basePath === baseUri + "1")
+
+ val resourcePath = KyuubiUIUtils.prependBaseUri(request, "1", "2")
+ assert(resourcePath === baseUri + "12")
+ }
+
+}
diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala
index d361f318f..f431300ba 100644
--- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala
+++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala
@@ -118,6 +118,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
val op5 = operationMgr.newExecuteStatementOperation(session, statement)
op5.cancel()
operationMgr.cancelOperation(op5.getHandle)
+ ss.stop()
}
test("rm expired operations") {
@@ -146,6 +147,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
operationMgr.cancelOperation(op2.getHandle) // isTerminal=true
Thread.sleep(1500) // timeout
operationMgr.removeExpiredOperations(handles) should be(Seq(op2)) // op2 is timeout and terminal
+ ss.stop()
}
test("get operation next row set") {
@@ -170,6 +172,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
op1.close()
intercept[KyuubiSQLException](
operationMgr.getOperationNextRowSet(op1.getHandle, FetchOrientation.FETCH_NEXT, 5))
+ ss.stop()
}
test("get operation log row set") {
@@ -190,6 +193,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
val e = intercept[KyuubiSQLException](
operationMgr.getOperationLogRowSet(op1.getHandle, FetchOrientation.FETCH_NEXT, 5))
e.getMessage should startWith("Couldn't find log associated with operation handle:")
+ ss.stop()
}
}
diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala
index 111203bf1..abc8bdc68 100644
--- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala
+++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala
@@ -18,10 +18,10 @@
package yaooqinn.kyuubi.server
import java.net.InetAddress
+import java.util.UUID
import scala.collection.JavaConverters._
-import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.hive.service.cli.thrift._
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.KyuubiConf._
@@ -299,7 +299,8 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
-
+ val dt = new TExecuteStatementReq(handle, "drop table src2")
+ fe.ExecuteStatement(dt)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ui/KyuubiServerMonitorSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ui/KyuubiServerMonitorSuite.scala
index a022e8ead..3751a1866 100644
--- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ui/KyuubiServerMonitorSuite.scala
+++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/ui/KyuubiServerMonitorSuite.scala
@@ -18,7 +18,7 @@
package yaooqinn.kyuubi.ui
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.ui.KyuubiServerTab
+import org.apache.spark.ui.KyuubiSessionTab
class KyuubiServerMonitorSuite extends SparkFunSuite {
val conf = new SparkConf(loadDefaults = true).setAppName("monitor").setMaster("local")
@@ -42,7 +42,7 @@ class KyuubiServerMonitorSuite extends SparkFunSuite {
val liOp = KyuubiServerMonitor.getListener(user)
assert(liOp.get === li)
assert(KyuubiServerMonitor.getListener("fake").isEmpty)
- val tab = new KyuubiServerTab(user, sc)
+ val tab = new KyuubiSessionTab(user, sc)
KyuubiServerMonitor.addUITab(user, tab)
KyuubiServerMonitor.detachUITab(user)
KyuubiServerMonitor.detachAllUITabs()
diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/ReflectUtilsSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/ReflectUtilsSuite.scala
index 95a52110d..0579b6d19 100644
--- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/ReflectUtilsSuite.scala
+++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/ReflectUtilsSuite.scala
@@ -132,6 +132,17 @@ class ReflectUtilsSuite extends SparkFunSuite {
ReflectUtils.reflectModule(className = "yaooqinn.kyuubi.TestRule2", silent = false))
assert(e.getMessage.contains("not found"))
}
+
+ test("reflect static method scala") {
+ val m1 = ReflectUtils.reflectStaticMethodScala("yaooqinn.kyuubi.utils.TestClass0", "staticTest")
+ assert(m1() === 1)
+ val m2 =
+ ReflectUtils.reflectStaticMethodScala("yaooqinn.kyuubi.utils.TestClass0", "staticTest2")
+ assert(m2(1) === 2)
+ intercept[IllegalArgumentException](m2("1"))
+ intercept[ScalaReflectionException](ReflectUtils.
+ reflectStaticMethodScala("yaooqinn.kyuubi.utils.TestClass0", "staticTest3"))
+ }
}
class TestTrait {
@@ -148,6 +159,7 @@ class TestClass3 extends TestTrait {
object TestClass0 {
def staticTest(): Int = 1
+ def staticTest2(x: Int): Int = x + 1
val testInt = 1
val testObj = "1"
}
diff --git a/pom.xml b/pom.xml
index ec0054b6d..ea4d515b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -356,6 +356,14 @@
+
+ spark-2.4
+
+ 2.4.0
+ 3.0.3
+
+
+
hadoop-2.7