diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/AbstractOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/AbstractOperation.scala index 95b03889c..3823633d0 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/AbstractOperation.scala @@ -34,8 +34,7 @@ import yaooqinn.kyuubi.session.KyuubiSession abstract class AbstractOperation( session: KyuubiSession, - opType: OperationType, - runAsync: Boolean) extends KyuubiOperation with Logging { + opType: OperationType) extends KyuubiOperation with Logging { private var state: OperationState = INITIALIZED private val opHandle: OperationHandle = new OperationHandle(opType, session.getProtocolVersion) protected val conf: SparkConf = session.getConf @@ -57,8 +56,6 @@ abstract class AbstractOperation( this.backgroundHandle = backgroundHandle } - override def shouldRunAsync: Boolean = runAsync - override def getSession: KyuubiSession = session override def getHandle: OperationHandle = opHandle diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala index f8f918303..64d27e24a 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType import yaooqinn.kyuubi.{KyuubiSQLException, Logging} import yaooqinn.kyuubi.cli.FetchOrientation import yaooqinn.kyuubi.operation.metadata._ -import yaooqinn.kyuubi.operation.statement.ExecuteStatementInClientMode +import yaooqinn.kyuubi.operation.statement.{ExecuteStatementInClientMode, ExecuteStatementOperation} import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder} import yaooqinn.kyuubi.service.AbstractService import yaooqinn.kyuubi.session.KyuubiSession @@ -87,8 +87,9 @@ private[kyuubi] class OperationManager private(name: String) def newExecuteStatementOperation( parentSession: KyuubiSession, - statement: String): ExecuteStatementInClientMode = synchronized { - val operation = new ExecuteStatementInClientMode(parentSession, statement) + statement: String, + runAsync: Boolean = true): ExecuteStatementOperation = synchronized { + val operation = new ExecuteStatementInClientMode(parentSession, statement, runAsync) addOperation(operation) operation } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/metadata/MetadataOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/metadata/MetadataOperation.scala index 2282d8b47..9e240bd4b 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/metadata/MetadataOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/metadata/MetadataOperation.scala @@ -29,7 +29,7 @@ import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder} import yaooqinn.kyuubi.session.KyuubiSession abstract class MetadataOperation(session: KyuubiSession, opType: OperationType) - extends AbstractOperation(session, opType, false) { + extends AbstractOperation(session, opType) { setHasResultSet(true) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala index ffc5bff9e..e44de854b 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala @@ -42,8 +42,11 @@ import yaooqinn.kyuubi.session.KyuubiSession import yaooqinn.kyuubi.ui.KyuubiServerMonitor import yaooqinn.kyuubi.utils.ReflectUtils -class ExecuteStatementInClientMode(session: KyuubiSession, statement: String) - extends ExecuteStatementOperation(session, statement) { +class ExecuteStatementInClientMode( + session: KyuubiSession, + statement: String, + runAsync: Boolean = true) + extends ExecuteStatementOperation(session, statement, runAsync) { import ExecuteStatementInClientMode._ diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementOperation.scala index 171c66e7c..87588cb66 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementOperation.scala @@ -30,8 +30,11 @@ import yaooqinn.kyuubi.session.KyuubiSession * @param session Parent [[KyuubiSession]] * @param statement sql statement */ -abstract class ExecuteStatementOperation(session: KyuubiSession, statement: String) - extends AbstractOperation(session, EXECUTE_STATEMENT, true) { +abstract class ExecuteStatementOperation( + session: KyuubiSession, + statement: String, + runAsync: Boolean) + extends AbstractOperation(session, EXECUTE_STATEMENT) { protected val statementId: String = UUID.randomUUID().toString @@ -61,9 +64,7 @@ abstract class ExecuteStatementOperation(session: KyuubiSession, statement: Stri override protected def runInternal(): Unit = { setState(PENDING) setHasResultSet(true) - - // Runnable impl to call runInternal asynchronously, from a different thread - val backgroundOperation = new Runnable() { + val task = new Runnable() { override def run(): Unit = { try { session.ugi.doAs(new PrivilegedExceptionAction[Unit]() { @@ -82,16 +83,19 @@ abstract class ExecuteStatementOperation(session: KyuubiSession, statement: Stri } } - try { - // This submit blocks if no background threads are available to run this operation - val backgroundHandle = - session.getSessionMgr.submitBackgroundOperation(backgroundOperation) - setBackgroundHandle(backgroundHandle) - } catch { - case rejected: RejectedExecutionException => - setState(ERROR) - throw new KyuubiSQLException("The background threadpool cannot accept" + - " new task for execution, please retry the operation", rejected) + if (shouldRunAsync) { + try { + // This submit blocks if no background threads are available to run this operation + val backgroundHandle = session.getSessionMgr.submitBackgroundOperation(task) + setBackgroundHandle(backgroundHandle) + } catch { + case rejected: RejectedExecutionException => + setState(ERROR) + throw new KyuubiSQLException("The background threadpool cannot accept" + + " new task for execution, please retry the operation", rejected) + } + } else { + task.run() } } @@ -108,4 +112,6 @@ abstract class ExecuteStatementOperation(session: KyuubiSession, statement: Stri } } + override def shouldRunAsync: Boolean = runAsync + } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index 450ca7ed0..6cebd87a3 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -108,10 +108,10 @@ private[kyuubi] class KyuubiSession( } @throws[KyuubiSQLException] - private def executeStatementInternal(statement: String): OperationHandle = { + private def executeStatementInternal(statement: String, runAsync: Boolean): OperationHandle = { acquire(true) val operation = - operationManager.newExecuteStatementOperation(this, statement) + operationManager.newExecuteStatementOperation(this, statement, runAsync) val opHandle = operation.getHandle try { operation.run() @@ -294,7 +294,7 @@ private[kyuubi] class KyuubiSession( */ @throws[KyuubiSQLException] def executeStatement(statement: String): OperationHandle = { - executeStatementInternal(statement) + executeStatementInternal(statement, runAsync = false) } /** @@ -305,7 +305,7 @@ private[kyuubi] class KyuubiSession( */ @throws[KyuubiSQLException] def executeStatementAsync(statement: String): OperationHandle = { - executeStatementInternal(statement) + executeStatementInternal(statement, runAsync = true) } /** diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala index 230713782..c965c127e 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala @@ -396,6 +396,39 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu } } + test("execute statement sync") { + withFEServiceAndHandle { case (fe, handle) => + val req = new TExecuteStatementReq(handle, "show databases") + req.setRunAsync(false) + val resp = fe.ExecuteStatement(req) + val req2 = new TGetOperationStatusReq(resp.getOperationHandle) + val statusResp = fe.GetOperationStatus(req2) + statusResp.getOperationState should be(TOperationState.FINISHED_STATE) + val fReq = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + val fRes = fe.FetchResults(fReq) + val rows = fRes.getResults.getRows + rows.get(0).getColVals.get(0).getStringVal.getValue should be("default") + } + } + + test("execute statement async") { + withFEServiceAndHandle { case (fe, handle) => + val req = new TExecuteStatementReq(handle, "show databases") + req.setRunAsync(true) + val resp = fe.ExecuteStatement(req) + val statusReq = new TGetOperationStatusReq(resp.getOperationHandle) + while(fe.GetOperationStatus(statusReq) + .getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) { + Thread.sleep(10) + } + Thread.sleep(2000) + val fReq = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + val fRes = fe.FetchResults(fReq) + val rows = fRes.getResults.getRows + rows.get(0).getColVals.get(0).getStringVal.getValue should be("default") + } + } + test("alter database") { val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { val req = new TExecuteStatementReq(handle,