diff --git a/docs/configurations.md b/docs/configurations.md index c30937692..0c3a7399c 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -94,6 +94,7 @@ Name|Default|Description ---|---|--- spark.kyuubi.operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time. spark.kyuubi.operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side. +spark.kyuubi.operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side. --- diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index 0b70c706d..aa7ca4516 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -366,6 +366,13 @@ object KyuubiConf { .booleanConf .createWithDefault(false) + val OPERATION_RESULT_LIMIT: ConfigEntry[Int] = + KyuubiConfigBuilder("spark.kyuubi.operation.result.limit") + .doc("In non-incremental result collection mode, set this to a positive value to limit the" + + " size of result collected to driver side.") + .intConf + .createWithDefault(-1) + ///////////////////////////////////////////////////////////////////////////////////////////////// // Containerization // ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 3ce6f9325..8e0f1c75c 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -43,31 +43,30 @@ import yaooqinn.kyuubi.ui.KyuubiServerMonitor class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging { - private[this] var state: OperationState = INITIALIZED - private[this] val opHandle: OperationHandle = + private var state: OperationState = INITIALIZED + private val opHandle: OperationHandle = new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion) - private[this] val conf = session.sparkSession.conf + private val conf = session.sparkSession.conf - private[this] val operationTimeout = + private val operationTimeout = KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT.key)) - private[this] var lastAccessTime = System.currentTimeMillis() + private var lastAccessTime = System.currentTimeMillis() - private[this] var hasResultSet: Boolean = false - private[this] var operationException: KyuubiSQLException = _ - private[this] var backgroundHandle: Future[_] = _ - private[this] var operationLog: OperationLog = _ - private[this] var isOperationLogEnabled: Boolean = false + private var hasResultSet: Boolean = false + private var operationException: KyuubiSQLException = _ + private var backgroundHandle: Future[_] = _ + private var operationLog: OperationLog = _ + private var isOperationLogEnabled: Boolean = false - private[this] var result: DataFrame = _ - private[this] var iter: Iterator[Row] = _ - private[this] var statementId: String = _ + private var result: DataFrame = _ + private var iter: Iterator[Row] = _ + private var statementId: String = _ - private[this] val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] = + private val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] = Set(FetchOrientation.FETCH_NEXT, FetchOrientation.FETCH_FIRST) - private[this] val incrementalCollect: Boolean = - conf.get(OPERATION_INCREMENTAL_COLLECT.key).toBoolean + private val incrementalCollect: Boolean = conf.get(OPERATION_INCREMENTAL_COLLECT).toBoolean def getBackgroundHandle: Future[_] = backgroundHandle @@ -85,18 +84,18 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging def getOperationLog: OperationLog = operationLog - private[this] def setOperationException(opEx: KyuubiSQLException): Unit = { + private def setOperationException(opEx: KyuubiSQLException): Unit = { this.operationException = opEx } @throws[KyuubiSQLException] - private[this] def setState(newState: OperationState): Unit = { + private def setState(newState: OperationState): Unit = { state.validateTransition(newState) this.state = newState this.lastAccessTime = System.currentTimeMillis() } - private[this] def checkState(state: OperationState): Boolean = { + private def checkState(state: OperationState): Boolean = { this.state == state } @@ -105,14 +104,14 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } @throws[KyuubiSQLException] - private[this] def assertState(state: OperationState): Unit = { + private def assertState(state: OperationState): Unit = { if (this.state ne state) { throw new KyuubiSQLException("Expected state " + state + ", but found " + this.state) } this.lastAccessTime = System.currentTimeMillis() } - private[this] def createOperationLog(): Unit = { + private def createOperationLog(): Unit = { if (session.isOperationLogEnabled) { val logFile = new File(session.getSessionLogDir, opHandle.getHandleIdentifier.toString) @@ -161,7 +160,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } - private[this] def registerCurrentOperationLog(): Unit = { + private def registerCurrentOperationLog(): Unit = { if (isOperationLogEnabled) { if (operationLog == null) { warn("Failed to get current OperationLog object of Operation: " @@ -174,7 +173,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } - private[this] def unregisterOperationLog(): Unit = { + private def unregisterOperationLog(): Unit = { if (isOperationLogEnabled) { session.getSessionMgr.getOperationMgr .unregisterOperationLog(session.getUserName) @@ -233,7 +232,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging RowSetBuilder.create(getResultSetSchema, taken.toSeq, session.getProtocolVersion) } - private[this] def setHasResultSet(hasResultSet: Boolean): Unit = { + private def setHasResultSet(hasResultSet: Boolean): Unit = { this.hasResultSet = hasResultSet opHandle.setHasResultSet(hasResultSet) } @@ -242,7 +241,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging * Verify if the given fetch orientation is part of the default orientation types. */ @throws[KyuubiSQLException] - private[this] def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = { + private def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = { validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET) } @@ -250,7 +249,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging * Verify if the given fetch orientation is part of the supported orientation types. */ @throws[KyuubiSQLException] - private[this] def validateFetchOrientation( + private def validateFetchOrientation( orientation: FetchOrientation, supportedOrientations: Set[FetchOrientation]): Unit = { if (!supportedOrientations.contains(orientation)) { @@ -259,7 +258,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } - private[this] def runInternal(): Unit = { + private def runInternal(): Unit = { setState(PENDING) setHasResultSet(true) @@ -300,7 +299,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } - private[this] def execute(): Unit = { + private def execute(): Unit = { try { statementId = UUID.randomUUID().toString info(s"Running query '$statement' with $statementId") @@ -323,7 +322,12 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging info("Executing query in incremental collection mode") result.toLocalIterator().asScala } else { - result.collect().toList.iterator + val resultLimit = conf.get(OPERATION_RESULT_LIMIT).toInt + if (resultLimit >= 0) { + result.take(resultLimit).toList.toIterator + } else { + result.collect().toList.iterator + } } setState(FINISHED) KyuubiServerMonitor.getListener(session.getUserName).foreach(_.onStatementFinish(statementId)) @@ -362,7 +366,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } - private[this] def onStatementError(id: String, message: String, trace: String): Unit = { + private def onStatementError(id: String, message: String, trace: String): Unit = { error( s""" |Error executing query as ${session.getUserName}, @@ -375,7 +379,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging .foreach(_.onStatementError(id, message, trace)) } - private[this] def cleanup(state: OperationState) { + private def cleanup(state: OperationState) { if (this.state != CLOSED) { setState(state) } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala index c6dc88aca..db10154ed 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala @@ -18,7 +18,7 @@ package yaooqinn.kyuubi.server import org.apache.hive.service.cli.thrift.TProtocolVersion -import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} +import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite} import yaooqinn.kyuubi.KyuubiSQLException import yaooqinn.kyuubi.cli.GetInfoType @@ -26,6 +26,8 @@ import yaooqinn.kyuubi.operation.{CANCELED, RUNNING} class BackendServiceSuite extends SparkFunSuite { + import KyuubiConf._ + var backendService: BackendService = _ val user = KyuubiSparkUtil.getCurrentUserName val conf = new SparkConf(loadDefaults = true).setAppName("be test") @@ -60,6 +62,13 @@ class BackendServiceSuite extends SparkFunSuite { "", "localhost", Map.empty) + val session2 = backendService.openSession( + TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8, + user, + "", + "localhost", + Map(OPERATION_RESULT_LIMIT.key -> "1") + ) assert( backendService.getInfo( session, GetInfoType.SERVER_NAME).toTGetInfoValue.getStringValue === "Kyuubi Server") @@ -76,6 +85,7 @@ class BackendServiceSuite extends SparkFunSuite { val showTables = "show tables" val op1 = backendService.executeStatement(session, showTables) val op2 = backendService.executeStatementAsync(session, "show databases") + val op3 = backendService.executeStatement(session, showTables) val e1 = intercept[KyuubiSQLException](backendService.getTypeInfo(session)) assert(e1.toTStatus.getErrorMessage === "Method Not Implemented!") val e2 = intercept[KyuubiSQLException](backendService.getCatalogs(session)) @@ -89,6 +99,8 @@ class BackendServiceSuite extends SparkFunSuite { assert(backendService.getOperationStatus(op1).getState === RUNNING) assert(backendService.getOperationStatus(op2).getState === RUNNING) + assert(backendService.getOperationStatus(op3).getState === RUNNING) + assert(backendService.getResultSetMetadata(op1).head.name === "Result") backendService.cancelOperation(op1) assert(backendService.getOperationStatus(op1).getState === CANCELED)