* [KYUUBI-186][FOLLOWUP]fix ut occasionally failure fix #186 * fix ut * code cover
This commit is contained in:
parent
3593f980e4
commit
ffbf5303fb
@ -302,10 +302,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
|||||||
setState(ERROR)
|
setState(ERROR)
|
||||||
throw new KyuubiSQLException("The background threadpool cannot accept" +
|
throw new KyuubiSQLException("The background threadpool cannot accept" +
|
||||||
" new task for execution, please retry the operation", rejected)
|
" new task for execution, please retry the operation", rejected)
|
||||||
case NonFatal(e) =>
|
|
||||||
error(s"Error executing query in background", e)
|
|
||||||
setState(ERROR)
|
|
||||||
throw e
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -17,85 +17,160 @@
|
|||||||
|
|
||||||
package yaooqinn.kyuubi.server
|
package yaooqinn.kyuubi.server
|
||||||
|
|
||||||
|
import java.util.concurrent.{Executors, RejectedExecutionException}
|
||||||
|
|
||||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||||
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
|
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
|
||||||
|
|
||||||
import yaooqinn.kyuubi.KyuubiSQLException
|
import yaooqinn.kyuubi.KyuubiSQLException
|
||||||
import yaooqinn.kyuubi.cli.GetInfoType
|
import yaooqinn.kyuubi.cli.GetInfoType
|
||||||
import yaooqinn.kyuubi.operation.{CANCELED, RUNNING}
|
import yaooqinn.kyuubi.operation.{FINISHED, OperationHandle}
|
||||||
|
import yaooqinn.kyuubi.session.SessionHandle
|
||||||
|
|
||||||
class BackendServiceSuite extends SparkFunSuite {
|
class BackendServiceSuite extends SparkFunSuite {
|
||||||
|
|
||||||
import KyuubiConf._
|
private var backendService: BackendService = _
|
||||||
|
private val user = KyuubiSparkUtil.getCurrentUserName
|
||||||
var backendService: BackendService = _
|
private val conf = new SparkConf(loadDefaults = true).setAppName("be test")
|
||||||
val user = KyuubiSparkUtil.getCurrentUserName
|
|
||||||
val conf = new SparkConf(loadDefaults = true).setAppName("be test")
|
|
||||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||||
conf.remove(KyuubiSparkUtil.CATALOG_IMPL)
|
conf.remove(KyuubiSparkUtil.CATALOG_IMPL)
|
||||||
conf.setMaster("local")
|
conf.setMaster("local")
|
||||||
|
private var sessionHandle: SessionHandle = _
|
||||||
|
private val showTables = "show tables"
|
||||||
|
private val ip = "localhost"
|
||||||
|
private val proto = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8
|
||||||
|
|
||||||
override protected def beforeAll(): Unit = {
|
override protected def beforeAll(): Unit = {
|
||||||
backendService = new BackendService()
|
backendService = new BackendService()
|
||||||
|
backendService.init(conf)
|
||||||
|
backendService.start()
|
||||||
|
sessionHandle = backendService.openSession(proto, user, "", ip, Map.empty)
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override def afterAll(): Unit = {
|
protected override def afterAll(): Unit = {
|
||||||
backendService.stop()
|
backendService.stop()
|
||||||
backendService = null
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test("backend service function tests") {
|
test("open session") {
|
||||||
assert(backendService.getName === classOf[BackendService].getSimpleName)
|
val sessionManager = backendService.getSessionManager
|
||||||
// before init
|
assert(sessionManager.getOpenSessionCount === 1)
|
||||||
assert(backendService.getSessionManager === null)
|
val kyuubiSession = sessionManager.getSession(sessionHandle)
|
||||||
assert(backendService.getConf === null)
|
assert(kyuubiSession.getSessionHandle === sessionHandle)
|
||||||
|
assert(kyuubiSession.getUserName === user)
|
||||||
|
assert(!kyuubiSession.sparkSession.sparkContext.isStopped)
|
||||||
|
assert(kyuubiSession.ugi.getShortUserName === user)
|
||||||
|
assert(kyuubiSession.getResourcesSessionDir.exists())
|
||||||
|
assert(kyuubiSession.getIpAddress === ip)
|
||||||
|
assert(kyuubiSession.getPassword.isEmpty)
|
||||||
|
assert(kyuubiSession.isOperationLogEnabled)
|
||||||
|
kyuubiSession.closeExpiredOperations
|
||||||
|
assert(kyuubiSession.getProtocolVersion === proto)
|
||||||
|
}
|
||||||
|
|
||||||
// after init
|
test("get info") {
|
||||||
backendService.init(conf)
|
|
||||||
assert(backendService.getSessionManager !== null)
|
|
||||||
assert(backendService.getConf !== null)
|
|
||||||
backendService.start()
|
|
||||||
|
|
||||||
val session = backendService.openSession(
|
|
||||||
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8,
|
|
||||||
user,
|
|
||||||
"",
|
|
||||||
"localhost",
|
|
||||||
Map.empty)
|
|
||||||
assert(
|
assert(
|
||||||
backendService.getInfo(
|
backendService.getInfo(
|
||||||
session, GetInfoType.SERVER_NAME).toTGetInfoValue.getStringValue === "Kyuubi Server")
|
sessionHandle, GetInfoType.SERVER_NAME).toTGetInfoValue.getStringValue === "Kyuubi Server")
|
||||||
|
|
||||||
assert(
|
assert(
|
||||||
backendService.getInfo(
|
backendService.getInfo(
|
||||||
session, GetInfoType.DBMS_NAME).toTGetInfoValue.getStringValue === "Spark SQL")
|
sessionHandle, GetInfoType.DBMS_NAME).toTGetInfoValue.getStringValue === "Spark SQL")
|
||||||
|
|
||||||
assert(
|
assert(
|
||||||
backendService.getInfo(
|
backendService.getInfo(
|
||||||
session,
|
sessionHandle,
|
||||||
GetInfoType.DBMS_VERSION).toTGetInfoValue.getStringValue === KyuubiSparkUtil.SPARK_VERSION)
|
GetInfoType.DBMS_VERSION).toTGetInfoValue.getStringValue === KyuubiSparkUtil.SPARK_VERSION)
|
||||||
|
}
|
||||||
|
|
||||||
val showTables = "show tables"
|
test("get type info") {
|
||||||
val op1 = backendService.executeStatement(session, showTables)
|
val e1 = intercept[KyuubiSQLException](backendService.getTypeInfo(sessionHandle))
|
||||||
val op2 = backendService.executeStatementAsync(session, "show databases")
|
|
||||||
val e1 = intercept[KyuubiSQLException](backendService.getTypeInfo(session))
|
|
||||||
assert(e1.toTStatus.getErrorMessage === "Method Not Implemented!")
|
assert(e1.toTStatus.getErrorMessage === "Method Not Implemented!")
|
||||||
val e2 = intercept[KyuubiSQLException](backendService.getCatalogs(session))
|
}
|
||||||
|
|
||||||
|
test("get catalogs") {
|
||||||
|
val e2 = intercept[KyuubiSQLException](backendService.getCatalogs(sessionHandle))
|
||||||
assert(e2.toTStatus.getErrorMessage === "Method Not Implemented!")
|
assert(e2.toTStatus.getErrorMessage === "Method Not Implemented!")
|
||||||
assert(KyuubiSQLException.toTStatus(e2).getErrorMessage === "Method Not Implemented!")
|
assert(KyuubiSQLException.toTStatus(e2).getErrorMessage === "Method Not Implemented!")
|
||||||
|
}
|
||||||
|
|
||||||
intercept[KyuubiSQLException](backendService.getSchemas(session, "", ""))
|
test("get schemas") {
|
||||||
intercept[KyuubiSQLException](backendService.getTables(session, "", "", "", null))
|
intercept[KyuubiSQLException](backendService.getSchemas(sessionHandle, "", ""))
|
||||||
intercept[KyuubiSQLException](backendService.getTableTypes(session))
|
}
|
||||||
intercept[KyuubiSQLException](backendService.getFunctions(session, "", "", ""))
|
|
||||||
|
|
||||||
assert(backendService.getOperationStatus(op1).getState === RUNNING)
|
test("get tables") {
|
||||||
assert(backendService.getOperationStatus(op2).getState === RUNNING)
|
intercept[KyuubiSQLException](backendService.getTables(sessionHandle, "", "", "", null))
|
||||||
|
}
|
||||||
|
|
||||||
assert(backendService.getResultSetMetadata(op1).head.name === "Result")
|
test("get table types") {
|
||||||
backendService.cancelOperation(op1)
|
intercept[KyuubiSQLException](backendService.getTableTypes(sessionHandle))
|
||||||
assert(backendService.getOperationStatus(op1).getState === CANCELED)
|
}
|
||||||
|
|
||||||
backendService.getSessionManager.getSession(session).sparkSession.stop()
|
test("get functions") {
|
||||||
|
intercept[KyuubiSQLException](backendService.getFunctions(sessionHandle, "", "", ""))
|
||||||
|
}
|
||||||
|
|
||||||
|
test("execute statement") {
|
||||||
|
val operationHandle = backendService.executeStatement(sessionHandle, showTables)
|
||||||
|
val operationMgr = backendService.getSessionManager.getOperationMgr
|
||||||
|
val kyuubiOperation = operationMgr.getOperation(operationHandle)
|
||||||
|
assert(kyuubiOperation.getHandle === operationHandle)
|
||||||
|
assert(kyuubiOperation.getProtocolVersion === proto)
|
||||||
|
assert(!kyuubiOperation.isTimedOut)
|
||||||
|
assert(!kyuubiOperation.isClosedOrCanceled)
|
||||||
|
var count = 0
|
||||||
|
while (count < 100 && kyuubiOperation.getStatus.getState != FINISHED) {
|
||||||
|
Thread.sleep(50 )
|
||||||
|
count = count + 1
|
||||||
|
}
|
||||||
|
assert(kyuubiOperation.getStatus.getState === FINISHED)
|
||||||
|
assert(backendService.getOperationStatus(operationHandle).getState === FINISHED)
|
||||||
|
assert(backendService.getResultSetMetadata(operationHandle).head.name === "database")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
test("execute statement async") {
|
||||||
|
val operationHandle = backendService.executeStatementAsync(sessionHandle, showTables)
|
||||||
|
val operationMgr = backendService.getSessionManager.getOperationMgr
|
||||||
|
val kyuubiOperation = operationMgr.getOperation(operationHandle)
|
||||||
|
assert(kyuubiOperation.getHandle === operationHandle)
|
||||||
|
assert(kyuubiOperation.getProtocolVersion === proto)
|
||||||
|
assert(!kyuubiOperation.isTimedOut)
|
||||||
|
assert(!kyuubiOperation.isClosedOrCanceled)
|
||||||
|
var count = 0
|
||||||
|
while (count < 100 && kyuubiOperation.getStatus.getState != FINISHED) {
|
||||||
|
Thread.sleep(50 )
|
||||||
|
count = count + 1
|
||||||
|
}
|
||||||
|
assert(kyuubiOperation.getStatus.getState === FINISHED)
|
||||||
|
assert(backendService.getOperationStatus(operationHandle).getState === FINISHED)
|
||||||
|
assert(backendService.getResultSetMetadata(operationHandle).head.name === "database")
|
||||||
|
}
|
||||||
|
|
||||||
|
test("cancel operation") {
|
||||||
|
val operationHandle = backendService.executeStatementAsync(sessionHandle, showTables)
|
||||||
|
val operationMgr = backendService.getSessionManager.getOperationMgr
|
||||||
|
backendService.cancelOperation(operationHandle)
|
||||||
|
val operation = operationMgr.getOperation(operationHandle)
|
||||||
|
assert(operation.isClosedOrCanceled || operation.getStatus.getState === FINISHED)
|
||||||
|
}
|
||||||
|
|
||||||
|
test("close operation") {
|
||||||
|
val operationHandle = backendService.executeStatementAsync(sessionHandle, showTables)
|
||||||
|
val operationMgr = backendService.getSessionManager.getOperationMgr
|
||||||
|
val operation = operationMgr.getOperation(operationHandle)
|
||||||
|
backendService.closeOperation(operationHandle)
|
||||||
|
assert(operation.isClosedOrCanceled || operation.getStatus.getState === FINISHED)
|
||||||
|
}
|
||||||
|
|
||||||
|
test("reject execution exception") {
|
||||||
|
val t = new Thread() {
|
||||||
|
override def run(): Unit = {
|
||||||
|
val exception = intercept[KyuubiSQLException](
|
||||||
|
backendService.executeStatementAsync(sessionHandle, showTables))
|
||||||
|
assert(exception.getCause.isInstanceOf[RejectedExecutionException])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.start()
|
||||||
|
t.interrupt()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -57,5 +57,7 @@ class ServiceExceptionSuite extends SparkFunSuite {
|
|||||||
assert(tStatus3.getSqlState === null)
|
assert(tStatus3.getSqlState === null)
|
||||||
assert(tStatus3.getErrorCode === 0)
|
assert(tStatus3.getErrorCode === 0)
|
||||||
assert(tStatus3.getInfoMessages === KyuubiSQLException.toString(e4).asJava)
|
assert(tStatus3.getInfoMessages === KyuubiSQLException.toString(e4).asJava)
|
||||||
|
val e5 = new KyuubiSQLException(e4)
|
||||||
|
assert(e5.getMessage === e4.toString)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,15 +25,24 @@ class SparkSessionCacheSuite extends SparkFunSuite {
|
|||||||
.setMaster("local")
|
.setMaster("local")
|
||||||
.set(KyuubiConf.BACKEND_SESSION_MAX_CACHE_TIME.key, "10s")
|
.set(KyuubiConf.BACKEND_SESSION_MAX_CACHE_TIME.key, "10s")
|
||||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||||
private val spark = SparkSession.builder().config(conf).getOrCreate()
|
private var spark: SparkSession = _
|
||||||
|
|
||||||
|
override def beforeAll(): Unit = {
|
||||||
|
spark = SparkSession.builder().config(conf).getOrCreate()
|
||||||
|
spark.sql("show tables")
|
||||||
|
super.beforeAll()
|
||||||
|
}
|
||||||
override def afterAll(): Unit = {
|
override def afterAll(): Unit = {
|
||||||
spark.stop()
|
if (spark != null) spark.stop()
|
||||||
super.afterAll()
|
super.afterAll()
|
||||||
}
|
}
|
||||||
|
|
||||||
test("spark session cache") {
|
test("spark session cache") {
|
||||||
|
val start = System.currentTimeMillis()
|
||||||
val cache = SparkSessionCache.init(spark)
|
val cache = SparkSessionCache.init(spark)
|
||||||
|
val end = System.currentTimeMillis()
|
||||||
|
print("init cache using " + (end - start))
|
||||||
|
print("\n")
|
||||||
assert(!cache.isCrashed)
|
assert(!cache.isCrashed)
|
||||||
assert(!cache.isIdle)
|
assert(!cache.isIdle)
|
||||||
assert(!cache.needClear, s"cache status [crash:${cache.isCrashed}, expired:${cache.isExpired}]")
|
assert(!cache.needClear, s"cache status [crash:${cache.isCrashed}, expired:${cache.isExpired}]")
|
||||||
@ -51,7 +60,11 @@ class SparkSessionCacheSuite extends SparkFunSuite {
|
|||||||
}
|
}
|
||||||
|
|
||||||
test("cache status idled") {
|
test("cache status idled") {
|
||||||
|
val start = System.currentTimeMillis()
|
||||||
val cache = SparkSessionCache.init(spark)
|
val cache = SparkSessionCache.init(spark)
|
||||||
|
val end = System.currentTimeMillis()
|
||||||
|
print("init cache using " + (end - start))
|
||||||
|
print("\n")
|
||||||
assert(!cache.isIdle, "cache is not idled, reuse time = 1")
|
assert(!cache.isIdle, "cache is not idled, reuse time = 1")
|
||||||
cache.decReuseTimeAndGet
|
cache.decReuseTimeAndGet
|
||||||
assert(!cache.isIdle, "cache is not idled, reuse time = 0, but latest logout is unset")
|
assert(!cache.isIdle, "cache is not idled, reuse time = 0, but latest logout is unset")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user