From 7171f77d7afa18c121ca68546ae1a9209e3abf57 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 11 Jan 2019 15:37:17 +0800 Subject: [PATCH] fix #140 Add a server side configuration to limit the query result size --- docs/configurations.md | 1 + .../src/main/scala/org/apache/spark/KyuubiConf.scala | 7 +++++++ .../yaooqinn/kyuubi/operation/KyuubiOperation.scala | 10 +++++++--- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/docs/configurations.md b/docs/configurations.md index c30937692..0c3a7399c 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -94,6 +94,7 @@ Name|Default|Description ---|---|--- spark.kyuubi.operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time. spark.kyuubi.operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side. +spark.kyuubi.operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side. --- diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index 0b70c706d..aa7ca4516 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -366,6 +366,13 @@ object KyuubiConf { .booleanConf .createWithDefault(false) + val OPERATION_RESULT_LIMIT: ConfigEntry[Int] = + KyuubiConfigBuilder("spark.kyuubi.operation.result.limit") + .doc("In non-incremental result collection mode, set this to a positive value to limit the" + + " size of result collected to driver side.") + .intConf + .createWithDefault(-1) + ///////////////////////////////////////////////////////////////////////////////////////////////// // Containerization // ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 3ce6f9325..d30dc0283 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -66,8 +66,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging private[this] val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] = Set(FetchOrientation.FETCH_NEXT, FetchOrientation.FETCH_FIRST) - private[this] val incrementalCollect: Boolean = - conf.get(OPERATION_INCREMENTAL_COLLECT.key).toBoolean + private[this] val incrementalCollect: Boolean = conf.get(OPERATION_INCREMENTAL_COLLECT).toBoolean def getBackgroundHandle: Future[_] = backgroundHandle @@ -323,7 +322,12 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging info("Executing query in incremental collection mode") result.toLocalIterator().asScala } else { - result.collect().toList.iterator + val resultLimit = conf.get(OPERATION_RESULT_LIMIT).toInt + if (resultLimit >= 0) { + result.take(resultLimit).toList.toIterator + } else { + result.collect().toList.iterator + } } setState(FINISHED) KyuubiServerMonitor.getListener(session.getUserName).foreach(_.onStatementFinish(statementId))