Merge pull request #151 from yaooqinn/KYUUBI-150

[KYUUBI-150]Localize remote jars to support add jar and create function cmd
This commit is contained in:
Kent Yao 2019-02-21 22:03:12 +08:00 committed by GitHub
commit 6352c6aeb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 430 additions and 47 deletions

View File

@ -30,13 +30,13 @@ jobs:
include:
- stage: spark2.3
language: scala
script: ./build/mvn install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.2
language: scala
script: ./build/mvn install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V
script: ./build/mvn clean install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.1
language: scala
script: ./build/mvn install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V
script: ./build/mvn clean install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V
after_success:
- bash <(curl -s https://codecov.io/bash)

View File

@ -6,4 +6,4 @@
\ \_\ \_\/`____ \ \____/\ \____/ \ \_,__/\ \_\
\/_/\/_/`/___/> \/___/ \/___/ \/___/ \/_/
/\___/
\/__/
\/__/

View File

@ -279,7 +279,7 @@
</goals>
<configuration>
<sources>
<source>src/gen/</source>
<source>src/gen/java</source>
</sources>
</configuration>
</execution>

View File

@ -165,7 +165,6 @@ object KyuubiConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.HOURS.toMillis(6L))
val FRONTEND_IDLE_SESSION_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.frontend.session.timeout")
.doc("The check interval for session/operation timeout, which can be disabled by setting" +
@ -373,6 +372,15 @@ object KyuubiConf {
.intConf
.createWithDefault(-1)
val OPERATION_DOWNLOADED_RESOURCES_DIR: ConfigEntry[String] = {
KyuubiConfigBuilder("spark.kyuubi.operation.downloaded.resources.dir")
.doc("Temporary local directory for added resources in the remote file system.")
.stringConf
.createWithDefault(
s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}"
+ File.separator + "resources")
}
/////////////////////////////////////////////////////////////////////////////////////////////////
// Containerization //
/////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -17,6 +17,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.DataType
@ -26,4 +27,15 @@ object SparkSQLUtils {
HiveUtils.toHiveString(a)
}
def getUserJarClassLoader(sparkSession: SparkSession): ClassLoader = {
sparkSession.sharedState.jarClassLoader
}
def parsePlan(sparkSession: SparkSession, statement: String): LogicalPlan = {
sparkSession.sessionState.sqlParser.parsePlan(statement)
}
def toDataFrame(sparkSession: SparkSession, plan: LogicalPlan): DataFrame = {
Dataset.ofRows(sparkSession, plan)
}
}

View File

@ -25,14 +25,18 @@ import java.util.concurrent.{Future, RejectedExecutionException}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException
import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.AddJarCommand
import org.apache.spark.sql.types._
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
@ -40,9 +44,12 @@ import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
import yaooqinn.kyuubi.session.KyuubiSession
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging {
import KyuubiOperation._
private var state: OperationState = INITIALIZED
private val opHandle: OperationHandle =
new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion)
@ -50,7 +57,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
private val conf = session.sparkSession.conf
private val operationTimeout =
KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT.key))
KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT))
private var lastAccessTime = System.currentTimeMillis()
private var hasResultSet: Boolean = false
@ -299,11 +306,30 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
}
private def localizeAndAndResource(path: String): Unit = try {
if (isResourceDownloadable(path)) {
val src = new Path(path)
val destFileName = src.getName
val destFile =
new File(session.getResourcesSessionDir, destFileName).getCanonicalPath
val fs = src.getFileSystem(session.sparkSession.sparkContext.hadoopConfiguration)
fs.copyToLocalFile(src, new Path(destFile))
FileUtil.chmod(destFile, "ugo+rx", true)
AddJarCommand(destFile).run(session.sparkSession)
}
} catch {
case e: Exception => throw new KyuubiSQLException(s"Failed to read external resource: $path", e)
}
private def execute(): Unit = {
try {
statementId = UUID.randomUUID().toString
info(s"Running query '$statement' with $statementId")
setState(RUNNING)
val classLoader = SparkSQLUtils.getUserJarClassLoader(session.sparkSession)
Thread.currentThread().setContextClassLoader(classLoader)
KyuubiServerMonitor.getListener(session.getUserName).foreach {
_.onStatementStart(
statementId,
@ -314,7 +340,21 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
session.sparkSession.sparkContext.setJobGroup(statementId, statement)
KyuubiSparkUtil.setActiveSparkContext(session.sparkSession.sparkContext)
result = session.sparkSession.sql(statement)
val parsedPlan = SparkSQLUtils.parsePlan(session.sparkSession, statement)
parsedPlan match {
case c if c.nodeName == "CreateFunctionCommand" =>
val resources =
ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]]
resources.foreach { case FunctionResource(_, uri) =>
localizeAndAndResource(uri)
}
case a if a.nodeName == "AddJarCommand" =>
val path = ReflectUtils.getFieldValue(a, "path").asInstanceOf[String]
localizeAndAndResource(path)
case _ =>
}
result = SparkSQLUtils.toDataFrame(session.sparkSession, parsedPlan)
KyuubiServerMonitor.getListener(session.getUserName).foreach {
_.onStatementParsed(statementId, result.queryExecution.toString())
}
@ -406,4 +446,9 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
object KyuubiOperation {
val DEFAULT_FETCH_ORIENTATION: FetchOrientation = FetchOrientation.FETCH_NEXT
val DEFAULT_FETCH_MAX_ROWS = 100
def isResourceDownloadable(resource: String): Boolean = {
val scheme = new Path(resource).toUri.getScheme
StringUtils.equalsIgnoreCase(scheme, "hdfs")
}
}

View File

@ -63,6 +63,7 @@ private[kyuubi] class KyuubiSession(
private val opHandleSet = new MHSet[OperationHandle]
private var _isOperationLogEnabled = false
private var sessionLogDir: File = _
private var sessionResourcesDir: File = _
@volatile private var lastAccessTime: Long = System.currentTimeMillis()
private var lastIdleTime = 0L
@ -136,6 +137,15 @@ private[kyuubi] class KyuubiSession(
}
}
private def cleanupSessionResourcesDir(): Unit = {
try {
FileUtils.forceDelete(sessionResourcesDir)
} catch {
case e: Exception =>
error("Failed to cleanup session log dir: " + sessionResourcesDir, e)
}
}
def sparkSession: SparkSession = this.sparkSessionWithUGI.sparkSession
def ugi: UserGroupInformation = this.sessionUGI
@ -195,6 +205,8 @@ private[kyuubi] class KyuubiSession(
// Iterate through the opHandles and close their operations
opHandleSet.foreach(closeOperation)
opHandleSet.clear()
// Cleanup session resources directory
cleanupSessionResourcesDir()
// Cleanup session log directory.
cleanupSessionLogDir()
} finally {
@ -313,14 +325,14 @@ private[kyuubi] class KyuubiSession(
def isOperationLogEnabled: Boolean = _isOperationLogEnabled
/**
* Get the session dir, which is the parent dir of operation logs
* Get the session log dir, which is the parent dir of operation logs
*
* @return a file representing the parent directory of operation logs
*/
def getSessionLogDir: File = sessionLogDir
/**
* Set the session dir, which is the parent dir of operation logs
* Set the session log dir, which is the parent dir of operation logs
*
* @param operationLogRootDir the parent dir of the session dir
*/
@ -340,6 +352,32 @@ private[kyuubi] class KyuubiSession(
}
}
/**
* Get the session resource dir, which is the parent dir of operation logs
*
* @return a file representing the parent directory of operation logs
*/
def getResourcesSessionDir: File = sessionResourcesDir
/**
* Set the session log dir, which is the parent dir of operation logs
*
* @param resourcesRootDir the parent dir of the session dir
*/
def setResourcesSessionDir(resourcesRootDir: File): Unit = {
sessionResourcesDir = new File(resourcesRootDir,
username + File.separator + sessionHandle.getHandleIdentifier.toString + "_resources")
if (sessionResourcesDir.exists() && !sessionResourcesDir.isDirectory) {
throw new RuntimeException("The resources directory exists but is not a directory: " +
sessionResourcesDir)
}
if (!sessionResourcesDir.exists() && !sessionResourcesDir.mkdirs()) {
throw new RuntimeException("Couldn't create session resources directory " +
sessionResourcesDir)
}
}
def getSessionHandle: SessionHandle = sessionHandle
def getPassword: String = password

View File

@ -31,7 +31,7 @@ import org.apache.spark.SparkConf
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.operation.OperationManager
import yaooqinn.kyuubi.server.KyuubiServer
import yaooqinn.kyuubi.service.CompositeService
import yaooqinn.kyuubi.service.{CompositeService, ServiceException}
import yaooqinn.kyuubi.spark.SparkSessionCacheManager
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.utils.NamedThreadFactory
@ -41,25 +41,26 @@ import yaooqinn.kyuubi.utils.NamedThreadFactory
*/
private[kyuubi] class SessionManager private(
name: String) extends CompositeService(name) with Logging {
private[this] val operationManager = new OperationManager()
private[this] val cacheManager = new SparkSessionCacheManager()
private[this] val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession]
private[this] var execPool: ThreadPoolExecutor = _
private[this] var isOperationLogEnabled = false
private[this] var operationLogRootDir: File = _
private[this] var checkInterval: Long = _
private[this] var sessionTimeout: Long = _
private[this] var checkOperation: Boolean = false
private[this] var shutdown: Boolean = false
private val operationManager = new OperationManager()
private val cacheManager = new SparkSessionCacheManager()
private val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession]
private var execPool: ThreadPoolExecutor = _
private var isOperationLogEnabled = false
private var operationLogRootDir: File = _
private var resourcesRootDir: File = _
private var checkInterval: Long = _
private var sessionTimeout: Long = _
private var checkOperation: Boolean = false
private var shutdown: Boolean = false
def this() = this(classOf[SessionManager].getSimpleName)
private[this] def createExecPool(): Unit = {
val poolSize = conf.get(ASYNC_EXEC_THREADS.key).toInt
private def createExecPool(): Unit = {
val poolSize = conf.get(ASYNC_EXEC_THREADS).toInt
info("Background operation thread pool size: " + poolSize)
val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt
val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE).toInt
info("Background operation thread wait queue size: " + poolQueueSize)
val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME.key)
val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME)
info("Background operation thread keepalive time: " + keepAliveTime + " seconds")
val threadPoolName = classOf[KyuubiServer].getSimpleName + "-Background-Pool"
execPool =
@ -71,13 +72,13 @@ private[kyuubi] class SessionManager private(
new LinkedBlockingQueue[Runnable](poolQueueSize),
new NamedThreadFactory(threadPoolName))
execPool.allowCoreThreadTimeOut(true)
checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL.key)
sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT.key)
checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION.key).toBoolean
checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL)
sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT)
checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION).toBoolean
}
private[this] def initOperationLogRootDir(): Unit = {
operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR.key))
private def initOperationLogRootDir(): Unit = {
operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR))
isOperationLogEnabled = true
if (operationLogRootDir.exists && !operationLogRootDir.isDirectory) {
info("The operation log root directory exists, but it is not a directory: "
@ -109,10 +110,30 @@ private[kyuubi] class SessionManager private(
}
}
private def initResourcesRootDir(): Unit = {
resourcesRootDir = new File(conf.get(OPERATION_DOWNLOADED_RESOURCES_DIR))
if (resourcesRootDir.exists() && !resourcesRootDir.isDirectory) {
throw new ServiceException("The operation downloaded resources directory exists but is not" +
s" a directory ${resourcesRootDir.getAbsolutePath}")
}
if (!resourcesRootDir.exists() && !resourcesRootDir.mkdirs()) {
throw new ServiceException("Unable to create the operation downloaded resources directory" +
s" ${resourcesRootDir.getAbsolutePath}")
}
try {
FileUtils.forceDeleteOnExit(resourcesRootDir)
} catch {
case e: IOException =>
warn("Failed to schedule clean up Kyuubi Server's root directory for downloaded" +
" resources", e)
}
}
/**
* Periodically close idle sessions in 'spark.kyuubi.frontend.session.check.interval(default 6h)'
*/
private[this] def startTimeoutChecker(): Unit = {
private def startTimeoutChecker(): Unit = {
val interval: Long = math.max(checkInterval, 3000L)
// minimum 3 seconds
val timeoutChecker = new Runnable() {
@ -143,7 +164,7 @@ private[kyuubi] class SessionManager private(
execPool.execute(timeoutChecker)
}
private[this] def sleepInterval(interval: Long): Unit = {
private def sleepInterval(interval: Long): Unit = {
try {
Thread.sleep(interval)
} catch {
@ -152,7 +173,7 @@ private[kyuubi] class SessionManager private(
}
}
private[this] def cleanupLoggingRootDir(): Unit = {
private def cleanupLoggingRootDir(): Unit = {
if (isOperationLogEnabled) {
try {
FileUtils.forceDelete(operationLogRootDir)
@ -164,10 +185,19 @@ private[kyuubi] class SessionManager private(
}
}
private def cleanupResourcesRootDir(): Unit = try {
FileUtils.forceDelete(resourcesRootDir)
} catch {
case e: Exception =>
warn("Failed to cleanup root dir of KyuubiServer logging: "
+ operationLogRootDir.getAbsolutePath, e)
}
override def init(conf: SparkConf): Unit = synchronized {
this.conf = conf
initResourcesRootDir()
// Create operation log root directory, if operation logging is enabled
if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) {
if (conf.get(LOGGING_OPERATION_ENABLED).toBoolean) {
initOperationLogRootDir()
}
createExecPool()
@ -188,7 +218,7 @@ private[kyuubi] class SessionManager private(
shutdown = true
if (execPool != null) {
execPool.shutdown()
val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT.key)
val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT)
try {
execPool.awaitTermination(timeout, TimeUnit.SECONDS)
} catch {
@ -198,6 +228,7 @@ private[kyuubi] class SessionManager private(
}
execPool = null
}
cleanupResourcesRootDir()
cleanupLoggingRootDir()
}
@ -227,6 +258,7 @@ private[kyuubi] class SessionManager private(
info(s"Opening session for $username")
kyuubiSession.open(sessionConf)
kyuubiSession.setResourcesSessionDir(resourcesRootDir)
if (isOperationLogEnabled) {
kyuubiSession.setOperationLogSessionDir(operationLogRootDir)
}

View File

@ -33,4 +33,4 @@
<level value="OFF" />
<appender-ref ref="console" />
</root>
</log4j:configuration>
</log4j:configuration>

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
class SparkSQLUtilsSuite extends SparkFunSuite {
test("get user jar class loader") {
val sparkSession = SparkSession
.builder()
.appName(classOf[SparkSQLUtilsSuite].getSimpleName)
.master("local")
.getOrCreate()
sparkSession.sql("add jar udf-test.jar")
val loader = SparkSQLUtils.getUserJarClassLoader(sparkSession)
assert(loader.getResource("udf-test.jar") !== null)
sparkSession.stop()
}
}

View File

@ -136,11 +136,19 @@ class KyuubiOperationSuite extends SparkFunSuite {
assert(s.getUserName === userName)
}
test("testDEFAULT_FETCH_ORIENTATION") {
test("DEFAULT_FETCH_ORIENTATION") {
assert(KyuubiOperation.DEFAULT_FETCH_ORIENTATION === FETCH_NEXT)
}
test("testDEFAULT_FETCH_MAX_ROWS") {
test("DEFAULT_FETCH_MAX_ROWS") {
assert(KyuubiOperation.DEFAULT_FETCH_MAX_ROWS === 100)
}
test("is resource downloadable") {
intercept[IllegalArgumentException](KyuubiOperation.isResourceDownloadable(null))
intercept[IllegalArgumentException](KyuubiOperation.isResourceDownloadable(""))
assert(KyuubiOperation.isResourceDownloadable("hdfs://a/b/c.jar"))
assert(!KyuubiOperation.isResourceDownloadable("file://a/b/c.jar"))
assert(!KyuubiOperation.isResourceDownloadable("dfs://a/b/c.jar"))
}
}

View File

@ -21,6 +21,7 @@ import java.net.InetAddress
import scala.collection.JavaConverters._
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.hive.service.cli.thrift._
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.KyuubiConf._
@ -55,7 +56,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
super.afterAll()
}
test(" test new fe service") {
test("test new fe service") {
val feService = new FrontendService(beService)
feService.getConf should be(null)
feService.getStartTime should be(0)
@ -206,7 +207,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
val req3 = new TGetOperationStatusReq(resp2.getOperationHandle)
val resp3 = feService.GetOperationStatus(req3)
resp3.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
Thread.sleep(10000)
Thread.sleep(5000)
val req4 = new TFetchResultsReq(resp2.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
val resp4 = feService.FetchResults(req4)
resp4.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
@ -253,4 +254,159 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
feService.stop()
}
}
test("alter database") {
withFEServiceAndHandle { (fe, handle) =>
val req = new TExecuteStatementReq(handle,
"alter database default set dbproperties ('kent'='yao')")
val resp = fe.ExecuteStatement(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
}
test("alter schema") {
withFEServiceAndHandle { (fe, handle) =>
val req = new TExecuteStatementReq(handle,
"alter schema default set dbproperties ('kent'='yao')")
val resp = fe.ExecuteStatement(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
}
test("alter table name") {
withFEServiceAndHandle { (fe, handle) =>
val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet")
fe.ExecuteStatement(ct)
Thread.sleep(5000)
val req = new TExecuteStatementReq(handle, "alter table default.src rename to default.src2")
val resp = fe.ExecuteStatement(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
}
test("alter table set properties") {
withFEServiceAndHandle { (fe, handle) =>
val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet")
fe.ExecuteStatement(ct)
Thread.sleep(5000)
val req = new TExecuteStatementReq(handle,
"alter table default.src set tblproperties ('kent'='yao')")
val resp = fe.ExecuteStatement(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
}
test("alter table unset properties") {
withFEServiceAndHandle { (fe, handle) =>
val ct = new TExecuteStatementReq(handle,
"create table default.src(key int) using parquet tblproperties ('kent'='yao')")
fe.ExecuteStatement(ct)
Thread.sleep(5000)
val req = new TExecuteStatementReq(handle,
"alter table default.src unset tblproperties if exists ('kent', 'yao')")
val resp = fe.ExecuteStatement(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
}
test("add jar hdfs") {
withFEServiceAndHandle { (fe, handle) =>
val req = new TExecuteStatementReq(handle, "add jar hdfs://a/b/test.jar")
val resp = fe.ExecuteStatement(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
}
}
test("add jar local") {
withFEServiceAndHandle { (fe, handle) =>
val req = new TExecuteStatementReq(handle, "add jar file://a/b/test.jar")
val resp = fe.ExecuteStatement(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.get(0).getColVals.get(0).getI32Val.getValue should be(0)
}
}
test("create function") {
withFEServiceAndHandle { (fe, handle) =>
val req = new TExecuteStatementReq(handle,
"create temporary function testfunc as 'testClass' using jar 'hdfs://a/b/test.jar'")
val resp = fe.ExecuteStatement(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
}
}
def withFEServiceAndHandle(block: (FrontendService, TSessionHandle) => Unit): Unit = {
val feService = new FrontendService(beService)
try {
feService.init(conf)
feService.start()
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
val resp = feService.OpenSession(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val handle = resp.getSessionHandle
block(feService, handle)
} finally {
feService.stop()
}
}
}

View File

@ -22,7 +22,6 @@ import java.io.File
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkFunSuite}
import org.apache.spark.KyuubiConf.LOGGING_OPERATION_LOG_DIR
import org.apache.spark.sql.SparkSession
import yaooqinn.kyuubi.KyuubiSQLException
@ -34,6 +33,8 @@ import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiSessionSuite extends SparkFunSuite {
import KyuubiConf._
var server: KyuubiServer = _
var session: KyuubiSession = _
var spark: SparkSession = _
@ -105,10 +106,8 @@ class KyuubiSessionSuite extends SparkFunSuite {
}
test("set operation log session dir") {
val operationLogRootDir = new File(server.getConf.get(LOGGING_OPERATION_LOG_DIR.key))
operationLogRootDir.setReadable(true)
operationLogRootDir.setWritable(true)
operationLogRootDir.setExecutable(true)
val operationLogRootDir = new File(server.getConf.get(LOGGING_OPERATION_LOG_DIR))
operationLogRootDir.mkdirs()
session.setOperationLogSessionDir(operationLogRootDir)
assert(session.isOperationLogEnabled)
assert(operationLogRootDir.exists())
@ -128,6 +127,30 @@ class KyuubiSessionSuite extends SparkFunSuite {
operationLogRootDir.setExecutable(true)
}
test("set resources session dir") {
val resourceRoot = new File(server.getConf.get(OPERATION_DOWNLOADED_RESOURCES_DIR))
resourceRoot.mkdirs()
resourceRoot.deleteOnExit()
assert(resourceRoot.isDirectory)
session.setResourcesSessionDir(resourceRoot)
val subDir = resourceRoot.listFiles().head
assert(subDir.getName === KyuubiSparkUtil.getCurrentUserName)
val resourceDir = subDir.listFiles().head
assert(resourceDir.getName === session.getSessionHandle.getSessionId + "_resources")
session.setResourcesSessionDir(resourceRoot)
assert(subDir.listFiles().length === 1, "directory should already exists")
assert(resourceDir.delete())
resourceDir.createNewFile()
assert(resourceDir.isFile)
val e1 = intercept[RuntimeException](session.setResourcesSessionDir(resourceRoot))
assert(e1.getMessage.startsWith("The resources directory exists but is not a directory"))
resourceDir.delete()
subDir.setWritable(false)
val e2 = intercept[RuntimeException](session.setResourcesSessionDir(resourceRoot))
assert(e2.getMessage.startsWith("Couldn't create session resources directory"))
subDir.setWritable(true)
}
test("get no operation time") {
assert(session.getNoOperationTime !== 0L)
}

View File

@ -24,11 +24,13 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.service.State
import yaooqinn.kyuubi.service.{ServiceException, State}
import yaooqinn.kyuubi.utils.ReflectUtils
class SessionManagerSuite extends SparkFunSuite {
import KyuubiConf._
test("init operation log") {
val logRoot = UUID.randomUUID().toString
val logRoot2 = logRoot + "/sub"
@ -67,6 +69,29 @@ class SessionManagerSuite extends SparkFunSuite {
sessionManager4.stop()
}
test("init resources root dir") {
val conf = new SparkConf(true).set(KyuubiConf.LOGGING_OPERATION_ENABLED.key, "false")
KyuubiSparkUtil.setupCommonConfig(conf)
val sessionManager = new SessionManager()
sessionManager.init(conf)
val resourcesRoot = new File(conf.get(OPERATION_DOWNLOADED_RESOURCES_DIR))
assert(resourcesRoot.exists())
assert(resourcesRoot.isDirectory)
resourcesRoot.delete()
resourcesRoot.createNewFile()
val e1 = intercept[ServiceException](sessionManager.init(conf))
assert(e1.getMessage.startsWith(
"The operation downloaded resources directory exists but is not a directory"))
assert(resourcesRoot.delete())
resourcesRoot.getParentFile.setWritable(false)
try {
intercept[Exception](sessionManager.init(conf))
} finally {
resourcesRoot.getParentFile.setWritable(true)
}
}
test("start timeout checker") {
val conf = new SparkConf().set(KyuubiConf.FRONTEND_SESSION_CHECK_INTERVAL.key, "-1")
val sessionManager = new SessionManager()