[KYUUBI-120]define interface class IKyuubiOperation to allow different implementation of kyuubiOperation

---

---
fix #181 fix #120
---
Squashed commit of the following:

commit 3e7f81f3b8e5c4663fa54ed3e676312139693197
Author: hustfeiwang <wangfei3@corp.netease.com>
Date:   Thu May 23 18:32:08 2019 +0800

    fix unit test

commit 73763d7ca8416156132cb9e8df3e86966bf8c6e6
Author: hustfeiwang <wangfei3@corp.netease.com>
Date:   Thu May 23 09:48:21 2019 +0800

    set statementId to a val

commit 128effa6a9a89c5bb8f9c91f219e4e257ff96372
Author: hustfeiwang <wangfei3@corp.netease.com>
Date:   Wed May 22 16:06:45 2019 +0800

    set isClosedOrCanceled to a protected method

commit 267b55f1d2497872e348fc494a3ff7132a6012f0
Author: hustfeiwang <wangfei3@corp.netease.com>
Date:   Wed May 22 10:10:48 2019 +0800

    KYUUBI-120: define interface class IKyuubiOperation to allow different implementation of kyuubiOperation
This commit is contained in:
hustfeiwang 2019-05-24 11:55:32 +08:00 committed by Kent Yao
parent 416122675d
commit 5ad223556b
13 changed files with 625 additions and 289 deletions

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.operation
import java.security.PrivilegedExceptionAction
import java.util.UUID
import java.util.concurrent.{Future, RejectedExecutionException}
import scala.util.control.NonFatal
import org.apache.hive.service.cli.thrift.TProtocolVersion
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.session.KyuubiSession
abstract class AbstractKyuubiOperation(session: KyuubiSession, statement: String)
extends IKyuubiOperation with Logging{
protected var state: OperationState = INITIALIZED
protected val opHandle: OperationHandle =
new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion)
protected val operationTimeout: Long
protected var lastAccessTime = System.currentTimeMillis()
protected var hasResultSet: Boolean = false
protected var operationException: KyuubiSQLException = _
protected var backgroundHandle: Future[_] = _
protected val statementId = UUID.randomUUID().toString
protected val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] =
Set(FetchOrientation.FETCH_NEXT, FetchOrientation.FETCH_FIRST)
def getBackgroundHandle: Future[_] = backgroundHandle
def setBackgroundHandle(backgroundHandle: Future[_]): Unit = {
this.backgroundHandle = backgroundHandle
}
override def getSession: KyuubiSession = session
override def getHandle: OperationHandle = opHandle
override def getProtocolVersion: TProtocolVersion = opHandle.getProtocolVersion
override def getStatus: OperationStatus = new OperationStatus(state, operationException)
protected def setOperationException(opEx: KyuubiSQLException): Unit = {
this.operationException = opEx
}
@throws[KyuubiSQLException]
protected def setState(newState: OperationState): Unit = {
state.validateTransition(newState)
this.state = newState
this.lastAccessTime = System.currentTimeMillis()
}
protected def checkState(state: OperationState): Boolean = {
this.state == state
}
protected def isClosedOrCanceled: Boolean = {
checkState(CLOSED) || checkState(CANCELED)
}
@throws[KyuubiSQLException]
protected def assertState(state: OperationState): Unit = {
if (this.state ne state) {
throw new KyuubiSQLException("Expected state " + state + ", but found " + this.state)
}
this.lastAccessTime = System.currentTimeMillis()
}
override def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
debug(s"CLOSING $statementId")
cleanup(CLOSED)
}
override def cancel(): Unit = {
info(s"Cancel '$statement' with $statementId")
cleanup(CANCELED)
}
protected def setHasResultSet(hasResultSet: Boolean): Unit = {
this.hasResultSet = hasResultSet
opHandle.setHasResultSet(hasResultSet)
}
/**
* Verify if the given fetch orientation is part of the default orientation types.
*/
@throws[KyuubiSQLException]
protected def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET)
}
/**
* Verify if the given fetch orientation is part of the supported orientation types.
*/
@throws[KyuubiSQLException]
protected def validateFetchOrientation(
orientation: FetchOrientation,
supportedOrientations: Set[FetchOrientation]): Unit = {
if (!supportedOrientations.contains(orientation)) {
throw new KyuubiSQLException(
"The fetch type " + orientation.toString + " is not supported for this resultset", "HY106")
}
}
protected def runInternal(): Unit = {
setState(PENDING)
setHasResultSet(true)
// Runnable impl to call runInternal asynchronously, from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
try {
session.ugi.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
execute()
} catch {
case e: KyuubiSQLException => setOperationException(e)
}
}
})
} catch {
case e: Exception => setOperationException(new KyuubiSQLException(e))
}
}
}
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
session.getSessionMgr.submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(ERROR)
throw new KyuubiSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
}
}
protected def execute(): Unit
protected def onStatementError(id: String, message: String, trace: String): Unit = {
error(
s"""
|Error executing query as ${session.getUserName},
|$statement
|Current operation state ${this.state},
|$trace
""".stripMargin)
setState(ERROR)
}
protected def cleanup(state: OperationState) {
if (this.state != CLOSED) {
setState(state)
}
val backgroundHandle = getBackgroundHandle
if (backgroundHandle != null) {
backgroundHandle.cancel(true)
}
}
override def isTimedOut: Boolean = {
if (operationTimeout <= 0) {
false
} else {
// check only when it's in terminal state
state.isTerminal && lastAccessTime + operationTimeout <= System.currentTimeMillis()
}
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.operation
import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.sql.types.StructType
import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.schema.RowSet
import yaooqinn.kyuubi.session.KyuubiSession
/**
* Interface class of KyuubiOperation.
*/
trait IKyuubiOperation {
/**
* Get relative IKyuubiSession.
*/
def getSession: KyuubiSession
/**
* Get relative OperationHandle.
*/
def getHandle: OperationHandle
/**
* Get the protocol version of this IKyuubiOperation.
*/
def getProtocolVersion: TProtocolVersion
/**
* Get current status of this IKyuubiOperation.
*/
def getStatus: OperationStatus
/**
* Get operation log.
*/
def getOperationLog: OperationLog
/**
* Run this IKyuubiOperation.
* @throws KyuubiSQLException
*/
@throws[KyuubiSQLException]
def run(): Unit
/**
* Close this IKyuubiOperation.
*/
def close(): Unit
/**
* Cancel this IKyuubiOperation.
*/
def cancel(): Unit
/**
* Get the schema of operation result set.
*/
def getResultSetSchema: StructType
/**
* Get the operation result set.
* @param order the fetch orientation, FETCH_FIRST or FETCH_NEXT.
* @param rowSetSize limit of result set.
* @return
*/
def getNextRowSet(order: FetchOrientation, rowSetSize: Long): RowSet
/**
* Check whether this IKyuubiOperation has run more than the configured timeout duration.
*/
def isTimedOut: Boolean
}

View File

@ -18,20 +18,15 @@
package yaooqinn.kyuubi.operation
import java.io.{File, FileNotFoundException}
import java.security.PrivilegedExceptionAction
import java.util.UUID
import java.util.concurrent.{Future, RejectedExecutionException}
import scala.collection.JavaConverters._
import scala.util.{Success, Try}
import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException
import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
@ -41,85 +36,33 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{AddFileCommand, AddJarCommand, CreateFunctionCommand}
import org.apache.spark.sql.types._
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
import yaooqinn.kyuubi.session.KyuubiSession
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging {
class KyuubiClientOperation(session: KyuubiSession, statement: String)
extends AbstractKyuubiOperation(session, statement) {
import KyuubiOperation._
private var state: OperationState = INITIALIZED
private val opHandle: OperationHandle =
new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion)
import KyuubiClientOperation._
private val sparkSession = session.sparkSession
private val conf = sparkSession.conf
private val operationTimeout =
override protected val operationTimeout =
KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT))
private var lastAccessTime = System.currentTimeMillis()
private var hasResultSet: Boolean = false
private var operationException: KyuubiSQLException = _
private var backgroundHandle: Future[_] = _
private var operationLog: OperationLog = _
private var isOperationLogEnabled: Boolean = false
private var result: DataFrame = _
private var iter: Iterator[Row] = _
private var statementId: String = _
private val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] =
Set(FetchOrientation.FETCH_NEXT, FetchOrientation.FETCH_FIRST)
private val incrementalCollect: Boolean = conf.get(OPERATION_INCREMENTAL_COLLECT).toBoolean
def getBackgroundHandle: Future[_] = backgroundHandle
def setBackgroundHandle(backgroundHandle: Future[_]): Unit = {
this.backgroundHandle = backgroundHandle
}
def getSession: KyuubiSession = session
def getHandle: OperationHandle = opHandle
def getProtocolVersion: TProtocolVersion = opHandle.getProtocolVersion
def getStatus: OperationStatus = new OperationStatus(state, operationException)
def getOperationLog: OperationLog = operationLog
private def setOperationException(opEx: KyuubiSQLException): Unit = {
this.operationException = opEx
}
@throws[KyuubiSQLException]
private def setState(newState: OperationState): Unit = {
state.validateTransition(newState)
this.state = newState
this.lastAccessTime = System.currentTimeMillis()
}
private def checkState(state: OperationState): Boolean = {
this.state == state
}
def isClosedOrCanceled: Boolean = {
checkState(CLOSED) || checkState(CANCELED)
}
@throws[KyuubiSQLException]
private def assertState(state: OperationState): Unit = {
if (this.state ne state) {
throw new KyuubiSQLException("Expected state " + state + ", but found " + this.state)
}
this.lastAccessTime = System.currentTimeMillis()
}
override def getOperationLog: OperationLog = operationLog
private def createOperationLog(): Unit = {
if (session.isOperationLogEnabled) {
@ -191,7 +134,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
@throws[KyuubiSQLException]
def run(): Unit = {
override def run(): Unit = {
createOperationLog()
try {
runInternal()
@ -211,26 +154,19 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
}
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
debug(s"CLOSING $statementId")
cleanup(CLOSED)
override def close(): Unit = {
super.close()
cleanupOperationLog()
sparkSession.sparkContext.clearJobGroup()
}
def cancel(): Unit = {
info(s"Cancel '$statement' with $statementId")
cleanup(CANCELED)
}
def getResultSetSchema: StructType = if (result == null || result.schema.isEmpty) {
override def getResultSetSchema: StructType = if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
} else {
result.schema
}
def getNextRowSet(order: FetchOrientation, rowSetSize: Long): RowSet = {
override def getNextRowSet(order: FetchOrientation, rowSetSize: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(FINISHED)
setHasResultSet(true)
@ -242,69 +178,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
RowSetBuilder.create(getResultSetSchema, taken.toSeq, session.getProtocolVersion)
}
private def setHasResultSet(hasResultSet: Boolean): Unit = {
this.hasResultSet = hasResultSet
opHandle.setHasResultSet(hasResultSet)
}
/**
* Verify if the given fetch orientation is part of the default orientation types.
*/
@throws[KyuubiSQLException]
private def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET)
}
/**
* Verify if the given fetch orientation is part of the supported orientation types.
*/
@throws[KyuubiSQLException]
private def validateFetchOrientation(
orientation: FetchOrientation,
supportedOrientations: Set[FetchOrientation]): Unit = {
if (!supportedOrientations.contains(orientation)) {
throw new KyuubiSQLException(
"The fetch type " + orientation.toString + " is not supported for this resultset", "HY106")
}
}
private def runInternal(): Unit = {
setState(PENDING)
setHasResultSet(true)
// Runnable impl to call runInternal asynchronously, from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
try {
session.ugi.doAs(new PrivilegedExceptionAction[Unit]() {
registerCurrentOperationLog()
override def run(): Unit = {
try {
execute()
} catch {
case e: KyuubiSQLException => setOperationException(e)
}
}
})
} catch {
case e: Exception => setOperationException(new KyuubiSQLException(e))
}
}
}
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
session.getSessionMgr.submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(ERROR)
throw new KyuubiSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
}
}
private def localizeAndAndResource(path: String): Option[String] = try {
if (isResourceDownloadable(path)) {
val src = new Path(path)
@ -345,9 +218,9 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
case _ => plan
}
private def execute(): Unit = {
override protected def execute(): Unit = {
try {
statementId = UUID.randomUUID().toString
registerCurrentOperationLog()
info(s"Running query '$statement' with $statementId")
setState(RUNNING)
@ -443,48 +316,24 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
}
private def onStatementError(id: String, message: String, trace: String): Unit = {
error(
s"""
|Error executing query as ${session.getUserName},
|$statement
|Current operation state ${this.state},
|$trace
""".stripMargin)
setState(ERROR)
override protected def onStatementError(id: String, message: String, trace: String): Unit = {
super.onStatementError(id, message, trace)
KyuubiServerMonitor.getListener(session.getUserName)
.foreach(_.onStatementError(id, message, trace))
}
private def cleanup(state: OperationState) {
if (this.state != CLOSED) {
setState(state)
}
val backgroundHandle = getBackgroundHandle
if (backgroundHandle != null) {
backgroundHandle.cancel(true)
}
override protected def cleanup(state: OperationState) {
super.cleanup(state)
if (statementId != null) {
sparkSession.sparkContext.cancelJobGroup(statementId)
}
}
def isTimedOut: Boolean = {
if (operationTimeout <= 0) {
false
} else {
// check only when it's in terminal state
state.isTerminal && lastAccessTime + operationTimeout <= System.currentTimeMillis()
}
}
}
object KyuubiOperation {
val DEFAULT_FETCH_ORIENTATION: FetchOrientation = FetchOrientation.FETCH_NEXT
val DEFAULT_FETCH_MAX_ROWS = 100
object KyuubiClientOperation {
def isResourceDownloadable(resource: String): Boolean = {
val scheme = new Path(resource).toUri.getScheme
StringUtils.equalsIgnoreCase(scheme, "hdfs")
}
}
}

View File

@ -41,7 +41,7 @@ private[kyuubi] class OperationManager private(name: String)
def this() = this(classOf[OperationManager].getSimpleName)
private[this] lazy val logSchema: StructType = new StructType().add("operation_log", "string")
private[this] val handleToOperation = new ConcurrentHashMap[OperationHandle, KyuubiOperation]
private[this] val handleToOperation = new ConcurrentHashMap[OperationHandle, IKyuubiOperation]
private[this] val userToOperationLog = new ConcurrentHashMap[String, OperationLog]()
override def init(conf: SparkConf): Unit = synchronized {
@ -85,13 +85,13 @@ private[kyuubi] class OperationManager private(name: String)
def newExecuteStatementOperation(
parentSession: KyuubiSession,
statement: String): KyuubiOperation = synchronized {
val operation = new KyuubiOperation(parentSession, statement)
statement: String): IKyuubiOperation = synchronized {
val operation = new KyuubiClientOperation(parentSession, statement)
addOperation(operation)
operation
}
def getOperation(operationHandle: OperationHandle): KyuubiOperation = {
def getOperation(operationHandle: OperationHandle): IKyuubiOperation = {
val operation = getOperationInternal(operationHandle)
if (operation == null) {
throw new KyuubiSQLException("Invalid OperationHandle " + operationHandle)
@ -102,7 +102,7 @@ private[kyuubi] class OperationManager private(name: String)
private[this] def getOperationInternal(operationHandle: OperationHandle) =
handleToOperation.get(operationHandle)
private[this] def addOperation(operation: KyuubiOperation): Unit = {
private[this] def addOperation(operation: IKyuubiOperation): Unit = {
handleToOperation.put(operation.getHandle, operation)
}
@ -110,7 +110,7 @@ private[kyuubi] class OperationManager private(name: String)
handleToOperation.remove(opHandle)
private def removeTimedOutOperation(
operationHandle: OperationHandle): Option[KyuubiOperation] = synchronized {
operationHandle: OperationHandle): Option[IKyuubiOperation] = synchronized {
Some(handleToOperation.get(operationHandle))
.filter(_.isTimedOut)
.map(_ => handleToOperation.remove(operationHandle))
@ -176,7 +176,7 @@ private[kyuubi] class OperationManager private(name: String)
fetchOrientation == FetchOrientation.FETCH_FIRST
}
def removeExpiredOperations(handles: Seq[OperationHandle]): Seq[KyuubiOperation] = {
def removeExpiredOperations(handles: Seq[OperationHandle]): Seq[IKyuubiOperation] = {
handles.flatMap(removeTimedOutOperation).map { op =>
warn("Operation " + op.getHandle + " is timed-out and will be closed")
op

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi
import yaooqinn.kyuubi.cli.FetchOrientation
package object operation {
val DEFAULT_FETCH_ORIENTATION: FetchOrientation = FetchOrientation.FETCH_NEXT
val DEFAULT_FETCH_MAX_ROWS = 100
}

View File

@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.auth.KyuubiAuthFactory
import yaooqinn.kyuubi.cli._
import yaooqinn.kyuubi.operation.{KyuubiOperation, OperationHandle, OperationManager}
import yaooqinn.kyuubi.operation.{IKyuubiOperation, OperationHandle, OperationManager}
import yaooqinn.kyuubi.schema.RowSet
import yaooqinn.kyuubi.session.security.TokenCollector
import yaooqinn.kyuubi.spark.SparkSessionWithUGI
@ -281,7 +281,7 @@ private[kyuubi] class KyuubiSession(
}
}
private def closeTimedOutOperations(operations: Seq[KyuubiOperation]): Unit = {
private def closeTimedOutOperations(operations: Seq[IKyuubiOperation]): Unit = {
acquire(false)
try {
operations.foreach { op =>

View File

@ -147,7 +147,12 @@ object ReflectUtils extends Logging {
argTypes: Seq[Class[_]], params: Seq[AnyRef]): Any = {
require(o != null, "object could not be null!")
Try {
val method = o.getClass.getDeclaredMethod(name, argTypes: _*)
val method = try {
o.getClass.getDeclaredMethod(name, argTypes: _*)
} catch {
case e: NoSuchMethodException =>
o.getClass.getMethod(name, argTypes: _*)
}
method.setAccessible(true)
method.invoke(o, params: _*)
} match {

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.operation
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli.thrift.{TFetchOrientation, TProtocolVersion}
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.scalatest.mock.MockitoSugar
import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.session.{KyuubiSession, SessionManager}
import yaooqinn.kyuubi.utils.ReflectUtils
abstract class AbstractKyuubiOperationSuite extends SparkFunSuite with MockitoSugar {
val conf = new SparkConf(loadDefaults = true).setAppName("operation test")
KyuubiSparkUtil.setupCommonConfig(conf)
conf.remove(KyuubiSparkUtil.CATALOG_IMPL)
conf.setMaster("local")
var sessionMgr: SessionManager = _
val proto = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8
val user = UserGroupInformation.getCurrentUser
val userName = user.getShortUserName
val passwd = ""
val statement = "show tables"
var session: KyuubiSession = _
override protected def beforeAll(): Unit = {
sessionMgr = new SessionManager()
sessionMgr.init(conf)
sessionMgr.start()
}
override protected def afterAll(): Unit = {
session.close()
session = null
sessionMgr.stop()
}
test("testCancel") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(op.getStatus.getState === INITIALIZED)
op.cancel()
assert(op.getStatus.getState === CANCELED)
}
test("testGetHandle") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(!op.getHandle.isHasResultSet)
assert(!op.getHandle.toTOperationHandle.isHasResultSet)
op.getHandle.setHasResultSet(true)
assert(op.getHandle.isHasResultSet)
assert(op.getHandle.toTOperationHandle.isHasResultSet)
assert(op.getHandle.getProtocolVersion === TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8)
assert(op.getHandle.getOperationType === EXECUTE_STATEMENT)
}
test("testGetStatus") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(op.getStatus.getState === INITIALIZED)
assert(op.getStatus.getOperationException === null)
}
test("testIsTimedOut") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(!op.isTimedOut)
ReflectUtils.setFieldValue(op, "operationTimeout", 0)
assert(!op.isTimedOut)
}
test("testGetProtocolVersion") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(op.getProtocolVersion === proto)
}
test("testGetOperationLog") {
// TODO
}
test("testClose") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(op.getStatus.getState === INITIALIZED)
op.close()
assert(op.getStatus.getState === CLOSED)
}
test("testGetSession") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
val s = op.getSession
assert(s == session)
assert(s.getUserName === userName)
}
test("is closed or canceled") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
op.cancel()
assert(op.getStatus.getState === CANCELED)
op.close()
assert(op.getStatus.getState === CLOSED)
val op2 = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
op2.close()
assert(op2.getStatus.getState === CLOSED)
val op3 = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, null)
op3.cancel()
op3.close()
assert(op3.getStatus.getState === CLOSED)
}
test("test set, check and assert state") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
ReflectUtils.invokeMethod(op, "setState", List(classOf[OperationState]), List(RUNNING))
assert(ReflectUtils.invokeMethod(
op, "checkState", List(classOf[OperationState]), List(RUNNING)) === true)
assert(ReflectUtils.invokeMethod(
op, "checkState", List(classOf[OperationState]), List(FINISHED)) === false)
ReflectUtils.invokeMethod(op, "assertState", List(classOf[OperationState]), List(RUNNING))
}
test("test validateDefaultFetchOrientation") {
case object FETCH_RELATIVE extends FetchOrientation {
override val toTFetchOrientation: TFetchOrientation = TFetchOrientation.FETCH_RELATIVE
}
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
val e = intercept[KyuubiSQLException](ReflectUtils.invokeMethod(op,
"validateDefaultFetchOrientation", List(classOf[FetchOrientation]), List(FETCH_RELATIVE)))
assert(e.getMessage === "The fetch type " + FETCH_RELATIVE.toString +
" is not supported for this resultset")
}
}

View File

@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -17,43 +17,32 @@
package yaooqinn.kyuubi.operation
import scala.collection.JavaConverters._
import java.io.File
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.KyuubiConf.LOGGING_OPERATION_LOG_DIR
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.command.CreateFunctionCommand
import org.apache.spark.sql.internal.SQLConf
import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar
import scala.collection.JavaConverters._
import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.FetchOrientation.FETCH_NEXT
import yaooqinn.kyuubi.schema.ColumnBasedSet
import yaooqinn.kyuubi.session.{KyuubiSession, SessionManager}
import yaooqinn.kyuubi.session.KyuubiSession
import yaooqinn.kyuubi.spark.SparkSessionWithUGI
import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar {
class KyuubiClientOperationSuite extends AbstractKyuubiOperationSuite {
val conf = new SparkConf(loadDefaults = true).setAppName("operation test")
KyuubiSparkUtil.setupCommonConfig(conf)
conf.remove(KyuubiSparkUtil.CATALOG_IMPL)
conf.setMaster("local")
var sessionMgr: SessionManager = _
val proto = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8
val user = UserGroupInformation.getCurrentUser
val userName = user.getShortUserName
val passwd = ""
val statement = "show tables"
var session: KyuubiSession = _
var spark: SparkSession = _
var sparkWithUgi: SparkSessionWithUGI = _
override protected def beforeAll(): Unit = {
super.beforeAll()
val sc = ReflectUtils
.newInstance(classOf[SparkContext].getName, Seq(classOf[SparkConf]), Seq(conf))
.asInstanceOf[SparkContext]
@ -61,9 +50,6 @@ class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar {
classOf[SparkSession].getName,
Seq(classOf[SparkContext]),
Seq(sc)).asInstanceOf[SparkSession]
sessionMgr = new SessionManager()
sessionMgr.init(conf)
sessionMgr.start()
sparkWithUgi = new SparkSessionWithUGI(user, conf, sessionMgr.getCacheMgr)
ReflectUtils.setFieldValue(sparkWithUgi,
"yaooqinn$kyuubi$spark$SparkSessionWithUGI$$_sparkSession", spark)
@ -72,40 +58,9 @@ class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar {
ReflectUtils.setFieldValue(session, "sparkSessionWithUGI", sparkWithUgi)
}
protected override def afterAll(): Unit = {
session.close()
session = null
sessionMgr.stop()
override protected def afterAll(): Unit = {
spark.stop()
}
test("testCancel") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(op.getStatus.getState === INITIALIZED)
op.cancel()
assert(op.getStatus.getState === CANCELED)
}
test("testGetHandle") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(!op.getHandle.isHasResultSet)
assert(!op.getHandle.toTOperationHandle.isHasResultSet)
op.getHandle.setHasResultSet(true)
assert(op.getHandle.isHasResultSet)
assert(op.getHandle.toTOperationHandle.isHasResultSet)
assert(op.getHandle.getProtocolVersion === TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8)
assert(op.getHandle.getOperationType === EXECUTE_STATEMENT)
}
test("testGetStatus") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(op.getStatus.getState === INITIALIZED)
assert(op.getStatus.getOperationException === null)
}
test("testIsTimedOut") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(!op.isTimedOut)
super.afterAll()
}
test("testGetNextRowSet") {
@ -119,48 +74,25 @@ class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar {
assert(rowSet.isInstanceOf[ColumnBasedSet])
}
test("testGetProtocolVersion") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(op.getProtocolVersion === proto)
}
test("testGetOperationLog") {
// TODO
}
test("testClose") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(op.getStatus.getState === INITIALIZED)
op.close()
assert(op.getStatus.getState === CLOSED)
}
test("testGetSession") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
val s = op.getSession
assert(s.sparkSession === spark)
assert(s == session)
assert(s.getUserName === userName)
}
test("DEFAULT_FETCH_ORIENTATION") {
assert(KyuubiOperation.DEFAULT_FETCH_ORIENTATION === FETCH_NEXT)
assert(DEFAULT_FETCH_ORIENTATION === FETCH_NEXT)
}
test("DEFAULT_FETCH_MAX_ROWS") {
assert(KyuubiOperation.DEFAULT_FETCH_MAX_ROWS === 100)
assert(DEFAULT_FETCH_MAX_ROWS === 100)
}
test("is resource downloadable") {
intercept[IllegalArgumentException](KyuubiOperation.isResourceDownloadable(null))
intercept[IllegalArgumentException](KyuubiOperation.isResourceDownloadable(""))
assert(KyuubiOperation.isResourceDownloadable("hdfs://a/b/c.jar"))
assert(!KyuubiOperation.isResourceDownloadable("file://a/b/c.jar"))
assert(!KyuubiOperation.isResourceDownloadable("dfs://a/b/c.jar"))
intercept[IllegalArgumentException](KyuubiClientOperation.isResourceDownloadable(null))
intercept[IllegalArgumentException](KyuubiClientOperation.isResourceDownloadable(""))
assert(KyuubiClientOperation.isResourceDownloadable("hdfs://a/b/c.jar"))
assert(!KyuubiClientOperation.isResourceDownloadable("file://a/b/c.jar"))
assert(!KyuubiClientOperation.isResourceDownloadable("dfs://a/b/c.jar"))
}
test("transform plan") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
.asInstanceOf[KyuubiClientOperation]
val parser = new SparkSqlParser(new SQLConf)
val plan0 = parser.parsePlan("create temporary function a as 'a.b.c'")
@ -191,20 +123,24 @@ class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar {
assert(e3.getMessage.startsWith("Resource Type"))
}
test("is closed or canceled") {
test("test get operation log") {
val operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR.key))
operationLogRootDir.mkdirs()
session.setOperationLogSessionDir(operationLogRootDir)
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(!op.isClosedOrCanceled)
op.cancel()
assert(op.isClosedOrCanceled)
op.close()
assert(op.isClosedOrCanceled)
val op2 = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
op2.close()
assert(op2.isClosedOrCanceled)
val op3 = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, null)
op3.cancel()
op3.close()
assert(op3.isClosedOrCanceled)
assert(op.getOperationLog === null)
ReflectUtils.invokeMethod(op, "registerCurrentOperationLog")
assert(sessionMgr.getOperationMgr.getOperationLog === null)
ReflectUtils.invokeMethod(op, "createOperationLog")
assert(op.getOperationLog !== null)
ReflectUtils.invokeMethod(op, "createOperationLog")
assert(op.getOperationLog !== null)
ReflectUtils.invokeMethod(op, "unregisterOperationLog")
assert(sessionMgr.getOperationMgr.getOperationLog === null)
operationLogRootDir.delete()
}
}

View File

@ -27,8 +27,9 @@ import org.apache.spark.sql.internal.SQLConf
import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiOperationWithHDFSSuite extends KyuubiOperationSuite {
class KyuubiClientOperationWithHDFSSuite extends KyuubiClientOperationSuite {
val hdfsConf = new HdfsConfiguration
hdfsConf.set("fs.hdfs.impl.disable.cache", "true")
var cluster: MiniDFSCluster = new MiniDFSCluster.Builder(hdfsConf).build()
cluster.waitClusterUp()
val fs = cluster.getFileSystem
@ -52,6 +53,7 @@ class KyuubiOperationWithHDFSSuite extends KyuubiOperationSuite {
test("transform logical plan") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
.asInstanceOf[KyuubiClientOperation]
val parser = new SparkSqlParser(new SQLConf)
val plan0 = parser.parsePlan(
s"create temporary function a as 'a.b.c' using file '$remoteUDFFile'")

View File

@ -24,7 +24,7 @@ import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.GetInfoType
import yaooqinn.kyuubi.operation.{FINISHED, OperationHandle}
import yaooqinn.kyuubi.operation.{CANCELED, CLOSED, FINISHED, OperationHandle}
import yaooqinn.kyuubi.session.SessionHandle
class BackendServiceSuite extends SparkFunSuite {
@ -116,7 +116,8 @@ class BackendServiceSuite extends SparkFunSuite {
assert(kyuubiOperation.getHandle === operationHandle)
assert(kyuubiOperation.getProtocolVersion === proto)
assert(!kyuubiOperation.isTimedOut)
assert(!kyuubiOperation.isClosedOrCanceled)
assert(kyuubiOperation.getStatus.getState !== CANCELED)
assert(kyuubiOperation.getStatus.getState !== CLOSED)
var count = 0
while (count < 100 && kyuubiOperation.getStatus.getState != FINISHED) {
Thread.sleep(50 )
@ -135,7 +136,8 @@ class BackendServiceSuite extends SparkFunSuite {
assert(kyuubiOperation.getHandle === operationHandle)
assert(kyuubiOperation.getProtocolVersion === proto)
assert(!kyuubiOperation.isTimedOut)
assert(!kyuubiOperation.isClosedOrCanceled)
assert(kyuubiOperation.getStatus.getState !== CANCELED)
assert(kyuubiOperation.getStatus.getState !== CLOSED)
var count = 0
while (count < 100 && kyuubiOperation.getStatus.getState != FINISHED) {
Thread.sleep(50 )
@ -151,7 +153,8 @@ class BackendServiceSuite extends SparkFunSuite {
val operationMgr = backendService.getSessionManager.getOperationMgr
backendService.cancelOperation(operationHandle)
val operation = operationMgr.getOperation(operationHandle)
assert(operation.isClosedOrCanceled || operation.getStatus.getState === FINISHED)
val opState = operation.getStatus.getState
assert(opState === CLOSED || opState === CANCELED || opState === FINISHED)
}
test("close operation") {
@ -159,7 +162,8 @@ class BackendServiceSuite extends SparkFunSuite {
val operationMgr = backendService.getSessionManager.getOperationMgr
val operation = operationMgr.getOperation(operationHandle)
backendService.closeOperation(operationHandle)
assert(operation.isClosedOrCanceled || operation.getStatus.getState === FINISHED)
val opState = operation.getStatus.getState
assert(opState === CLOSED || opState === CANCELED || opState === FINISHED)
}
test("reject execution exception") {

View File

@ -20,28 +20,32 @@ package yaooqinn.kyuubi.session
import java.io.File
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.hive.service.cli.thrift.{TGetInfoType, TProtocolVersion}
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkFunSuite}
import org.apache.spark.sql.SparkSession
import org.scalatest.mock.MockitoSugar
import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.auth.KyuubiAuthFactory
import yaooqinn.kyuubi.cli.GetInfoType
import yaooqinn.kyuubi.cli.{FetchOrientation, FetchType, GetInfoType}
import yaooqinn.kyuubi.operation.{CANCELED, OperationState}
import yaooqinn.kyuubi.schema.ColumnBasedSet
import yaooqinn.kyuubi.server.KyuubiServer
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiSessionSuite extends SparkFunSuite {
class KyuubiSessionSuite extends SparkFunSuite with MockitoSugar {
import KyuubiConf._
var server: KyuubiServer = _
var session: KyuubiSession = _
var spark: SparkSession = _
val statement = "show tables"
override def beforeAll(): Unit = {
System.setProperty(KyuubiConf.FRONTEND_BIND_PORT.key, "0")
System.setProperty("spark.master", "local")
System.setProperty("spark.sql.catalogImplementation", "hive")
server = KyuubiServer.startKyuubiServer()
val be = server.beService
@ -65,6 +69,7 @@ class KyuubiSessionSuite extends SparkFunSuite {
override def afterAll(): Unit = {
System.clearProperty(KyuubiConf.FRONTEND_BIND_PORT.key)
System.clearProperty("spark.master")
System.clearProperty("spark.sql.catalogImplementation")
if (session != null) {
if (session.sparkSession != null) session.sparkSession.stop()
session.close()
@ -181,4 +186,77 @@ class KyuubiSessionSuite extends SparkFunSuite {
assert(e.getMessage === "Delegation token only supported over kerberos authentication")
assert(e.toTStatus.getSqlState === "08S01")
}
test("test getProtocolVersion") {
assert(session.getProtocolVersion === TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8)
}
test("test close operation") {
val opMgr = session.getSessionMgr.getOperationMgr
val op = opMgr.newExecuteStatementOperation(session, statement)
val opHandle = op.getHandle
assert(opMgr.getOperation(opHandle) !== null)
session.closeOperation(opHandle)
val e = intercept[KyuubiSQLException](opMgr.getOperation(opHandle))
assert(e.getMessage === "Invalid OperationHandle " + opHandle)
}
test("test cancel operation") {
val opMgr = session.getSessionMgr.getOperationMgr
val op = opMgr.newExecuteStatementOperation(session, statement)
val opHandle = op.getHandle
session.cancelOperation(opHandle)
assert(ReflectUtils.getSuperField(op, "state").asInstanceOf[OperationState] === CANCELED)
}
test("test get info") {
assert(session.getInfo(GetInfoType.SERVER_NAME).toTGetInfoValue
.getStringValue === "Kyuubi Server")
assert(session.getInfo(GetInfoType.DBMS_NAME).toTGetInfoValue
.getStringValue === "Spark SQL")
assert(session.getInfo(GetInfoType.DBMS_VERSION).toTGetInfoValue
.getStringValue === spark.version)
case object UNSUPPORT_INFO extends GetInfoType {
override val tInfoType: TGetInfoType = TGetInfoType.CLI_USER_NAME
}
intercept[KyuubiSQLException](session.getInfo(UNSUPPORT_INFO))
}
test("test getNoOperationTime") {
val mockSession = mock[KyuubiSession]
assert(mockSession.getNoOperationTime === 0L)
}
test("test executeStatement") {
val sessionHadnle = server.beService.getSessionManager.openSession(
session.getProtocolVersion,
session.getUserName,
"",
session.getIpAddress,
Map.empty,
true)
val kyuubiSession = server.beService.getSessionManager.getSession(sessionHadnle)
kyuubiSession.getSessionMgr.getCacheMgr.set(session.getUserName, spark)
var opHandle = kyuubiSession.executeStatement("wrong statement")
Thread.sleep(5000)
var opException = kyuubiSession.getSessionMgr.getOperationMgr.getOperation(opHandle)
.getStatus.getOperationException
assert(opException.getSQLState === "ParseException")
opHandle = kyuubiSession.executeStatement("select * from tablea")
Thread.sleep(5000)
opException = kyuubiSession.getSessionMgr.getOperationMgr.getOperation(opHandle)
.getStatus.getOperationException
assert(opException.getSQLState === "AnalysisException")
opHandle = kyuubiSession.executeStatement("show tables")
Thread.sleep(5000)
val results = kyuubiSession.fetchResults(opHandle, FetchOrientation.FETCH_FIRST,
10, FetchType.QUERY_OUTPUT)
val logs = kyuubiSession.fetchResults(opHandle, FetchOrientation.FETCH_FIRST,
10, FetchType.LOG)
assert(results.isInstanceOf[ColumnBasedSet] && logs.isInstanceOf[ColumnBasedSet])
}
}

View File

@ -82,6 +82,8 @@ class ReflectUtilsSuite extends SparkFunSuite {
val t = new TestClass3
assert(ReflectUtils.invokeMethod(t, "test") === 1)
intercept[NoSuchMethodException](ReflectUtils.invokeMethod(t, "dummy"))
val t2 = new TestClass4
assert(ReflectUtils.invokeMethod(t2, "test") === 1)
}
test("testSuperField") {
@ -156,6 +158,7 @@ class TestClass2(arg1: String, arg2: TestClass0)
class TestClass3 extends TestTrait {
def test: Long = 1L
}
class TestClass4 extends TestClass3
object TestClass0 {
def staticTest(): Int = 1