From 9cf22452c8a52e3ee8b144ea2b3cdc28848dee63 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Mon, 19 Aug 2024 23:07:54 -0700 Subject: [PATCH] [KYUUBI #6335][FOLLOWUP] Using sessionId for sessionUploadFolderPath MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— Followup for #6335 Just use the session handle identifier to build the batchResourceUploadFolderPath, and do not rely on the BatchJobSubmission operation initialization. ## Describe Your Solution ๐Ÿ”ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6629 from turboFei/batch_id. Closes #6335 28df03d4d [liangbowen] unused import 2eee2a64f [Wang, Fei] Move to KyuubiApplicationManager f23ecae1f [liangbowen] update batchResourceUploadFolderPath method for removing batch prefix and checking sessionId b30a7b996 [Wang, Fei] using session id Lead-authored-by: Wang, Fei Co-authored-by: liangbowen Signed-off-by: Wang, Fei --- .../apache/kyuubi/engine/KyuubiApplicationManager.scala | 7 +++++++ .../org/apache/kyuubi/server/api/v1/BatchesResource.scala | 8 +++++--- .../org/apache/kyuubi/session/KyuubiBatchSession.scala | 3 +-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index 247f0c813..a80f13a86 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -24,6 +24,8 @@ import java.util.Locale import scala.util.control.NonFatal +import org.apache.commons.lang3.StringUtils + import org.apache.kyuubi.{KyuubiException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY @@ -211,4 +213,9 @@ object KyuubiApplicationManager { case _ => } } + + def sessionUploadFolderPath(sessionId: String): Path = { + require(StringUtils.isNotBlank(sessionId)) + uploadWorkDir.resolve(sessionId) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 182b28b0c..b5e98845e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -545,7 +545,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { resourceFileInputStream: InputStream, resourceFileName: String, formDataMultiPartOpt: Option[FormDataMultiPart]): Option[JPath] = { - val uploadFileFolderPath = batchResourceUploadFolderPath(batchId) + val uploadFileFolderPath = KyuubiApplicationManager.sessionUploadFolderPath(batchId) try { handleUploadingResourceFile( request, @@ -642,6 +642,8 @@ object BatchesResource { Option(batchState).exists(bt => VALID_BATCH_STATES.contains(bt.toUpperCase(Locale.ROOT))) } - def batchResourceUploadFolderPath(batchId: String): JPath = - KyuubiApplicationManager.uploadWorkDir.resolve(s"batch-$batchId") + def batchResourceUploadFolderPath(sessionId: String): JPath = { + require(StringUtils.isNotBlank(sessionId)) + KyuubiApplicationManager.uploadWorkDir.resolve(sessionId) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala index 5dc3a605d..c8cce59e1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala @@ -28,7 +28,6 @@ import org.apache.kyuubi.engine.KyuubiApplicationManager import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent} import org.apache.kyuubi.operation.OperationState -import org.apache.kyuubi.server.api.v1.BatchesResource import org.apache.kyuubi.server.metadata.api.Metadata import org.apache.kyuubi.session.SessionType.SessionType import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion @@ -83,7 +82,7 @@ class KyuubiBatchSession( sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(conf) private[kyuubi] def resourceUploadFolderPath: Path = - BatchesResource.batchResourceUploadFolderPath(batchJobSubmissionOp.batchId) + KyuubiApplicationManager.sessionUploadFolderPath(handle.identifier.toString) val optimizedConf: Map[String, String] = { val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(