diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala index 228890a1e..94859a08c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala @@ -68,14 +68,27 @@ class KyuubiBatchSessionImpl( override val sessionIdleTimeoutThreshold: Long = sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT) - // TODO: Support batch conf advisor override val normalizedConf: Map[String, String] = { sessionConf.getBatchConf(batchRequest.getBatchType) ++ sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap) } + private val optimizedConf: Map[String, String] = { + val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay( + user, + normalizedConf.asJava) + if (confOverlay != null) { + val overlayConf = new KyuubiConf(false) + confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) } + normalizedConf ++ overlayConf.getBatchConf(batchRequest.getBatchType) + } else { + warn(s"the server plugin return null value for user: $user, ignore it") + normalizedConf + } + } + override lazy val name: Option[String] = Option(batchRequest.getName).orElse( - normalizedConf.get(KyuubiConf.SESSION_NAME.key)) + optimizedConf.get(KyuubiConf.SESSION_NAME.key)) // whether the resource file is from uploading private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf @@ -88,7 +101,7 @@ class KyuubiBatchSessionImpl( name.orNull, batchRequest.getResource, batchRequest.getClassName, - normalizedConf, + optimizedConf, batchRequest.getArgs.asScala, recoveryMetadata) @@ -115,7 +128,7 @@ class KyuubiBatchSessionImpl( override def checkSessionAccessPathURIs(): Unit = { KyuubiApplicationManager.checkApplicationAccessPaths( batchRequest.getBatchType, - normalizedConf, + optimizedConf, sessionManager.getConf) if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE && !isResourceUploaded) { @@ -140,7 +153,7 @@ class KyuubiBatchSessionImpl( resource = batchRequest.getResource, className = batchRequest.getClassName, requestName = name.orNull, - requestConf = normalizedConf, + requestConf = optimizedConf, requestArgs = batchRequest.getArgs.asScala, createTime = createTime, engineType = batchRequest.getBatchType,