diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index ce878b87f..40e5d9548 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -287,6 +287,7 @@ Key | Default | Meaning | Since
kyuubi\.operation\.idle
\.timeout|
PT3H
|Operation will be closed when it's not accessed for this duration of time
|1.0.0
kyuubi\.operation
\.interrupt\.on\.cancel|true
|When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.
|1.2.0
kyuubi\.operation
\.query\.timeout|PT0S
|Set a query duration timeout in seconds in Kyuubi. If the timeout is set to a positive value, a running query will be cancelled automatically if timeout. Otherwise the query continues to run till completion. If timeout values are set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller than this configuration value, they take precedence. If you set this timeout and prefer to cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.
|1.2.0
+kyuubi\.operation
\.scheduler\.pool|<undefined>
|The scheduler pool of job. Note that, this config should be used after change Spark config spark.scheduler.mode=FAIR.
|1.1.1
kyuubi\.operation
\.status\.polling
\.timeout|PT5S
|Timeout(ms) for long polling asynchronous running sql query's status
|1.0.0
### Session
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 11142d41b..c3d6975e2 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -41,6 +41,10 @@ class ExecuteStatement(
private val forceCancel =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)
+ private val schedulerPool =
+ spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse(
+ session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL))
+
private val operationLog: OperationLog =
OperationLog.createOperationLog(session.handle, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@@ -64,20 +68,17 @@ class ExecuteStatement(
OperationLog.removeCurrentOperationLog()
}
- private def executeStatement(): Unit = {
+ private def executeStatement(): Unit = withLocalProperties {
try {
setState(OperationState.RUNNING)
info(KyuubiSparkUtil.diagnostics(spark))
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
- spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
result = spark.sql(statement)
debug(result.queryExecution)
iter = new ArrayFetchIterator(result.collect())
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
- } finally {
- spark.sparkContext.clearJobGroup()
}
}
@@ -108,6 +109,22 @@ class ExecuteStatement(
}
}
+ private def withLocalProperties[T](f: => T): T = {
+ try {
+ spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
+ schedulerPool match {
+ case Some(pool) =>
+ spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+ case None =>
+ }
+
+ f
+ } finally {
+ spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)
+ spark.sparkContext.clearJobGroup()
+ }
+ }
+
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 578afbdd6..25c96aa40 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -535,4 +535,11 @@ object KyuubiConf {
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(ShareLevel.values.map(_.toString))
.createWithDefault(ShareLevel.USER.toString)
+
+ val OPERATION_SCHEDULER_POOL: OptionalConfigEntry[String] = buildConf("operation.scheduler.pool")
+ .doc("The scheduler pool of job. Note that, this config should be used after change Spark " +
+ "config spark.scheduler.mode=FAIR.")
+ .version("1.1.1")
+ .stringConf
+ .createOptional
}