Merge pull request #141 from yaooqinn/KYUUBI-140
[KYUUBI-140]Add a server side configuration to limit the query result size
This commit is contained in:
commit
b3ab6be6e1
@ -94,6 +94,7 @@ Name|Default|Description
|
||||
---|---|---
|
||||
spark.kyuubi.operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time.
|
||||
spark.kyuubi.operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side.
|
||||
spark.kyuubi.operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@ -366,6 +366,13 @@ object KyuubiConf {
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val OPERATION_RESULT_LIMIT: ConfigEntry[Int] =
|
||||
KyuubiConfigBuilder("spark.kyuubi.operation.result.limit")
|
||||
.doc("In non-incremental result collection mode, set this to a positive value to limit the" +
|
||||
" size of result collected to driver side.")
|
||||
.intConf
|
||||
.createWithDefault(-1)
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Containerization //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@ -43,31 +43,30 @@ import yaooqinn.kyuubi.ui.KyuubiServerMonitor
|
||||
|
||||
class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging {
|
||||
|
||||
private[this] var state: OperationState = INITIALIZED
|
||||
private[this] val opHandle: OperationHandle =
|
||||
private var state: OperationState = INITIALIZED
|
||||
private val opHandle: OperationHandle =
|
||||
new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion)
|
||||
|
||||
private[this] val conf = session.sparkSession.conf
|
||||
private val conf = session.sparkSession.conf
|
||||
|
||||
private[this] val operationTimeout =
|
||||
private val operationTimeout =
|
||||
KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT.key))
|
||||
private[this] var lastAccessTime = System.currentTimeMillis()
|
||||
private var lastAccessTime = System.currentTimeMillis()
|
||||
|
||||
private[this] var hasResultSet: Boolean = false
|
||||
private[this] var operationException: KyuubiSQLException = _
|
||||
private[this] var backgroundHandle: Future[_] = _
|
||||
private[this] var operationLog: OperationLog = _
|
||||
private[this] var isOperationLogEnabled: Boolean = false
|
||||
private var hasResultSet: Boolean = false
|
||||
private var operationException: KyuubiSQLException = _
|
||||
private var backgroundHandle: Future[_] = _
|
||||
private var operationLog: OperationLog = _
|
||||
private var isOperationLogEnabled: Boolean = false
|
||||
|
||||
private[this] var result: DataFrame = _
|
||||
private[this] var iter: Iterator[Row] = _
|
||||
private[this] var statementId: String = _
|
||||
private var result: DataFrame = _
|
||||
private var iter: Iterator[Row] = _
|
||||
private var statementId: String = _
|
||||
|
||||
private[this] val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] =
|
||||
private val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] =
|
||||
Set(FetchOrientation.FETCH_NEXT, FetchOrientation.FETCH_FIRST)
|
||||
|
||||
private[this] val incrementalCollect: Boolean =
|
||||
conf.get(OPERATION_INCREMENTAL_COLLECT.key).toBoolean
|
||||
private val incrementalCollect: Boolean = conf.get(OPERATION_INCREMENTAL_COLLECT).toBoolean
|
||||
|
||||
def getBackgroundHandle: Future[_] = backgroundHandle
|
||||
|
||||
@ -85,18 +84,18 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
|
||||
def getOperationLog: OperationLog = operationLog
|
||||
|
||||
private[this] def setOperationException(opEx: KyuubiSQLException): Unit = {
|
||||
private def setOperationException(opEx: KyuubiSQLException): Unit = {
|
||||
this.operationException = opEx
|
||||
}
|
||||
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def setState(newState: OperationState): Unit = {
|
||||
private def setState(newState: OperationState): Unit = {
|
||||
state.validateTransition(newState)
|
||||
this.state = newState
|
||||
this.lastAccessTime = System.currentTimeMillis()
|
||||
}
|
||||
|
||||
private[this] def checkState(state: OperationState): Boolean = {
|
||||
private def checkState(state: OperationState): Boolean = {
|
||||
this.state == state
|
||||
}
|
||||
|
||||
@ -105,14 +104,14 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def assertState(state: OperationState): Unit = {
|
||||
private def assertState(state: OperationState): Unit = {
|
||||
if (this.state ne state) {
|
||||
throw new KyuubiSQLException("Expected state " + state + ", but found " + this.state)
|
||||
}
|
||||
this.lastAccessTime = System.currentTimeMillis()
|
||||
}
|
||||
|
||||
private[this] def createOperationLog(): Unit = {
|
||||
private def createOperationLog(): Unit = {
|
||||
if (session.isOperationLogEnabled) {
|
||||
val logFile =
|
||||
new File(session.getSessionLogDir, opHandle.getHandleIdentifier.toString)
|
||||
@ -161,7 +160,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def registerCurrentOperationLog(): Unit = {
|
||||
private def registerCurrentOperationLog(): Unit = {
|
||||
if (isOperationLogEnabled) {
|
||||
if (operationLog == null) {
|
||||
warn("Failed to get current OperationLog object of Operation: "
|
||||
@ -174,7 +173,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def unregisterOperationLog(): Unit = {
|
||||
private def unregisterOperationLog(): Unit = {
|
||||
if (isOperationLogEnabled) {
|
||||
session.getSessionMgr.getOperationMgr
|
||||
.unregisterOperationLog(session.getUserName)
|
||||
@ -233,7 +232,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
RowSetBuilder.create(getResultSetSchema, taken.toSeq, session.getProtocolVersion)
|
||||
}
|
||||
|
||||
private[this] def setHasResultSet(hasResultSet: Boolean): Unit = {
|
||||
private def setHasResultSet(hasResultSet: Boolean): Unit = {
|
||||
this.hasResultSet = hasResultSet
|
||||
opHandle.setHasResultSet(hasResultSet)
|
||||
}
|
||||
@ -242,7 +241,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
* Verify if the given fetch orientation is part of the default orientation types.
|
||||
*/
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
|
||||
private def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
|
||||
validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET)
|
||||
}
|
||||
|
||||
@ -250,7 +249,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
* Verify if the given fetch orientation is part of the supported orientation types.
|
||||
*/
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def validateFetchOrientation(
|
||||
private def validateFetchOrientation(
|
||||
orientation: FetchOrientation,
|
||||
supportedOrientations: Set[FetchOrientation]): Unit = {
|
||||
if (!supportedOrientations.contains(orientation)) {
|
||||
@ -259,7 +258,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def runInternal(): Unit = {
|
||||
private def runInternal(): Unit = {
|
||||
setState(PENDING)
|
||||
setHasResultSet(true)
|
||||
|
||||
@ -300,7 +299,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def execute(): Unit = {
|
||||
private def execute(): Unit = {
|
||||
try {
|
||||
statementId = UUID.randomUUID().toString
|
||||
info(s"Running query '$statement' with $statementId")
|
||||
@ -323,7 +322,12 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
info("Executing query in incremental collection mode")
|
||||
result.toLocalIterator().asScala
|
||||
} else {
|
||||
result.collect().toList.iterator
|
||||
val resultLimit = conf.get(OPERATION_RESULT_LIMIT).toInt
|
||||
if (resultLimit >= 0) {
|
||||
result.take(resultLimit).toList.toIterator
|
||||
} else {
|
||||
result.collect().toList.iterator
|
||||
}
|
||||
}
|
||||
setState(FINISHED)
|
||||
KyuubiServerMonitor.getListener(session.getUserName).foreach(_.onStatementFinish(statementId))
|
||||
@ -362,7 +366,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def onStatementError(id: String, message: String, trace: String): Unit = {
|
||||
private def onStatementError(id: String, message: String, trace: String): Unit = {
|
||||
error(
|
||||
s"""
|
||||
|Error executing query as ${session.getUserName},
|
||||
@ -375,7 +379,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
.foreach(_.onStatementError(id, message, trace))
|
||||
}
|
||||
|
||||
private[this] def cleanup(state: OperationState) {
|
||||
private def cleanup(state: OperationState) {
|
||||
if (this.state != CLOSED) {
|
||||
setState(state)
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
package yaooqinn.kyuubi.server
|
||||
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
|
||||
|
||||
import yaooqinn.kyuubi.KyuubiSQLException
|
||||
import yaooqinn.kyuubi.cli.GetInfoType
|
||||
@ -26,6 +26,8 @@ import yaooqinn.kyuubi.operation.{CANCELED, RUNNING}
|
||||
|
||||
class BackendServiceSuite extends SparkFunSuite {
|
||||
|
||||
import KyuubiConf._
|
||||
|
||||
var backendService: BackendService = _
|
||||
val user = KyuubiSparkUtil.getCurrentUserName
|
||||
val conf = new SparkConf(loadDefaults = true).setAppName("be test")
|
||||
@ -60,6 +62,13 @@ class BackendServiceSuite extends SparkFunSuite {
|
||||
"",
|
||||
"localhost",
|
||||
Map.empty)
|
||||
val session2 = backendService.openSession(
|
||||
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8,
|
||||
user,
|
||||
"",
|
||||
"localhost",
|
||||
Map(OPERATION_RESULT_LIMIT.key -> "1")
|
||||
)
|
||||
assert(
|
||||
backendService.getInfo(
|
||||
session, GetInfoType.SERVER_NAME).toTGetInfoValue.getStringValue === "Kyuubi Server")
|
||||
@ -76,6 +85,7 @@ class BackendServiceSuite extends SparkFunSuite {
|
||||
val showTables = "show tables"
|
||||
val op1 = backendService.executeStatement(session, showTables)
|
||||
val op2 = backendService.executeStatementAsync(session, "show databases")
|
||||
val op3 = backendService.executeStatement(session, showTables)
|
||||
val e1 = intercept[KyuubiSQLException](backendService.getTypeInfo(session))
|
||||
assert(e1.toTStatus.getErrorMessage === "Method Not Implemented!")
|
||||
val e2 = intercept[KyuubiSQLException](backendService.getCatalogs(session))
|
||||
@ -89,6 +99,8 @@ class BackendServiceSuite extends SparkFunSuite {
|
||||
|
||||
assert(backendService.getOperationStatus(op1).getState === RUNNING)
|
||||
assert(backendService.getOperationStatus(op2).getState === RUNNING)
|
||||
assert(backendService.getOperationStatus(op3).getState === RUNNING)
|
||||
|
||||
assert(backendService.getResultSetMetadata(op1).head.name === "Result")
|
||||
backendService.cancelOperation(op1)
|
||||
assert(backendService.getOperationStatus(op1).getState === CANCELED)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user