diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index d0bce0f56..db4615d1c 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -302,10 +302,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging setState(ERROR) throw new KyuubiSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) - case NonFatal(e) => - error(s"Error executing query in background", e) - setState(ERROR) - throw e } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala index bb2a0b124..a5ad03590 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala @@ -17,85 +17,160 @@ package yaooqinn.kyuubi.server +import java.util.concurrent.{Executors, RejectedExecutionException} + import org.apache.hive.service.cli.thrift.TProtocolVersion -import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite} +import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} import yaooqinn.kyuubi.KyuubiSQLException import yaooqinn.kyuubi.cli.GetInfoType -import yaooqinn.kyuubi.operation.{CANCELED, RUNNING} +import yaooqinn.kyuubi.operation.{FINISHED, OperationHandle} +import yaooqinn.kyuubi.session.SessionHandle class BackendServiceSuite extends SparkFunSuite { - import KyuubiConf._ - - var backendService: BackendService = _ - val user = KyuubiSparkUtil.getCurrentUserName - val conf = new SparkConf(loadDefaults = true).setAppName("be test") + private var backendService: BackendService = _ + private val user = KyuubiSparkUtil.getCurrentUserName + private val conf = new SparkConf(loadDefaults = true).setAppName("be test") KyuubiSparkUtil.setupCommonConfig(conf) conf.remove(KyuubiSparkUtil.CATALOG_IMPL) conf.setMaster("local") + private var sessionHandle: SessionHandle = _ + private val showTables = "show tables" + private val ip = "localhost" + private val proto = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8 override protected def beforeAll(): Unit = { backendService = new BackendService() + backendService.init(conf) + backendService.start() + sessionHandle = backendService.openSession(proto, user, "", ip, Map.empty) } protected override def afterAll(): Unit = { backendService.stop() - backendService = null } - test("backend service function tests") { - assert(backendService.getName === classOf[BackendService].getSimpleName) - // before init - assert(backendService.getSessionManager === null) - assert(backendService.getConf === null) + test("open session") { + val sessionManager = backendService.getSessionManager + assert(sessionManager.getOpenSessionCount === 1) + val kyuubiSession = sessionManager.getSession(sessionHandle) + assert(kyuubiSession.getSessionHandle === sessionHandle) + assert(kyuubiSession.getUserName === user) + assert(!kyuubiSession.sparkSession.sparkContext.isStopped) + assert(kyuubiSession.ugi.getShortUserName === user) + assert(kyuubiSession.getResourcesSessionDir.exists()) + assert(kyuubiSession.getIpAddress === ip) + assert(kyuubiSession.getPassword.isEmpty) + assert(kyuubiSession.isOperationLogEnabled) + kyuubiSession.closeExpiredOperations + assert(kyuubiSession.getProtocolVersion === proto) + } - // after init - backendService.init(conf) - assert(backendService.getSessionManager !== null) - assert(backendService.getConf !== null) - backendService.start() - - val session = backendService.openSession( - TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8, - user, - "", - "localhost", - Map.empty) + test("get info") { assert( backendService.getInfo( - session, GetInfoType.SERVER_NAME).toTGetInfoValue.getStringValue === "Kyuubi Server") + sessionHandle, GetInfoType.SERVER_NAME).toTGetInfoValue.getStringValue === "Kyuubi Server") assert( backendService.getInfo( - session, GetInfoType.DBMS_NAME).toTGetInfoValue.getStringValue === "Spark SQL") + sessionHandle, GetInfoType.DBMS_NAME).toTGetInfoValue.getStringValue === "Spark SQL") assert( backendService.getInfo( - session, + sessionHandle, GetInfoType.DBMS_VERSION).toTGetInfoValue.getStringValue === KyuubiSparkUtil.SPARK_VERSION) + } - val showTables = "show tables" - val op1 = backendService.executeStatement(session, showTables) - val op2 = backendService.executeStatementAsync(session, "show databases") - val e1 = intercept[KyuubiSQLException](backendService.getTypeInfo(session)) + test("get type info") { + val e1 = intercept[KyuubiSQLException](backendService.getTypeInfo(sessionHandle)) assert(e1.toTStatus.getErrorMessage === "Method Not Implemented!") - val e2 = intercept[KyuubiSQLException](backendService.getCatalogs(session)) + } + + test("get catalogs") { + val e2 = intercept[KyuubiSQLException](backendService.getCatalogs(sessionHandle)) assert(e2.toTStatus.getErrorMessage === "Method Not Implemented!") assert(KyuubiSQLException.toTStatus(e2).getErrorMessage === "Method Not Implemented!") + } - intercept[KyuubiSQLException](backendService.getSchemas(session, "", "")) - intercept[KyuubiSQLException](backendService.getTables(session, "", "", "", null)) - intercept[KyuubiSQLException](backendService.getTableTypes(session)) - intercept[KyuubiSQLException](backendService.getFunctions(session, "", "", "")) + test("get schemas") { + intercept[KyuubiSQLException](backendService.getSchemas(sessionHandle, "", "")) + } - assert(backendService.getOperationStatus(op1).getState === RUNNING) - assert(backendService.getOperationStatus(op2).getState === RUNNING) + test("get tables") { + intercept[KyuubiSQLException](backendService.getTables(sessionHandle, "", "", "", null)) + } - assert(backendService.getResultSetMetadata(op1).head.name === "Result") - backendService.cancelOperation(op1) - assert(backendService.getOperationStatus(op1).getState === CANCELED) + test("get table types") { + intercept[KyuubiSQLException](backendService.getTableTypes(sessionHandle)) + } - backendService.getSessionManager.getSession(session).sparkSession.stop() + test("get functions") { + intercept[KyuubiSQLException](backendService.getFunctions(sessionHandle, "", "", "")) + } + + test("execute statement") { + val operationHandle = backendService.executeStatement(sessionHandle, showTables) + val operationMgr = backendService.getSessionManager.getOperationMgr + val kyuubiOperation = operationMgr.getOperation(operationHandle) + assert(kyuubiOperation.getHandle === operationHandle) + assert(kyuubiOperation.getProtocolVersion === proto) + assert(!kyuubiOperation.isTimedOut) + assert(!kyuubiOperation.isClosedOrCanceled) + var count = 0 + while (count < 100 && kyuubiOperation.getStatus.getState != FINISHED) { + Thread.sleep(50 ) + count = count + 1 + } + assert(kyuubiOperation.getStatus.getState === FINISHED) + assert(backendService.getOperationStatus(operationHandle).getState === FINISHED) + assert(backendService.getResultSetMetadata(operationHandle).head.name === "database") + + } + + test("execute statement async") { + val operationHandle = backendService.executeStatementAsync(sessionHandle, showTables) + val operationMgr = backendService.getSessionManager.getOperationMgr + val kyuubiOperation = operationMgr.getOperation(operationHandle) + assert(kyuubiOperation.getHandle === operationHandle) + assert(kyuubiOperation.getProtocolVersion === proto) + assert(!kyuubiOperation.isTimedOut) + assert(!kyuubiOperation.isClosedOrCanceled) + var count = 0 + while (count < 100 && kyuubiOperation.getStatus.getState != FINISHED) { + Thread.sleep(50 ) + count = count + 1 + } + assert(kyuubiOperation.getStatus.getState === FINISHED) + assert(backendService.getOperationStatus(operationHandle).getState === FINISHED) + assert(backendService.getResultSetMetadata(operationHandle).head.name === "database") + } + + test("cancel operation") { + val operationHandle = backendService.executeStatementAsync(sessionHandle, showTables) + val operationMgr = backendService.getSessionManager.getOperationMgr + backendService.cancelOperation(operationHandle) + val operation = operationMgr.getOperation(operationHandle) + assert(operation.isClosedOrCanceled || operation.getStatus.getState === FINISHED) + } + + test("close operation") { + val operationHandle = backendService.executeStatementAsync(sessionHandle, showTables) + val operationMgr = backendService.getSessionManager.getOperationMgr + val operation = operationMgr.getOperation(operationHandle) + backendService.closeOperation(operationHandle) + assert(operation.isClosedOrCanceled || operation.getStatus.getState === FINISHED) + } + + test("reject execution exception") { + val t = new Thread() { + override def run(): Unit = { + val exception = intercept[KyuubiSQLException]( + backendService.executeStatementAsync(sessionHandle, showTables)) + assert(exception.getCause.isInstanceOf[RejectedExecutionException]) + } + } + t.start() + t.interrupt() } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/service/ServiceExceptionSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/service/ServiceExceptionSuite.scala index 05a01abd5..75949748c 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/service/ServiceExceptionSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/service/ServiceExceptionSuite.scala @@ -57,5 +57,7 @@ class ServiceExceptionSuite extends SparkFunSuite { assert(tStatus3.getSqlState === null) assert(tStatus3.getErrorCode === 0) assert(tStatus3.getInfoMessages === KyuubiSQLException.toString(e4).asJava) + val e5 = new KyuubiSQLException(e4) + assert(e5.getMessage === e4.toString) } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheSuite.scala index 6fd60e1a2..86d1fcadf 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheSuite.scala @@ -25,15 +25,24 @@ class SparkSessionCacheSuite extends SparkFunSuite { .setMaster("local") .set(KyuubiConf.BACKEND_SESSION_MAX_CACHE_TIME.key, "10s") KyuubiSparkUtil.setupCommonConfig(conf) - private val spark = SparkSession.builder().config(conf).getOrCreate() + private var spark: SparkSession = _ + override def beforeAll(): Unit = { + spark = SparkSession.builder().config(conf).getOrCreate() + spark.sql("show tables") + super.beforeAll() + } override def afterAll(): Unit = { - spark.stop() + if (spark != null) spark.stop() super.afterAll() } test("spark session cache") { + val start = System.currentTimeMillis() val cache = SparkSessionCache.init(spark) + val end = System.currentTimeMillis() + print("init cache using " + (end - start)) + print("\n") assert(!cache.isCrashed) assert(!cache.isIdle) assert(!cache.needClear, s"cache status [crash:${cache.isCrashed}, expired:${cache.isExpired}]") @@ -51,7 +60,11 @@ class SparkSessionCacheSuite extends SparkFunSuite { } test("cache status idled") { + val start = System.currentTimeMillis() val cache = SparkSessionCache.init(spark) + val end = System.currentTimeMillis() + print("init cache using " + (end - start)) + print("\n") assert(!cache.isIdle, "cache is not idled, reuse time = 1") cache.decReuseTimeAndGet assert(!cache.isIdle, "cache is not idled, reuse time = 0, but latest logout is unset")