[KYUUBI-180]add a sync execute statement operation (#192)

* fix #180 add sync execute statement operation

* fix ut
This commit is contained in:
Kent Yao 2019-06-04 18:51:07 +08:00 committed by GitHub
parent cf4ee6afb6
commit f8a4775365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 69 additions and 29 deletions

View File

@ -34,8 +34,7 @@ import yaooqinn.kyuubi.session.KyuubiSession
abstract class AbstractOperation(
session: KyuubiSession,
opType: OperationType,
runAsync: Boolean) extends KyuubiOperation with Logging {
opType: OperationType) extends KyuubiOperation with Logging {
private var state: OperationState = INITIALIZED
private val opHandle: OperationHandle = new OperationHandle(opType, session.getProtocolVersion)
protected val conf: SparkConf = session.getConf
@ -57,8 +56,6 @@ abstract class AbstractOperation(
this.backgroundHandle = backgroundHandle
}
override def shouldRunAsync: Boolean = runAsync
override def getSession: KyuubiSession = session
override def getHandle: OperationHandle = opHandle

View File

@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.operation.metadata._
import yaooqinn.kyuubi.operation.statement.ExecuteStatementInClientMode
import yaooqinn.kyuubi.operation.statement.{ExecuteStatementInClientMode, ExecuteStatementOperation}
import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
import yaooqinn.kyuubi.service.AbstractService
import yaooqinn.kyuubi.session.KyuubiSession
@ -87,8 +87,9 @@ private[kyuubi] class OperationManager private(name: String)
def newExecuteStatementOperation(
parentSession: KyuubiSession,
statement: String): ExecuteStatementInClientMode = synchronized {
val operation = new ExecuteStatementInClientMode(parentSession, statement)
statement: String,
runAsync: Boolean = true): ExecuteStatementOperation = synchronized {
val operation = new ExecuteStatementInClientMode(parentSession, statement, runAsync)
addOperation(operation)
operation
}

View File

@ -29,7 +29,7 @@ import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
import yaooqinn.kyuubi.session.KyuubiSession
abstract class MetadataOperation(session: KyuubiSession, opType: OperationType)
extends AbstractOperation(session, opType, false) {
extends AbstractOperation(session, opType) {
setHasResultSet(true)

View File

@ -42,8 +42,11 @@ import yaooqinn.kyuubi.session.KyuubiSession
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.utils.ReflectUtils
class ExecuteStatementInClientMode(session: KyuubiSession, statement: String)
extends ExecuteStatementOperation(session, statement) {
class ExecuteStatementInClientMode(
session: KyuubiSession,
statement: String,
runAsync: Boolean = true)
extends ExecuteStatementOperation(session, statement, runAsync) {
import ExecuteStatementInClientMode._

View File

@ -30,8 +30,11 @@ import yaooqinn.kyuubi.session.KyuubiSession
* @param session Parent [[KyuubiSession]]
* @param statement sql statement
*/
abstract class ExecuteStatementOperation(session: KyuubiSession, statement: String)
extends AbstractOperation(session, EXECUTE_STATEMENT, true) {
abstract class ExecuteStatementOperation(
session: KyuubiSession,
statement: String,
runAsync: Boolean)
extends AbstractOperation(session, EXECUTE_STATEMENT) {
protected val statementId: String = UUID.randomUUID().toString
@ -61,9 +64,7 @@ abstract class ExecuteStatementOperation(session: KyuubiSession, statement: Stri
override protected def runInternal(): Unit = {
setState(PENDING)
setHasResultSet(true)
// Runnable impl to call runInternal asynchronously, from a different thread
val backgroundOperation = new Runnable() {
val task = new Runnable() {
override def run(): Unit = {
try {
session.ugi.doAs(new PrivilegedExceptionAction[Unit]() {
@ -82,16 +83,19 @@ abstract class ExecuteStatementOperation(session: KyuubiSession, statement: Stri
}
}
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
session.getSessionMgr.submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(ERROR)
throw new KyuubiSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
if (shouldRunAsync) {
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle = session.getSessionMgr.submitBackgroundOperation(task)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(ERROR)
throw new KyuubiSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
}
} else {
task.run()
}
}
@ -108,4 +112,6 @@ abstract class ExecuteStatementOperation(session: KyuubiSession, statement: Stri
}
}
override def shouldRunAsync: Boolean = runAsync
}

View File

@ -108,10 +108,10 @@ private[kyuubi] class KyuubiSession(
}
@throws[KyuubiSQLException]
private def executeStatementInternal(statement: String): OperationHandle = {
private def executeStatementInternal(statement: String, runAsync: Boolean): OperationHandle = {
acquire(true)
val operation =
operationManager.newExecuteStatementOperation(this, statement)
operationManager.newExecuteStatementOperation(this, statement, runAsync)
val opHandle = operation.getHandle
try {
operation.run()
@ -294,7 +294,7 @@ private[kyuubi] class KyuubiSession(
*/
@throws[KyuubiSQLException]
def executeStatement(statement: String): OperationHandle = {
executeStatementInternal(statement)
executeStatementInternal(statement, runAsync = false)
}
/**
@ -305,7 +305,7 @@ private[kyuubi] class KyuubiSession(
*/
@throws[KyuubiSQLException]
def executeStatementAsync(statement: String): OperationHandle = {
executeStatementInternal(statement)
executeStatementInternal(statement, runAsync = true)
}
/**

View File

@ -396,6 +396,39 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
}
}
test("execute statement sync") {
withFEServiceAndHandle { case (fe, handle) =>
val req = new TExecuteStatementReq(handle, "show databases")
req.setRunAsync(false)
val resp = fe.ExecuteStatement(req)
val req2 = new TGetOperationStatusReq(resp.getOperationHandle)
val statusResp = fe.GetOperationStatus(req2)
statusResp.getOperationState should be(TOperationState.FINISHED_STATE)
val fReq = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
val fRes = fe.FetchResults(fReq)
val rows = fRes.getResults.getRows
rows.get(0).getColVals.get(0).getStringVal.getValue should be("default")
}
}
test("execute statement async") {
withFEServiceAndHandle { case (fe, handle) =>
val req = new TExecuteStatementReq(handle, "show databases")
req.setRunAsync(true)
val resp = fe.ExecuteStatement(req)
val statusReq = new TGetOperationStatusReq(resp.getOperationHandle)
while(fe.GetOperationStatus(statusReq)
.getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) {
Thread.sleep(10)
}
Thread.sleep(2000)
val fReq = new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
val fRes = fe.FetchResults(fReq)
val rows = fRes.getResults.getRows
rows.get(0).getColVals.get(0).getStringVal.getValue should be("default")
}
}
test("alter database") {
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
val req = new TExecuteStatementReq(handle,