fix #140 Add a server side configuration to limit the query result size

This commit is contained in:
Kent Yao 2019-01-11 15:37:17 +08:00
parent 37625fd4dc
commit 7171f77d7a
3 changed files with 15 additions and 3 deletions

View File

@ -94,6 +94,7 @@ Name|Default|Description
---|---|---
spark.kyuubi.operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time.
spark.kyuubi.operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side.
spark.kyuubi.operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side.
---

View File

@ -366,6 +366,13 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)
val OPERATION_RESULT_LIMIT: ConfigEntry[Int] =
KyuubiConfigBuilder("spark.kyuubi.operation.result.limit")
.doc("In non-incremental result collection mode, set this to a positive value to limit the" +
" size of result collected to driver side.")
.intConf
.createWithDefault(-1)
/////////////////////////////////////////////////////////////////////////////////////////////////
// Containerization //
/////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -66,8 +66,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
private[this] val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] =
Set(FetchOrientation.FETCH_NEXT, FetchOrientation.FETCH_FIRST)
private[this] val incrementalCollect: Boolean =
conf.get(OPERATION_INCREMENTAL_COLLECT.key).toBoolean
private[this] val incrementalCollect: Boolean = conf.get(OPERATION_INCREMENTAL_COLLECT).toBoolean
def getBackgroundHandle: Future[_] = backgroundHandle
@ -323,7 +322,12 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
info("Executing query in incremental collection mode")
result.toLocalIterator().asScala
} else {
result.collect().toList.iterator
val resultLimit = conf.get(OPERATION_RESULT_LIMIT).toInt
if (resultLimit >= 0) {
result.take(resultLimit).toList.toIterator
} else {
result.collect().toList.iterator
}
}
setState(FINISHED)
KyuubiServerMonitor.getListener(session.getUserName).foreach(_.onStatementFinish(statementId))