diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 796d87b65..2e2eb310f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -19,13 +19,16 @@ package org.apache.kyuubi.engine.spark.operation import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit} +import scala.collection.JavaConverters._ + import org.apache.spark.kyuubi.SQLOperationListener +import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil} +import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, IterableFetchIterator, KyuubiSparkUtil} import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkStatementEvent} import org.apache.kyuubi.operation.{OperationState, OperationType} import org.apache.kyuubi.operation.OperationState.OperationState @@ -38,7 +41,8 @@ class ExecuteStatement( session: Session, protected override val statement: String, override val shouldRunAsync: Boolean, - queryTimeout: Long) + queryTimeout: Long, + incrementalCollect: Boolean) extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging { import org.apache.kyuubi.KyuubiSparkUtils._ @@ -88,11 +92,17 @@ class ExecuteStatement( // TODO: Make it configurable spark.sparkContext.addSparkListener(operationListener) result = spark.sql(statement) - // TODO( #921): COMPILED need consider eagerly executed commands + // TODO #921: COMPILED need consider eagerly executed commands statementEvent.queryExecution = result.queryExecution.toString() setState(OperationState.COMPILED) debug(result.queryExecution) - iter = new ArrayFetchIterator(result.collect()) + iter = if (incrementalCollect) { + info("Execute in incremental collect mode") + new IterableFetchIterator[Row](result.toLocalIterator().asScala.toIterable) + } else { + info("Execute in full collect mode") + new ArrayFetchIterator(result.collect()) + } setState(OperationState.FINISHED) } catch { onError(cancel = true) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala index b34510bf6..ccda453b0 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.SparkSession import org.apache.kyuubi.KyuubiSQLException -import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY, OperationModes} +import org.apache.kyuubi.config.KyuubiConf.{OPERATION_INCREMENTAL_COLLECT, OPERATION_PLAN_ONLY, OperationModes} import org.apache.kyuubi.config.KyuubiConf.OperationModes._ import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim import org.apache.kyuubi.operation.{Operation, OperationManager} @@ -56,6 +56,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n def getOpenSparkSessionCount: Int = sessionToSpark.size() private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY) + private lazy val operationIncrementalCollectDefault = getConf.get(OPERATION_INCREMENTAL_COLLECT) override def newExecuteStatementOperation( session: Session, @@ -66,9 +67,13 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n val operationModeStr = spark.conf.get(OPERATION_PLAN_ONLY.key, operationModeDefault).toUpperCase(Locale.ROOT) + val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key) + .map(_.toBoolean).getOrElse(operationIncrementalCollectDefault) val operation = OperationModes.withName(operationModeStr) match { - case NONE => new ExecuteStatement(spark, session, statement, runAsync, queryTimeout) - case mode => new PlanOnlyStatement(spark, session, statement, mode) + case NONE => + new ExecuteStatement(spark, session, statement, runAsync, queryTimeout, incrementalCollect) + case mode => + new PlanOnlyStatement(spark, session, statement, mode) } addOperation(operation) } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala index ea6a97c41..40585571b 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala @@ -20,8 +20,6 @@ package org.apache.kyuubi.engine.spark import java.util.concurrent.ConcurrentHashMap import org.apache.hive.service.rpc.thrift._ -import org.apache.hive.service.rpc.thrift.TCLIService.Iface -import org.apache.hive.service.rpc.thrift.TOperationState._ import org.apache.spark.scheduler.JobSucceeded import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ @@ -75,12 +73,4 @@ class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests assert(jobIdToJobInfoMap.size() === 1) } } - - private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = { - val req = new TGetOperationStatusReq(op) - var state = client.GetOperationStatus(req).getOperationState - while (state == INITIALIZED_STATE || state == PENDING_STATE || state == RUNNING_STATE) { - state = client.GetOperationStatus(req).getOperationState - } - } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala index 8083c3645..0e9e03def 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala @@ -30,8 +30,6 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hive.common.util.HiveVersionInfo import org.apache.hive.service.cli.HiveSQLException import org.apache.hive.service.rpc.thrift._ -import org.apache.hive.service.rpc.thrift.TCLIService.Iface -import org.apache.hive.service.rpc.thrift.TOperationState._ import org.apache.spark.kyuubi.SparkContextHelper import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.types._ @@ -495,14 +493,6 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveJDBCTests { } } - private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = { - val req = new TGetOperationStatusReq(op) - var state = client.GetOperationStatus(req).getOperationState - while (state == INITIALIZED_STATE || state == PENDING_STATE || state == RUNNING_STATE) { - state = client.GetOperationStatus(req).getOperationState - } - - } test("basic open | execute | close") { withThriftClient { client => val operationManager = engine.backendService.sessionManager. diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala index 2cc57d9b4..40c974704 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SQLOperationListenerSuite.scala @@ -43,12 +43,12 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with JDBCTestUtils { fetchResultsReq.setFetchType(1.toShort) eventually(timeout(90.seconds), interval(500.milliseconds)) { val resultsResp = client.FetchResults(fetchResultsReq) - val toSeq = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala.toSeq - assert(toSeq.exists(_.contains("started with 2 stages"))) - assert(toSeq.exists(_.contains("started with 1 tasks"))) - assert(toSeq.exists(_.contains("started with 3 tasks"))) - assert(toSeq.exists(_.contains("Finished stage:"))) - assert(toSeq.exists(_.contains("Job 0 succeeded"))) + val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala + assert(logs.exists(_.contains("started with 2 stages"))) + assert(logs.exists(_.contains("started with 1 tasks"))) + assert(logs.exists(_.contains("started with 3 tasks"))) + assert(logs.exists(_.contains("Finished stage:"))) + assert(logs.exists(_.contains("Job 0 succeeded"))) } } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 29d53c652..3447a18ed 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -687,6 +687,15 @@ object KyuubiConf { .checkValue(_ >= 1000, "must >= 1s if set") .createOptional + val OPERATION_INCREMENTAL_COLLECT: ConfigEntry[Boolean] = + buildConf("operation.incremental.collect") + .internal + .doc("When true, the executor side result will be sequentially calculated and returned to" + + " the Spark driver side.") + .version("1.4.0") + .booleanConf + .createWithDefault(false) + val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] = buildConf("operation.log.dir.root") .doc("Root directory for query operation log at server-side.") diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala index 1decd7bc6..4df59309a 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestUtils.scala @@ -20,9 +20,12 @@ package org.apache.kyuubi.operation import java.sql.{DriverManager, ResultSet, SQLException, Statement} import java.util.Locale -import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle} +import org.apache.hive.service.rpc.thrift._ +import org.apache.hive.service.rpc.thrift.TCLIService.Iface +import org.apache.hive.service.rpc.thrift.TOperationState._ import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TSocket +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{KyuubiFunSuite, Utils} import org.apache.kyuubi.service.authentication.PlainSASLHelper @@ -169,4 +172,13 @@ trait JDBCTestUtils extends KyuubiFunSuite { assert(!rs.next()) assert(dbNames.size === count, "All expected schemas should be visited") } + + def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = { + val req = new TGetOperationStatusReq(op) + var state = client.GetOperationStatus(req).getOperationState + eventually(timeout(90.seconds), interval(100.milliseconds)) { + state = client.GetOperationStatus(req).getOperationState + assert(!Set(INITIALIZED_STATE, PENDING_STATE, RUNNING_STATE).contains(state)) + } + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiIncrementCollectSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiIncrementCollectSuite.scala new file mode 100644 index 000000000..b7aa4f5e5 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiIncrementCollectSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hive.service.rpc.thrift._ +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + +import org.apache.kyuubi.WithKyuubiServer +import org.apache.kyuubi.config.KyuubiConf + +class KyuubiIncrementCollectSuite extends WithKyuubiServer with JDBCTestUtils { + + override protected val conf: KyuubiConf = KyuubiConf() + .set(KyuubiConf.OPERATION_INCREMENTAL_COLLECT, true) + + override protected def jdbcUrl: String = getJdbcUrl + + test("change incremental collect mode using SET commands") { + val querySQL = "SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1" + withSessionHandle { (client, handle) => + def execute(sql: String): TOperationHandle = { + val req = new TExecuteStatementReq() + req.setSessionHandle(handle) + req.setStatement(sql) + val execStmtResp = client.ExecuteStatement(req) + execStmtResp.getOperationHandle + } + + def executeAndWait(sql: String): TOperationHandle = { + val opHandle = execute(sql) + waitForOperationToComplete(client, opHandle) + opHandle + } + + def queryAndCheckLog(sql: String, checkedText: String): Unit = { + val opHandle = execute(sql) + val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000) + fetchResultsReq.setFetchType(1.toShort) + eventually(timeout(10.seconds), interval(100.milliseconds)) { + val resultsResp = client.FetchResults(fetchResultsReq) + val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala + assert(logs.exists(_ contains checkedText)) + } + } + + queryAndCheckLog(querySQL, "Execute in incremental collect mode") + executeAndWait("SET kyuubi.operation.incremental.collect=false") + queryAndCheckLog(querySQL, "Execute in full collect mode") + executeAndWait("SET kyuubi.operation.incremental.collect=true") + queryAndCheckLog(querySQL, "Execute in incremental collect mode") + } + } + + test("incremental collect query result") { + withJdbcStatement() { statement => + val rs = statement.executeQuery("SELECT * FROM VALUES(1),(2),(3) AS t(c1) DISTRIBUTE BY c1") + val result = new ArrayBuffer[Int] + while (rs.next()) { + result += rs.getInt(1) + } + assert((Set(1, 2, 3) diff result.toSet).isEmpty) + } + } +}