[KYUUBI #4731] Support batch session conf advisor

### _Why are the changes needed?_

As title.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4731 from turboFei/batch_session_conf_overlay.

Closes #4731

5cf73d1db [fwang12] save

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
fwang12 2023-04-20 17:47:52 +08:00 committed by Cheng Pan
parent c6571344b9
commit ec8596c9ee
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D

View File

@ -68,14 +68,27 @@ class KyuubiBatchSessionImpl(
override val sessionIdleTimeoutThreshold: Long =
sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)
// TODO: Support batch conf advisor
override val normalizedConf: Map[String, String] = {
sessionConf.getBatchConf(batchRequest.getBatchType) ++
sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap)
}
private val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
user,
normalizedConf.asJava)
if (confOverlay != null) {
val overlayConf = new KyuubiConf(false)
confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) }
normalizedConf ++ overlayConf.getBatchConf(batchRequest.getBatchType)
} else {
warn(s"the server plugin return null value for user: $user, ignore it")
normalizedConf
}
}
override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
normalizedConf.get(KyuubiConf.SESSION_NAME.key))
optimizedConf.get(KyuubiConf.SESSION_NAME.key))
// whether the resource file is from uploading
private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf
@ -88,7 +101,7 @@ class KyuubiBatchSessionImpl(
name.orNull,
batchRequest.getResource,
batchRequest.getClassName,
normalizedConf,
optimizedConf,
batchRequest.getArgs.asScala,
recoveryMetadata)
@ -115,7 +128,7 @@ class KyuubiBatchSessionImpl(
override def checkSessionAccessPathURIs(): Unit = {
KyuubiApplicationManager.checkApplicationAccessPaths(
batchRequest.getBatchType,
normalizedConf,
optimizedConf,
sessionManager.getConf)
if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE
&& !isResourceUploaded) {
@ -140,7 +153,7 @@ class KyuubiBatchSessionImpl(
resource = batchRequest.getResource,
className = batchRequest.getClassName,
requestName = name.orNull,
requestConf = normalizedConf,
requestConf = optimizedConf,
requestArgs = batchRequest.getArgs.asScala,
createTime = createTime,
engineType = batchRequest.getBatchType,