[KYUUBI-135][KYUUBI-142]add support for spark 2.4.0 release (#157)

* fix #135 spark 2.4.0 support

* fix #142

* import

* add ut

* fix ut

* fix ut

* fix ut

* fix ut
This commit is contained in:
Kent Yao 2019-03-02 01:16:50 +08:00 committed by GitHub
parent 185d60a273
commit 7401ca3c6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 374 additions and 46 deletions

View File

@ -28,6 +28,9 @@ deploy:
jobs:
include:
- stage: spark2.4
language: scala
script: ./build/mvn clean install -Pspark-2.4 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.3
language: scala
script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V

View File

@ -307,6 +307,9 @@
<environmentVariables>
<KYUUBI_JAR>${project.build.testOutputDirectory}/kyuubi-server-${project.version}.jar</KYUUBI_JAR>
</environmentVariables>
<systemProperties>
<spark.sql.warehouse.dir>${project.build.testOutputDirectory}/spark-warehouse</spark.sql.warehouse.dir>
</systemProperties>
</configuration>
<executions>
<execution>

View File

@ -139,8 +139,11 @@ class SparkEnv (
}
object SparkEnv extends Logging {
import scala.collection.JavaConverters._
info("Loaded Kyuubi Supplied SparkEnv Class...")
private val env = new ConcurrentHashMap[String, SparkEnv]()
@volatile private var env: SparkEnv = _
private val envs = new ConcurrentHashMap[String, SparkEnv]()
private[spark] val driverSystemName = "sparkDriver"
private[spark] val executorSystemName = "sparkExecutor"
@ -150,10 +153,11 @@ object SparkEnv extends Logging {
def set(e: SparkEnv) {
if (e == null) {
debug(s"Kyuubi: Removing SparkEnv for $user")
env.remove(user)
envs.remove(user)
} else {
debug(s"Kyuubi: Registering SparkEnv for $user")
env.put(user, e)
envs.put(user, e)
env = e
}
}
@ -162,7 +166,7 @@ object SparkEnv extends Logging {
*/
def get: SparkEnv = {
debug(s"Kyuubi: Get SparkEnv for $user")
env.getOrDefault(user, env.values().iterator().next())
envs.getOrDefault(user, envs.values().asScala.headOption.getOrElse(env))
}
/**

View File

@ -22,7 +22,8 @@ import java.net.URL
import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader
import org.apache.spark.util.MutableURLClassLoader
@ -56,6 +57,8 @@ private[hive] class IsolatedClientLoader(
val barrierPrefixes: Seq[String] = Seq.empty)
extends Logging {
import KyuubiSparkUtil._
// Check to make sure that the root classloader does not know about Hive.
assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
@ -75,7 +78,28 @@ private[hive] class IsolatedClientLoader(
/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
val ctor = classOf[HiveClientImpl].getConstructors.head
if (majorVersion(SPARK_VERSION) == 2 && minorVersion(SPARK_VERSION) > 3) {
val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
ctor.newInstance(
version,
warehouseDir,
sparkConf,
hadoopConf,
config,
baseClassLoader,
this).asInstanceOf[HiveClientImpl]
} else {
ctor.newInstance(
version,
sparkConf,
hadoopConf,
config,
baseClassLoader,
this).asInstanceOf[HiveClientImpl]
}
}
/**

View File

@ -27,8 +27,10 @@ import org.apache.spark.ui.UIUtils._
import yaooqinn.kyuubi.ui.{ExecutionInfo, ExecutionState, SessionInfo}
/** Page for Spark Web UI that shows statistics of the kyuubi server */
class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
/**
* Page for Spark Web UI that shows statistics of the kyuubi server
*/
class KyuubiSessionPage(parent: KyuubiSessionTab) extends WebUIPage("") {
private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime()
@ -44,10 +46,10 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
{listener.getOnlineSessionNum} session(s) are online,
running {listener.getTotalRunning} SQL statement(s)
</h4> ++
generateSessionStatsTable() ++
generateSQLStatsTable()
generateSessionStatsTable(request) ++
generateSQLStatsTable(request)
}
UIUtils.headerSparkPage("Kyuubi Server", content, parent, Some(5000))
KyuubiUIUtils.headerSparkPage(request, "Kyuubi Session - Application View", content, parent)
}
/** Generate basic stats of the kyuubi server program */
@ -64,7 +66,7 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
}
/** Generate stats of batch statements of the kyuubi server program */
private def generateSQLStatsTable(): Seq[Node] = {
private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = {
val numStatement = listener.getExecutionList.size
val table = if (numStatement > 0) {
val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration",
@ -73,7 +75,8 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
def generateDataRow(info: ExecutionInfo): Seq[Node] = {
val jobLink = info.jobId.map { id: String =>
<a href={"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), id)}>
<a href={"%s/jobs/job?id=%s".format(
KyuubiUIUtils.prependBaseUri(request, parent.basePath), id)}>
[{id}]
</a>
}
@ -135,7 +138,7 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
}
/** Generate stats of batch sessions of the kyuubi server program */
private def generateSessionStatsTable(): Seq[Node] = {
private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
val sessionList = listener.getSessionList
val numBatches = sessionList.size
val table = if (numBatches > 0) {
@ -143,8 +146,8 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration",
"Total Execute")
def generateDataRow(session: SessionInfo): Seq[Node] = {
val sessionLink = "%s/%s/session?id=%s"
.format(UIUtils.prependBaseUri(parent.basePath), parent.prefix, session.sessionId)
val sessionLink = "%s/%s/session?id=%s".format(
KyuubiUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId)
<tr>
<td> {session.userName} </td>
<td> {session.ip} </td>
@ -178,7 +181,6 @@ class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
content
}
/**
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/

View File

@ -28,7 +28,7 @@ import org.apache.spark.ui.UIUtils._
import yaooqinn.kyuubi.ui.{ExecutionInfo, ExecutionState}
/** Page for Spark Web UI that shows statistics of jobs running in the thrift server */
class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("session") {
class KyuubiSessionSubPage(parent: KyuubiSessionTab) extends WebUIPage("session") {
private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime
@ -52,9 +52,9 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
Session created at {formatDate(sessionStat.startTimestamp)},
Total run {sessionStat.totalExecution} SQL
</h4> ++
generateSQLStatsTable(sessionStat.sessionId)
generateSQLStatsTable(request, sessionStat.sessionId)
}
UIUtils.headerSparkPage("Kyuubi Session", content, parent, Some(5000))
KyuubiUIUtils.headerSparkPage(request, "Kyuubi Session", content, parent)
}
/** Generate basic stats of the kyuubi server program */
@ -71,7 +71,7 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
}
/** Generate stats of batch statements of the kyuubi server program */
private def generateSQLStatsTable(sessionID: String): Seq[Node] = {
private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = {
val executionList = listener.getExecutionList
.filter(_.sessionId == sessionID)
val numStatement = executionList.size
@ -82,7 +82,8 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
def generateDataRow(info: ExecutionInfo): Seq[Node] = {
val jobLink = info.jobId.map { id: String =>
<a href={"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), id)}>
<a href={"%s/jobs/job?id=%s"
.format(KyuubiUIUtils.prependBaseUri(request, parent.basePath), id)}>
[{id}]
</a>
}
@ -142,6 +143,7 @@ class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("sessio
}
<td>{errorSummary}{details}</td>
}
/**
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/

View File

@ -18,7 +18,7 @@
package org.apache.spark.ui
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.ui.KyuubiServerTab._
import org.apache.spark.ui.KyuubiSessionTab._
import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}
@ -26,22 +26,22 @@ import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}
* Spark Web UI tab that shows statistics of jobs running in the thrift server.
* This assumes the given SparkContext has enabled its SparkUI.
*/
class KyuubiServerTab(userName: String, sparkContext: SparkContext)
class KyuubiSessionTab(userName: String, sparkContext: SparkContext)
extends SparkUITab(getSparkUI(sparkContext), userName) {
override val name = s"Kyuubi Tab 4 $userName"
val parent = getSparkUI(sparkContext)
// KyuubiServerTab renders by different listener's content, identified by user.
// KyuubiSessionTab renders by different listener's content, identified by user.
val listener = KyuubiServerMonitor.getListener(userName).getOrElse {
val lr = new KyuubiServerListener(sparkContext.conf)
KyuubiServerMonitor.setListener(userName, lr)
lr
}
attachPage(new KyuubiServerPage(this))
attachPage(new KyuubiServerSessionPage(this))
attachPage(new KyuubiSessionPage(this))
attachPage(new KyuubiSessionSubPage(this))
parent.attachTab(this)
def detach() {
@ -49,7 +49,7 @@ class KyuubiServerTab(userName: String, sparkContext: SparkContext)
}
}
object KyuubiServerTab {
object KyuubiSessionTab {
def getSparkUI(sparkContext: SparkContext): SparkUI = {
sparkContext.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.KyuubiSparkUtil._
import yaooqinn.kyuubi.utils.ReflectUtils
object KyuubiUIUtils {
private val className = "org.apache.spark.ui.UIUtils"
/** Returns a spark page with correctly formatted headers */
def headerSparkPage(
request: HttpServletRequest,
title: String,
content: => Seq[Node],
activeTab: SparkUITab): Seq[Node] = {
val methodMirror = ReflectUtils.reflectStaticMethodScala(className, "headerSparkPage")
if (equalOrHigherThan("2.4")) {
methodMirror(request, title, content, activeTab, Some(5000), None, false, false)
.asInstanceOf[Seq[Node]]
} else {
methodMirror(title, content, activeTab, Some(5000), None, false, false)
.asInstanceOf[Seq[Node]]
}
}
def prependBaseUri(
request: HttpServletRequest,
basePath: String = "",
resource: String = ""): String = {
val method = ReflectUtils.reflectStaticMethodScala(className, "prependBaseUri")
if (equalOrHigherThan("2.4")) {
method(request, basePath, resource).asInstanceOf[String]
} else {
method(basePath, resource).asInstanceOf[String]
}
}
}

View File

@ -30,7 +30,7 @@ import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext}
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil._
import org.apache.spark.sql.SparkSession
import org.apache.spark.ui.KyuubiServerTab
import org.apache.spark.ui.KyuubiSessionTab
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.author.AuthzHelper
@ -190,7 +190,7 @@ class SparkSessionWithUGI(
KyuubiServerMonitor.setListener(userName, new KyuubiServerListener(conf))
KyuubiServerMonitor.getListener(userName)
.foreach(_sparkSession.sparkContext.addSparkListener)
val uiTab = new KyuubiServerTab(userName, _sparkSession.sparkContext)
val uiTab = new KyuubiSessionTab(userName, _sparkSession.sparkContext)
KyuubiServerMonitor.addUITab(_sparkSession.sparkContext.sparkUser, uiTab)
}

View File

@ -20,12 +20,12 @@ package yaooqinn.kyuubi.ui
import scala.collection.mutable.HashMap
import org.apache.spark.SparkException
import org.apache.spark.ui.KyuubiServerTab
import org.apache.spark.ui.KyuubiSessionTab
object KyuubiServerMonitor {
private[this] val uiTabs = new HashMap[String, KyuubiServerTab]()
private[this] val uiTabs = new HashMap[String, KyuubiSessionTab]()
private[this] val listeners = new HashMap[String, KyuubiServerListener]()
@ -37,7 +37,7 @@ object KyuubiServerMonitor {
listeners.get(user)
}
def addUITab(user: String, ui: KyuubiServerTab): Unit = {
def addUITab(user: String, ui: KyuubiSessionTab): Unit = {
uiTabs.put(user, ui)
}

View File

@ -221,4 +221,27 @@ object ReflectUtils extends Logging {
case Failure(e) => throw KyuubiSparkUtil.findCause(e)
}
}
/**
* Call a static method of an Scala Object
*
* @param className scala object fully qualified name
* @param methodName the method name
* @return
*/
def reflectStaticMethodScala(
className: String,
methodName: String): scala.reflect.api.Mirrors#MethodMirror = {
try {
val mirror = ru.runtimeMirror(KyuubiSparkUtil.getContextOrSparkClassLoader)
val moduleSymbol = mirror.staticModule(className)
val moduleMirror = mirror.reflectModule(moduleSymbol)
val method = moduleMirror.symbol.typeSignature.decl(ru.TermName(methodName)).asMethod
val instanceMirror = mirror.reflect(moduleMirror.instance)
instanceMirror.reflectMethod(method)
} catch {
case e: Exception => throw KyuubiSparkUtil.findCause(e)
}
}
}

View File

@ -160,7 +160,7 @@ class KyuubiSparkUtilSuite extends SparkFunSuite with Logging {
assert(KyuubiSparkUtil.equalOrHigherThan("1.6.3"))
assert(KyuubiSparkUtil.equalOrHigherThan("2.0.2"))
assert(KyuubiSparkUtil.equalOrHigherThan(SPARK_COMPILE_VERSION))
assert(!KyuubiSparkUtil.equalOrHigherThan("2.4.1"))
assert(!KyuubiSparkUtil.equalOrHigherThan("2.9.1"))
assert(!KyuubiSparkUtil.equalOrHigherThan("3.0.0"))
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui
import javax.servlet.http.HttpServletRequest
import scala.util.Try
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
import org.scalatest.mock.MockitoSugar
class KyuubiSessionPageSuite extends SparkFunSuite with MockitoSugar {
var sc: SparkContext = _
var user: String = _
var tab: KyuubiSessionTab = _
override def beforeAll(): Unit = {
val conf = new SparkConf(loadDefaults = true).setMaster("local").setAppName("test")
sc = new SparkContext(conf)
user = KyuubiSparkUtil.getCurrentUserName
tab = new KyuubiSessionTab(user, sc)
}
override def afterAll(): Unit = {
sc.stop()
}
test("render kyuubi session page") {
val page = new KyuubiSessionPage(tab)
val request = mock[HttpServletRequest]
assert(Try { page.render(request) }.isSuccess )
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui
import javax.servlet.http.HttpServletRequest
import scala.util.Try
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar
import yaooqinn.kyuubi.ui.{ExecutionInfo, KyuubiServerListener, SessionInfo}
class KyuubiSessionSubPageSuite extends SparkFunSuite with MockitoSugar {
var sc: SparkContext = _
var user: String = _
var tab: KyuubiSessionTab = _
override def beforeAll(): Unit = {
val conf = new SparkConf(loadDefaults = true).setMaster("local").setAppName("test")
sc = new SparkContext(conf)
user = KyuubiSparkUtil.getCurrentUserName
tab = new KyuubiSessionTab(user, sc)
}
override def afterAll(): Unit = {
sc.stop()
}
test("render kyuubi session page") {
val page = new KyuubiSessionSubPage(tab)
val request = mock[HttpServletRequest]
intercept[IllegalArgumentException](page.render(request))
val id = "id1"
when(request.getParameter("id")).thenReturn(id)
intercept[IllegalArgumentException](page.render(request))
val sessionInfo = mock[SessionInfo]
val tab1 = mock[KyuubiSessionTab]
when(request.getParameter("id")).thenReturn(id)
val listener = mock[KyuubiServerListener]
when(tab1.listener).thenReturn(listener)
when(listener.getSession(id)).thenReturn(Some(sessionInfo))
when(sessionInfo.sessionId).thenReturn("1")
when(listener.getExecutionList).thenReturn(Seq[ExecutionInfo]())
when(tab1.appName).thenReturn("name")
when(tab1.headerTabs).thenReturn(Seq[WebUITab]())
val page2 = new KyuubiSessionSubPage(tab1)
assert(Try { page2.render(request) }.isSuccess )
}
}

View File

@ -21,17 +21,17 @@ import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite
import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}
class KyuubiServerTabSuite extends SparkFunSuite {
class KyuubiSessionTabSuite extends SparkFunSuite {
var sc: SparkContext = _
var user: String = _
var tab: KyuubiServerTab = _
var tab: KyuubiSessionTab = _
override def beforeAll(): Unit = {
val conf = new SparkConf(loadDefaults = true).setMaster("local").setAppName("test")
sc = new SparkContext(conf)
user = KyuubiSparkUtil.getCurrentUserName
tab = new KyuubiServerTab(user, sc)
tab = new KyuubiSessionTab(user, sc)
}
override def afterAll(): Unit = {
@ -48,9 +48,9 @@ class KyuubiServerTabSuite extends SparkFunSuite {
assert(tab.listener !== lr)
assert(KyuubiServerMonitor.getListener(user).get === tab.listener)
KyuubiServerMonitor.setListener("user1", lr)
val tab2 = new KyuubiServerTab("user1", sc)
val tab2 = new KyuubiSessionTab("user1", sc)
assert(tab2.listener === lr)
val tab3 = new KyuubiServerTab("user2", sc)
val tab3 = new KyuubiSessionTab("user2", sc)
assert(tab3.listener === KyuubiServerMonitor.getListener("user2").get)
}
@ -60,12 +60,12 @@ class KyuubiServerTabSuite extends SparkFunSuite {
}
test("testDetach") {
assert(KyuubiServerTab.getSparkUI(sc).getTabs.contains(tab))
assert(KyuubiSessionTab.getSparkUI(sc).getTabs.contains(tab))
tab.detach()
assert(!KyuubiServerTab.getSparkUI(sc).getTabs.contains(tab))
assert(!KyuubiSessionTab.getSparkUI(sc).getTabs.contains(tab))
}
test("testGetSparkUI") {
assert(KyuubiServerTab.getSparkUI(sc) === sc.ui.get)
assert(KyuubiSessionTab.getSparkUI(sc) === sc.ui.get)
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui
import javax.servlet.http.HttpServletRequest
import scala.util.Try
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
import org.scalatest.mock.MockitoSugar
class KyuubiUIUtilsSuite extends SparkFunSuite with MockitoSugar {
var sc: SparkContext = _
var user: String = _
var tab: KyuubiSessionTab = _
override def beforeAll(): Unit = {
val conf = new SparkConf(loadDefaults = true).setMaster("local").setAppName("test")
KyuubiSparkUtil.setupCommonConfig(conf)
sc = new SparkContext(conf)
user = KyuubiSparkUtil.getCurrentUserName
tab = new KyuubiSessionTab(user, sc)
}
override def afterAll(): Unit = {
sc.stop()
}
test("spark page header") {
val request = mock[HttpServletRequest]
val title = "KyuubiServer test ui" * 10
val content = <td></td>
assert(Try { KyuubiUIUtils.headerSparkPage(request, title, content, tab) }.isSuccess)
}
test("prepend base uri") {
val request = mock[HttpServletRequest]
val baseUri = KyuubiUIUtils.prependBaseUri(request)
val basePath = KyuubiUIUtils.prependBaseUri(request, "1")
assert(basePath === baseUri + "1")
val resourcePath = KyuubiUIUtils.prependBaseUri(request, "1", "2")
assert(resourcePath === baseUri + "12")
}
}

View File

@ -118,6 +118,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
val op5 = operationMgr.newExecuteStatementOperation(session, statement)
op5.cancel()
operationMgr.cancelOperation(op5.getHandle)
ss.stop()
}
test("rm expired operations") {
@ -146,6 +147,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
operationMgr.cancelOperation(op2.getHandle) // isTerminal=true
Thread.sleep(1500) // timeout
operationMgr.removeExpiredOperations(handles) should be(Seq(op2)) // op2 is timeout and terminal
ss.stop()
}
test("get operation next row set") {
@ -170,6 +172,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
op1.close()
intercept[KyuubiSQLException](
operationMgr.getOperationNextRowSet(op1.getHandle, FetchOrientation.FETCH_NEXT, 5))
ss.stop()
}
test("get operation log row set") {
@ -190,6 +193,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
val e = intercept[KyuubiSQLException](
operationMgr.getOperationLogRowSet(op1.getHandle, FetchOrientation.FETCH_NEXT, 5))
e.getMessage should startWith("Couldn't find log associated with operation handle:")
ss.stop()
}
}

View File

@ -18,10 +18,10 @@
package yaooqinn.kyuubi.server
import java.net.InetAddress
import java.util.UUID
import scala.collection.JavaConverters._
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.hive.service.cli.thrift._
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.KyuubiConf._
@ -299,7 +299,8 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
val dt = new TExecuteStatementReq(handle, "drop table src2")
fe.ExecuteStatement(dt)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)

View File

@ -18,7 +18,7 @@
package yaooqinn.kyuubi.ui
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.ui.KyuubiServerTab
import org.apache.spark.ui.KyuubiSessionTab
class KyuubiServerMonitorSuite extends SparkFunSuite {
val conf = new SparkConf(loadDefaults = true).setAppName("monitor").setMaster("local")
@ -42,7 +42,7 @@ class KyuubiServerMonitorSuite extends SparkFunSuite {
val liOp = KyuubiServerMonitor.getListener(user)
assert(liOp.get === li)
assert(KyuubiServerMonitor.getListener("fake").isEmpty)
val tab = new KyuubiServerTab(user, sc)
val tab = new KyuubiSessionTab(user, sc)
KyuubiServerMonitor.addUITab(user, tab)
KyuubiServerMonitor.detachUITab(user)
KyuubiServerMonitor.detachAllUITabs()

View File

@ -132,6 +132,17 @@ class ReflectUtilsSuite extends SparkFunSuite {
ReflectUtils.reflectModule(className = "yaooqinn.kyuubi.TestRule2", silent = false))
assert(e.getMessage.contains("not found"))
}
test("reflect static method scala") {
val m1 = ReflectUtils.reflectStaticMethodScala("yaooqinn.kyuubi.utils.TestClass0", "staticTest")
assert(m1() === 1)
val m2 =
ReflectUtils.reflectStaticMethodScala("yaooqinn.kyuubi.utils.TestClass0", "staticTest2")
assert(m2(1) === 2)
intercept[IllegalArgumentException](m2("1"))
intercept[ScalaReflectionException](ReflectUtils.
reflectStaticMethodScala("yaooqinn.kyuubi.utils.TestClass0", "staticTest3"))
}
}
class TestTrait {
@ -148,6 +159,7 @@ class TestClass3 extends TestTrait {
object TestClass0 {
def staticTest(): Int = 1
def staticTest2(x: Int): Int = x + 1
val testInt = 1
val testObj = "1"
}

View File

@ -356,6 +356,14 @@
</properties>
</profile>
<profile>
<id>spark-2.4</id>
<properties>
<spark.version>2.4.0</spark.version>
<scalatest.version>3.0.3</scalatest.version>
</properties>
</profile>
<profile>
<id>hadoop-2.7</id>
<properties>