From ec8596c9eee9b31c8d715ee4f53d53cd8b441304 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Thu, 20 Apr 2023 17:47:52 +0800 Subject: [PATCH] [KYUUBI #4731] Support batch session conf advisor ### _Why are the changes needed?_ As title. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4731 from turboFei/batch_session_conf_overlay. Closes #4731 5cf73d1db [fwang12] save Authored-by: fwang12 Signed-off-by: Cheng Pan --- .../session/KyuubiBatchSessionImpl.scala | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala index 228890a1e..94859a08c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala @@ -68,14 +68,27 @@ class KyuubiBatchSessionImpl( override val sessionIdleTimeoutThreshold: Long = sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT) - // TODO: Support batch conf advisor override val normalizedConf: Map[String, String] = { sessionConf.getBatchConf(batchRequest.getBatchType) ++ sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap) } + private val optimizedConf: Map[String, String] = { + val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay( + user, + normalizedConf.asJava) + if (confOverlay != null) { + val overlayConf = new KyuubiConf(false) + confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) } + normalizedConf ++ overlayConf.getBatchConf(batchRequest.getBatchType) + } else { + warn(s"the server plugin return null value for user: $user, ignore it") + normalizedConf + } + } + override lazy val name: Option[String] = Option(batchRequest.getName).orElse( - normalizedConf.get(KyuubiConf.SESSION_NAME.key)) + optimizedConf.get(KyuubiConf.SESSION_NAME.key)) // whether the resource file is from uploading private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf @@ -88,7 +101,7 @@ class KyuubiBatchSessionImpl( name.orNull, batchRequest.getResource, batchRequest.getClassName, - normalizedConf, + optimizedConf, batchRequest.getArgs.asScala, recoveryMetadata) @@ -115,7 +128,7 @@ class KyuubiBatchSessionImpl( override def checkSessionAccessPathURIs(): Unit = { KyuubiApplicationManager.checkApplicationAccessPaths( batchRequest.getBatchType, - normalizedConf, + optimizedConf, sessionManager.getConf) if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE && !isResourceUploaded) { @@ -140,7 +153,7 @@ class KyuubiBatchSessionImpl( resource = batchRequest.getResource, className = batchRequest.getClassName, requestName = name.orNull, - requestConf = normalizedConf, + requestConf = optimizedConf, requestArgs = batchRequest.getArgs.asScala, createTime = createTime, engineType = batchRequest.getBatchType,