[KYUUBI #526][BRANCH-1.1] Support kyuubi.operation.scheduler.pool for queries
 [](https://github.com/yaooqinn/kyuubi/pull/530)     [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT --> backport #528 for branch-1.1 Closes #530 from ulysses-you/support-spark-scheduler-mode. 2ca4104 [ulysses-you] backport #528 Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
This commit is contained in:
parent
12fbef030d
commit
b15b42cadf
@ -287,6 +287,7 @@ Key | Default | Meaning | Since
|
||||
kyuubi\.operation\.idle<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT3H</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Operation will be closed when it's not accessed for this duration of time</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.operation<br>\.interrupt\.on\.cancel|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.operation<br>\.query\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT0S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Set a query duration timeout in seconds in Kyuubi. If the timeout is set to a positive value, a running query will be cancelled automatically if timeout. Otherwise the query continues to run till completion. If timeout values are set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller than this configuration value, they take precedence. If you set this timeout and prefer to cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.operation<br>\.scheduler\.pool|<div style='width: 80pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>The scheduler pool of job. Note that, this config should be used after change Spark config spark.scheduler.mode=FAIR.</div>|<div style='width: 20pt'>1.1.1</div>
|
||||
kyuubi\.operation<br>\.status\.polling<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for long polling asynchronous running sql query's status</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
|
||||
### Session
|
||||
|
||||
@ -41,6 +41,10 @@ class ExecuteStatement(
|
||||
private val forceCancel =
|
||||
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)
|
||||
|
||||
private val schedulerPool =
|
||||
spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse(
|
||||
session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL))
|
||||
|
||||
private val operationLog: OperationLog =
|
||||
OperationLog.createOperationLog(session.handle, getHandle)
|
||||
override def getOperationLog: Option[OperationLog] = Option(operationLog)
|
||||
@ -64,20 +68,17 @@ class ExecuteStatement(
|
||||
OperationLog.removeCurrentOperationLog()
|
||||
}
|
||||
|
||||
private def executeStatement(): Unit = {
|
||||
private def executeStatement(): Unit = withLocalProperties {
|
||||
try {
|
||||
setState(OperationState.RUNNING)
|
||||
info(KyuubiSparkUtil.diagnostics(spark))
|
||||
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
|
||||
spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
|
||||
result = spark.sql(statement)
|
||||
debug(result.queryExecution)
|
||||
iter = new ArrayFetchIterator(result.collect())
|
||||
setState(OperationState.FINISHED)
|
||||
} catch {
|
||||
onError(cancel = true)
|
||||
} finally {
|
||||
spark.sparkContext.clearJobGroup()
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,6 +109,22 @@ class ExecuteStatement(
|
||||
}
|
||||
}
|
||||
|
||||
private def withLocalProperties[T](f: => T): T = {
|
||||
try {
|
||||
spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
|
||||
schedulerPool match {
|
||||
case Some(pool) =>
|
||||
spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
|
||||
case None =>
|
||||
}
|
||||
|
||||
f
|
||||
} finally {
|
||||
spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)
|
||||
spark.sparkContext.clearJobGroup()
|
||||
}
|
||||
}
|
||||
|
||||
private def addTimeoutMonitor(): Unit = {
|
||||
if (queryTimeout > 0) {
|
||||
val timeoutExecutor =
|
||||
|
||||
@ -535,4 +535,11 @@ object KyuubiConf {
|
||||
.transform(_.toUpperCase(Locale.ROOT))
|
||||
.checkValues(ShareLevel.values.map(_.toString))
|
||||
.createWithDefault(ShareLevel.USER.toString)
|
||||
|
||||
val OPERATION_SCHEDULER_POOL: OptionalConfigEntry[String] = buildConf("operation.scheduler.pool")
|
||||
.doc("The scheduler pool of job. Note that, this config should be used after change Spark " +
|
||||
"config spark.scheduler.mode=FAIR.")
|
||||
.version("1.1.1")
|
||||
.stringConf
|
||||
.createOptional
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user